fastapi sse 实现打印机式流式输出
安装依赖
1
| pip install sse-starlette==2.0.0
|
示例代码
1 2 3 4 5 6 7 8 9 10 11
| import asyncio from sse_starlette import EventSourceResponse
@router.get('/stream/') async def stream(): async def generate_response_data(): for i in range(10): yield i await asyncio.sleep(1)
return EventSourceResponse(generate_response_data())
|
会有坑的地方
1 2 3
| from fastapi.middleware.gzip import GZipMiddleware
app.add_middleware(GZipMiddleware, minimum_size=1000)
|
如果该app加上了gzip压缩的中间件的话,会导致sse效果失效(原因暂不明)。
解决方法
把需要sse输出的接口单独写在一个subapp里面,通过mount挂载。
1 2 3 4 5 6 7
| app = FastAPI()
gzip_app = FastAPI() stream_app = FastAPI()
app.mount('/xx', gzip_app) app.mount('/xx', stream_app)
|
测试接口
https://dev.lllyyb.com/sse/
版权声明:本文首发于
lybtt的博客,转载请注明出处!