- 支持试读
SSE 入门
本章我们将对服务端发送事件(Server-Sent Events, SSE)进行简单介绍,并使用 AXUM 实现一个简单的 SSE 服务:每秒钟向客户端推送服务器当前时间。同时,我们将分别介绍使用 Rust、原生 JS 和 React 编写 SSE 客户端。 站内通知:基础设施及异步回调构建流
本章我们开始站内通知的开发,除了准备项目的基础设施外,我们还要学习另一种构建流的方法。该方法的回调是一个异步函数,相对于前一章介绍的 `repeat_with(同步回调)`,该方法比较符合我们的预期。站内通知:SSE 状态共享及数据库访问
本章我们将介绍 SSE 状态共享及在 SSE 处理函数中访问数据库。
SSE 入门
- 3
- 2026-04-27 12:53:50
SSE 介绍
服务端发送事件(Server-Sent Events, SSE) 是一种基于 HTTP 的协议,允许服务器向客户端主动推送实时、单向文本数据流。它是一种轻量级技术,适用于聊天、股票行情或实时通知等场景。核心特点为:持久连接、支持自动重连、原生使用 UTF-8 编码的纯文本流。
- 核心特性
- 基于 HTTP:不需要特定协议,使用现有的 HTTP 协议
- 单向通信:数据只能从服务端推送到客户端
- 持久连接:使用
EventSourceAPI 保持长连接,无需反复发起请求 - 简单且标准:相较于 WebSocket,SSE 实现更简单,主要用于单向消息推送
- 技术实现
- 服务器响应:必须将响应头设置为
Content-Type: text/event-stream - 数据格式:每条消息用
\n\n分隔,消息由以下字段组成data:内容event:事件类型id:数据IDretry:重连时间
- 客户端 API:浏览器通过
new EventSource(url)接收数据
- 服务器响应:必须将响应头设置为
- 适用场景
- 实时仪表盘
- 推送通知
- 聊天:单向推送聊天消息
和 WebSocket 对比
Server-Sent Events (SSE) (单向) 和 WebSocket (WS) (双向) 均用于实时Web应用。SSE 基于HTTP协议,轻量、支持自动重连,适用股票、新闻单向推送;WebSocket 是独立的全双工协议,延迟更低、能力更强,适用于聊天、互动游戏双向交互。
- 通信方向:SSE 是单向通信(服务端 ➡️ 客户端);WS 为全双工双向(服务器 ⬅️➡️ 客户端)
- 协议基础:SSE 基于标准 HTTP 协议;WS 是基于 TCP 的独立协议
- 复杂性:SSE 实现简单,支持自动重连、数据类型轻量;WS 实现复杂,需要独立服务器和更复杂的连接管理
- 适用场景:
- SSE:价格实时更新、新闻 FEED、实时看板、消息通知推送等
- WS:实时聊天、多人在线游戏、协作编辑等
- 性能与连接:WS 适合频繁的双向传输,延迟更低;SSE 适合服务器主动推送数据的轻量级场景
AXUM 既支持 SSE 也支持 WebSocket 的服务实现
首先,我们需要安装依赖:
[dependencies]
anyhow = "1.0.102"
axum = "0.8.9"
chrono = { version = "0.4.44", features = ["serde"] }
futures-util = { version = "0.3.32", default-features = false, features = ["sink", "std"] }
serde = { version = "1.0.228", features = ["derive"] }
tokio = { version = "1.52.1", features = ["rt-multi-thread", "macros", "time", "net"] }
tokio-stream = "0.1.18"
handler:
// server-time/src/handler.rs
use std::{convert::Infallible, time::Duration};
use axum::response::{Html, Sse, sse::Event};
use futures_util::{Stream, stream};
use tokio_stream::StreamExt;
pub async fn sse_handler() -> Sse<impl Stream<Item = Result<Event, Infallible>>> { // 1️⃣
let stream = stream::repeat_with(|| {
let now = chrono::Utc::now().to_rfc3339();
Event::default().data(now)
})
.map(Ok)
.throttle(Duration::from_secs(1)); // 2️⃣
Sse::new(stream).keep_alive( // 3️⃣
axum::response::sse::KeepAlive::new()
.interval(Duration::from_secs(3600))
.text("keep-alive-text"),
)
}
pub async fn index() -> Html<String> {
let html = include_str!("../asset/index.html"); // 4️⃣
Html(html.to_string())
}
主函数:
// server-time/src/main.rs
use axum::{Router, routing::get};
use tokio::net::TcpListener;
mod handler;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let listener = TcpListener::bind("0.0.0.0:9527").await?;
let app = Router::new()
.route("/", get(handler::index)) // 1️⃣
.route("/api/sse", get(handler::sse_handler)); // 2️⃣
axum::serve(listener, app).await?;
Ok(())
}
- 1️⃣ 首页的路由,它将显示一个 HTML 页面
- 2️⃣ SSE 的路由
我们来看一下首页的 HTML,它使用原生 JS 实现了一个简单的 SSE 客户端:
<!-- server-time/asset/index.html -->
<!doctype html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>SSE:服务器推送时间</title>
</head>
<body>
<h1>SSE:服务器推送时间</h1>
<textarea
id="data"
style="width: 36rem; height: 18rem; font-size: larger"
></textarea>
<script>
const eventSource = new EventSource("/api/sse"); // 1️⃣
eventSource.onmessage = (event) => { // 2️⃣
console.log(event.data);
document.getElementById("data").value =
new Date(event.data).toLocaleString() +
"\n" +
document.getElementById("data").value;
};
</script>
</body>
</html>
- 1️⃣ 创建 SSE 连接
- 2️⃣ 当接收到服务器推送过来的数据时,会由该函数进行处理
使用 React 实现 SSE 客户端
// server-time-ui/src/App.tsx
import { useEffect, useState } from "react";
export default function App() {
const [dateList, setDateList] = useState<string[]>([]); // 1️⃣
useEffect(() => { // 2️⃣
const eventSource = new EventSource("/api/sse"); // 3️⃣
eventSource.onmessage = (event: MessageEvent<string>) => { // 4️⃣
setDateList((list) => [event.data, ...list]);
console.log(event.data);
};
return () => eventSource.close(); // 5️⃣
}, []);
return (
<div className="container mx-auto p-3 my-3 space-y-6">
<h1 className="text-3xl">SSE:服务器推送时间</h1>
<div className="outline-0 ring ring-gray-400 rounded w-xl h-72 overflow-y-auto px-2 py-1 font-mono">
{dateList
.map((d) => new Date(d))
.map((date, idx) => (
<div key={idx}>{date.toLocaleString()}</div>
))}
</div>
</div>
);
}
- 1️⃣ 定义一个状态数组,用于维护服务器通过 SSE 推送过来的服务器时间
- 2️⃣ 定义一个
Effect钩子,它将在页面组件挂载后自动执行 - 3️⃣ 连接 SSE 服务端
- 4️⃣ 处理 SSE 服务端推送过来的数据
- 5️⃣ 组件卸载时,关闭 SSE 连接
依赖:
[dev-dependencies]
eventsource-stream = "0.2"
reqwest = { version = "0.13", features = ["stream"] }
代码:
// server-time/examples/sse-client.rs
use eventsource_stream::Eventsource;
use tokio_stream::StreamExt;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut cli = reqwest::Client::new()
.get("http://127.0.0.1:9527/api/sse")
.send()
.await?
.bytes_stream()
.eventsource();
while let Some(event) = cli.next().await {
match event {
Ok(event) => println!("收到事件:{:?}", event),
Err(err) => eprintln!("发生错误:{:?}", err),
}
}
Ok(())
}
本章代码位于01.入门分支
