Skip to content

411sst/Conduit

Repository files navigation

Conduit

Distributed real-time event delivery system built from scratch in Go. Conduit is the infrastructure layer — a multi-tenant pub/sub platform where services publish events and subscribers receive them with at-least-once delivery guarantees, per-topic message ordering, and fault tolerance across distributed nodes. No Kafka wrapper, no RabbitMQ shim — the core mechanics are implemented directly.

Architecture

  Publishers                                                    Subscribers
  (HTTP)                                                        (WebSocket)
     │                                                              ▲
     │ POST /api/publish                                            │
     ▼                                                              │
┌─────────────────────────────────────────────────────────────────────────┐
│                          Conduit Node                                   │
│                                                                         │
│  ┌──────────────┐   ┌──────────┐   ┌──────────────────────────────┐    │
│  │  REST API    │──▶│  Auth    │──▶│          Broker               │    │
│  │  /api/*      │   │  (Keys)  │   │  ┌────────┐  ┌───────────┐  │    │
│  └──────────────┘   └──────────┘   │  │ Topics │  │  Fan-out  │  │    │
│                                     │  └────────┘  └─────┬─────┘  │    │
│  ┌──────────────┐                  │         ▲          │        │    │
│  │  WebSocket   │◀─────────────────┤         │          ▼        │    │
│  │  Handler     │                  │  ┌──────┴──────────────┐   │    │
│  └──────┬───────┘                  │  │ Subscriber Channels │   │    │
│         │                          │  │ (256 buffered each) │   │    │
│         │                          │  └─────────────────────┘   │    │
│         │                          └──────────────────────────────┘    │
│         │                                                              │
│         ▼                                                              │
│  ┌───────────────┐  ┌────────────────┐  ┌──────────────────────────┐  │
│  │ Delivery Eng. │  │  Hash Ring     │  │     Registry (Redis)     │  │
│  │ (Retry+DLQ)   │  │  (CRC32/150vn) │  │  Subscriber → Node map  │  │
│  └───────┬───────┘  └────────────────┘  └──────────────────────────┘  │
│          │                                                             │
└──────────┼─────────────────────────────────────────────────────────────┘
           │
           ▼
┌─────────────────────┐
│     PostgreSQL      │
│  ┌───────────────┐  │
│  │  Message WAL  │  │
│  │  Delivery Log │  │
│  │  DLQ Storage  │  │
│  └───────────────┘  │
└─────────────────────┘

See docs/architecture.md for full multi-node diagrams and component deep-dive.

Key Design Decisions

At-least-once vs exactly-once delivery

Exactly-once requires distributed transactions (two-phase commit or a transaction log shared across all consumers), which introduces significant latency and complexity. At-least-once delivery with idempotency guidance to consumers is the pragmatic choice — it is the model used by Kafka, SQS, and most production messaging systems. Consumers are expected to handle duplicate messages by keying on message_id.

Consistent hashing vs random distribution (Phase 3)

When nodes are added or removed, random subscriber distribution causes a full reshuffle — every subscriber potentially needs to reconnect to a different node. Consistent hashing limits remapping to only the key range affected by the topology change, making scaling events non-disruptive.

WAL-inspired Postgres log vs a dedicated message queue (Phase 2)

Using Postgres as the durable message log avoids introducing another infrastructure dependency. The tradeoff is throughput ceiling — Postgres cannot match a purpose-built queue at extreme scale. For Conduit's target of 10k+ msg/sec, Postgres with an append-only write pattern and proper indexing is sufficient and dramatically simpler to operate.

Redis for coordination only, not message storage (Phase 3)

Redis is volatile by default — data is lost on restart unless persistence is explicitly configured. Using it only for the ephemeral subscriber registry and node presence means a Redis restart affects routing temporarily, not message durability. Postgres owns durability; Redis owns coordination.

Guarantees

  • At-least-once delivery — Every published message is delivered to every active subscriber at least once. Messages are written to the Postgres WAL before fan-out, and unacknowledged deliveries are retried with exponential backoff (1s → 60s cap, 5 attempts before DLQ).
  • Per-topic message ordering — Messages published to the same topic are delivered in publish order. The broker serializes fan-out under a topic lock, and the Postgres WAL assigns monotonically increasing sequence numbers per topic.
  • Durability across restarts — Every message is persisted to the Postgres append log before fan-out begins. Unacknowledged messages are automatically retried after restart.
  • Tenant isolation — API key authentication with automatic topic namespacing. Tenants cannot see or publish to other tenants' topics.
  • Graceful degradation — All distributed features (Postgres, Redis, auth) are optional. Without them, Conduit runs as a single-node in-memory broker.

Message Ordering

Ordering is guaranteed per topic, not globally. Global ordering across topics would require a single serialization point, which is incompatible with horizontal scaling.

Within a single topic, ordering is enforced at two levels:

  1. Broker fan-out (Phase 1): The Publish method holds a read lock on the topic while iterating all subscribers. Because Go's net/http server handles each request in its own goroutine but the broker serializes fan-out under the topic lock, messages published sequentially to the same topic are delivered in order. Concurrent publishes to the same topic are serialized by the lock.

  2. WAL sequence numbers (Phase 2): Each message written to the Postgres append log receives a monotonically increasing sequence ID scoped to its topic. Subscribers that reconnect or fall behind can replay from a known sequence number, and the delivery engine respects sequence order during retries.

This is the same per-partition ordering model used by Kafka. It is the strongest ordering guarantee that does not sacrifice throughput.

API Reference

REST Endpoints

POST /api/publish

Publish a message to a topic. The topic must already exist (created explicitly or by a prior subscription).

Request:

{
  "topic": "orders.created",
  "payload": { "order_id": "abc-123", "amount": 99.95 }
}

Response (200 OK):

{
  "message_id": "550e8400-e29b-41d4-a716-446655440000",
  "topic": "orders.created",
  "subscribers": 3,
  "published_at": "2026-02-23T10:30:00Z"
}

Errors:

  • 400 — Missing or invalid request body, or topic is empty.
  • 404 — Topic does not exist.

GET /api/topics

List all topics with subscriber counts and message throughput.

Response (200 OK):

{
  "topics": [
    {
      "name": "orders.created",
      "subscriber_count": 3,
      "message_count": 1520,
      "created_at": "2026-02-23T09:00:00Z"
    }
  ]
}

POST /api/topics

Create a named topic explicitly.

Request:

{
  "name": "orders.created"
}

Response (201 Created):

{
  "name": "orders.created",
  "subscriber_count": 0,
  "message_count": 0,
  "created_at": "2026-02-23T10:30:00Z"
}

Errors:

  • 400 — Missing or invalid request body, or name is empty.
  • 409 — Topic already exists.

DELETE /api/topics/{name}

Delete a topic and disconnect all its subscribers.

Response: 204 No Content

Errors:

  • 404 — Topic not found.

GET /api/stats

System-wide statistics.

Response (200 OK):

{
  "messages_per_sec": 1523.4,
  "total_messages": 45231,
  "active_subscribers": 12,
  "active_topics": 5,
  "uptime_seconds": 3600.5
}

GET /health

Health check.

Response (200 OK):

{
  "status": "ok"
}

WebSocket Protocol

Connect to ws://host:port/ws to subscribe to topics and receive messages.

Client → Server

Subscribe to a topic:

{ "type": "subscribe", "topic": "orders.created" }

Unsubscribe from a topic:

{ "type": "unsubscribe", "topic": "orders.created" }

Acknowledge a message (Phase 2):

{ "type": "ack", "message_id": "550e8400-e29b-41d4-a716-446655440000" }

Heartbeat:

{ "type": "heartbeat" }

Server → Client

Message delivery:

{
  "type": "message",
  "message_id": "550e8400-e29b-41d4-a716-446655440000",
  "topic": "orders.created",
  "payload": { "order_id": "abc-123", "amount": 99.95 },
  "published_at": "2026-02-23T10:30:00Z"
}

Subscription confirmed:

{ "type": "subscribed", "topic": "orders.created" }

Unsubscription confirmed:

{ "type": "unsubscribed", "topic": "orders.created" }

Heartbeat response:

{ "type": "heartbeat_ack" }

Error:

{ "type": "error", "message": "topic not found" }

Environment Variables

Variable Default Description
PORT 8080 HTTP server port
DATABASE_URL Postgres connection string (enables WAL)
REDIS_URL Redis URL (enables registry)
ENABLE_AUTH false Enable API key authentication
DEFAULT_API_KEY Use a specific API key for the default tenant
NODE_ID hostname Unique node identifier for clustering
NODE_ADDR :PORT Advertised address other nodes use to reach this one (e.g. server-1:8080)
INTERNAL_SECRET Shared secret for cross-node /internal/forward endpoint
CORS_ORIGIN * Allowed CORS origin (restrict in production)

All are optional. Without DATABASE_URL, messages are in-memory only. Without REDIS_URL, no subscriber registry or node coordination. Without INTERNAL_SECRET, cross-node forwarding is disabled.

Cross-Node Routing

When running multiple Conduit nodes behind a load balancer, a message published to node A may need to reach subscribers connected to node B. Cross-node routing handles this automatically.

How it works:

  1. A publisher sends POST /api/publish to any node.
  2. The receiving node writes the message to Postgres (WAL) and creates delivery records.
  3. The node delivers to all locally connected subscribers via channel fan-out.
  4. Concurrently, it queries the Redis registry for all other active nodes and forwards the message to each one via POST /internal/forward.
  5. Each remote node receives the forward and delivers to its locally connected subscribers — without writing to the WAL again (the publishing node already persisted it).
  6. If forwarding fails (node down, network error), the error is logged and the retry engine handles redelivery via the delivery records already in Postgres.

Configuration:

  • NODE_ADDR — The address other nodes use to reach this one. In Docker Compose, this is the service name + port (e.g. server-1:8080). Must be reachable from other nodes.
  • INTERNAL_SECRET — A shared secret that all nodes must have. The /internal/forward endpoint rejects requests without a matching X-Internal-Secret header. This prevents external clients from using the internal forwarding endpoint.

Running multiple nodes locally:

# docker-compose.yml runs two nodes by default:
docker-compose up

# Node 1 is on localhost:8080, Node 2 is on localhost:8081
# Subscribe to node 1, publish to node 2 — the message arrives via forwarding.

Local Development

# Clone and run
git clone https://github.com/411sst/conduit.git
cd conduit

# Full stack with Docker (recommended)
docker-compose up

# Backend only (in-memory mode)
go run ./cmd/server

# Frontend dev server
cd frontend && npm install && npm run dev

# Run unit tests
go test ./...

# Run integration tests (requires Docker)
./tests/integration/run.sh

# Run k6 load tests (requires running server)
k6 run tests/load/sustained.js

The server starts on port 8080 by default.

Quick Demo

  1. Create a topic:

    curl -X POST http://localhost:8080/api/topics \
      -H "Content-Type: application/json" \
      -d '{"name": "demo"}'
  2. Subscribe via WebSocket (using websocat or any WS client):

    websocat ws://localhost:8080/ws
    # then send: {"type":"subscribe","topic":"demo"}
  3. Publish a message:

    curl -X POST http://localhost:8080/api/publish \
      -H "Content-Type: application/json" \
      -d '{"topic": "demo", "payload": {"hello": "world"}}'
  4. Watch the message arrive in the WebSocket client.

Project Structure

cmd/server/              Go entrypoint, graceful shutdown
internal/
  broker/                Core pub/sub — topic tree, subscriber fan-out, rate tracking
  delivery/              Retry engine, exponential backoff, DLQ routing
  storage/               Postgres WAL layer, migrations, delivery tracking
  registry/              Redis subscriber registry, node heartbeats
  hashing/               Consistent hash ring (CRC32, 150 virtual nodes)
  auth/                  API key management, tenant namespace isolation
  api/                   REST handlers, WebSocket handler, auth middleware
  models/                Shared types (Message, Delivery, Stats, NodeInfo)
frontend/                React + TypeScript + Vite + Tailwind dashboard
  src/pages/             Overview, Topics, Messages, Nodes, Playground
  src/hooks/             WebSocket hook
  src/components/        Reusable UI components
tests/
  integration/           Docker Compose + Go integration tests
  load/                  k6 scripts (sustained, fan-out, node-failure)
.github/workflows/       CI/CD (PR checks, deploy, weekly benchmarks)
docs/                    Architecture diagrams

CI/CD

Three GitHub Actions workflows:

  • PR Checks (pr-checks.yml) — Go vet, race-detector tests with coverage, frontend TypeScript check and build. Runs on every PR to main.
  • Deploy (deploy.yml) — Tests then deploys to Railway on push to main. Uses RAILWAY_TOKEN secret (generate from Railway dashboard: Account Settings → Tokens). DATABASE_URL and REDIS_URL are injected automatically when Postgres and Redis plugins are added in Railway. Set these manually: INTERNAL_SECRET, CORS_ORIGIN, NODE_ADDR, DEFAULT_API_KEY.
  • Weekly Benchmarks (benchmarks.yml) — Spins up full stack, runs k6 sustained throughput and fan-out tests, uploads results as artifacts. Runs every Monday or on manual trigger.

Load Testing

Three k6 scripts target different failure modes:

Script What it measures Target
sustained.js Sustained publish throughput at configurable rate p95 < 100ms, <1% errors
fanout.js Publish with concurrent WS subscribers All subscribers receive messages
node-failure.js Publish under disruption with health monitoring <5% errors during disruption

Throughput Ceiling

Two ceiling numbers, measured by running sustained.js at escalating rates for 2 minutes each. The ceiling is the highest rate where p99 stays under 250ms and error rate stays at 0%.

Full-stack (Postgres + Redis): 3,000 msg/sec — Every message is written to the Postgres WAL before the publish response returns. This is the number that reflects the system with its durability guarantees active.

Rate p50 p95 p99 < 250ms Errors Actual throughput
3,000/s 12.3 ms 68.3 ms PASS 0% 2,994/s
4,500/s 18.5 ms 128.2 ms PASS 0% 4,444/s
5,000/s 58.8 ms 266.2 ms FAIL 0% 4,736/s
6,000/s 83.1 ms 344.3 ms FAIL 0% 4,996/s

The bottleneck is Postgres write latency. Each publish performs a synchronous INSERT before responding. At 5,000+ req/s, write contention causes p50 to jump from 18ms to 59ms and p99 crosses 250ms.

In-memory (no Postgres, no Redis): 25,000 msg/sec — This measures the raw Go server throughput without storage I/O — how fast the broker, HTTP handler, and fan-out logic can run.

Rate p50 p95 Errors Actual throughput
5,000/s 0.39 ms 0.68 ms 0% 5,000/s
10,000/s 0.40 ms 0.95 ms 0% 10,000/s
15,000/s 0.46 ms 1.09 ms 0% 14,999/s
20,000/s 0.55 ms 1.51 ms 0% 19,993/s
25,000/s 0.61 ms 4.24 ms 0% 24,965/s
30,000/s 12.68 ms 56.4 ms 0% 28,635/s
35,000/s 39.48 ms 104.53 ms 0% 27,800/s

At 30k req/s, p50 latency jumps 20x and the client can no longer sustain the target rate. The bottleneck is goroutine scheduling under extreme concurrency. All numbers are from a local Linux container; Railway deployment results may differ.

Both numbers are meaningful. The in-memory ceiling shows what the Go server can do without I/O constraints — useful for understanding the broker's raw capacity. The full-stack ceiling is the honest number for what the system delivers with durability guarantees, and is the one to cite.

Baseline Results

Metric sustained (1k/s) node-failure (500/s)
Total requests 120,001 90,001
p50 latency 0.45 ms 0.44 ms
p95 latency 0.63 ms 0.57 ms
Error rate 0.00% 0.00%
Health checks 180/180 passed

Fan-out

Fan-out was tested at 50 concurrent WebSocket subscribers (not the target 1,000) due to k6 v0.50.0 WebSocket handling limitations that prevent reliable testing at higher connection counts. Publish-side throughput remained clean: 12,001 publishes at 100/s with 0% errors and p50 of 0.52ms. Accurate fan-out measurement at 1,000 subscribers requires either a newer k6 version or a dedicated load testing environment.

Full JSON results and ceiling data are saved in benchmarks/. See benchmarks/CHANGELOG.md for methodology.

# Run locally (server must be running)
k6 run tests/load/sustained.js
k6 run tests/load/fanout.js
k6 run tests/load/node-failure.js

# Find the throughput ceiling (override rate via env)
k6 run -e RATE=25000 -e PRE_VUS=500 -e MAX_VUS=2500 tests/load/sustained.js

What I'd Do Differently at 10x Scale

At 100k+ msg/sec, Postgres becomes the bottleneck — writes to a single append log cannot keep up. The first change would be partitioning the message log by topic hash across multiple Postgres instances (or switching to a purpose-built log like a custom segment-file WAL). The broker fan-out would need to move from per-message iteration to batched delivery with vectorized writes to subscriber channels. Redis would need to be replaced with an embedded gossip protocol (like Serf/memberlist) to eliminate the coordination single point of failure. Connection handling would move from goroutine-per-connection to an io_uring-based event loop on Linux for lower overhead at high connection counts.

Specific bottlenecks and what breaks first

  1. Postgres INSERT throughput — The single-writer WAL pattern tops out around 15-20k inserts/sec on a standard instance. Solution: shard the message table by topic hash, or switch to a segment-file-based log that batches writes to disk.

  2. Subscriber channel backpressure — At high fan-out (1 topic, 10k subscribers), the synchronous iteration in Publish blocks the publisher goroutine for the duration of the fan-out. Solution: async fan-out with a separate goroutine pool and batch channel writes.

  3. Redis registry coordination — TTL-based presence with polling introduces a detection delay of up to 15s for node failures. Solution: replace with a gossip protocol (memberlist) for sub-second failure detection.

  4. WebSocket memory per connection — gorilla/websocket allocates ~4KB per connection for read/write buffers. At 100k connections, that's ~400MB just for buffers. Solution: use a zero-alloc WebSocket library (gobwas/ws) or pool buffers.

  5. JSON serialization in the hot pathencoding/json uses reflection. At extreme throughput, this becomes measurable. Solution: code-generated serializers (easyjson) or switch to a binary protocol (protobuf).


Resume bullet: Built a distributed real-time event delivery system in Go from scratch — multi-tenant pub/sub with at-least-once delivery, WAL-backed persistence, cross-node message forwarding, and consistent hash routing. Sustained 3k msg/sec with full WAL durability (Postgres + Redis); 25k msg/sec raw throughput on isolated Go server.

About

Conduit is a distributed, multi-tenant real-time pub/sub platform in Go with WebSocket streaming, REST APIs, Postgres durability, Redis-based node discovery, and a React ops dashboard.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors