🐙 tako
Tutorials

Realtime over WebSocket

Build a full-duplex WebSocket feature with an echo handler and a server-push ticker, then compare it to the SSE alternative.

Realtime over WebSocket

This tutorial builds a realtime feature over a full-duplex WebSocket connection. It is grounded in the runnable examples/websocket crate, which serves two endpoints:

  • GET /ws/echo — reads each client message and echoes it back
  • GET /ws/tick — pushes a tick #N message to the client once per second

These are the two halves of any chat-style feature: receiving messages from a client, and pushing messages to it on a schedule or from another source. WebSocket ships in the default feature set on the tokio runtime, so no flag is needed.

If you have not served a handler before, read the Quickstart first.

1. The full example

A WebSocket endpoint in Tako is an ordinary HTTP handler that returns TakoWs::new(req, closure). The closure receives an upgraded socket and drives the conversation. This is the whole file:

use std::time::Duration;

use futures_util::SinkExt;
use futures_util::StreamExt;
use tako::Method;
use tako::responder::Responder;
use tako::types::Request;
use tako::ws::TakoWs;
use tokio_stream::wrappers::IntervalStream;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::tungstenite::Utf8Bytes;

pub async fn ws_echo(req: Request) -> impl Responder {
  TakoWs::new(req, |mut ws| async move {
    let _ = ws.send(Message::Text("Welcome to Tako WS!".into())).await;

    while let Some(Ok(msg)) = ws.next().await {
      match msg {
        Message::Text(txt) => {
          let _ = ws
            .send(Message::Text(Utf8Bytes::from(format!("Echo: {txt}"))))
            .await;
        }
        Message::Binary(bin) => {
          let _ = ws.send(Message::Binary(bin)).await;
        }
        Message::Ping(p) => {
          let _ = ws.send(Message::Pong(p)).await;
        }
        Message::Close(_) => {
          let _ = ws.send(Message::Close(None)).await;
          break;
        }
        _ => {}
      }
    }
  })
}

pub async fn ws_tick(req: Request) -> impl Responder {
  TakoWs::new(req, |mut ws| async move {
    let mut ticker = IntervalStream::new(tokio::time::interval(Duration::from_secs(1))).enumerate();

    loop {
      tokio::select! {
          msg = ws.next() => {
              match msg {
                  Some(Ok(Message::Close(_))) | None => break,
                  _ => {}
              }
          }

          Some((i, _)) = ticker.next() => {
              let _ = ws.send(Message::Text(Utf8Bytes::from(format!("tick #{i}")))).await;
          }
      }
    }
  })
}

#[tokio::main]
async fn main() {
  let listener = tokio::net::TcpListener::bind("127.0.0.1:8080")
    .await
    .unwrap();
  let mut router = tako::router::Router::new();

  router.route(Method::GET, "/ws/echo", ws_echo);
  router.route(Method::GET, "/ws/tick", ws_tick);

  tako::serve(listener, router).await;
}

2. How the upgrade works

TakoWs::new(req, |ws| async move { … }) is a Responder: when the handler returns it, Tako performs the WebSocket handshake on the incoming Request and then runs your closure with the upgraded connection. Inside the closure, ws is both a Stream of inbound Messages and a Sink you can send outbound Messages into — the futures_util::{StreamExt, SinkExt} traits provide .next() and .send().

The echo handler is a straight read-respond loop:

while let Some(Ok(msg)) = ws.next().await {
  match msg {
    Message::Text(txt) => {
      let _ = ws.send(Message::Text(format!("Echo: {txt}").into())).await;
    }
    Message::Binary(bin) => { let _ = ws.send(Message::Binary(bin)).await; }
    Message::Ping(p)     => { let _ = ws.send(Message::Pong(p)).await; }
    Message::Close(_)    => { let _ = ws.send(Message::Close(None)).await; break; }
    _ => {}
  }
}

The Message type comes from tokio-tungstenite. Handling Ping/Close explicitly keeps the connection healthy and shuts it down cleanly.

3. Server-initiated pushes

A chat feed has to push messages the client did not ask for — a new message from another user, a periodic heartbeat. The ws_tick handler shows the pattern: tokio::select! races the inbound stream against a timer, so the task both reacts to the client (closing when it disconnects) and pushes on its own schedule:

loop {
  tokio::select! {
    msg = ws.next() => match msg {
      Some(Ok(Message::Close(_))) | None => break,
      _ => {}
    },
    Some((i, _)) = ticker.next() => {
      let _ = ws.send(Message::Text(format!("tick #{i}").into())).await;
    }
  }
}

To turn this into a real chat fan-out, replace the ticker arm with a tokio::sync::broadcast::Receiver: every connected socket subscribes to a shared broadcast::Sender, and posting a message sends it to every subscriber. The select! skeleton stays identical — one arm reads the client, the other reads the broadcast channel.

4. The Cargo.toml

The example pulls in the umbrella crate plus the stream/codec helpers it uses in the closures:

[dependencies]
futures-util = "0.3"
tako-rs = "2"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
tokio-tungstenite = "0.29"

tako-rs re-exports the WebSocket support as tako::ws::TakoWs. futures-util provides the Stream/Sink extension traits, and tokio-tungstenite provides the Message enum used on the wire.

5. Running it

cargo run

Connect with any WebSocket client. With websocat:

# echo
websocat ws://127.0.0.1:8080/ws/echo
# type a line; the server replies "Echo: <line>"

# server push
websocat ws://127.0.0.1:8080/ws/tick
# prints "tick #0", "tick #1", … once per second

In the browser, the standard WebSocket API works unchanged:

const ws = new WebSocket("ws://127.0.0.1:8080/ws/echo");
ws.onmessage = (e) => console.log(e.data);
ws.onopen = () => ws.send("hello");

Tuning the connection

TakoWs is a builder. Before returning it you can constrain the handshake and the frames — protocols, max_frame_size, max_message_size, allowed_origins, upgrade_timeout, and keep_alive(WsKeepAlive). These guard against oversized frames and slow-loris upgrades on a public endpoint. See the WebSocket transport reference for the full builder.

WebSocket runs on tokio by default and on compio behind the compio-ws feature. HTTP/2 WebSocket (RFC 8441) and the deflate extension are covered in the transport reference.

When to use SSE instead

If the data only flows server → client — a live ticker, a log tail, a progress feed — a WebSocket is more machinery than you need. Server-Sent Events give you a one-way text/event-stream from an ordinary HTTP handler, with browser auto-reconnect built in, and also ship in the default feature set:

use futures_util::stream;
use tako::responder::Responder;
use tako::sse::{Sse, SseEvent};
use tako::types::Request;

async fn feed(_: Request) -> impl Responder {
  let events = stream::iter([
    SseEvent::data("user joined").event("presence"),
    SseEvent::data("hello").event("message"),
  ]);
  Sse::events(events)
}

Reach for a WebSocket when the client also needs to send — chat input, collaborative editing, game state. Reach for SSE when it only needs to listen.

Next steps

On this page