🐙 tako
Transports

Server-Sent Events

Stream text/event-stream responses from any HTTP handler with the Sse responder, structured events, and proxy-safe keep-alive.

Server-Sent Events

Server-Sent Events (SSE) push a one-way, long-lived stream of text/event-stream frames from the server to the browser's EventSource. In Tako an SSE response is just a Responder returned from an ordinary HTTP handler, so it flows through the same Router, middleware, and runtime as the rest of your app. tako::sse::Sse lives in tako-rs-streams and works on both the tokio and compio runtimes — no feature flag required (it ships in the default set).

There are two construction paths:

  • Sse::new(stream) — the legacy raw-bytes path. Each Bytes item is wrapped as data: …\n\n.
  • Sse::events(stream) — the structured path. Each item is an SseEvent carrying any of event:, id:, retry:, comment, and data: fields.

A minimal event stream

Sse::new accepts any Stream<Item = Bytes> that is Send + 'static. Each yielded chunk becomes a single data: event:

use bytes::Bytes;
use futures_util::{StreamExt, stream};
use tako::Method;
use tako::responder::Responder;
use tako::router::Router;
use tako::sse::Sse;
use tako::types::Request;

async fn ticker(_: Request) -> impl Responder {
  let s = stream::unfold(0u64, |i| async move {
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    Some((Bytes::from(format!("tick: {i}")), i + 1))
  });
  Sse::new(s)
}

#[tokio::main]
async fn main() {
  let listener = tokio::net::TcpListener::bind("127.0.0.1:8080").await.unwrap();
  let mut router = Router::new();
  router.route(Method::GET, "/events", ticker);
  tako::serve(listener, router).await;
}

The browser side is the standard EventSource:

const es = new EventSource("/events");
es.onmessage = (e) => console.log(e.data);

Response headers

Both constructors emit a 200 OK response with headers tuned for streaming through reverse proxies:

  • Content-Type: text/event-stream
  • Cache-Control: no-cache, no-store, must-revalidate
  • Connection: keep-alive
  • X-Accel-Buffering: no — disables nginx response buffering, which would otherwise hold frames until the connection closes.

You do not set these yourself; the Responder impl builds the response.

Structured events

For named events, reconnection ids, or retry hints, build a Stream<Item = SseEvent> and hand it to Sse::events. SseEvent is a builder:

  • SseEvent::data(d) — a data: payload (multi-line strings are split into one data: field per line).
  • SseEvent::comment(c) — a : comment line, invisible to the EventSource.
  • SseEvent::retry(duration) — a retry: reconnection-delay hint.
  • .event(name) — sets the event: field so the client can addEventListener(name, …).
  • .id(value) — sets the id: field, which the browser echoes back as Last-Event-ID on reconnect.
use std::time::Duration;
use futures_util::stream;
use tako::responder::Responder;
use tako::sse::{Sse, SseEvent};
use tako::types::Request;

async fn feed(_: Request) -> impl Responder {
  let events = stream::iter([
    SseEvent::data("hello"),
    SseEvent::data("again").event("greeting").id("1"),
    SseEvent::retry(Duration::from_secs(5)),
  ]);
  Sse::events(events)
}

Sse::new does not sanitize embedded \n / \r in the raw bytes. A message containing \n\nevent:click\n\n is parsed by the browser as a second, synthetic event — a field-injection bug if the payload comes from untrusted input. The Sse::events path rebuilds every line with strict prefixes and strips CR, so it is safe for caller-controlled data. Reserve Sse::new for bytes you have already encoded yourself.

Keep-alive

Idle SSE connections can be closed by intermediaries that enforce read timeouts. Call .keep_alive(period) on either responder to interleave a :keepalive\n\n comment frame whenever the inner stream has been silent for period. The timer resets every time a real event is emitted, so active streams pay nothing.

use std::time::Duration;
use futures_util::stream;
use tako::sse::{Sse, SseEvent};

let events = stream::iter([SseEvent::data("hi")]);
let response = Sse::events(events).keep_alive(Duration::from_secs(15));

Keep-alive is backed by tokio::time::Sleep. It is available on the structured and raw paths through the same .keep_alive method.

Honoring reconnection

When a client reconnects it sends the last id: it saw back as the Last-Event-ID request header. Two helpers read it so you can resume from the right cursor:

  • tako::sse::last_event_id(headers) — returns the trimmed value as String when it is valid UTF-8.
  • tako::sse::last_event_id_bytes(headers) — returns the raw trimmed bytes, preserving non-UTF-8 (e.g. opaque binary cursors) that the UTF-8 helper would drop.
use tako::responder::Responder;
use tako::sse::{last_event_id, Sse, SseEvent};
use tako::types::Request;
use futures_util::stream;

async fn resume(req: Request) -> impl Responder {
  let cursor = last_event_id(req.headers()).unwrap_or_default();
  let events = stream::iter([
    SseEvent::data(format!("resuming after {cursor}")).id("42"),
  ]);
  Sse::events(events)
}

When to reach for something else

SSE is one-way (server → client), text-framed, and rides on a normal HTTP request, which makes it ideal for live tickers, log tails, and progress feeds. For full-duplex messaging use a WebSocket; for binary, datagram-style channels over QUIC use WebTransport. SSE also works over HTTP/3 — see the http3-sse example.

Examples

  • examples/streamsSse::new ticker alongside hand-rolled text/event-stream bodies.
  • examples/http3-sse — SSE served over HTTP/3.

On this page