fastapi sse 实现打印机式流式输出

安装依赖

pip install sse-starlette==2.0.0

示例代码

import asyncio
from sse_starlette import EventSourceResponse

@router.get('/stream/')
async def stream():
    async def generate_response_data():
        for i in range(10):
            yield i
            await asyncio.sleep(1)

    return EventSourceResponse(generate_response_data())

会有坑的地方

from fastapi.middleware.gzip import GZipMiddleware

app.add_middleware(GZipMiddleware, minimum_size=1000)

如果该app加上了gzip压缩的中间件的话,会导致sse效果失效(原因暂不明)。

解决方法

把需要sse输出的接口单独写在一个subapp里面,通过mount挂载。

app = FastAPI()

gzip_app = FastAPI()
stream_app = FastAPI()

app.mount('/xx', gzip_app)
app.mount('/xx', stream_app)

测试接口

https://dev.lllyyb.com/sse/