WebSocket
Upgrade an HTTP request to a WebSocket inside a handler, then run the send/receive message loop on either runtime.
WebSocket
WebSocket is not a separate listener — it lives inside an HTTP handler on
whichever HTTP transport you already serve. TakoWs::new(req, fut) performs the
RFC-6455 server-side handshake and hands the upgraded stream to your closure.
The handler owns the stream and drives the message loop.
WebSocket is available on both runtimes: it is on by default for Tokio
(tako::ws), and behind the compio-ws feature for Compio (tako::ws_compio).
Tokio
On Tokio, TakoWs returns a tokio_tungstenite WebSocketStream. It is a
Sink + Stream of Message, so you use SinkExt::send and StreamExt::next.
use futures_util::SinkExt;
use futures_util::StreamExt;
use tako::Method;
use tako::responder::Responder;
use tako::types::Request;
use tako::ws::TakoWs;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::tungstenite::Utf8Bytes;
async fn ws_echo(req: Request) -> impl Responder {
TakoWs::new(req, |mut ws| async move {
let _ = ws.send(Message::Text("Welcome to Tako WS!".into())).await;
while let Some(Ok(msg)) = ws.next().await {
match msg {
Message::Text(txt) => {
let _ = ws
.send(Message::Text(Utf8Bytes::from(format!("Echo: {txt}"))))
.await;
}
Message::Binary(bin) => {
let _ = ws.send(Message::Binary(bin)).await;
}
Message::Ping(p) => {
let _ = ws.send(Message::Pong(p)).await;
}
Message::Close(_) => {
let _ = ws.send(Message::Close(None)).await;
break;
}
_ => {}
}
}
})
}
#[tokio::main]
async fn main() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:8080")
.await
.unwrap();
let mut router = tako::router::Router::new();
router.route(Method::GET, "/ws/echo", ws_echo);
tako::serve(listener, router).await;
}The handler is just another route — register it with router.route and serve
it on any HTTP transport. Because the closure returns a Responder, the upgrade
response (101 Switching Protocols) is produced for you; the upgraded socket is
driven on a spawned task once the handshake completes.
use std::time::Duration;
use futures_util::SinkExt;
use futures_util::StreamExt;
use tako::Method;
use tako::responder::Responder;
use tako::types::Request;
use tako::ws::TakoWs;
use tokio_stream::wrappers::IntervalStream;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::tungstenite::Utf8Bytes;
pub async fn ws_echo(req: Request) -> impl Responder {
TakoWs::new(req, |mut ws| async move {
let _ = ws.send(Message::Text("Welcome to Tako WS!".into())).await;
while let Some(Ok(msg)) = ws.next().await {
match msg {
Message::Text(txt) => {
let _ = ws
.send(Message::Text(Utf8Bytes::from(format!("Echo: {txt}"))))
.await;
}
Message::Binary(bin) => {
let _ = ws.send(Message::Binary(bin)).await;
}
Message::Ping(p) => {
let _ = ws.send(Message::Pong(p)).await;
}
Message::Close(_) => {
let _ = ws.send(Message::Close(None)).await;
break;
}
_ => {}
}
}
})
}
pub async fn ws_tick(req: Request) -> impl Responder {
TakoWs::new(req, |mut ws| async move {
let mut ticker = IntervalStream::new(tokio::time::interval(Duration::from_secs(1))).enumerate();
loop {
tokio::select! {
msg = ws.next() => {
match msg {
Some(Ok(Message::Close(_))) | None => break,
_ => {}
}
}
Some((i, _)) = ticker.next() => {
let _ = ws.send(Message::Text(Utf8Bytes::from(format!("tick #{i}")))).await;
}
}
}
})
}
#[tokio::main]
async fn main() {
let listener = tokio::net::TcpListener::bind("127.0.0.1:8080")
.await
.unwrap();
let mut router = tako::router::Router::new();
router.route(Method::GET, "/ws/echo", ws_echo);
router.route(Method::GET, "/ws/tick", ws_tick);
tako::serve(listener, router).await;
}
Compio
On Compio, enable the compio-ws feature and use TakoWsCompio. The upgraded
type is CompioWebSocket<UpgradedStream>, which exposes an explicit read()
method rather than the Stream interface.
use compio::ws::tungstenite::Message;
use tako::Method;
use tako::responder::Responder;
use tako::types::Request;
use tako::ws_compio::CompioWebSocket;
use tako::ws_compio::TakoWsCompio;
use tako::ws_compio::UpgradedStream;
async fn ws_echo(req: Request) -> impl Responder {
TakoWsCompio::new(req, |mut ws: CompioWebSocket<UpgradedStream>| async move {
let _ = ws.send(Message::Text("Welcome to Tako WS (compio)!".into())).await;
loop {
match ws.read().await {
Ok(Message::Text(txt)) => {
let _ = ws.send(Message::Text(format!("Echo: {txt}").into())).await;
}
Ok(Message::Binary(bin)) => {
let _ = ws.send(Message::Binary(bin)).await;
}
Ok(Message::Ping(p)) => {
let _ = ws.send(Message::Pong(p)).await;
}
Ok(Message::Close(_)) => {
let _ = ws.close(None).await;
break;
}
Ok(_) => {}
Err(_) => break,
}
}
})
}
#[compio::main]
async fn main() {
let listener = compio::net::TcpListener::bind("127.0.0.1:8080")
.await
.unwrap();
let mut router = tako::router::Router::new();
router.route(Method::GET, "/ws/echo", ws_echo);
tako::serve(listener, router).await;
}//! WebSocket example using compio runtime.
//!
//! Run with: cargo run --example websocket-compio --features compio-ws
use std::time::Duration;
use compio::ws::tungstenite::Message;
use tako::Method;
use tako::responder::Responder;
use tako::types::Request;
use tako::ws_compio::CompioWebSocket;
use tako::ws_compio::TakoWsCompio;
use tako::ws_compio::UpgradedStream;
pub async fn ws_echo(req: Request) -> impl Responder {
TakoWsCompio::new(req, |mut ws: CompioWebSocket<UpgradedStream>| async move {
if let Err(e) = ws
.send(Message::Text("Welcome to Tako WS (compio)!".into()))
.await
{
tracing::error!("Failed to send welcome message: {}", e);
return;
}
loop {
match ws.read().await {
Ok(Message::Text(txt)) => {
let response = format!("Echo: {}", txt);
if let Err(e) = ws.send(Message::Text(response.into())).await {
tracing::error!("Failed to send echo: {}", e);
break;
}
}
Ok(Message::Binary(bin)) => {
if let Err(e) = ws.send(Message::Binary(bin)).await {
tracing::error!("Failed to send binary: {}", e);
break;
}
}
Ok(Message::Ping(p)) => {
if let Err(e) = ws.send(Message::Pong(p)).await {
tracing::error!("Failed to send pong: {}", e);
break;
}
}
Ok(Message::Close(_)) => {
let _ = ws.close(None).await;
break;
}
Ok(_) => {}
Err(e) => {
tracing::error!("WebSocket error: {}", e);
break;
}
}
}
})
}
pub async fn ws_count(req: Request) -> impl Responder {
TakoWsCompio::new(req, |mut ws: CompioWebSocket<UpgradedStream>| async move {
let mut count: u64 = 0;
loop {
// Send current count
let msg = format!("count: {}", count);
if let Err(e) = ws.send(Message::Text(msg.into())).await {
tracing::error!("Failed to send count: {}", e);
break;
}
count += 1;
// Wait 1 second
compio::time::sleep(Duration::from_secs(1)).await;
// Check for close message (non-blocking read attempt)
// For simplicity, we just keep counting until connection drops
}
})
}
#[compio::main]
async fn main() {
let listener = compio::net::TcpListener::bind("127.0.0.1:8080")
.await
.unwrap();
let mut router = tako::router::Router::new();
router.route(Method::GET, "/ws/echo", ws_echo);
router.route(Method::GET, "/ws/count", ws_count);
println!("WebSocket server running at:");
println!(" - ws://127.0.0.1:8080/ws/echo");
println!(" - ws://127.0.0.1:8080/ws/count");
tako::serve(listener, router).await;
}
Configuration
On Tokio, TakoWs is a builder. Each option is a chained method before the
handler runs:
| Method | Effect |
|---|---|
.protocols(list) | Subprotocol allow-list; the first server-preferred match the client offers is echoed back. |
.max_frame_size(n) | Cap a single WebSocket frame, in bytes. |
.max_message_size(n) | Cap a reassembled message, in bytes. |
.allowed_origins(list) | Reject upgrades whose Origin is not on the list with 403. |
.upgrade_timeout(d) | Drop the task if the client never completes the upgrade. |
.max_lifetime(d) | Hard cap on total conversation lifetime after upgrade. |
.keep_alive(WsKeepAlive { .. }) | Surface ping_interval / pong_timeout hints to the handler. |
use std::time::Duration;
use tako::ws::TakoWs;
# fn _doc(req: tako::types::Request) -> impl tako::responder::Responder {
TakoWs::new(req, |ws| async move { /* ... */ })
.protocols(["chat", "superchat"])
.max_message_size(1 << 20)
.allowed_origins(["https://app.example.com"])
.upgrade_timeout(Duration::from_secs(10))
.max_lifetime(Duration::from_secs(3600))
# }WsKeepAlive (ping_interval, pong_timeout) is surfaced to the handler as a
hint — the framework does not run the ping loop itself, because the handler owns
the stream. For unconditional disconnection of an idle peer, prefer
max_lifetime.
Behind HTTP/2
WebSocket can also be served over an HTTP/2-or-TLS listener. The
examples/websocket-http2 example registers the same TakoWs handlers and
serves them with tako::serve_tls. The upgrade and message loop are identical;
only the transport you bind changes.
Related
- Routing — registering the upgrade handler as a route.
- HTTP — the listeners WebSocket rides on.
- Streams — SSE and other streaming responses.
- Feature flags — the
compio-wsfeature.