🐙 tako

Streams

Server-Sent Events, WebSocket, ranged file streaming, and WebTransport over HTTP/3 in tako.

Streams

Tako's streaming surface covers Server-Sent Events, WebSocket, file streaming with range / conditional GET, and WebTransport over HTTP/3. All four live in the tako-streams crate and are re-exported under tako::*.

Server-Sent Events

Sse::new(stream) accepts any Stream<Item = Into<Bytes>> and frames it as text/event-stream. Default headers include Cache-Control: no-cache plus X-Accel-Buffering: no so reverse proxies do not buffer the response.

use bytes::Bytes;
use futures_util::{StreamExt, stream};
use tako::Method;
use tako::responder::Responder;
use tako::router::Router;
use tako::sse::Sse;

async fn ticker() -> impl Responder {
  let s = stream::unfold(0u64, |i| async move {
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    Some((Bytes::from(format!("tick: {i}")), i + 1))
  });
  Sse::new(s)
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
  let listener = tokio::net::TcpListener::bind("127.0.0.1:8080").await?;
  let mut router = Router::new();
  router.route(Method::GET, "/events", ticker);
  tako::serve(listener, router).await;
  Ok(())
}

For structured SSE with event:, id:, and retry: fields, use Sse::events(stream) plus the SseEvent builder, and call .keep_alive(Duration::from_secs(15)) to emit :keep-alive comments during idle periods. tako::sse::last_event_id(headers) reads the Last-Event-ID header sent by EventSource clients on reconnect.

See SSE transport for the full transport reference.

WebSocket

TakoWs::new(req, fut) performs the upgrade from inside an HTTP handler. The closure receives a WebSocket half-duplex pair (tokio_tungstenite on tokio, compio_ws on compio) that you drive to completion:

use futures_util::{SinkExt, StreamExt};
use tako::Method;
use tako::responder::Responder;
use tako::types::Request;
use tako::ws::TakoWs;
use tokio_tungstenite::tungstenite::Message;

async fn ws_echo(req: Request) -> impl Responder {
  TakoWs::new(req, |mut ws| async move {
    while let Some(Ok(msg)) = ws.next().await {
      if let Message::Text(t) = msg {
        let _ = ws.send(Message::Text(t)).await;
      }
    }
  })
}

Subprotocol negotiation, max frame / message sizes, allowed origins, upgrade timeout, ping / pong keep-alive policy, and permessage-deflate all live on WebSocketConfig — pass it via TakoWs::with_config.

See WebSocket transport for the full transport reference.

File serving

tako::file_stream::FileStream serves a single file with strong ETag / If-None-Match / If-Modified-Since support and serves precompressed sidecars (<file>.br, <file>.gz) when present. tako::r#static::ServeDirBuilder mounts a directory with SPA fallback (fallback("/index.html")) and traversal hardening.

use anyhow::Result;
use tako::Method;
use tako::file_stream::FileStream;
use tako::responder::Responder;
use tako::router::Router;
use tako::types::Request;
use tokio::fs::File;
use tokio::net::TcpListener;
use tokio_util::io::ReaderStream;

async fn serve_file(_: Request) -> impl Responder {
  let file = File::open("test.txt").await.unwrap();
  let stream = ReaderStream::new(file);
  let file_stream = FileStream::new(stream, Some("test.txt".to_string()), None);
  file_stream.into_response()
}

#[tokio::main]
async fn main() -> Result<()> {
  let listener = TcpListener::bind("127.0.0.1:8080").await?;

  let mut router = Router::new();
  router.route(Method::GET, "/file", serve_file);

  tako::serve(listener, router).await;
  Ok(())
}

WebTransport

Behind the webtransport feature; requires http3. The type is also exported as RawQuicSession because the W3C WebTransport CONNECT handshake is deferred — the current implementation gives you raw QUIC bidi/uni streams over the same h3 listener.

See also:

Multipart byteranges and Linux sendfile(2) are deferred follow-up items.

On this page