Server-Sent Events
Stream text/event-stream responses from any HTTP handler with the Sse responder, structured events, and proxy-safe keep-alive.
Server-Sent Events
Server-Sent Events (SSE) push a one-way, long-lived stream of text/event-stream
frames from the server to the browser's EventSource. In Tako an SSE response is
just a Responder returned from an ordinary HTTP handler, so it
flows through the same Router, middleware, and runtime as the
rest of your app. tako::sse::Sse lives in tako-rs-streams and works on both
the tokio and compio runtimes — no feature flag required (it ships in the
default set).
There are two construction paths:
Sse::new(stream)— the legacy raw-bytes path. EachBytesitem is wrapped asdata: …\n\n.Sse::events(stream)— the structured path. Each item is anSseEventcarrying any ofevent:,id:,retry:, comment, anddata:fields.
A minimal event stream
Sse::new accepts any Stream<Item = Bytes> that is Send + 'static. Each yielded
chunk becomes a single data: event:
use bytes::Bytes;
use futures_util::{StreamExt, stream};
use tako::Method;
use tako::responder::Responder;
use tako::router::Router;
use tako::sse::Sse;
use tako::types::Request;
async fn ticker(_: Request) -> impl Responder {
let s = stream::unfold(0u64, |i| async move {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Some((Bytes::from(format!("tick: {i}")), i + 1))
});
Sse::new(s)
}
#[tokio::main]
async fn main() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:8080").await.unwrap();
let mut router = Router::new();
router.route(Method::GET, "/events", ticker);
tako::serve(listener, router).await;
}The browser side is the standard EventSource:
const es = new EventSource("/events");
es.onmessage = (e) => console.log(e.data);Response headers
Both constructors emit a 200 OK response with headers tuned for streaming
through reverse proxies:
Content-Type: text/event-streamCache-Control: no-cache, no-store, must-revalidateConnection: keep-aliveX-Accel-Buffering: no— disables nginx response buffering, which would otherwise hold frames until the connection closes.
You do not set these yourself; the Responder impl builds the response.
Structured events
For named events, reconnection ids, or retry hints, build a Stream<Item = SseEvent>
and hand it to Sse::events. SseEvent is a builder:
SseEvent::data(d)— adata:payload (multi-line strings are split into onedata:field per line).SseEvent::comment(c)— a:comment line, invisible to theEventSource.SseEvent::retry(duration)— aretry:reconnection-delay hint..event(name)— sets theevent:field so the client canaddEventListener(name, …)..id(value)— sets theid:field, which the browser echoes back asLast-Event-IDon reconnect.
use std::time::Duration;
use futures_util::stream;
use tako::responder::Responder;
use tako::sse::{Sse, SseEvent};
use tako::types::Request;
async fn feed(_: Request) -> impl Responder {
let events = stream::iter([
SseEvent::data("hello"),
SseEvent::data("again").event("greeting").id("1"),
SseEvent::retry(Duration::from_secs(5)),
]);
Sse::events(events)
}Sse::new does not sanitize embedded \n / \r in the raw bytes. A message
containing \n\nevent:click\n\n is parsed by the browser as a second, synthetic
event — a field-injection bug if the payload comes from untrusted input. The
Sse::events path rebuilds every line with strict prefixes and strips CR, so it
is safe for caller-controlled data. Reserve Sse::new for bytes you have already
encoded yourself.
Keep-alive
Idle SSE connections can be closed by intermediaries that enforce read timeouts.
Call .keep_alive(period) on either responder to interleave a :keepalive\n\n
comment frame whenever the inner stream has been silent for period. The timer
resets every time a real event is emitted, so active streams pay nothing.
use std::time::Duration;
use futures_util::stream;
use tako::sse::{Sse, SseEvent};
let events = stream::iter([SseEvent::data("hi")]);
let response = Sse::events(events).keep_alive(Duration::from_secs(15));Keep-alive is backed by tokio::time::Sleep. It is available on the structured
and raw paths through the same .keep_alive method.
Honoring reconnection
When a client reconnects it sends the last id: it saw back as the Last-Event-ID
request header. Two helpers read it so you can resume from the right cursor:
tako::sse::last_event_id(headers)— returns the trimmed value asStringwhen it is valid UTF-8.tako::sse::last_event_id_bytes(headers)— returns the raw trimmed bytes, preserving non-UTF-8 (e.g. opaque binary cursors) that the UTF-8 helper would drop.
use tako::responder::Responder;
use tako::sse::{last_event_id, Sse, SseEvent};
use tako::types::Request;
use futures_util::stream;
async fn resume(req: Request) -> impl Responder {
let cursor = last_event_id(req.headers()).unwrap_or_default();
let events = stream::iter([
SseEvent::data(format!("resuming after {cursor}")).id("42"),
]);
Sse::events(events)
}When to reach for something else
SSE is one-way (server → client), text-framed, and rides on a normal HTTP request,
which makes it ideal for live tickers, log tails, and progress feeds. For
full-duplex messaging use a WebSocket; for binary,
datagram-style channels over QUIC use WebTransport.
SSE also works over HTTP/3 — see the http3-sse example.
Examples
examples/streams—Sse::newticker alongside hand-rolledtext/event-streambodies.examples/http3-sse— SSE served over HTTP/3.