A production-grade distributed task queue built on PostgreSQL + Redis — demonstrating durable job delivery, atomic lease acquisition, exponential retry with jitter, and dead-letter handling. Designed to show the systems-design and reliability thinking expected at senior/staff level.
- System Architecture
- Guarantees
- Design Decisions & Tradeoffs
- Failure Modes & Mitigations
- Job State Machine
- Retry Policy
- Observability
- Quick Start
- API Reference
- Project Structure
- Configuration
- Running Tests
- Load Test Results
┌─────────────────────────────────────────────────────────────────────┐
│ CLIENT │
│ POST /v1/jobs Idempotency-Key: <caller-uuid> │
└───────────────────────────┬─────────────────────────────────────────┘
│ HTTPS
▼
┌─────────────────────────────────────────────────────────────────────┐
│ API SERVICE (FastAPI / uvicorn) │
│ │
│ 1. BEGIN TRANSACTION │
│ 2. SELECT idempotency_keys WHERE key = ? ──► duplicate? return │
│ 3. INSERT INTO jobs (status='queued') │
│ 4. INSERT INTO idempotency_keys │
│ 5. COMMIT │
│ 6. LPUSH job_id → Redis (post-commit, at-least-once) │
│ │
│ GET /v1/jobs/:id ──► SELECT jobs WHERE id = ? │
│ GET /metrics ──► Prometheus text format │
└──────────────┬────────────────────────────┬────────────────────────┘
│ SQL (asyncpg) │
▼ ▼
┌──────────────────────┐ ┌──────────────────────────┐
│ PostgreSQL 16 │ │ Redis 7 │
│ │ │ │
│ jobs │ │ atlasqueue:jobs LIST │
│ idempotency_keys │◄──────│ atlasqueue:dlq LIST │
│ │ lease │ │
└──────────────────────┘ guard └──────────┬───────────────┘
▲ │ BLPOP
│ ▼
┌──────────────────────────────────────────────────────────────────────┐
│ WORKER SERVICE (asyncio) │
│ │
│ BLPOP atlasqueue:jobs ──► job_id │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ LEASE ACQUISITION (atomic UPDATE…RETURNING) │ │
│ │ │ │
│ │ UPDATE jobs SET │ │
│ │ status='running', │ │
│ │ locked_until = now() + lease_seconds, │ │
│ │ lock_owner = worker_id, │ │
│ │ attempts = attempts + 1 │ │
│ │ WHERE id = ? │ │
│ │ AND status IN ('queued','failed') │ │
│ │ AND run_at <= now() │ │
│ │ AND (locked_until IS NULL OR locked_until < now()) │ │
│ │ RETURNING * │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ handler(payload) ── registered via @register("job.type") │
│ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ ON SUCCESS → UPDATE status='succeeded' │ │
│ │ ON FAILURE → UPDATE status='failed', run_at=backoff │ │
│ │ ON EXHAUSTED→ UPDATE status='dead', LPUSH dlq │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
│ BLPOP timeout → sweep_failed_jobs() │
│ SELECT id WHERE status='failed' AND run_at <= now() │
│ FOR UPDATE SKIP LOCKED ──► LPUSH → UPDATE status='queued' │
│ │
│ Prometheus sidecar HTTP server on :9091 │
└──────────────────────────────────────────────────────────────────────┘
Job is written to Postgres BEFORE Redis is touched.
If Redis LPUSH fails → job stays status=queued.
Worker sweep re-enqueues all queued/failed jobs with run_at ≤ now.
Result: a job is never silently dropped.
Duplicate delivery is possible (e.g., worker crashes after claiming but before marking succeeded). The lease guard absorbs this:
- Expired
locked_until < now→ next worker can reclaim. - Succeeded/dead jobs don't match the
WHERE status IN ('queued','failed')predicate → safe no-op.
Every POST /v1/jobs requires an Idempotency-Key header. The flow:
SELECT idempotency_keys WHERE key = <caller-key>
found → return existing job (200 OK)
not found → INSERT job + key atomically (201 Created) → LPUSH
The idempotency_keys primary key constraint acts as a last-resort guard against concurrent duplicate inserts — only one INSERT wins; the loser gets an IntegrityError.
Not guaranteed (distributed systems can't without 2PC), but the system is designed to make duplicate execution detectable:
job.attemptsis the authoritative attempt counter, incremented atomically in the sameUPDATEthat claims the lease.- Handlers receive the full
jobobject includingattempts, so they can implement their own idempotency if needed.
LIST (LPUSH/BLPOP) |
ZSET (score=run_at) |
|
|---|---|---|
| Blocking pop | ✅ Native BLPOP — no polling |
❌ Must poll with ZRANGEBYSCORE |
| Ordered retry scheduling | ❌ Not native | ✅ Score = run_at timestamp |
| Simplicity | ✅ Two commands | More complex |
| Our solution | Main queue uses LIST | Retries use Postgres run_at + DB sweep |
Decision: Main queue is a Redis LIST — BLPOP eliminates polling busy-loops. Retry scheduling is handled by storing run_at in Postgres and using a DB sweep (SELECT FOR UPDATE SKIP LOCKED) rather than a Redis ZSET. This keeps Redis as a simple signalling layer and Postgres as the durable source of truth.
Postgres UPDATE…WHERE…RETURNING |
Redis SET NX + TTL |
|
|---|---|---|
| Atomicity | ✅ Single statement — no TOCTOU | |
| Crash recovery | ✅ locked_until < now predicate built-in |
|
| Audit trail | ✅ lock_owner, attempts, last_error in same row |
❌ Separate store |
| Multi-worker safety | ✅ Postgres row-level locking | ✅ Redis is single-threaded |
Decision: Postgres lease via UPDATE…RETURNING — eliminates TOCTOU races entirely. The row is the lock. locked_until gives automatic crash recovery without a separate TTL mechanism.
- No broker abstraction needed — the system is the broker; full control of delivery semantics.
- No serialization overhead — jobs are native SQLAlchemy objects.
- Single-process async — one worker process handles hundreds of concurrent lease checks via the event loop, no threading complexity.
- Tradeoff: no built-in task routing, rate-limiting, or canvas — these would need to be built for v2.
- APM agents (Datadog, New Relic) add vendor lock-in and ~2–5ms per-request latency.
structlogJSON logs are sink-agnostic — ship to any log aggregator (Loki, Splunk, CloudWatch).prometheus_clientis language-native — no agent, no network call on the hot path.
| Scenario | What Happens | Recovery |
|---|---|---|
| API crashes after DB COMMIT, before Redis LPUSH | Job is queued in Postgres with no Redis entry |
Worker sweep re-enqueues all queued jobs with run_at ≤ now on next BLPOP timeout |
| Worker crashes mid-handler | Job stays running with locked_until = T |
After T expires, any worker's lease attempt matches locked_until < now and reclaims |
| Redis unavailable at submit time | LPUSH raises — returns 500 to caller |
Caller retries with same Idempotency-Key. Worker sweep covers any that slipped through |
| Redis unavailable during sweep LPUSH | sweep_failed_jobs propagates exception |
Caught in consumer loop, logged, retried on next timeout |
| Postgres unavailable | Both API and worker fail fast with a logged error | No silent data loss — Postgres is the source of truth |
| Handler panics (unhandled exception) | execute_job catches it, transitions running → failed, stores last_error |
Retried up to max_attempts, then moved to DLQ |
| Duplicate Redis message (at-least-once) | Second worker's UPDATE matches 0 rows (job already running/succeeded) |
LEASE_REJECTED counter incremented; job skipped silently |
| Multiple workers sweep simultaneously | SELECT FOR UPDATE SKIP LOCKED ensures non-overlapping row sets |
Each worker gets a disjoint set of due-retry jobs |
| Enormous handler error message | Truncated to 4096 chars before storage | Prevents last_error column overflow |
┌─────────────────────────┐
│ queued │◄──── LPUSH (submit / sweep)
└────────────┬────────────┘
│ acquire_lease()
│ UPDATE…RETURNING (atomic)
▼
┌─────────────────────────┐
│ running │
└──┬─────────────────┬───┘
│ success │ failure
▼ ▼
┌─────────────────┐ ┌─────────────────────────┐
│ succeeded │ │ failed │
│ (terminal) │ │ run_at = now+backoff │
└─────────────────┘ └──────────┬──────────────┘
│ attempts < max_attempts
│ sweep re-enqueues
▼
┌─────────────────────────┐
│ queued │ (retry cycle)
└─────────────────────────┘
│ attempts >= max_attempts
▼
┌─────────────────────────┐
│ dead │
│ LPUSH atlasqueue:dlq │
│ (terminal) │
└─────────────────────────┘
Exponential backoff with full jitter (PRD §9):
run_at = now + (base_delay × 2^attempts) + random(0, jitter_max)
| Variable | Default | Env var |
|---|---|---|
base_delay |
5 s | RETRY_BASE_DELAY_SECONDS |
jitter_max |
2 s | RETRY_JITTER_MAX_SECONDS |
max_attempts |
per-job | set at submission time |
Example (base=5, jitter=0 for clarity):
| Attempt | Delay |
|---|---|
| 1 | 5 × 2¹ = 10 s |
| 2 | 5 × 2² = 20 s |
| 3 | 5 × 2³ = 40 s |
| 4 | 5 × 2⁴ = 80 s |
Full jitter (not truncated jitter) is used to prevent thundering herd when many jobs fail simultaneously.
Every key event emits a structured log line with consistent fields:
{"event": "job_created", "job_id": "...", "job_type": "email.send", "level": "info"}
{"event": "lease_acquired", "job_id": "...", "attempt": 1, "locked_until": "...", "worker_id": "..."}
{"event": "handler_start", "job_id": "...", "job_type": "email.send", "attempt": 1}
{"event": "job_succeeded", "job_id": "...", "level": "info"}
{"event": "job_failed", "job_id": "...", "retry_at": "...", "error": "...", "level": "warning"}
{"event": "job_dead", "job_id": "...", "dlq_key": "atlasqueue:dlq", "level": "error"}
{"event": "sweep_complete", "requeued": 3, "level": "info"}| Metric | Type | Labels | Description |
|---|---|---|---|
atlasqueue_jobs_submitted_total |
Counter | job_type |
New jobs submitted |
atlasqueue_jobs_succeeded_total |
Counter | job_type |
Successful completions |
atlasqueue_jobs_failed_total |
Counter | job_type |
Retryable failures |
atlasqueue_jobs_dead_total |
Counter | job_type |
Jobs exhausted retries → DLQ |
atlasqueue_jobs_idempotent_total |
Counter | job_type |
Duplicate submissions absorbed |
atlasqueue_jobs_retried_total |
Counter | — | Jobs re-enqueued by sweep |
atlasqueue_lease_acquired_total |
Counter | — | Successful lease acquisitions |
atlasqueue_lease_rejected_total |
Counter | — | Contested/non-claimable leases |
atlasqueue_worker_active_jobs |
Gauge | — | Jobs currently executing |
atlasqueue_job_duration_seconds |
Histogram | job_type, outcome |
Handler wall-clock time |
atlasqueue_http_request_duration_seconds |
Histogram | method, path_template, status_code |
API latency |
atlasqueue_http_requests_total |
Counter | method, path_template, status_code |
Total HTTP requests |
Endpoints:
- API:
GET http://localhost:8000/metrics/ - Worker:
GET http://localhost:9091/
- Docker ≥ 24 + Docker Compose v2
curlandpython3for smoke tests
git clone https://github.com/AghahowaJeffrey/AtlasQueue.git
cd AtlasQueue
# Start Postgres, Redis, run migrations, then API + worker
docker compose up --buildcurl -s http://localhost:8000/health | python3 -m json.tool
# → {"status": "ok", "service": "atlasqueue-api"}# New job — 201 Created
curl -s -X POST http://localhost:8000/v1/jobs \
-H "Content-Type: application/json" \
-H "Idempotency-Key: my-unique-key-001" \
-d '{"type": "noop", "payload": {"hello": "world"}, "max_attempts": 3}' \
| python3 -m json.toolResponse:
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"type": "noop",
"status": "queued",
"attempts": 0,
"max_attempts": 3,
"payload": {"hello": "world"},
"run_at": "2026-03-03T00:00:00Z",
"created_at": "2026-03-03T00:00:00Z"
}curl -s http://localhost:8000/v1/jobs/<JOB_ID> | python3 -m json.tool
# status: "queued" → "running" → "succeeded" (within ~1s)# Same Idempotency-Key → 200 OK, same job returned — no duplicate
curl -s -X POST http://localhost:8000/v1/jobs \
-H "Content-Type: application/json" \
-H "Idempotency-Key: my-unique-key-001" \
-d '{"type": "noop", "payload": {"hello": "world"}, "max_attempts": 3}'docker compose logs -f worker# Submit a job that will always fail (unregistered type)
curl -X POST http://localhost:8000/v1/jobs \
-H "Content-Type: application/json" \
-H "Idempotency-Key: dlq-test-001" \
-d '{"type": "no.handler", "payload": {}, "max_attempts": 2}'
# After retries exhaust, check DLQ
docker compose exec redis redis-cli LRANGE atlasqueue:dlq 0 -1docker compose down # keep data
docker compose down -v # wipe volumesSubmit a new job. Idempotent on Idempotency-Key.
Headers
| Header | Required | Description |
|---|---|---|
Idempotency-Key |
✅ | Caller-generated unique key (1–512 chars) |
Content-Type |
✅ | application/json |
Body
{
"type": "email.send",
"payload": { "to": "user@example.com" },
"max_attempts": 5
}| Field | Type | Constraints | Default |
|---|---|---|---|
type |
string | 1–128 chars | required |
payload |
object | any JSON object | required |
max_attempts |
integer | 1–25 | 5 |
Responses
| Code | Meaning |
|---|---|
201 Created |
New job created and enqueued |
200 OK |
Duplicate — existing job returned |
422 Unprocessable Entity |
Validation error (missing header, bad body) |
Retrieve a job by UUID.
Responses
| Code | Meaning |
|---|---|
200 OK |
Job found |
404 Not Found |
No job with that ID |
422 Unprocessable Entity |
Malformed UUID |
{"status": "ok", "service": "atlasqueue-api"}Add a handler in atlasqueue/worker/handlers.py or any module imported at worker startup:
from atlasqueue.worker.handlers import register
@register("email.send")
async def send_email(payload: dict) -> None:
"""Raise any exception to signal failure and trigger retry."""
recipient = payload["to"]
# ... call mail API
@register("invoice.generate")
async def generate_invoice(payload: dict) -> None:
invoice_id = payload["invoice_id"]
# ...Then rebuild the worker:
docker compose up --build workeratlasqueue/
├── atlasqueue/
│ ├── core/
│ │ ├── config.py # pydantic-settings — all env vars
│ │ ├── enums.py # JobStatus enum (queued→running→succeeded/failed/dead)
│ │ ├── logging.py # structlog JSON config
│ │ └── metrics.py # Prometheus metric definitions (single source of truth)
│ ├── db/
│ │ ├── models.py # SQLAlchemy ORM (Job, IdempotencyKey)
│ │ └── session.py # async engine + session factory
│ ├── api/
│ │ ├── main.py # FastAPI app factory, HTTP metrics middleware, /metrics mount
│ │ ├── deps.py # DI: get_db (plain session), get_redis
│ │ ├── schemas.py # Pydantic request/response models
│ │ └── routers/
│ │ ├── health.py # GET /health
│ │ └── jobs.py # POST /v1/jobs, GET /v1/jobs/{id}
│ ├── services/
│ │ └── jobs.py # submit_job (idempotency + enqueue), get_job
│ └── worker/
│ ├── main.py # Entry point, Prometheus sidecar on :9091
│ ├── consumer.py # BLPOP loop + sweep on timeout
│ ├── lease.py # acquire_lease() — atomic UPDATE…RETURNING
│ ├── executor.py # execute_job() — dispatch + status transitions
│ ├── retry.py # compute_backoff() + sweep_failed_jobs()
│ └── handlers.py # @register decorator + built-in noop handler
├── migrations/
│ └── versions/
│ └── 0001_initial.py # jobs + idempotency_keys tables
├── tests/
│ ├── conftest.py # shared fixtures (mock_session, mock_redis, client)
│ ├── integration/
│ │ └── test_job_submission.py # 11 tests — API layer
│ └── unit/
│ ├── test_lease.py # 7 tests — acquire_lease + consumer dispatch
│ ├── test_executor.py # 5 tests — status transition paths
│ ├── test_retry.py # 7 tests — backoff formula + sweep
│ └── test_metrics.py # 7 tests — metric registration + /metrics endpoint
├── Dockerfile.api
├── Dockerfile.worker
├── docker-compose.yml
├── pyproject.toml
└── .env.docker
All settings via environment variables (see .env.docker):
| Variable | Default | Description |
|---|---|---|
DATABASE_URL |
postgresql+asyncpg://... |
Async Postgres DSN |
REDIS_URL |
redis://redis:6379/0 |
Redis connection string |
JOB_QUEUE_KEY |
atlasqueue:jobs |
Main queue Redis key |
DLQ_KEY |
atlasqueue:dlq |
Dead-letter queue Redis key |
WORKER_LEASE_SECONDS |
30 |
Lease TTL — auto-reclaim after crash |
WORKER_POLL_TIMEOUT |
5 |
BLPOP timeout → triggers sweep |
WORKER_METRICS_PORT |
9091 |
Worker Prometheus sidecar port |
RETRY_BASE_DELAY_SECONDS |
5 |
Backoff base delay |
RETRY_JITTER_MAX_SECONDS |
2 |
Max random jitter added to backoff |
LOG_LEVEL |
INFO |
Logging verbosity (DEBUG/INFO/WARNING/ERROR) |
Tests are fully mocked — no Postgres or Redis required.
# Install dev dependencies
python -m venv .venv && source .venv/bin/activate
pip install -r requirements/dev.txt
# Run all 37 tests
pytest tests/ -v
# By layer
pytest tests/unit/ -v # 26 unit tests
pytest tests/integration/ -v # 11 integration tests (API layer)Current results:
37 passed in 1.89s
🚧 Load tests are planned for a future milestone using k6. Results will be published here including:
- Sustained throughput (jobs/sec at p99 < 200ms)
- Spike behaviour (5× traffic burst)
- Worker scale-out (horizontal, multiple consumer processes)
- Redis and Postgres saturation points
| Layer | Technology | Why |
|---|---|---|
| API | FastAPI + uvicorn | Async-native, auto OpenAPI, fast |
| ORM | SQLAlchemy 2 (asyncpg) | Async-first, type-safe, mature |
| Queue signalling | Redis 7 LIST |
Native BLPOP, no polling |
| Persistence | PostgreSQL 16 | ACID, row-level locks, SKIP LOCKED |
| Migrations | Alembic | Battle-tested, async env support |
| Logging | structlog | JSON-first, sink-agnostic |
| Metrics | prometheus-client | Zero-overhead, Grafana-compatible |
| Config | pydantic-settings | Type-safe env var parsing |
| Runtime | Python 3.12 | async/await, performance improvements |
MIT