Realtime over WebSocket
Build a full-duplex WebSocket feature with an echo handler and a server-push ticker, then compare it to the SSE alternative.
Realtime over WebSocket
This tutorial builds a realtime feature over a full-duplex WebSocket
connection. It is grounded in the runnable
examples/websocket
crate, which serves two endpoints:
GET /ws/echo— reads each client message and echoes it backGET /ws/tick— pushes atick #Nmessage to the client once per second
These are the two halves of any chat-style feature: receiving messages from a client, and pushing messages to it on a schedule or from another source. WebSocket ships in the default feature set on the tokio runtime, so no flag is needed.
If you have not served a handler before, read the Quickstart first.
1. The full example
A WebSocket endpoint in Tako is an ordinary HTTP handler that returns
TakoWs::new(req, closure). The closure receives an upgraded socket and drives
the conversation. This is the whole file:
use std::time::Duration;
use futures_util::SinkExt;
use futures_util::StreamExt;
use tako::Method;
use tako::responder::Responder;
use tako::types::Request;
use tako::ws::TakoWs;
use tokio_stream::wrappers::IntervalStream;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::tungstenite::Utf8Bytes;
pub async fn ws_echo(req: Request) -> impl Responder {
TakoWs::new(req, |mut ws| async move {
let _ = ws.send(Message::Text("Welcome to Tako WS!".into())).await;
while let Some(Ok(msg)) = ws.next().await {
match msg {
Message::Text(txt) => {
let _ = ws
.send(Message::Text(Utf8Bytes::from(format!("Echo: {txt}"))))
.await;
}
Message::Binary(bin) => {
let _ = ws.send(Message::Binary(bin)).await;
}
Message::Ping(p) => {
let _ = ws.send(Message::Pong(p)).await;
}
Message::Close(_) => {
let _ = ws.send(Message::Close(None)).await;
break;
}
_ => {}
}
}
})
}
pub async fn ws_tick(req: Request) -> impl Responder {
TakoWs::new(req, |mut ws| async move {
let mut ticker = IntervalStream::new(tokio::time::interval(Duration::from_secs(1))).enumerate();
loop {
tokio::select! {
msg = ws.next() => {
match msg {
Some(Ok(Message::Close(_))) | None => break,
_ => {}
}
}
Some((i, _)) = ticker.next() => {
let _ = ws.send(Message::Text(Utf8Bytes::from(format!("tick #{i}")))).await;
}
}
}
})
}
#[tokio::main]
async fn main() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:8080")
.await
.unwrap();
let mut router = tako::router::Router::new();
router.route(Method::GET, "/ws/echo", ws_echo);
router.route(Method::GET, "/ws/tick", ws_tick);
tako::serve(listener, router).await;
}
2. How the upgrade works
TakoWs::new(req, |ws| async move { … }) is a Responder: when
the handler returns it, Tako performs the WebSocket handshake on the incoming
Request and then runs your closure with the upgraded connection. Inside the
closure, ws is both a Stream of inbound Messages and a Sink you can
send outbound Messages into — the futures_util::{StreamExt, SinkExt} traits
provide .next() and .send().
The echo handler is a straight read-respond loop:
while let Some(Ok(msg)) = ws.next().await {
match msg {
Message::Text(txt) => {
let _ = ws.send(Message::Text(format!("Echo: {txt}").into())).await;
}
Message::Binary(bin) => { let _ = ws.send(Message::Binary(bin)).await; }
Message::Ping(p) => { let _ = ws.send(Message::Pong(p)).await; }
Message::Close(_) => { let _ = ws.send(Message::Close(None)).await; break; }
_ => {}
}
}The Message type comes from tokio-tungstenite. Handling Ping/Close
explicitly keeps the connection healthy and shuts it down cleanly.
3. Server-initiated pushes
A chat feed has to push messages the client did not ask for — a new message from
another user, a periodic heartbeat. The ws_tick handler shows the pattern:
tokio::select! races the inbound stream against a timer, so the task both
reacts to the client (closing when it disconnects) and pushes on its own
schedule:
loop {
tokio::select! {
msg = ws.next() => match msg {
Some(Ok(Message::Close(_))) | None => break,
_ => {}
},
Some((i, _)) = ticker.next() => {
let _ = ws.send(Message::Text(format!("tick #{i}").into())).await;
}
}
}To turn this into a real chat fan-out, replace the ticker arm with a
tokio::sync::broadcast::Receiver: every connected socket subscribes to a shared
broadcast::Sender, and posting a message sends it to every subscriber. The
select! skeleton stays identical — one arm reads the client, the other reads
the broadcast channel.
4. The Cargo.toml
The example pulls in the umbrella crate plus the stream/codec helpers it uses in the closures:
[dependencies]
futures-util = "0.3"
tako-rs = "2"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
tokio-tungstenite = "0.29"tako-rs re-exports the WebSocket support as tako::ws::TakoWs. futures-util
provides the Stream/Sink extension traits, and tokio-tungstenite provides
the Message enum used on the wire.
5. Running it
cargo runConnect with any WebSocket client. With websocat:
# echo
websocat ws://127.0.0.1:8080/ws/echo
# type a line; the server replies "Echo: <line>"
# server push
websocat ws://127.0.0.1:8080/ws/tick
# prints "tick #0", "tick #1", … once per secondIn the browser, the standard WebSocket API works unchanged:
const ws = new WebSocket("ws://127.0.0.1:8080/ws/echo");
ws.onmessage = (e) => console.log(e.data);
ws.onopen = () => ws.send("hello");Tuning the connection
TakoWs is a builder. Before returning it you can constrain the handshake and
the frames — protocols, max_frame_size, max_message_size,
allowed_origins, upgrade_timeout, and keep_alive(WsKeepAlive). These guard
against oversized frames and slow-loris upgrades on a public endpoint. See the
WebSocket transport reference for the full builder.
WebSocket runs on tokio by default and on compio behind the compio-ws feature.
HTTP/2 WebSocket (RFC 8441) and the deflate extension are covered in the
transport reference.
When to use SSE instead
If the data only flows server → client — a live ticker, a log tail, a
progress feed — a WebSocket is more machinery than you need. Server-Sent
Events give you a one-way text/event-stream from an
ordinary HTTP handler, with browser auto-reconnect built in, and also ship in the
default feature set:
use futures_util::stream;
use tako::responder::Responder;
use tako::sse::{Sse, SseEvent};
use tako::types::Request;
async fn feed(_: Request) -> impl Responder {
let events = stream::iter([
SseEvent::data("user joined").event("presence"),
SseEvent::data("hello").event("message"),
]);
Sse::events(events)
}Reach for a WebSocket when the client also needs to send — chat input, collaborative editing, game state. Reach for SSE when it only needs to listen.
Next steps
- WebSocket transport — the full
TakoWsbuilder and configuration. - Server-Sent Events — the one-way streaming alternative.
- Building a REST API — the companion tutorial.
- Deployment — ship the binary.