SSE 入门

SSE 介绍

服务端发送事件(Server-Sent Events, SSE) 是一种基于 HTTP 的协议,允许服务器向客户端主动推送实时、单向文本数据流。它是一种轻量级技术,适用于聊天、股票行情或实时通知等场景。核心特点为:持久连接、支持自动重连、原生使用 UTF-8 编码的纯文本流。

  • 核心特性
    • 基于 HTTP:不需要特定协议,使用现有的 HTTP 协议
    • 单向通信:数据只能从服务端推送到客户端
    • 持久连接:使用 EventSource API 保持长连接,无需反复发起请求
    • 简单且标准:相较于 WebSocket,SSE 实现更简单,主要用于单向消息推送
  • 技术实现
    • 服务器响应:必须将响应头设置为 Content-Type: text/event-stream
    • 数据格式:每条消息用 \n\n 分隔,消息由以下字段组成
      • data:内容
      • event:事件类型
      • id:数据ID
      • retry:重连时间
    • 客户端 API:浏览器通过 new EventSource(url) 接收数据
  • 适用场景
    • 实时仪表盘
    • 推送通知
    • 聊天:单向推送聊天消息

和 WebSocket 对比

Server-Sent Events (SSE) (单向) 和 WebSocket (WS) (双向) 均用于实时Web应用。SSE 基于HTTP协议,轻量、支持自动重连,适用股票、新闻单向推送;WebSocket 是独立的全双工协议,延迟更低、能力更强,适用于聊天、互动游戏双向交互。

  • 通信方向:SSE 是单向通信(服务端 ➡️ 客户端);WS 为全双工双向(服务器 ⬅️➡️ 客户端)
  • 协议基础:SSE 基于标准 HTTP 协议;WS 是基于 TCP 的独立协议
  • 复杂性:SSE 实现简单,支持自动重连、数据类型轻量;WS 实现复杂,需要独立服务器和更复杂的连接管理
  • 适用场景:
    • SSE:价格实时更新、新闻 FEED、实时看板、消息通知推送等
    • WS:实时聊天、多人在线游戏、协作编辑等
  • 性能与连接:WS 适合频繁的双向传输,延迟更低;SSE 适合服务器主动推送数据的轻量级场景

AXUM 既支持 SSE 也支持 WebSocket 的服务实现

首先,我们需要安装依赖:

[dependencies]
anyhow = "1.0.102"
axum = "0.8.9"
chrono = { version = "0.4.44", features = ["serde"] }
futures-util = { version = "0.3.32", default-features = false, features = ["sink", "std"] }
serde = { version = "1.0.228", features = ["derive"] }
tokio = { version = "1.52.1", features = ["rt-multi-thread", "macros", "time", "net"] }
tokio-stream = "0.1.18"

handler:

// server-time/src/handler.rs

use std::{convert::Infallible, time::Duration};

use axum::response::{Html, Sse, sse::Event};
use futures_util::{Stream, stream};
use tokio_stream::StreamExt;

pub async fn sse_handler() -> Sse<impl Stream<Item = Result<Event, Infallible>>> { // 1️⃣
    let stream = stream::repeat_with(|| {
        let now = chrono::Utc::now().to_rfc3339();
        Event::default().data(now)
    })
    .map(Ok)
    .throttle(Duration::from_secs(1)); // 2️⃣

    Sse::new(stream).keep_alive( // 3️⃣
        axum::response::sse::KeepAlive::new()
            .interval(Duration::from_secs(3600))
            .text("keep-alive-text"),
    )
}

pub async fn index() -> Html<String> {
    let html = include_str!("../asset/index.html"); // 4️⃣

    Html(html.to_string())
}

主函数:

// server-time/src/main.rs

use axum::{Router, routing::get};
use tokio::net::TcpListener;

mod handler;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let listener = TcpListener::bind("0.0.0.0:9527").await?;

    let app = Router::new()
        .route("/", get(handler::index)) // 1️⃣
        .route("/api/sse", get(handler::sse_handler)); // 2️⃣

    axum::serve(listener, app).await?;

    Ok(())
}

  • 1️⃣ 首页的路由,它将显示一个 HTML 页面
  • 2️⃣ SSE 的路由

我们来看一下首页的 HTML,它使用原生 JS 实现了一个简单的 SSE 客户端:

 <!-- server-time/asset/index.html -->

<!doctype html>
<html lang="en">
  <head>
    <meta charset="UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>SSE:服务器推送时间</title>
  </head>

  <body>
    <h1>SSE:服务器推送时间</h1>
    <textarea
      id="data"
      style="width: 36rem; height: 18rem; font-size: larger"
    ></textarea>
    <script>
      const eventSource = new EventSource("/api/sse"); // 1️⃣
      eventSource.onmessage = (event) => { // 2️⃣
        console.log(event.data);
        document.getElementById("data").value =
          new Date(event.data).toLocaleString() +
          "\n" +
          document.getElementById("data").value;
      };
    </script>
  </body>
</html>

  • 1️⃣ 创建 SSE 连接
  • 2️⃣ 当接收到服务器推送过来的数据时,会由该函数进行处理

使用 React 实现 SSE 客户端

// server-time-ui/src/App.tsx

import { useEffect, useState } from "react";

export default function App() {
  const [dateList, setDateList] = useState<string[]>([]); // 1️⃣
  useEffect(() => { // 2️⃣
    const eventSource = new EventSource("/api/sse"); // 3️⃣
    eventSource.onmessage = (event: MessageEvent<string>) => { // 4️⃣
      setDateList((list) => [event.data, ...list]);
      console.log(event.data);
    };
    return () => eventSource.close(); // 5️⃣
  }, []);
  return (
    <div className="container mx-auto p-3 my-3 space-y-6">
      <h1 className="text-3xl">SSE:服务器推送时间</h1>
      <div className="outline-0 ring ring-gray-400 rounded w-xl h-72 overflow-y-auto px-2 py-1 font-mono">
        {dateList
          .map((d) => new Date(d))
          .map((date, idx) => (
            <div key={idx}>{date.toLocaleString()}</div>
          ))}
      </div>
    </div>
  );
}
  • 1️⃣ 定义一个状态数组,用于维护服务器通过 SSE 推送过来的服务器时间
  • 2️⃣ 定义一个 Effect 钩子,它将在页面组件挂载后自动执行
  • 3️⃣ 连接 SSE 服务端
  • 4️⃣ 处理 SSE 服务端推送过来的数据
  • 5️⃣ 组件卸载时,关闭 SSE 连接

依赖:

[dev-dependencies]
eventsource-stream = "0.2"
reqwest = { version = "0.13", features = ["stream"] }

代码:

// server-time/examples/sse-client.rs

use eventsource_stream::Eventsource;
use tokio_stream::StreamExt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut cli = reqwest::Client::new()
        .get("http://127.0.0.1:9527/api/sse")
        .send()
        .await?
        .bytes_stream()
        .eventsource();

    while let Some(event) = cli.next().await {
        match event {
            Ok(event) => println!("收到事件:{:?}", event),
            Err(err) => eprintln!("发生错误:{:?}", err),
        }
    }

    Ok(())
}

本章代码位于01.入门分支

要查看完整内容,请先登录