查看原文
其他

解密 SSE,用 Python 像 ChatGPT 一样返回流式响应

The following article is from 古明地觉的编程教室 Author 古明地觉

我们知道目前的 HTTP/1.1 采用的是标准的请求-响应模型,客户端主动发请求,服务端被动地返回响应。这种模型在客户端需要实时获取结果的场景下是不合适的,因为这意味着客户端需要不断地轮询,所以最好的做法是服务端生成结果之后,主动推送给客户端。

比如 ChatGPT,它在生成内容时,也是生成一部分,就主动向客户端推送一部分。而在这个过程中,客户端不需要做任何事情,只需等待 ChatGPT 服务端返回内容即可。

说到这儿,你肯定想到了 WebSocket,没错这是一种解决方案。但 WebSocket 太重了,它和 HTTP 都是基于 TCP 的应用层传输协议,只不过握手的时候搭了 HTTP 的便车,利用 HTTP 本身的协议升级特性,伪装成 HTTP,这样就能绕过浏览器沙箱、网络防火墙等限制。

当完成握手之后,后续传输的数据就不再是 HTTP 报文,而是 WebSocket 格式的二进制帧。所以这两者完全是不同的协议,那有没有一种办法,我们仍然使用 HTTP 协议,同时还能让服务端主动推送数据呢?

答案是有的,也就是本文将要介绍的 SSE 技术,它的英文全称是 Server-Sent Events(服务端推送事件)。通过 SSE 可以让服务端即时推送数据到客户端,而不需要客户端轮询服务端以获取更新。

到这你可能会问,那 WebSocket 和 SSE 有什么区别呢?

1)通信方式

WebSocket 提供全双工通信,服务端和客户端都可以在同一个连接上同时发送和接收数据。最重要的是,WebSocket 独立于 HTTP 协议,尽管它开始于一个 HTTP 握手。

SSE 仅提供服务端到客户端的单向通信,客户端不能通过 SSE 给服务端发信息。

2)协议和实现

WebSocket 使用自己的协议(ws:// 或 wss://),需要服务端和客户端都支持,并且协议比较复杂。

SSE 则是使用标准的 HTTP 协议,实现起来更简单,尤其是在服务端。

3)适用场景

WebSocket 适用于服务端和客户端之间双向实时通信的场景,如在线游戏、聊天应用等。

SSE 适用于服务端向客户端单向推送数据的场景,如消息通知、数据更新。并且 SSE 自动支持断线重连,而 WebSocket 则需要额外部署。

4)复杂性和资源使用

WebSocket 由于其双向通信的能力,通常比 SSE 更复杂,可能需要更多的资源来维护和管理连接。

SSE 因为其单向性和基于 HTTP 的特性,它可以利用现有的网络基础设施,如代理服务器、负载均衡器和防火墙等等,通常更容易实现和维护。



相信现在你已经明白 SSE 是做什么的了,它的目的就是让服务端能够主动推送数据给客户端。如果不需要和服务端动态交互,只是希望服务端在有数据的时候推过来,那么 WebSocket 就有些太重了,因为这意味着要替换 HTTP 协议,而使用 SSE 无疑是更好的选择。

SSE 是什么我们已经知道了,那它是怎么实现的呢?原理是什么呢?

1)建立连接

客户端发起一个标准的 HTTP 请求来开启 SSE 会话,这个请求的特殊之处在于它包含一个头字段。

Accept: text/event-stream

相当于客户端告诉服务端,期望接收 SSE 消息流。而服务端在看到该字段时,也知道这是一个 SSE 请求,于是立即向客户端返回响应头,注意:返回的只有响应头,里面会包含如下头字段。

Content-Type: text/event-stream

响应头返回之后标志着 SSE 连接成功建立,并且连接会保持开放状态,服务端后续可以随时通过此连接向客户端发送数据。此外当连接不小心断开时,客户端也会自动进行重连。

所以在普通的 HTTP 请求中,一旦服务端返回,那么请求结束了。虽然可以将 Connection 头字段设置为 keep-alive 保证连接不断开,但每次访问都包含了 HTTP 请求/响应的完整过程。

而在 SSE 中,服务端会保持一个开放的连接,只要有新数据可用,就会直接发送给客户端。所以服务端会将响应以流的形式发送给客户端,每次发送的消息都是响应流的一部分,而不是独立的 HTTP 响应。

因此 SSE 的服务端在发送数据时,并不遵循传统的一次请求,一次响应模式。它在建立连接之后会保持连接开放,并通过这个持续的连接流式地发送数据,这种方式就使得 SSE 非常适合实时数据推送的场景。

2)发送消息

客户端发送请求,服务端返回响应头之后,SSE 连接就建立成功了。此时客户端只需要躺平,安静地等待服务端的输出即可。所以现在的关键就在于服务端要返回什么格式的数据呢?很简单,一个基本的消息由以下几部分组成:

  • data:实际的消息数据;

  • id:可选,消息的唯一标识符,用于在连接重新建立时同步消息;

  • event:可选,定义事件类型,用于客户端区分消息的类型;

  • retry:可选,自动重连的时间(毫秒),如果连接中断,客户端在自动重新连接之前,需要等待多长时间;


注意:每个消息要以两个换行符(\n\n)结束,举个例子,我们发送一个 Hello World。

data: Hello World\n\n

也可以发送带有事件类型的消息:

event: userUpdate
data: {"username": "Serpen", "age": 18}\n\n

还是比较简单的,服务端可以保持连接并随时发送更多数据。然后客户端在收到时会进行处理,但不需要(也不能)对服务端作出任何回应,它只需要被动地接收来自服务端的数据即可。当服务端认为数据已经全部发送完毕、无需再发时,那么便可以主动断开连接。


关于 SSE 的原理我们就解释清楚了,下面来实际编程实现它,这里我们先使用原生的 asyncio 实现 SSE。

import asyncio
from asyncio import StreamReader, StreamWriter

class SSE:

    def __init__(self, host="0.0.0.0", port=9999):
        self.host = host
        self.port = port

    @staticmethod
    def parse_request_headers(data: bytes) -> dict:
        """
        此函数负责从原始字节流中解析出请求头
        """
        headers = data.split(b"\r\n\r\n")[0].split(b"\r\n")
        header_dict = {}
        for header in headers[1:]:
            key, val = header.decode("utf-8").split(":", 1)
            header_dict[key.lower()] = val.strip()
        return header_dict

    async def handler_requests(self,
                               reader: StreamReader,
                               writer: StreamWriter):
        """
        负责处理来自客户端的请求
        每来一个客户端连接,就会基于此函数创建一个协程
        并且自动传递两个参数:reader 和 writer
        reader.read  负责读取数据,等价于 socket.recv
        writer.write 负责发送数据,等价于 socket.send
        """
        # 获取客户端的请求报文,这里对请求方法、请求地址不做限制
        data = await reader.readuntil(b"\r\n\r\n")
        # 解析出请求头
        request_headers = self.parse_request_headers(data)
        # 简单检测一下 accept 字段,如果不是建立 SSE,那么直接关闭连接
        if request_headers.get("accept") != "text/event-stream":
            writer.close()
            return await writer.wait_closed()
        # 如果是 SSE 连接,那么返回响应头
        response_header = (
            b"HTTP/1.1 200 OK\r\n"
            b"Content-Type: text/event-stream\r\n"
            b"Cache-Control: no-cache\r\n"
            b"Connection: keep-alive\r\n"
            b'Access-Control-Allow-Origin: *\r\n'
            b"\r\n"
        )
        writer.write(response_header)
        await writer.drain()

        # 然后便可以不断地向客户端返回数据了
        for _ in range(5):
            # 每隔 1 秒返回数据
            data = "data: 高老师总能分享出好东西\r\n\r\n".encode("utf-8")
            writer.write(data)
            await writer.drain()
            await asyncio.sleep(1)
        # 数据传输完毕
        writer.close()
        await writer.wait_closed()

    async def __create_server(self):
        # 创建服务,第一个参数是一个回调函数
        # 当连接过来的时候就会根据此函数创建一个协程
        # 后面是绑定的 ip 和 端口
        server = await asyncio.start_server(self.handler_requests,
                                            self.host,
                                            self.port)
        # 然后开启无限循环
        async with server:
            await server.serve_forever()

    def run_server(self):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.__create_server())

if __name__ == '__main__':
    sse = SSE()
    sse.run_server()

服务端代码编写完毕,下面编写前端代码。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <style>
        #data {
            font-weight: bold;
            color: cadetblue;
            font-size: large;
        }
    </style>
</head>
<body>
    <h1>SSE Test</h1>
    <div id="data"></div>
    <script>
        document.addEventListener("DOMContentLoaded", function () {
            // 和服务端建立 SSE 连接
            var eventSource = new EventSource("http://localhost:9999");

            eventSource.onmessage = function (e) {
                // 将数据渲染在 <div id="data"></div> 的内部
                var data = e.data + "\n";
                document.getElementById('data').innerText += data;
            };

            eventSource.onerror = function (e) {
                console.error('Error occurred:', e);
                eventSource.close();
            };
        });
    </script>
</body>
</html>

代码编写完毕,我们用浏览器打开 HTML 文件,便可看到如下效果。

以上我们就简单实现了 SSE,当然为了加深印象,这里的后端是使用原生的 asyncio 编写的,但在工作中,我们会使用现成的 Web 框架,比如 FastAPI,Blacksheep 等等。

需要说明的是,虽然通过 SSE 技术可以实现类似 ChatGPT 的效果,但 ChatGPT 内部并没有用到 SSE,它内部是基于 HTTP 的分块传输实现的。因为 SSE 只能通过 GET 请求发出,并且无法自定义请求头。

如果想实现 ChatGPT 的效果,需要使用 HTTP 的分块传输。而像 FastAPI、BlackSheep 等框架提供的流式响应,便是基于 HTTP 的分块传输实现的,比如 FastAPI:

import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import uvicorn

app = FastAPI()
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

async def event_generator():
    for _ in range(5):
        # 每隔 1 秒返回数据
        data = "data: 高老师总能分享出好东西\r\n\r\n".encode("utf-8")
        yield data
        await asyncio.sleep(1)

@app.get("/")
async def sse():
    return StreamingResponse(event_generator(), 
                             media_type="text/event-stream")

if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=9999)

首先之前的前端代码依旧可以正常访问,通过修改数据格式和 Content-Type 可以让其支持 SSE。但最正确的做法是直接访问 localhost:9999,效果如下:

所以基于 StreamingResponse 可以实现 SSE,也可以直接访问。而直接访问的话,此时里面的 data: 和 \r\n 就是实体数据的一部分。并且这种方式和 ChatGPT 的工作机制是相似的,都使用了 HTTP 的分块传输,支持所有的请求方法,而 SSE 只支持 GET 请求。

BlackSheep 也是类似的,它同样也支持流式响应。

import asyncio
from blacksheep import Application, Response, StreamedContent
import uvicorn

app = Application()
app.use_cors(
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

async def event_generator():
    for _ in range(5):
        # 每隔 1 秒返回数据
        data = "data: 高老师总能分享出好东西\r\n\r\n".encode("utf-8")
        yield data
        await asyncio.sleep(1)

@app.router.get("/")
async def sse():
    return Response(
        200,
        content=StreamedContent(b"text/event-stream", event_generator),
    )

if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=9999)

可以测试一下,效果是一样的。如果你不想实现 SSE,只是希望固定的数据以流的形式一点一点返回,那么记得将数据中多余的 data: 和 \r\n 给去掉,并最好修改 Content-Type 为合适的类型。

所以 SSE 一般用于需要服务端推数据,但数据不知道什么时候会过来,于是通过 SSE 保持连接开放。后续当服务端有数据了,直接通过连接发送给客户端即可。

而 FastAPI 和 BlackSheep 提供的流式响应更像是,返回的数据比较庞大,如果全部准备好再一次性返回,会让用户陷入长时间的等待,造成不好的体验。于是通过分块传输,准备好一部分就返回一部分。虽然整体时间没变,但可以让用户立刻获取到数据,从而提升用户体验。

比如 ChatGPT,当它回答的内容比较多的时候,那么整个过程耗费几十秒钟是常有的事情,假设 30 秒。相比让用户等待 30 秒,然后内容一下子刷出来,显然生成一部分返回一部分这种方式更让人喜欢。

因此使用 SSE 还是流式响应,则取决于你当前的业务。如果你返回的数据是确定的,只是准备的时间比较长,或者数据量比较大,那么推荐使用流式响应。

至于 SSE,在这些现成的 Web 框架里面,也可以通过流式响应来实现,只需要将 Content-Type 设置为 text/event-stream,并将数据加上前缀 data: 和后缀 \r\n\r\n。

但说实话,如果想实现 SSE,不建议通过流式响应来实现,而是使用专门的库。以 FastAPI 为例:

from sse_starlette.sse import EventSourceResponse

FastAPI 其实就是在 starlette 的基础上套了一层壳,通过安装 sse_starlette 可以让 FastAPI 更好地支持 SSE。

以上就是本文的内容,如果对你有帮助,就点个赞吧。

推荐阅读  点击标题可跳转

1、Polars (最强Pandas平替)

2、小白看得懂的 Transformer (图解)

3、大厂是如何防止订单重复支付的?三分钟彻底搞懂!

继续滑动看下一个
向上滑动看下一个

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存