Queue
In-process background job queue with retry, dead-lettering, deduplication, and delayed execution.
Queue
Tako ships an in-process background job queue at tako::queue. It is
designed for the "fire-and-forget" workloads that usually end up
hand-rolled on top of tokio::spawn — send-email, dispatch-webhook,
reindex, etc. — with retry, dead-lettering, deduplication, and
delayed execution built in.
use std::time::Duration;
use serde::{Deserialize, Serialize};
use tako::Method;
use tako::extractors::json::Json;
use tako::extractors::state::State;
use tako::queue::{Job, Queue, RetryPolicy};
use tako::responder::Responder;
use tako::router::Router;
#[derive(Serialize, Deserialize)]
struct Email { to: String, subject: String }
async fn enqueue(
State(q): State<Queue>,
Json(req): Json<Email>,
) -> impl Responder {
match q.0.push("send_email", &req).await {
Ok(id) => (tako::StatusCode::ACCEPTED, format!("queued id={id}\n")),
Err(e) => (tako::StatusCode::INTERNAL_SERVER_ERROR, e.to_string()),
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let queue = Queue::builder()
.workers(4)
.retry(RetryPolicy::exponential(3, Duration::from_millis(500)))
.build();
queue.register("send_email", |job: Job| async move {
let payload: Email = job.deserialize()?;
println!("send_email -> {} / {}", payload.to, payload.subject);
Ok(())
});
queue.start();
let mut router = Router::new();
router.with_state(queue);
router.route(Method::POST, "/emails", enqueue);
let listener = tokio::net::TcpListener::bind("127.0.0.1:8080").await?;
tako::serve(listener, router).await;
Ok(())
}The builder gives you a fluent way to set worker count and retry
policy. Job handlers are registered by name with
queue.register(name, handler); each handler returns Result<(), QueueError>. Push jobs with queue.push(name, payload),
queue.push_delayed(name, payload, duration), or
queue.push_dedup(name, payload, key) to collapse duplicate pending
jobs by an idempotency key.
Operational helpers:
queue.pending_count(),queue.inflight_count()— gauge-style metrics for dashboards.queue.dead_letters()— read the dead-letter queue. Jobs that exhaust their retry budget land here for manual inspection.queue.shutdown(timeout)— drain workers gracefully on SIGTERM.tako::queue::cron::CronScheduler(featurequeue-cron) — wire a crontab spec intoqueue.push.
Signals are emitted for every job lifecycle event:
queue.job.queued,.started,.completed,.failed,.retrying,.dead_letter
The canonical strings live under tako::queue::signal_ids. They are
part of the stable signal contract — see
API stability. The queue integrates with
the in-process signals bus, so any listener can react
to job lifecycle events.
The default MemoryBackend keeps everything in process. Companion
crates tako-stores-redis and tako-stores-postgres are on the
follow-up list and will implement QueueBackend plus the session /
rate-limit / idempotency / JWKS / CSRF stores.
See examples/job-queue
for a full end-to-end demo including retries, delayed jobs, and the
dead-letter queue.