fastapi sse 实现打印机式流式输出
安装依赖
pip install sse-starlette==2.0.0
示例代码
import asyncio
from sse_starlette import EventSourceResponse
@router.get('/stream/')
async def stream():
async def generate_response_data():
for i in range(10):
yield i
await asyncio.sleep(1)
return EventSourceResponse(generate_response_data())
会有坑的地方
from fastapi.middleware.gzip import GZipMiddleware
app.add_middleware(GZipMiddleware, minimum_size=1000)
如果该app加上了gzip压缩的中间件的话,会导致sse效果失效(原因暂不明)。
解决方法
把需要sse输出的接口单独写在一个subapp里面,通过mount挂载。
app = FastAPI()
gzip_app = FastAPI()
stream_app = FastAPI()
app.mount('/xx', gzip_app)
app.mount('/xx', stream_app)