🐙 tako
Transports

WebSocket

Upgrade an HTTP request to a WebSocket inside a handler, then run the send/receive message loop on either runtime.

WebSocket

WebSocket is not a separate listener — it lives inside an HTTP handler on whichever HTTP transport you already serve. TakoWs::new(req, fut) performs the RFC-6455 server-side handshake and hands the upgraded stream to your closure. The handler owns the stream and drives the message loop.

WebSocket is available on both runtimes: it is on by default for Tokio (tako::ws), and behind the compio-ws feature for Compio (tako::ws_compio).

Tokio

On Tokio, TakoWs returns a tokio_tungstenite WebSocketStream. It is a Sink + Stream of Message, so you use SinkExt::send and StreamExt::next.

use futures_util::SinkExt;
use futures_util::StreamExt;
use tako::Method;
use tako::responder::Responder;
use tako::types::Request;
use tako::ws::TakoWs;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::tungstenite::Utf8Bytes;

async fn ws_echo(req: Request) -> impl Responder {
  TakoWs::new(req, |mut ws| async move {
    let _ = ws.send(Message::Text("Welcome to Tako WS!".into())).await;

    while let Some(Ok(msg)) = ws.next().await {
      match msg {
        Message::Text(txt) => {
          let _ = ws
            .send(Message::Text(Utf8Bytes::from(format!("Echo: {txt}"))))
            .await;
        }
        Message::Binary(bin) => {
          let _ = ws.send(Message::Binary(bin)).await;
        }
        Message::Ping(p) => {
          let _ = ws.send(Message::Pong(p)).await;
        }
        Message::Close(_) => {
          let _ = ws.send(Message::Close(None)).await;
          break;
        }
        _ => {}
      }
    }
  })
}

#[tokio::main]
async fn main() {
  let listener = tokio::net::TcpListener::bind("127.0.0.1:8080")
    .await
    .unwrap();
  let mut router = tako::router::Router::new();
  router.route(Method::GET, "/ws/echo", ws_echo);
  tako::serve(listener, router).await;
}

The handler is just another route — register it with router.route and serve it on any HTTP transport. Because the closure returns a Responder, the upgrade response (101 Switching Protocols) is produced for you; the upgraded socket is driven on a spawned task once the handshake completes.

use std::time::Duration;

use futures_util::SinkExt;
use futures_util::StreamExt;
use tako::Method;
use tako::responder::Responder;
use tako::types::Request;
use tako::ws::TakoWs;
use tokio_stream::wrappers::IntervalStream;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::tungstenite::Utf8Bytes;

pub async fn ws_echo(req: Request) -> impl Responder {
  TakoWs::new(req, |mut ws| async move {
    let _ = ws.send(Message::Text("Welcome to Tako WS!".into())).await;

    while let Some(Ok(msg)) = ws.next().await {
      match msg {
        Message::Text(txt) => {
          let _ = ws
            .send(Message::Text(Utf8Bytes::from(format!("Echo: {txt}"))))
            .await;
        }
        Message::Binary(bin) => {
          let _ = ws.send(Message::Binary(bin)).await;
        }
        Message::Ping(p) => {
          let _ = ws.send(Message::Pong(p)).await;
        }
        Message::Close(_) => {
          let _ = ws.send(Message::Close(None)).await;
          break;
        }
        _ => {}
      }
    }
  })
}

pub async fn ws_tick(req: Request) -> impl Responder {
  TakoWs::new(req, |mut ws| async move {
    let mut ticker = IntervalStream::new(tokio::time::interval(Duration::from_secs(1))).enumerate();

    loop {
      tokio::select! {
          msg = ws.next() => {
              match msg {
                  Some(Ok(Message::Close(_))) | None => break,
                  _ => {}
              }
          }

          Some((i, _)) = ticker.next() => {
              let _ = ws.send(Message::Text(Utf8Bytes::from(format!("tick #{i}")))).await;
          }
      }
    }
  })
}

#[tokio::main]
async fn main() {
  let listener = tokio::net::TcpListener::bind("127.0.0.1:8080")
    .await
    .unwrap();
  let mut router = tako::router::Router::new();

  router.route(Method::GET, "/ws/echo", ws_echo);
  router.route(Method::GET, "/ws/tick", ws_tick);

  tako::serve(listener, router).await;
}

Compio

On Compio, enable the compio-ws feature and use TakoWsCompio. The upgraded type is CompioWebSocket<UpgradedStream>, which exposes an explicit read() method rather than the Stream interface.

use compio::ws::tungstenite::Message;
use tako::Method;
use tako::responder::Responder;
use tako::types::Request;
use tako::ws_compio::CompioWebSocket;
use tako::ws_compio::TakoWsCompio;
use tako::ws_compio::UpgradedStream;

async fn ws_echo(req: Request) -> impl Responder {
  TakoWsCompio::new(req, |mut ws: CompioWebSocket<UpgradedStream>| async move {
    let _ = ws.send(Message::Text("Welcome to Tako WS (compio)!".into())).await;

    loop {
      match ws.read().await {
        Ok(Message::Text(txt)) => {
          let _ = ws.send(Message::Text(format!("Echo: {txt}").into())).await;
        }
        Ok(Message::Binary(bin)) => {
          let _ = ws.send(Message::Binary(bin)).await;
        }
        Ok(Message::Ping(p)) => {
          let _ = ws.send(Message::Pong(p)).await;
        }
        Ok(Message::Close(_)) => {
          let _ = ws.close(None).await;
          break;
        }
        Ok(_) => {}
        Err(_) => break,
      }
    }
  })
}

#[compio::main]
async fn main() {
  let listener = compio::net::TcpListener::bind("127.0.0.1:8080")
    .await
    .unwrap();
  let mut router = tako::router::Router::new();
  router.route(Method::GET, "/ws/echo", ws_echo);
  tako::serve(listener, router).await;
}
//! WebSocket example using compio runtime.
//!
//! Run with: cargo run --example websocket-compio --features compio-ws

use std::time::Duration;

use compio::ws::tungstenite::Message;
use tako::Method;
use tako::responder::Responder;
use tako::types::Request;
use tako::ws_compio::CompioWebSocket;
use tako::ws_compio::TakoWsCompio;
use tako::ws_compio::UpgradedStream;

pub async fn ws_echo(req: Request) -> impl Responder {
  TakoWsCompio::new(req, |mut ws: CompioWebSocket<UpgradedStream>| async move {
    if let Err(e) = ws
      .send(Message::Text("Welcome to Tako WS (compio)!".into()))
      .await
    {
      tracing::error!("Failed to send welcome message: {}", e);
      return;
    }

    loop {
      match ws.read().await {
        Ok(Message::Text(txt)) => {
          let response = format!("Echo: {}", txt);
          if let Err(e) = ws.send(Message::Text(response.into())).await {
            tracing::error!("Failed to send echo: {}", e);
            break;
          }
        }
        Ok(Message::Binary(bin)) => {
          if let Err(e) = ws.send(Message::Binary(bin)).await {
            tracing::error!("Failed to send binary: {}", e);
            break;
          }
        }
        Ok(Message::Ping(p)) => {
          if let Err(e) = ws.send(Message::Pong(p)).await {
            tracing::error!("Failed to send pong: {}", e);
            break;
          }
        }
        Ok(Message::Close(_)) => {
          let _ = ws.close(None).await;
          break;
        }
        Ok(_) => {}
        Err(e) => {
          tracing::error!("WebSocket error: {}", e);
          break;
        }
      }
    }
  })
}

pub async fn ws_count(req: Request) -> impl Responder {
  TakoWsCompio::new(req, |mut ws: CompioWebSocket<UpgradedStream>| async move {
    let mut count: u64 = 0;

    loop {
      // Send current count
      let msg = format!("count: {}", count);
      if let Err(e) = ws.send(Message::Text(msg.into())).await {
        tracing::error!("Failed to send count: {}", e);
        break;
      }
      count += 1;

      // Wait 1 second
      compio::time::sleep(Duration::from_secs(1)).await;

      // Check for close message (non-blocking read attempt)
      // For simplicity, we just keep counting until connection drops
    }
  })
}

#[compio::main]
async fn main() {
  let listener = compio::net::TcpListener::bind("127.0.0.1:8080")
    .await
    .unwrap();
  let mut router = tako::router::Router::new();

  router.route(Method::GET, "/ws/echo", ws_echo);
  router.route(Method::GET, "/ws/count", ws_count);

  println!("WebSocket server running at:");
  println!("  - ws://127.0.0.1:8080/ws/echo");
  println!("  - ws://127.0.0.1:8080/ws/count");
  tako::serve(listener, router).await;
}

Configuration

On Tokio, TakoWs is a builder. Each option is a chained method before the handler runs:

MethodEffect
.protocols(list)Subprotocol allow-list; the first server-preferred match the client offers is echoed back.
.max_frame_size(n)Cap a single WebSocket frame, in bytes.
.max_message_size(n)Cap a reassembled message, in bytes.
.allowed_origins(list)Reject upgrades whose Origin is not on the list with 403.
.upgrade_timeout(d)Drop the task if the client never completes the upgrade.
.max_lifetime(d)Hard cap on total conversation lifetime after upgrade.
.keep_alive(WsKeepAlive { .. })Surface ping_interval / pong_timeout hints to the handler.
use std::time::Duration;
use tako::ws::TakoWs;

# fn _doc(req: tako::types::Request) -> impl tako::responder::Responder {
TakoWs::new(req, |ws| async move { /* ... */ })
  .protocols(["chat", "superchat"])
  .max_message_size(1 << 20)
  .allowed_origins(["https://app.example.com"])
  .upgrade_timeout(Duration::from_secs(10))
  .max_lifetime(Duration::from_secs(3600))
# }

WsKeepAlive (ping_interval, pong_timeout) is surfaced to the handler as a hint — the framework does not run the ping loop itself, because the handler owns the stream. For unconditional disconnection of an idle peer, prefer max_lifetime.

Behind HTTP/2

WebSocket can also be served over an HTTP/2-or-TLS listener. The examples/websocket-http2 example registers the same TakoWs handlers and serves them with tako::serve_tls. The upgrade and message loop are identical; only the transport you bind changes.

  • Routing — registering the upgrade handler as a route.
  • HTTP — the listeners WebSocket rides on.
  • Streams — SSE and other streaming responses.
  • Feature flags — the compio-ws feature.

On this page