Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 21 additions & 53 deletions AGENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,59 +126,27 @@ The 3-phase CDC algorithm is eventually consistent under these assumptions:
- Within `catalog.begin_transaction()`: chunked deletes first, then upsert
- Crash mid-apply → transaction rolled back, no partial state

**Phase 3 fast path: `append_at_least_once`** (per-destination opt-in)

`DestinationConfig.append_at_least_once: bool` (default `false`). When `true`
AND `_is_pure_insert_batch(batch)` (every row's `change_type == "insert"`),
`_apply_changes` calls `tbl.append(rows)` instead of `tbl.upsert(rows, join_cols=key_columns)`.

Motivation: `pyducklake.Table.upsert()` runs `self.scan().count()` twice (before
and after the MERGE, just to populate `UpsertResult.rows_updated/rows_inserted`)
and the MERGE itself joins source keys against the target table — both scale
with destination size, and both are wasted work when the batch contains no
updates. For an insert-only events workload with UUID keys (no min/max stat
pruning helps), the join reads the whole target every flush to confirm zero
matches. `tbl.append()` skips all of that — pure write.

Contract change to flag carefully. The upsert path doesn't just "make apply
idempotent on retry" — it also silently collapses **upstream at-least-once
duplicates** (a CDC redelivery of the same source snapshot range arrives as
the same input batch; MERGE WHEN MATCHED reduces it to one destination row).
The fast path does neither. Both apply-retry duplicates AND upstream CDC
duplicates now physically materialize in the destination table and propagate
to every downstream consumer (queries, exports, lakehouse aggregations) —
they no longer stop at viaduck. The end-to-end "at-least-once" contract
remains identical from upstream → viaduck → downstream, but the previously-
hidden deduplication side-effect of upsert is gone. Enable only when **every
consumer of the destination table** can tolerate per-key duplicates, not
just the immediate consumer of viaduck.

Safety net: the check is per-batch, not config-only. A non-insert row anywhere
in the batch (a delete, an `update_postimage`) falls back to the upsert path
transparently, so a future schema/CDC change that introduces updates doesn't
silently corrupt the destination. `_dedupe_upserts_last_write_wins` still
runs on the fast path so within-batch duplicate keys collapse to one row
(the "duplicates only on retry" contract isn't weakened by the fast path
itself — only by retries).

Metric implication: `viaduck_dest_upsert_matched_total` (`metrics.dest_upsert_matched_total`)
stays silent on the fast path — `tbl.append()` has no "matched" concept, and
at-least-once semantics make the question ill-defined. A destination running
in fast-path mode that has zero scrapes for this counter is consistent with
the configuration, not a bug.

TLA+ spec coverage: `tla/Viaduck.tla` models destination contents as sets,
not bags — physical row duplicates introduced by the fast path are not
observable in the spec's safety invariants (`EventualConsistency` uses set
equality; `NoPhantomWhenCurrent` only requires every dest row to trace back
to a source row, which duplicates still do). The current spec therefore
neither breaks under the fast path nor verifies it; coverage is incidental.
A future spec update would need bag/multiset semantics to model the
duplicate-count semantic difference and re-run TLC. Note that moving cursor
advance into the same transaction as the apply (the cleanest way to restore
exactly-once and re-tighten the spec) is not achievable in the current
architecture: cursor lives in source-side Postgres, apply lives in the
destination DuckLake catalog, and there is no two-phase commit between them.
**Pipeline mode** (`routing.mode`): the operator picks the entire shape of
the pipeline at config time:

- `mode: append_only` — read source via `ducklake_table_insertions` (inserts
only, no delete stream from compaction-induced file end_snapshot churn),
skip Phase 1 and Phase 2 entirely, write each flush via `tbl.append(rows)`.
Requires `key_columns: []` (the apply path doesn't use them). The
posthog/team-2 events pipeline runs in this mode.
- `mode: full_cdc` — read source via `ducklake_table_changes` (inserts +
deletes + update preimages/postimages), run Phase 1 preimage resolution
and Phase 2 conflict resolution, apply via `tbl.upsert(rows,
join_cols=key_columns)`. Requires `key_columns` non-empty.

Both validated in `RoutingConfig.__post_init__`; a misconfig fails at
startup with the operator-actionable error rather than silently selecting
the wrong path. This replaced an earlier "infer mode from
`len(key_columns) > 0`" derivation which was a silent-misconfig hazard
(an empty list flipped the entire pipeline shape with no operator-visible
signal — and an earlier attempt to optimize the `mode: full_cdc` apply
path via a per-destination `append_at_least_once` flag was redundant for
posthog, which had been on `append_only` the whole time).

CDC batches are processed as unordered sets. This is sound because each
flush covers the union of adjacent half-open snapshot ranges
Expand Down
24 changes: 15 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,14 @@ Core modules:

## Two Modes

Viaduck operates in one of two modes based on the `key_columns` configuration:
Viaduck operates in one of two modes, selected by the **required** `routing.mode` field:

| Mode | Config | CDC API | Destination writes | Use case |
|------|--------|---------|-------------------|----------|
| **Append-only** | `key_columns` omitted or `[]` | `table_insertions()` | `append()` | Append-only tables, no primary key |
| **Full CDC** | `key_columns: [event_id]` | `table_changes()` | `delete()` + `upsert()` | Tables with primary keys, full replication |
| **Append-only** | `mode: append_only` + `key_columns: []` | `table_insertions()` | `append()` | Insert-only sources (events, append-only fact tables) |
| **Full CDC** | `mode: full_cdc` + non-empty `key_columns` | `table_changes()` | `delete()` + `upsert()` | Sources that emit deletes/updates; the join columns are the upsert keys |

`routing.mode` is required (no default). Misconfiguration (`mode` unset, unknown value, `full_cdc` with empty `key_columns`, `append_only` with non-empty `key_columns`) fails fast at startup with an actionable `ConfigError` — earlier viaduck versions inferred the mode from `len(key_columns) > 0`, which silently flipped the entire pipeline shape when an operator typo'd or accidentally emptied the list.

## Poll Cycle

Expand Down Expand Up @@ -208,9 +210,10 @@ source:

routing:
field: "company" # column in source table to route on
key_columns: ["event_id"] # primary key for delete/update replication
# omit or [] for append-only mode
seed_mode: "scan" # "scan" (default) or "cdc_replay"
mode: "full_cdc" # required: "full_cdc" or "append_only"
key_columns: ["event_id"] # required iff mode=full_cdc (upsert join keys)
# forbidden if mode=append_only
seed_mode: "scan" # "scan" (default), "earliest", or "latest"
seed_truncate: true # REPLACE-semantics seeding (truncate a cursor-0, non-empty dest)

destinations:
Expand Down Expand Up @@ -298,16 +301,19 @@ State is keyed by `(destination_id, instance_id)`, enabling multiple viaduck ins

## New Destination Seeding

When a new destination is added to the config, it needs the current source data. Two modes are available via `routing.seed_mode`:
When a new destination is added to the config, it needs the current source data. Three modes are available via `routing.seed_mode`:

| Mode | How it works | When to use |
|------|-------------|-------------|
| `scan` (default) | Reads current source state via filtered `table.scan()`, bulk-loads the destination, sets cursor to current snapshot | Most use cases. Fast — reads one snapshot, not historical CDC. |
| `cdc_replay` | Starts cursor at snapshot 0, replays entire CDC history on first poll | Audit trails where you need to process every historical change |
| `latest` | Skip backfill: cursor starts at the source head; only events from now onward are replicated | High-volume sources where historical backfill is infeasible (e.g. the PostHog events_nrt pipeline) |
| `earliest` | Cursor starts at the source's minimum snapshot; CDC catches up forward from there | Replicating a freshly-provisioned source where you need every event but no destination state existed |

`cdc_replay` was removed in v0.0.28 — use `earliest` if you need a full historical replay.

With `scan` mode, adding a new tenant is instant regardless of source history depth. The scan is pinned to the snapshot captured at startup, so no race condition between the scan and cursor advancement ([`main.py:_seed_new_destinations`](viaduck/main.py)).

When `key_columns` is configured, seeding uses `upsert` for idempotency — safe if the process crashes mid-seed and re-seeds on restart. Without `key_columns`, seeding uses `append` (at-least-once semantics apply).
When `mode: full_cdc`, seeding uses `upsert` for idempotency — safe if the process crashes mid-seed and re-seeds on restart. When `mode: append_only`, seeding uses `append` (at-least-once semantics apply).

Seeding has **REPLACE semantics**: a destination at cursor 0 with existing rows can only be a crashed prior seed (single-master assumption), so it is truncated — with a loud WARNING — before the seed streams. Crash mid-seed leaves the cursor at 0 and the next attempt re-truncates: convergent, and it also fixes the historical append-mode re-seed duplication. `routing.seed_truncate: false` switches the behavior to refuse-loudly, protecting a misconfigured destination pointed at a populated table. The truncate is scoped to the destination's routing value, so even two destinations misconfigured onto one physical table cannot wipe each other. Downstream readers of destination lakes should gate on cursor > 0 (the seed may leave the table briefly empty during a re-seed retry).

Expand Down
Loading
Loading