Streams
Server-Sent Events, WebSocket, ranged file streaming, and WebTransport over HTTP/3 in tako.
Streams
Tako's streaming surface covers Server-Sent Events, WebSocket,
file streaming with range / conditional GET, and WebTransport over
HTTP/3. All four live in the tako-streams crate and are re-exported
under tako::*.
Server-Sent Events
Sse::new(stream) accepts any Stream<Item = Into<Bytes>> and frames
it as text/event-stream. Default headers include Cache-Control: no-cache plus X-Accel-Buffering: no so reverse proxies do not
buffer the response.
use bytes::Bytes;
use futures_util::{StreamExt, stream};
use tako::Method;
use tako::responder::Responder;
use tako::router::Router;
use tako::sse::Sse;
async fn ticker() -> impl Responder {
let s = stream::unfold(0u64, |i| async move {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Some((Bytes::from(format!("tick: {i}")), i + 1))
});
Sse::new(s)
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let listener = tokio::net::TcpListener::bind("127.0.0.1:8080").await?;
let mut router = Router::new();
router.route(Method::GET, "/events", ticker);
tako::serve(listener, router).await;
Ok(())
}For structured SSE with event:, id:, and retry: fields, use
Sse::events(stream) plus the SseEvent builder, and call
.keep_alive(Duration::from_secs(15)) to emit :keep-alive comments
during idle periods. tako::sse::last_event_id(headers) reads the
Last-Event-ID header sent by EventSource clients on reconnect.
See SSE transport for the full transport reference.
WebSocket
TakoWs::new(req, fut) performs the upgrade from inside an HTTP
handler. The closure receives a WebSocket half-duplex pair
(tokio_tungstenite on tokio, compio_ws on compio) that you drive
to completion:
use futures_util::{SinkExt, StreamExt};
use tako::Method;
use tako::responder::Responder;
use tako::types::Request;
use tako::ws::TakoWs;
use tokio_tungstenite::tungstenite::Message;
async fn ws_echo(req: Request) -> impl Responder {
TakoWs::new(req, |mut ws| async move {
while let Some(Ok(msg)) = ws.next().await {
if let Message::Text(t) = msg {
let _ = ws.send(Message::Text(t)).await;
}
}
})
}Subprotocol negotiation, max frame / message sizes, allowed origins,
upgrade timeout, ping / pong keep-alive policy, and
permessage-deflate all live on WebSocketConfig — pass it via
TakoWs::with_config.
See WebSocket transport for the full transport reference.
File serving
tako::file_stream::FileStream serves a single file with strong
ETag / If-None-Match / If-Modified-Since support and serves
precompressed sidecars (<file>.br, <file>.gz) when present.
tako::r#static::ServeDirBuilder mounts a directory with SPA
fallback (fallback("/index.html")) and traversal hardening.
use anyhow::Result;
use tako::Method;
use tako::file_stream::FileStream;
use tako::responder::Responder;
use tako::router::Router;
use tako::types::Request;
use tokio::fs::File;
use tokio::net::TcpListener;
use tokio_util::io::ReaderStream;
async fn serve_file(_: Request) -> impl Responder {
let file = File::open("test.txt").await.unwrap();
let stream = ReaderStream::new(file);
let file_stream = FileStream::new(stream, Some("test.txt".to_string()), None);
file_stream.into_response()
}
#[tokio::main]
async fn main() -> Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
let mut router = Router::new();
router.route(Method::GET, "/file", serve_file);
tako::serve(listener, router).await;
Ok(())
}
WebTransport
Behind the webtransport feature; requires http3. The type is
also exported as RawQuicSession because the W3C WebTransport CONNECT
handshake is deferred — the current implementation gives you raw QUIC
bidi/uni streams over the same h3 listener.
See also:
examples/streamsandexamples/http3-ssefor SSE,examples/websocket,examples/websocket-http2, andexamples/websocket-compiofor WebSocket,examples/file-streamfor ranged file serving,examples/webtransportfor raw QUIC sessions.
Multipart byteranges and Linux sendfile(2) are deferred follow-up
items.