A standalone Python app that replicates data from a source DuckLake table to N destination DuckLake tables using CDC (Change Data Capture). Supports inserts, deletes, and updates. Single thread, single poll loop, no framework.
Contents: Naming | What | Why | Architecture | Two Modes | Poll Cycle | CDC Operations | Failure Modes | Configuration | State Tracking | Seeding | Connection Management | Horizontal Scaling | Metrics | Setup | Development | Formal Verification | Deployment | Error Handling
viaduct (noun): a long bridge-like structure carrying a road or railroad across a valley or other low ground.
Viaduck carries data across DuckLakes. The name is a portmanteau of viaduct and duck, because DuckLake and because Why A Duck?.
- CDC replication from one source DuckLake table to N destination DuckLake tables
- Routes rows by a configurable field (e.g.
company) to per-tenant destinations - Full CDC: inserts, deletes, and updates (not just append-only)
- 3-phase algorithm: preimage resolution, conflict resolution, atomic transactional apply
- Buffered delivery: CDC reads at poll cadence, destination writes at flush cadence (1–5 min latency), flushed by a concurrent worker pool
- Scan-based seeding for new destinations (no full history replay)
- YAML config with env var indirection for credentials
- Per-destination error isolation — one broken destination doesn't block others
- LRU connection pool for high fanout (local bench: ~43 destinations/s flat through 1000 destinations)
- Persistent cursor tracking on plain Postgres
- Horizontal scaling via hash or explicit destination partitioning
- 27 Prometheus metrics, health checks (
/healthz,/readyz), live status web UI - At-least-once delivery with documented failure modes
- TLA+ formally verified: 7 safety invariants checked across 22.6M states with NO crash conditioning — eventual consistency, phantom-freedom, no-data-loss, and partition correctness all hold through every modeled crash and failure window
Multi-tenant DuckLake architectures need per-tenant table isolation — separate catalogs, separate S3 paths, separate Postgres metadata stores. Viaduck reads CDC changes from a shared source table, routes rows by a configurable field (e.g. company), and writes each partition to the correct tenant's DuckLake. Think of it as a data viaduct with N exits.
Source: docs/overview.d2
poll thread (every interval_seconds):
current_snapshot() on source
group destinations by read position
for each group:
table_changes(after, current, filter_expr) → CDC read (inserts + deletes + updates)
resolve preimages → handle cross-tenant migration
split by routing field → Arrow routing
buffer per destination → advance in-memory position
evaluate flush triggers → submit eligible flushes to workers
flush worker (per destination, concurrent):
resolve conflicts → cancel insert+delete pairs across buffered reads
dedupe upserts per key (last write wins) → Winner(k)
delete + upsert (in transaction) → atomic apply
advance_cursor() → persist to Postgres
One poll thread reads and buffers; delivery.workers threads flush. DuckLake snapshots are the cursor. State lives on plain Postgres. Designed for high fanout (100s-1000s of destinations).
Source: docs/architecture.d2
Each viaduck instance consumes from one source table and writes to N destination DuckLakes. The source and every destination are independent catalogs with their own Postgres metadata store and S3 data path. No cross-catalog transactions are possible.
Core modules:
| Module | Responsibility | Source |
|---|---|---|
main.py |
Poll loop, Phase 1 preimage resolution, seeding, signal handling | Entry point |
delivery.py |
Per-destination buffers, flush triggers, flush worker pool | Buffered delivery |
apply.py |
Phase 2 conflict resolution, Phase 3 atomic delete/upsert, write retry | Destination apply |
config.py |
YAML parsing, env var resolution, validation | Configuration |
source.py |
Source catalog connection, CDC reading (table_changes / table_insertions) |
CDC |
router.py |
Arrow splitting by routing field | Routing |
destination.py |
LRU connection pool for destination catalogs, lease pinning | Connections |
state.py |
Per-destination cursors on plain Postgres | State tracking |
arrowutil.py |
Shared Arrow kernel helpers | Vectorization |
metrics.py |
Prometheus metric definitions | Observability |
server.py |
HTTP /metrics, /healthz, /readyz, status web UI |
Health checks |
Viaduck operates in one of two modes, selected by the required routing.mode field:
| Mode | Config | CDC API | Destination writes | Use case |
|---|---|---|---|---|
| Append-only | mode: append_only + key_columns: [] |
table_insertions() |
append() |
Insert-only sources (events, append-only fact tables) |
| Full CDC | mode: full_cdc + non-empty key_columns |
table_changes() |
delete() + upsert() |
Sources that emit deletes/updates; the join columns are the upsert keys |
routing.mode is required (no default). Misconfiguration (mode unset, unknown value, full_cdc with empty key_columns, append_only with non-empty key_columns) fails fast at startup with an actionable ConfigError — earlier viaduck versions inferred the mode from len(key_columns) > 0, which silently flipped the entire pipeline shape when an operator typo'd or accidentally emptied the list.
Source: docs/poll-cycle.d2
CDC reads happen at poll cadence; destination writes happen at flush cadence. The poll thread reads and buffers; a worker pool flushes. The decoupling is the buffered-delivery design verified in tla/Viaduck.tla.
Each poll cycle (main.py:_poll_cycle):
- Snapshot check —
current_snapshot()on the source table. If no snapshots exist, sleep and retry. - Read plan — atomic snapshot of each destination's in-memory read position + epoch from the delivery manager (
delivery.py:read_plan). Positions start at the persisted cursors and advance in memory as reads land. - Group by position — destinations at the same position share a single CDC read (
main.py:_group_by_cursor). - CDC read — full CDC mode:
table_changes(inserts, deletes, update pre/post images); append-only mode:table_insertions. The range is half-open(position, current]— the position snapshot was already delivered (source.py). Routing values are pushed down as a SQLINfilter. - Phase 1: Preimage Resolution (full CDC only) — pair
update_preimagewithupdate_postimagerows byrowid(Arrow hash join). Cross-tenant mutations convert the preimage to a delete on the old destination; same-tenant preimages drop; orphaned preimages convert to deletes (main.py:_resolve_preimages). - Route —
split_and_count()partitions the Arrow table by routing value in a single vectorized pass (router.py:split_and_count). - Buffer — routed batches accumulate in per-destination buffers; the read position advances in memory. Destinations with no routed rows advance position only — zero writes for idle destinations (
delivery.py:buffer). - Flush triggers — buffer age ≥
flush_interval_seconds, rows ≥flush_max_rows, bytes ≥flush_max_bytes, global watermark ≥buffer_total_max_bytes(largest-first), or shutdown. Eligible buffers are swapped out and submitted to the worker pool (delivery.py:maybe_flush). - Lag metrics + status — per-destination snapshot lag from the delivery manager's authoritative in-memory view.
Each flush (worker thread, delivery.py:_flush):
- Phase 2: Conflict Resolution (full CDC only) — on the concatenation of all buffered reads: cancel
insert + deletepairs for the samerowid, dropupdate_postimageshadowed by a delete, dropinsertshadowed by a postimage (apply.py:_resolve_conflicts). Sound across reads: the union of adjacent half-open ranges behaves exactly like one wide read. - Phase 3: Apply (full CDC only) — per-key last-write-wins dedup of upsert candidates (
Winner(k)), then chunked deletes followed by upsert inside a single destinationbegin_transaction()(apply.py:_apply_changes). Append-only mode: oneappend(). - Advance cursor — upsert the persisted cursor in Postgres, with a short in-process retry since the destination commit already landed (
state.py:advance_cursor).
At most one flush per destination is in flight (the in-flight guard); flushes for different destinations run concurrently on delivery.workers threads. A failed flush drops the destination's buffers, resets its read position to the persisted cursor, and lets the next cycle re-read the range — at-least-once, no poison-buffer growth.
| Operation | Append-only mode | Full CDC mode | Notes |
|---|---|---|---|
| INSERT | Yes | Yes | Core use case. Rows inserted between snapshots are read and routed. |
| DELETE | No | Yes | Deleted rows are replicated via table.delete(filter) on destinations. Requires key_columns. |
| UPDATE | No | Yes | DuckLake models updates as preimage + postimage. Viaduck upserts postimages via table.upsert(df, key_columns). |
| Cross-tenant UPDATE | No | Yes | When an update changes the routing value (e.g. company), the row is deleted from the old destination and inserted to the new one. Metricked via viaduck_cdc_routing_mutations_total. |
| Schema evolution | Partial | Partial | New source columns included in CDC reads. Destination schemas not auto-evolved. Restart viaduck for new destinations to get updated schema. |
| Scenario | Behavior | Tested |
|---|---|---|
| No snapshots on source | Poll returns early, sleeps | test_poll_cycle_no_snapshots |
| All destinations caught up | No CDC reads, no writes | test_poll_cycle_all_caught_up |
| Empty changeset | Cursors advanced without writing | test_poll_cycle_empty_changeset_advances_cursors |
| Destinations at different snapshots | Grouped CDC reads — one per distinct cursor position | test_group_by_cursor_mixed |
| New destination (no prior state) | Initialized at snapshot 0, replays full history | test_initialize_destinations_creates_new |
| Rows with no matching destination | Counted as unrouted, metricked, silently dropped | test_split_string_no_match |
| NULL values in routing column | Not routed, counted as unrouted | test_split_null_values_not_routed |
| Routing field missing from schema | RoutingError raised, group processing halted |
test_poll_cycle_routing_error_breaks_gracefully |
| Insert then delete same row in range | Insert dropped; the delete survives as a tombstone (idempotent no-op normally; heals commit/cursor-gap phantoms on replay) | test_resolve_conflicts_insert_delete_keeps_tombstone, test_torture_insert_update_delete_same_key |
| Update then delete same row | Postimage dropped, delete kept — row removed | test_resolve_conflicts_update_delete_keeps_delete |
| Same key, different logical rows | NOT cancelled — uses rowid to distinguish |
test_resolve_conflicts_same_key_different_rowid_no_cancel |
| Cross-tenant row migration | Preimage → delete on old dest, postimage → upsert on new dest | test_resolve_preimages_cross_tenant_converts_to_delete, test_torture_routing_value_mutation_cross_tenant |
| Orphaned preimage (no postimage) | Converted to delete (defensive) | test_resolve_preimages_orphaned_converts_to_delete |
| NULL in key column | Delete filter uses IS NULL |
test_build_delete_filter_null_in_key, test_torture_delete_filter_null_composite_key |
| Delete-only changeset | Only deletes applied, cursor advanced | test_apply_changes_deletes_only |
At-least-once. There is no cross-catalog transaction support in DuckLake. The write-then-advance-cursor pattern means:
- Buffered data is in-memory only. A crash loses all buffers and read positions; the next start re-reads everything after the persisted cursors. Nothing buffered is ever the only copy — the source snapshot history is the durable log.
- If the process crashes after a destination commit but before the cursor persists, the next start re-reads the same range and re-applies it. Full CDC mode re-applies idempotently (delete + upsert); append-only mode duplicates rows.
- In full CDC mode, deletes + upserts on each destination are applied within a single transaction. A crash mid-apply rolls back both — no partial state.
- A failed flush drops the destination's buffers and leaves its cursor untouched; the range is re-read next cycle. Other destinations are unaffected.
- The commit/cursor-gap window heals itself. If a row is committed to a destination, the cursor fails to persist (crash, or a PG outage outlasting the in-process retry), and the source deletes that row before the re-read — the replayed delete survives conflict resolution as a tombstone and removes the committed row. (Earlier revisions cancelled the insert+delete pair, making such phantoms permanent; the tombstone rule retired that limitation, machine-checked by TLC with no crash conditioning.)
- Seeding is REPLACE. A destination at cursor 0 with existing rows (only legitimate cause: a crashed prior seed) is truncated before the seed streams, so re-seeds are full repairs in both CDC modes. Set
routing.seed_truncate: falseto refuse loudly instead.
There is no data loss path — machine-checked, not just argued: every consistency invariant (including phantom-freedom) holds with no crash conditioning at all. The source DuckLake's snapshot history is the durable log.
| Failure | Impact | Recovery | Isolation |
|---|---|---|---|
| Source catalog unavailable | CDC read fails | Poll loop exits, buffered data is drained to destinations, process exits cleanly; K8s (restartPolicy: Always) restarts it and it resumes from the persisted cursors |
All destinations blocked |
| Destination catalog unavailable | Flush fails | 3 retries with backoff (apply.py:_write_with_retry), then error recorded, connection evicted, buffers dropped, read position reset to the persisted cursor. Range re-read next cycle. |
Per-destination — other destinations unaffected |
| Crash with data buffered | Buffers and read positions lost | Re-read from persisted cursors on restart. No data loss. | All destinations re-read |
| Crash after destination commit, before cursor persist | Destination has data, cursor stale | At-least-once: range re-read and re-applied idempotently (full CDC); tombstones remove any since-deleted rows the crashed commit wrote. | Per-destination |
| Cursor persist failure (PG down) | Destination commit already landed | 3 in-process retries (delivery.py:_advance_cursor_with_retry); on exhaustion, same path as flush failure — range re-read, idempotent re-apply. |
Per-destination |
| Destination apply failure (full CDC) | Delete + upsert transaction rolled back | No partial state on destination. Buffer dropped, range re-read. | Per-destination |
| SIGTERM with data buffered | Shutdown drain | drain() flushes everything buffered (trigger=shutdown), bounded by a 60s deadline; anything abandoned is re-read on restart. Note: the 60s deadline exceeds K8s's default 30s terminationGracePeriodSeconds — raise the grace period or expect SIGKILL to cut the drain short (safe, just re-read). |
— |
| Global buffer watermark exceeded | Memory pressure | Largest buffers force-flushed first; CDC reads pause if every buffering destination is already in flight. | All destinations |
| Routing field missing from source | RoutingError halts group processing |
Error metricked, logged. Requires config or schema fix. | All destinations in group |
| Connection pool eviction storm | Frequent close/reopen | Automatic via LRU; pinned (in-use) connections are never evicted mid-transaction. Increase delivery.pool_max_open if thrashing. |
Performance, not correctness |
- Dynamic destination discovery — destinations are defined statically in YAML. No runtime discovery from a DuckLake table.
- Schema evolution propagation — source schema changes are not automatically applied to existing destination tables. Requires viaduck restart.
- Halted destination state — permanent per-destination failures (e.g. schema mismatch) retry forever with growing re-read ranges; no schema pre-flight check or consecutive-failure circuit breaker yet (see Error states).
- Exactly-once delivery — at-least-once only. No deduplication layer.
- PostHog events integration — operational events (flush failures, seeds, drains, halted destinations) emitted as PostHog events for product-side observability.
- Rowid-reuse tolerance — DuckLake reuses a rowid when an upsert re-creates a deleted key; if the re-create lands in the same flush window as its predecessor's delete, Phase 2 mistakes them for one row and the new row is lost (pre-existing; both old and new conflict rules affected; append-only mode immune). Fix in design: snapshot-ordered latest-event-wins conflict resolution.
- E2E tests — the docker-compose stack supports a manual kill-sequence soak (SIGKILL/SIGTERM + convergence diff); not yet automated in CI.
Config via YAML file (default: viaduck.yaml). Credentials are never in the YAML — use _env suffix to reference environment variables.
source:
name: "source"
postgres_uri_env: "SOURCE_POSTGRES_URI" # env var containing the connection string
data_path: "s3://source-bucket/data"
table: "events"
properties:
s3_endpoint: "minio:9000"
s3_access_key_id_env: "S3_ACCESS_KEY_ID" # _env suffix → read from env var
s3_secret_access_key_env: "S3_SECRET_ACCESS_KEY"
s3_use_ssl: "false"
s3_url_style: "path"
routing:
field: "company" # column in source table to route on
mode: "full_cdc" # required: "full_cdc" or "append_only"
key_columns: ["event_id"] # required iff mode=full_cdc (upsert join keys)
# forbidden if mode=append_only
seed_mode: "scan" # "scan" (default), "earliest", or "latest"
seed_truncate: true # REPLACE-semantics seeding (truncate a cursor-0, non-empty dest)
destinations:
- id: "quacksworth-lake" # internal identifier (state tracking, logs)
routing_value: "quacksworth" # rows where company='quacksworth' go here
name: "quacksworth_catalog"
postgres_uri_env: "DEST_QUACKSWORTH_POSTGRES_URI"
data_path: "s3://quacksworth-data/"
table: "events" # defaults to source table name if omitted
- id: "mallardine-lake"
routing_value: "mallardine"
name: "mallardine_catalog"
postgres_uri_env: "DEST_MALLARDINE_POSTGRES_URI"
data_path: "s3://mallardine-data/"
defaults:
properties: # inherited by all destinations
s3_endpoint: "minio:9000"
s3_use_ssl: "false"
s3_url_style: "path"
poll:
interval_seconds: 5 # how often to poll for new snapshots
state:
table: "viaduck_state" # Postgres cursor table
schema: "viaduck" # dedicated schema, never pollutes ducklake's namespace
# postgres_uri_env: "STATE_POSTGRES_URI" # defaults to the source catalog's URI
delivery:
workers: 8 # flush worker threads
flush_interval_seconds: 120 # per-destination max buffer age
flush_max_rows: 500000 # per-destination row trigger
flush_max_bytes: 268435456 # per-destination byte trigger (256 MiB)
buffer_total_max_bytes: 1073741824 # global force-flush watermark (1 GiB)
pool_max_open: 100 # destination connection pool size
server:
port: 8000 # metrics, health checks, status UI
web:
enabled: true # live status web UI at /ui (same port as server)
instance:
id: "viaduck-0" # unique per scaling instance
partition:
mode: "all" # all | explicit | hash
# explicit: include: ["quacksworth-lake", "mallardine-lake"]
# hash: total: 4, ordinal: 0Config parsing and validation: config.py, tests: test_config.py.
Routing values can be strings or integers (YAML unquoted integers are coerced to strings). The router detects the source column's Arrow type at runtime and casts accordingly (router.py:_make_scalar).
The routing column must not be updated. Viaduck assumes the routing field value is immutable for the lifetime of a row. This is a design constraint that enables efficient CDC filter pushdown — the filter_expr in table_changes() only includes the current destination routing values, which would miss preimages whose routing value was changed to a value outside the current set.
If a routing column mutation is detected (e.g. via an UPDATE that changes company), viaduck handles it defensively (delete from old destination, upsert to new) and logs an ERROR with the viaduck_cdc_routing_mutations_total metric. However, data integrity is not guaranteed in this case — other preimages may have been dropped by the filter pushdown.
When using full CDC mode (key_columns configured), the source table must not use the column names change_type, snapshot_id, or rowid. These are metadata columns injected by DuckLake's ducklake_table_changes() function and are stripped before writing to destinations.
Viaduck stores per-destination replication cursors in a plain Postgres table (viaduck.viaduck_state by default), created with CREATE SCHEMA / CREATE TABLE IF NOT EXISTS at startup (state.py). By default it lives in the same Postgres database that hosts the source DuckLake's metadata, but in a dedicated viaduck schema so it never cohabits with ducklake's catalog tables — and it is NOT a DuckLake table: a cursor advance must not create catalog snapshots, or idle destinations would generate CDC work forever (the snapshot treadmill).
The default reuses the source catalog's credentials. For production hardening, point state.postgres_uri_env at a scoped-down user with USAGE on the viaduck schema plus table grants only (logged future work).
| Column | Type | Purpose |
|---|---|---|
destination_id |
TEXT | Which destination (e.g. "quacksworth-lake") |
instance_id |
TEXT | Which viaduck instance owns this cursor |
last_snapshot_id |
BIGINT | Last successfully flushed source snapshot |
rows_replicated |
BIGINT | Cumulative operations applied to this destination |
last_error |
TEXT | Last error message (NULL if healthy) |
last_error_at / last_replicated_at / updated_at |
TIMESTAMPTZ | Timestamps |
Cursor advances are single INSERT ... ON CONFLICT DO UPDATE upserts with a monotonicity guard (WHERE last_snapshot_id <= EXCLUDED.last_snapshot_id) — a stale writer can never move a cursor backwards. The connection is lock-guarded with reconnect-on-failure; cursor writes happen on the flush cadence, so PG write rate is per-flush, not per-poll.
State is keyed by (destination_id, instance_id), enabling multiple viaduck instances to independently track their assigned destinations without conflicts.
When a new destination is added to the config, it needs the current source data. Three modes are available via routing.seed_mode:
| Mode | How it works | When to use |
|---|---|---|
scan (default) |
Reads current source state via filtered table.scan(), bulk-loads the destination, sets cursor to current snapshot |
Most use cases. Fast — reads one snapshot, not historical CDC. |
latest |
Skip backfill: cursor starts at the source head; only events from now onward are replicated | High-volume sources where historical backfill is infeasible (e.g. the PostHog events_nrt pipeline) |
earliest |
Cursor starts at the source's minimum snapshot; CDC catches up forward from there | Replicating a freshly-provisioned source where you need every event but no destination state existed |
cdc_replay was removed in v0.0.28 — use earliest if you need a full historical replay.
With scan mode, adding a new tenant is instant regardless of source history depth. The scan is pinned to the snapshot captured at startup, so no race condition between the scan and cursor advancement (main.py:_seed_new_destinations).
When mode: full_cdc, seeding uses upsert for idempotency — safe if the process crashes mid-seed and re-seeds on restart. When mode: append_only, seeding uses append (at-least-once semantics apply).
Seeding has REPLACE semantics: a destination at cursor 0 with existing rows can only be a crashed prior seed (single-master assumption), so it is truncated — with a loud WARNING — before the seed streams. Crash mid-seed leaves the cursor at 0 and the next attempt re-truncates: convergent, and it also fixes the historical append-mode re-seed duplication. routing.seed_truncate: false switches the behavior to refuse-loudly, protecting a misconfigured destination pointed at a populated table. The truncate is scoped to the destination's routing value, so even two destinations misconfigured onto one physical table cannot wipe each other. Downstream readers of destination lakes should gate on cursor > 0 (the seed may leave the table briefly empty during a re-seed retry).
The seed scan also verifies key_columns uniqueness within the partition (DuckLake has no unique constraints, so the key contract is otherwise unenforceable). Duplicate keys would silently corrupt the destination — delete-by-key over-deletes, duplicate-key upserts duplicate — so a violation fails the seed loudly before the cursor advances. The check piggybacks on the scan the seed already does (zero extra I/O, memory bounded by the partition's distinct-key count). Rows inserted after seeding are not re-verified; the contract still applies.
Verified by TLC: the SeedDestination action in the TLA+ spec produces the same invariant-satisfying state as a full CDC replay (tla/Viaduck.tla).
With 1000s of destinations, holding all connections open is not viable (~20-30MB per DuckDB catalog connection). Viaduck uses an LRU pool (destination.py:DestinationPool):
- Lazy initialization — connections created on first access, outside the pool lock so concurrent workers create in parallel
- LRU eviction — when at capacity (
delivery.pool_max_open, default 100), the least recently used connection is closed - Lease pinning — a flush worker's
get()pins the connection untilrelease(); eviction never closes a catalog another worker is mid-transaction on (a pinned entry evicted by error handling is closed at final release) - Error eviction — failed connections are evicted immediately and recreated on next access
- Schema caching — source table schema fetched once at startup, reused for all destination table creation
Metrics: viaduck_pool_open_connections, viaduck_pool_evictions_total, viaduck_pool_creates_total.
Tests: test_destination.py — LRU ordering, eviction, error handling, max_open validation.
Three partition modes (config.py:assigned_destination_ids):
| Mode | How it works | Use case |
|---|---|---|
all (default) |
Single instance handles all destinations | Small deployments |
explicit |
YAML lists destination IDs for this instance | Operator-controlled assignment |
hash |
md5(destination_id) % total == ordinal |
Automatic, no coordination |
Each instance only processes its assigned destinations. State rows are keyed by instance_id, so instances don't conflict.
The web UI (/ui) and status API (/status) report a per-destination operational status. Raw flush lag is NOT the signal — between flushes the cursor always trails the source; that's the buffering design working:
| Status | Meaning |
|---|---|
healthy |
Flushed through the current source snapshot — nothing pending |
buffering |
Reads are current; changes are in the in-memory buffer awaiting the next flush (normal steady state) |
flushing |
A flush is writing this destination right now |
lagging |
CDC reads are behind the source — data exists that viaduck has not yet read |
error |
The last flush failed; the buffered range was dropped and will be re-read (see Error states) |
27 Prometheus metrics exposed on GET /metrics (port 8000). Pipeline label auto-injected (metrics.py):
| Metric | Type | Labels | Description |
|---|---|---|---|
viaduck_polls_total |
Counter | — | Poll cycles executed |
viaduck_cdc_read_seconds |
Histogram | — | Time to read CDC from source |
viaduck_cdc_rows_read_total |
Counter | — | Total rows read from source |
viaduck_cdc_batch_rows |
Histogram | — | Rows per CDC read (monitors batch sizes; large values indicate poll interval too slow) |
viaduck_source_snapshot_id |
Gauge | — | Current source snapshot ID |
viaduck_dest_write_seconds |
Histogram | destination | Time per destination write |
viaduck_dest_rows_written_total |
Counter | destination | Rows appended (append-only mode) |
viaduck_dest_rows_deleted_total |
Counter | destination | Rows deleted via CDC |
viaduck_dest_rows_upserted_total |
Counter | destination | Rows sent to upsert (input count) |
viaduck_dest_upsert_matched_total |
Counter | destination | Rows that matched existing rows during upsert (updates vs inserts) |
viaduck_dest_last_snapshot_id |
Gauge | destination | Last replicated snapshot |
viaduck_dest_lag_snapshots |
Gauge | destination | Snapshot lag per destination |
viaduck_unrouted_rows_total |
Counter | — | Rows with no matching destination |
viaduck_pool_open_connections |
Gauge | — | Open destination connections |
viaduck_pool_evictions_total |
Counter | — | LRU evictions |
viaduck_pool_creates_total |
Counter | — | Connections created |
viaduck_delivery_buffer_rows |
Gauge | destination | Rows buffered awaiting flush |
viaduck_delivery_buffer_bytes |
Gauge | destination | Bytes buffered awaiting flush |
viaduck_delivery_buffer_total_bytes |
Gauge | — | Total buffered + in-flight bytes (watermark input) |
viaduck_delivery_flushes_total |
Counter | destination, trigger | Flushes by trigger (interval/rows/bytes/memory/shutdown) |
viaduck_delivery_flush_seconds |
Histogram | destination | Flush duration (data flushes only) |
viaduck_delivery_buffers_dropped_total |
Counter | destination | Buffers dropped on flush failure |
viaduck_cdc_routing_mutations_total |
Counter | — | Cross-tenant routing value changes |
viaduck_cdc_conflicts_resolved_total |
Counter | — | Rowid-level conflicts resolved in Phase 2 |
viaduck_cdc_tombstones_emitted_total |
Counter | — | Deletes surviving from insert+delete pairs (write cost of phantom healing; churn signal) |
viaduck_cdc_orphaned_preimages_total |
Counter | — | Preimages with no matching postimage |
viaduck_errors_total |
Counter | type, destination | Errors by type |
just sync # install dependencies
just run -- -c my.yaml # run with config filejust ci requires semgrep for security scanning. Install:
brew install semgrepjust fmt # format code
just lint # lint code
just test # run unit tests
just test-integration # run integration tests (local DuckDB)
just test-perf # run performance benchmarks
just test-perf-json # benchmarks + JSON output to perf-results.json
just ci # full CI: lock-check, format, lint, tests, semgrep, Docker build
just docs # render d2 diagrams to SVG
just docs-check # verify all README links are valid
just up # start docker-compose stack
just demo # stack + live producer traffic, wait for health, open the web UI
just down # stop docker-compose stack11 benchmarks in tests/perf/test_fanout_perf.py exercise the hot path at scale (M2 MacBook, single run):
| Benchmark | Scale | Budget | Typical |
|---|---|---|---|
| Router split | 10K rows, 100 dests | <1s | ~16ms |
| Router split | 100K rows, 1000 dests | <5s | ~8ms |
| Router split | 1M rows, 10K dests | <60s | ~134ms |
| Preimage resolution | 50K CDC rows | <2s | ~5ms |
| Preimage resolution | 50K rows, matched pairs | <2s | ~5ms |
| Conflict resolution | 50K rows, ~5% conflicts | <1s | ~5ms |
| Delete filter (single key) | 1000 rows | <1s | ~25ms |
| Delete filter (composite key) | 500 rows, 3-col key | <2s | ~2ms |
| Delivery fanout (end-to-end) | 200 dests × 100 rows | <600s | 4.8s (42 dests/s) |
| Delivery fanout (end-to-end) | 500 dests × 100 rows | <600s | 11.8s (42 dests/s) |
| Delivery fanout (end-to-end) | 1000 dests × 100 rows | <600s | 24.2s (41 dests/s) |
The fanout benchmark is the honest end-to-end number: full-CDC flushes through the DeliveryManager worker pool (workers=8) against real local DuckLake catalogs, with every flush paying catalog creation (fanout exceeds pool_max_open, so it measures continuous-eviction worst case). Scaling is flat — ~42 destinations/s at 200, 500, and 1000 — so a 1000-destination deployment drains a full flush cycle in ~25s, comfortably inside the default 120s flush interval. Cursor persistence and object-store latency are excluded (per-flush costs, not per-destination-count costs); see the benchmark docstring.
just test-perf-json writes results to perf-results.json for CI regression tracking.
delivery.workers controls flush parallelism, but two process-global thread pools sit underneath every worker: Arrow's compute pool (Phase 2/3 kernels) and DuckDB's per-connection threads (destination commits). Workers spend most of a flush inside those pools with the GIL released, so worker count is effectively a concurrency knob (overlapping I/O and commit latency across destinations), not a CPU multiplier — raising workers well past the core count buys queueing, not throughput. The default (8, ≈ core count) is a reasonable starting point; a measured worker-count sweep hasn't been done, so treat per-deployment tuning as empirical.
A Grafana dashboard covering the metrics is included. Available at http://localhost:3000/d/viaduck/viaduck when running just up.
kubectl apply -f k8s/service.yaml
kubectl apply -f k8s/pdb.yaml
kubectl apply -f k8s/deployment.yamlViaduck runs as a K8s Deployment (not StatefulSet — no ordinal-based identity needed). For horizontal scaling, deploy multiple instances with different instance.partition configs. See k8s/deployment.yaml for manifests.
| Operation | Attempts | Backoff | On exhaustion |
|---|---|---|---|
| Destination write (flush) | 3 | 1s, 2s | Error recorded, connection evicted, buffers dropped, read position reset. Range re-read next cycle. Other destinations unaffected. |
| Cursor persist (after destination commit) | 3 | 0.5s, 1s | Falls back to the flush-failure path: range re-read, idempotent re-apply. |
| Source CDC read | 1 | — | Poll loop exits; buffered data drains, process exits cleanly, K8s restarts, resume from cursors. |
Every error state and how it resolves:
| Error | Where | Resolution |
|---|---|---|
Flush failure (errors_total{type="dest_write"}) |
Destination write fails after retries — catalog down, transaction failure, schema mismatch | Automatic: connection evicted, buffer dropped, read position reset to the persisted cursor, range re-read next cycle. last_error durable in PG and shown in the UI (error status); cleared on the next successful flush |
| Cursor persist failure after destination commit | PG outage outlasting the in-process retry | Same path as flush failure; the re-read re-applies idempotently and tombstones heal any phantom (the spec's FlushCommitNoCursor window) |
Routing error (errors_total{type="routing"}) |
Key column missing from CDC data (Phase 1) or routing field missing from the source schema | Group processing halts for the cycle, retried every poll. Needs an operator fix (config or source schema); affects all destinations in the read group |
| Source CDC read failure | Any poll-cycle exception | Poll loop exits, buffered data drains, clean process exit; K8s restarts and resumes from persisted cursors |
| Seed key-uniqueness violation | key_columns not unique in the source partition |
Fatal before the cursor advances; fix the source data or key_columns, the seed re-runs on restart |
| Partial seed (error or crash mid-seed) | Seed scan/write interrupted | Cursor stays at 0 → full re-seed on restart (idempotent with key_columns) |
| State-store write failure while recording an error | PG down during failure bookkeeping | Caught and logged; the position/buffer reset has already happened, so recovery is unaffected |
| Pool connection create/close failure | Destination catalog connect | Evicted and recreated on next access; close failures are log-warnings only |
| Escaped flush-worker exception | Bug guard | CRITICAL log; in-flight state cleared in finally so the destination isn't wedged |
Permanent failures retry forever. A destination that can never succeed (e.g. schema mismatch) loops through the flush-failure path at flush cadence, and because its cursor never advances, its re-read range — and the source I/O spent on it — grows every cycle. The failure is fully visible (error status, last_error, climbing dest_lag_snapshots, delivery_buffers_dropped_total) but never escalates. A halted destination state (schema pre-flight check / consecutive-failure circuit breaker) is logged future work.
Why exit on source failure? The source catalog is the single point of truth. If it's unreachable, there's nothing to do. The exit path runs the normal shutdown drain (already-buffered data still flushes), then lets K8s handle the restart. DuckLake snapshot history ensures no data loss.
MIT License. Copyright (c) 2026 PostHog, Inc.
