diff --git a/AGENT.md b/AGENT.md index a7a8421..5962a41 100644 --- a/AGENT.md +++ b/AGENT.md @@ -126,59 +126,27 @@ The 3-phase CDC algorithm is eventually consistent under these assumptions: - Within `catalog.begin_transaction()`: chunked deletes first, then upsert - Crash mid-apply → transaction rolled back, no partial state -**Phase 3 fast path: `append_at_least_once`** (per-destination opt-in) - -`DestinationConfig.append_at_least_once: bool` (default `false`). When `true` -AND `_is_pure_insert_batch(batch)` (every row's `change_type == "insert"`), -`_apply_changes` calls `tbl.append(rows)` instead of `tbl.upsert(rows, join_cols=key_columns)`. - -Motivation: `pyducklake.Table.upsert()` runs `self.scan().count()` twice (before -and after the MERGE, just to populate `UpsertResult.rows_updated/rows_inserted`) -and the MERGE itself joins source keys against the target table — both scale -with destination size, and both are wasted work when the batch contains no -updates. For an insert-only events workload with UUID keys (no min/max stat -pruning helps), the join reads the whole target every flush to confirm zero -matches. `tbl.append()` skips all of that — pure write. - -Contract change to flag carefully. The upsert path doesn't just "make apply -idempotent on retry" — it also silently collapses **upstream at-least-once -duplicates** (a CDC redelivery of the same source snapshot range arrives as -the same input batch; MERGE WHEN MATCHED reduces it to one destination row). -The fast path does neither. Both apply-retry duplicates AND upstream CDC -duplicates now physically materialize in the destination table and propagate -to every downstream consumer (queries, exports, lakehouse aggregations) — -they no longer stop at viaduck. The end-to-end "at-least-once" contract -remains identical from upstream → viaduck → downstream, but the previously- -hidden deduplication side-effect of upsert is gone. Enable only when **every -consumer of the destination table** can tolerate per-key duplicates, not -just the immediate consumer of viaduck. - -Safety net: the check is per-batch, not config-only. A non-insert row anywhere -in the batch (a delete, an `update_postimage`) falls back to the upsert path -transparently, so a future schema/CDC change that introduces updates doesn't -silently corrupt the destination. `_dedupe_upserts_last_write_wins` still -runs on the fast path so within-batch duplicate keys collapse to one row -(the "duplicates only on retry" contract isn't weakened by the fast path -itself — only by retries). - -Metric implication: `viaduck_dest_upsert_matched_total` (`metrics.dest_upsert_matched_total`) -stays silent on the fast path — `tbl.append()` has no "matched" concept, and -at-least-once semantics make the question ill-defined. A destination running -in fast-path mode that has zero scrapes for this counter is consistent with -the configuration, not a bug. - -TLA+ spec coverage: `tla/Viaduck.tla` models destination contents as sets, -not bags — physical row duplicates introduced by the fast path are not -observable in the spec's safety invariants (`EventualConsistency` uses set -equality; `NoPhantomWhenCurrent` only requires every dest row to trace back -to a source row, which duplicates still do). The current spec therefore -neither breaks under the fast path nor verifies it; coverage is incidental. -A future spec update would need bag/multiset semantics to model the -duplicate-count semantic difference and re-run TLC. Note that moving cursor -advance into the same transaction as the apply (the cleanest way to restore -exactly-once and re-tighten the spec) is not achievable in the current -architecture: cursor lives in source-side Postgres, apply lives in the -destination DuckLake catalog, and there is no two-phase commit between them. +**Pipeline mode** (`routing.mode`): the operator picks the entire shape of +the pipeline at config time: + +- `mode: append_only` — read source via `ducklake_table_insertions` (inserts + only, no delete stream from compaction-induced file end_snapshot churn), + skip Phase 1 and Phase 2 entirely, write each flush via `tbl.append(rows)`. + Requires `key_columns: []` (the apply path doesn't use them). The + posthog/team-2 events pipeline runs in this mode. +- `mode: full_cdc` — read source via `ducklake_table_changes` (inserts + + deletes + update preimages/postimages), run Phase 1 preimage resolution + and Phase 2 conflict resolution, apply via `tbl.upsert(rows, + join_cols=key_columns)`. Requires `key_columns` non-empty. + +Both validated in `RoutingConfig.__post_init__`; a misconfig fails at +startup with the operator-actionable error rather than silently selecting +the wrong path. This replaced an earlier "infer mode from +`len(key_columns) > 0`" derivation which was a silent-misconfig hazard +(an empty list flipped the entire pipeline shape with no operator-visible +signal — and an earlier attempt to optimize the `mode: full_cdc` apply +path via a per-destination `append_at_least_once` flag was redundant for +posthog, which had been on `append_only` the whole time). CDC batches are processed as unordered sets. This is sound because each flush covers the union of adjacent half-open snapshot ranges diff --git a/README.md b/README.md index a08fd72..f550ca6 100644 --- a/README.md +++ b/README.md @@ -84,12 +84,14 @@ Core modules: ## Two Modes -Viaduck operates in one of two modes based on the `key_columns` configuration: +Viaduck operates in one of two modes, selected by the **required** `routing.mode` field: | Mode | Config | CDC API | Destination writes | Use case | |------|--------|---------|-------------------|----------| -| **Append-only** | `key_columns` omitted or `[]` | `table_insertions()` | `append()` | Append-only tables, no primary key | -| **Full CDC** | `key_columns: [event_id]` | `table_changes()` | `delete()` + `upsert()` | Tables with primary keys, full replication | +| **Append-only** | `mode: append_only` + `key_columns: []` | `table_insertions()` | `append()` | Insert-only sources (events, append-only fact tables) | +| **Full CDC** | `mode: full_cdc` + non-empty `key_columns` | `table_changes()` | `delete()` + `upsert()` | Sources that emit deletes/updates; the join columns are the upsert keys | + +`routing.mode` is required (no default). Misconfiguration (`mode` unset, unknown value, `full_cdc` with empty `key_columns`, `append_only` with non-empty `key_columns`) fails fast at startup with an actionable `ConfigError` — earlier viaduck versions inferred the mode from `len(key_columns) > 0`, which silently flipped the entire pipeline shape when an operator typo'd or accidentally emptied the list. ## Poll Cycle @@ -208,9 +210,10 @@ source: routing: field: "company" # column in source table to route on - key_columns: ["event_id"] # primary key for delete/update replication - # omit or [] for append-only mode - seed_mode: "scan" # "scan" (default) or "cdc_replay" + mode: "full_cdc" # required: "full_cdc" or "append_only" + key_columns: ["event_id"] # required iff mode=full_cdc (upsert join keys) + # forbidden if mode=append_only + seed_mode: "scan" # "scan" (default), "earliest", or "latest" seed_truncate: true # REPLACE-semantics seeding (truncate a cursor-0, non-empty dest) destinations: @@ -298,16 +301,19 @@ State is keyed by `(destination_id, instance_id)`, enabling multiple viaduck ins ## New Destination Seeding -When a new destination is added to the config, it needs the current source data. Two modes are available via `routing.seed_mode`: +When a new destination is added to the config, it needs the current source data. Three modes are available via `routing.seed_mode`: | Mode | How it works | When to use | |------|-------------|-------------| | `scan` (default) | Reads current source state via filtered `table.scan()`, bulk-loads the destination, sets cursor to current snapshot | Most use cases. Fast — reads one snapshot, not historical CDC. | -| `cdc_replay` | Starts cursor at snapshot 0, replays entire CDC history on first poll | Audit trails where you need to process every historical change | +| `latest` | Skip backfill: cursor starts at the source head; only events from now onward are replicated | High-volume sources where historical backfill is infeasible (e.g. the PostHog events_nrt pipeline) | +| `earliest` | Cursor starts at the source's minimum snapshot; CDC catches up forward from there | Replicating a freshly-provisioned source where you need every event but no destination state existed | + +`cdc_replay` was removed in v0.0.28 — use `earliest` if you need a full historical replay. With `scan` mode, adding a new tenant is instant regardless of source history depth. The scan is pinned to the snapshot captured at startup, so no race condition between the scan and cursor advancement ([`main.py:_seed_new_destinations`](viaduck/main.py)). -When `key_columns` is configured, seeding uses `upsert` for idempotency — safe if the process crashes mid-seed and re-seeds on restart. Without `key_columns`, seeding uses `append` (at-least-once semantics apply). +When `mode: full_cdc`, seeding uses `upsert` for idempotency — safe if the process crashes mid-seed and re-seeds on restart. When `mode: append_only`, seeding uses `append` (at-least-once semantics apply). Seeding has **REPLACE semantics**: a destination at cursor 0 with existing rows can only be a crashed prior seed (single-master assumption), so it is truncated — with a loud WARNING — before the seed streams. Crash mid-seed leaves the cursor at 0 and the next attempt re-truncates: convergent, and it also fixes the historical append-mode re-seed duplication. `routing.seed_truncate: false` switches the behavior to refuse-loudly, protecting a misconfigured destination pointed at a populated table. The truncate is scoped to the destination's routing value, so even two destinations misconfigured onto one physical table cannot wipe each other. Downstream readers of destination lakes should gate on cursor > 0 (the seed may leave the table briefly empty during a re-seed retry). diff --git a/tests/integration/test_append_at_least_once_integration.py b/tests/integration/test_append_at_least_once_integration.py deleted file mode 100644 index ce80230..0000000 --- a/tests/integration/test_append_at_least_once_integration.py +++ /dev/null @@ -1,334 +0,0 @@ -"""Integration locks for the append_at_least_once fast path. - -These tests drive _apply_changes end-to-end against a real pyducklake catalog -(local DuckDB, no Postgres/Docker) to validate the three properties the -contract advertises: - - 1. Pure-insert batches actually land via tbl.append() — rows are present in - the destination table after the call. - 2. The per-batch safety net works under real pyducklake: a mixed batch - (any non-insert row) transparently uses the upsert path, with both the - deletes and the updates applied correctly. - 3. The documented duplicate window is real: replaying the same pure-insert - batch under the fast path produces duplicates in the destination, which - is the contract the flag advertises. This guards against a future - re-introduction of dedup-against-existing-rows masking the tradeoff. -""" - -from __future__ import annotations - -import os - -import pyarrow as pa -import pytest -from pyducklake import Catalog, Schema -from pyducklake.types import IntegerType, NestedField, StringType - -from viaduck import metrics -from viaduck.apply import _apply_changes - -pytestmark = pytest.mark.integration - - -def setup_module(): - metrics.init("integration_test") - - -def _make_catalog(tmp_path, name: str) -> Catalog: - base = tmp_path / name - meta_db = str(base / "meta.duckdb") - data_dir = str(base / "data") - os.makedirs(data_dir, exist_ok=True) - return Catalog(name, meta_db, data_path=data_dir) - - -SCHEMA = Schema( - NestedField(field_id=1, name="event_id", field_type=IntegerType(), required=True), - NestedField(field_id=2, name="region", field_type=StringType()), - NestedField(field_id=3, name="value", field_type=IntegerType()), -) - - -def _insert_batch(event_ids: list[int], *, snapshot_id: int = 1, start_rowid: int = 0) -> pa.Table: - n = len(event_ids) - return pa.table( - { - "event_id": pa.array(event_ids, type=pa.int32()), - "region": pa.array([f"r{i % 3}" for i in range(n)], type=pa.string()), - "value": pa.array([i * 10 for i in range(n)], type=pa.int32()), - "change_type": pa.array(["insert"] * n, type=pa.string()), - "snapshot_id": pa.array([snapshot_id] * n, type=pa.int64()), - "rowid": pa.array(list(range(start_rowid, start_rowid + n)), type=pa.int64()), - } - ) - - -@pytest.fixture() -def dest(tmp_path): - catalog = _make_catalog(tmp_path, "dest") - table = catalog.create_table("events", SCHEMA) - yield catalog, table - catalog.close() - - -def test_fast_path_inserts_land_in_destination(dest): - """Pure-insert batch + flag on: rows are present after the call. Smoke - test that tbl.append() is actually wired and the rows aren't dropped - on the floor.""" - catalog, table = dest - batch = _insert_batch([1, 2, 3]) - - counts = _apply_changes(catalog, table, batch, ["event_id"], append_at_least_once=True) - - assert counts == {"deleted": 0, "upserted": 3, "upsert_matched": 0, "used_append": True} - rows = catalog.load_table("events").scan().to_arrow() - assert rows.num_rows == 3 - assert sorted(rows.column("event_id").to_pylist()) == [1, 2, 3] - - -def test_fast_path_replay_produces_duplicates(dest): - """The documented tradeoff: applying the same pure-insert batch twice - under the fast path lands the rows twice. This is the contract the - flag advertises — at-least-once delivery into the destination. - - If a future refactor silently re-introduces dedupe-against-existing-rows - on the fast path, this test will fail, which is exactly the surface we - want to fail loudly.""" - catalog, table = dest - batch = _insert_batch([1, 2]) - - _apply_changes(catalog, table, batch, ["event_id"], append_at_least_once=True) - _apply_changes(catalog, table, batch, ["event_id"], append_at_least_once=True) - - rows = catalog.load_table("events").scan().to_arrow() - assert rows.num_rows == 4 - assert sorted(rows.column("event_id").to_pylist()) == [1, 1, 2, 2] - - -def test_upsert_path_replay_is_idempotent(dest): - """Counterpoint to the above: with the flag OFF, replaying the same - batch is idempotent (MERGE WHEN MATCHED collapses dupes). This is the - semantics callers are giving up by enabling the flag.""" - catalog, table = dest - batch = _insert_batch([1, 2]) - - _apply_changes(catalog, table, batch, ["event_id"], append_at_least_once=False) - _apply_changes(catalog, table, batch, ["event_id"], append_at_least_once=False) - - rows = catalog.load_table("events").scan().to_arrow() - assert rows.num_rows == 2 - assert sorted(rows.column("event_id").to_pylist()) == [1, 2] - - -def test_mixed_batch_with_delete_falls_back_to_upsert(dest): - """Safety net under real pyducklake: flag on, batch contains a delete. - The delete actually deletes the existing row; remaining inserts land - via upsert. None of this should silently take the append path and - corrupt the destination.""" - catalog, table = dest - seed = _insert_batch([1, 2, 3]) - _apply_changes(catalog, table, seed, ["event_id"], append_at_least_once=False) - - n = 1 - mixed = pa.table( - { - "event_id": pa.array([1, 4], type=pa.int32()), - "region": pa.array(["r0", "rN"], type=pa.string()), - "value": pa.array([0, 40], type=pa.int32()), - "change_type": pa.array(["delete", "insert"], type=pa.string()), - "snapshot_id": pa.array([2, 2], type=pa.int64()), - "rowid": pa.array([100, 101], type=pa.int64()), - } - ) - - counts = _apply_changes(catalog, table, mixed, ["event_id"], append_at_least_once=True) - assert counts["deleted"] == n - assert counts["upserted"] == 1 - - rows = catalog.load_table("events").scan().to_arrow() - assert sorted(rows.column("event_id").to_pylist()) == [2, 3, 4] - - -def test_mixed_batch_with_update_postimage_falls_back_to_upsert(dest): - """Safety net for the schema-evolution case: a future CDC change introduces - update_postimage rows. The flag must not turn those into duplicate - rows via append — they must overwrite the existing key via MERGE.""" - catalog, table = dest - seed = _insert_batch([1, 2]) - _apply_changes(catalog, table, seed, ["event_id"], append_at_least_once=False) - - update = pa.table( - { - "event_id": pa.array([1], type=pa.int32()), - "region": pa.array(["updated"], type=pa.string()), - "value": pa.array([999], type=pa.int32()), - "change_type": pa.array(["update_postimage"], type=pa.string()), - "snapshot_id": pa.array([2], type=pa.int64()), - "rowid": pa.array([1], type=pa.int64()), - } - ) - - _apply_changes(catalog, table, update, ["event_id"], append_at_least_once=True) - rows = catalog.load_table("events").scan().to_arrow().sort_by([("event_id", "ascending")]) - # event_id=1 should have been updated, not duplicated. - assert rows.num_rows == 2 - event_1 = rows.filter(pa.compute.equal(rows.column("event_id"), pa.scalar(1, type=pa.int32()))) - assert event_1.column("value").to_pylist() == [999] - assert event_1.column("region").to_pylist() == ["updated"] - - -def test_fast_path_cursor_advance_failure_replays_through_delivery(tmp_path): - """The stated retry scenario in production is: destination apply commits, - then cursor advance fails (PG blip, etc.), and the next poll re-reads - the same source range and presents the same batch to apply again. This - test drives that scenario through DeliveryManager._flush (not just two - direct _apply_changes calls): - - 1. First flush: apply commits, cursor advance is mocked to raise. - 2. Second flush: same range buffered again (simulating the re-read), - apply commits a second time. - - Fast path: 4 rows in destination (2 from each apply). - Upsert path (counterpoint): 2 rows (MERGE collapsed the replay). - - Defends against a refactor that silently introduces dedup-against-existing - on the fast path, or that catches the cursor advance exception silently - and never replays. - """ - from unittest.mock import MagicMock - - from viaduck.config import DeliveryConfig - from viaduck.delivery import DeliveryManager - - catalog = _make_catalog(tmp_path, "dest") - table = catalog.create_table("events", SCHEMA) - - pool = MagicMock() - pool.get.return_value = (catalog, table) - pool.release = MagicMock() - pool.evict = MagicMock() - - state = MagicMock() - state.load_cursors.return_value = {} - # Raise once, then succeed. The 3-attempt retry inside - # _advance_cursor_with_retry burns through all 3 attempts here so the - # whole _flush call surfaces an exception, which is the exact production - # behavior we want to simulate (cursor-blip → range re-read next cycle). - advance_calls = {"n": 0} - - def advance_cursor_raises_first_time(*args, **kwargs): - advance_calls["n"] += 1 - if advance_calls["n"] <= 3: - raise RuntimeError("simulated PG cursor advance failure") - - state.advance_cursor.side_effect = advance_cursor_raises_first_time - - mgr = DeliveryManager( - DeliveryConfig(workers=1, flush_interval_seconds=0.0), - state, - pool, - ["event_id"], - ["d1"], - append_at_least_once_by_dest={"d1": True}, - ) - - batch = _insert_batch([1, 2]) - - # Apply round 1: commits to destination, cursor advance fails 3x → flush raises. - mgr.buffer("d1", batch, through_snapshot=7) - mgr.maybe_flush() - mgr.wait_idle() - - # Reset side_effect so round 2 lets cursor advance through. - state.advance_cursor.side_effect = None - - # Round 2: same range re-presented (production: poll re-reads because - # cursor didn't move). New buffer, same batch. - mgr.buffer("d1", batch, through_snapshot=7) - mgr.maybe_flush() - mgr.wait_idle() - - rows = catalog.load_table("events").scan().to_arrow() - assert rows.num_rows == 4, "fast path must duplicate the replayed batch (no MERGE collapse)" - assert sorted(rows.column("event_id").to_pylist()) == [1, 1, 2, 2] - - -def test_upsert_path_cursor_advance_failure_replays_idempotently(tmp_path): - """Counterpoint to the above: flag OFF, same scenario, the replayed - batch is collapsed by MERGE WHEN MATCHED — only 2 rows in destination. - This is the property the fast path explicitly trades away.""" - from unittest.mock import MagicMock - - from viaduck.config import DeliveryConfig - from viaduck.delivery import DeliveryManager - - catalog = _make_catalog(tmp_path, "dest") - table = catalog.create_table("events", SCHEMA) - - pool = MagicMock() - pool.get.return_value = (catalog, table) - pool.release = MagicMock() - pool.evict = MagicMock() - - state = MagicMock() - state.load_cursors.return_value = {} - advance_calls = {"n": 0} - - def advance_cursor_raises_first_time(*args, **kwargs): - advance_calls["n"] += 1 - if advance_calls["n"] <= 3: - raise RuntimeError("simulated PG cursor advance failure") - - state.advance_cursor.side_effect = advance_cursor_raises_first_time - - mgr = DeliveryManager( - DeliveryConfig(workers=1, flush_interval_seconds=0.0), - state, - pool, - ["event_id"], - ["d1"], - # No append_at_least_once_by_dest → defaults to upsert path - ) - - batch = _insert_batch([1, 2]) - - mgr.buffer("d1", batch, through_snapshot=7) - mgr.maybe_flush() - mgr.wait_idle() - - state.advance_cursor.side_effect = None - - mgr.buffer("d1", batch, through_snapshot=7) - mgr.maybe_flush() - mgr.wait_idle() - - rows = catalog.load_table("events").scan().to_arrow() - assert rows.num_rows == 2, "upsert path must collapse the replayed batch via MERGE WHEN MATCHED" - assert sorted(rows.column("event_id").to_pylist()) == [1, 2] - - -def test_fast_path_within_batch_dupes_collapse_to_winner(dest): - """Within-batch dedup runs on the fast path too: three copies of the - same key in one batch land as one row, not three. The deterministic - winner (highest snapshot_id then highest rowid) is the version that - lands.""" - catalog, table = dest - batch = pa.table( - { - "event_id": pa.array([1, 1, 1], type=pa.int32()), - "region": pa.array(["old", "newer", "newest"], type=pa.string()), - "value": pa.array([10, 20, 30], type=pa.int32()), - "change_type": pa.array(["insert", "insert", "insert"], type=pa.string()), - "snapshot_id": pa.array([1, 2, 3], type=pa.int64()), - "rowid": pa.array([1, 2, 3], type=pa.int64()), - } - ) - - counts = _apply_changes(catalog, table, batch, ["event_id"], append_at_least_once=True) - assert counts["upserted"] == 1 - - rows = catalog.load_table("events").scan().to_arrow() - assert rows.num_rows == 1 - assert rows.column("event_id").to_pylist() == [1] - assert rows.column("value").to_pylist() == [30] # highest snapshot wins - assert rows.column("region").to_pylist() == ["newest"] diff --git a/tests/integration/test_buffered_delivery.py b/tests/integration/test_buffered_delivery.py index 5a145aa..9f923b3 100644 --- a/tests/integration/test_buffered_delivery.py +++ b/tests/integration/test_buffered_delivery.py @@ -99,7 +99,9 @@ def _read_dest(pool: DestinationPool, dest_id: str) -> pa.Table: def test_buffered_flush_end_to_end(tmp_path, state): pool = _make_pool(tmp_path, ["d1"]) state.initialize_destinations(["d1"]) - mgr = DeliveryManager(DeliveryConfig(workers=2, flush_interval_seconds=0.0), state, pool, ["event_id"], ["d1"]) + mgr = DeliveryManager( + DeliveryConfig(workers=2, flush_interval_seconds=0.0), state, pool, ["event_id"], ["d1"], mode="full_cdc" + ) # Two buffered reads over adjacent ranges, flushed as one batch. mgr.buffer("d1", _cdc_batch("acme", [1, 2]), through_snapshot=7) @@ -124,7 +126,9 @@ def test_cross_read_conflict_resolution_at_flush(tmp_path, state): the row never appears in the destination.""" pool = _make_pool(tmp_path, ["d1"]) state.initialize_destinations(["d1"]) - mgr = DeliveryManager(DeliveryConfig(workers=1, flush_interval_seconds=0.0), state, pool, ["event_id"], ["d1"]) + mgr = DeliveryManager( + DeliveryConfig(workers=1, flush_interval_seconds=0.0), state, pool, ["event_id"], ["d1"], mode="full_cdc" + ) mgr.buffer("d1", _cdc_batch("acme", [1, 2], "insert"), through_snapshot=7) mgr.buffer("d1", _cdc_batch("acme", [2], "delete"), through_snapshot=8) @@ -141,7 +145,9 @@ def test_flush_failure_leaves_cursor_and_recovers(tmp_path, state): pool = _make_pool(tmp_path, ["d-bad"], broken={"d-bad"}) state.initialize_destinations(["d-bad"]) state.advance_cursor("d-bad", snapshot_id=3) - mgr = DeliveryManager(DeliveryConfig(workers=1, flush_interval_seconds=0.0), state, pool, ["event_id"], ["d-bad"]) + mgr = DeliveryManager( + DeliveryConfig(workers=1, flush_interval_seconds=0.0), state, pool, ["event_id"], ["d-bad"], mode="full_cdc" + ) mgr.buffer("d-bad", _cdc_batch("acme", [1]), through_snapshot=7) mgr.maybe_flush() @@ -160,7 +166,9 @@ def test_concurrent_multi_destination_flushes(tmp_path, state): dest_ids = [f"d{i}" for i in range(6)] pool = _make_pool(tmp_path, dest_ids) state.initialize_destinations(dest_ids) - mgr = DeliveryManager(DeliveryConfig(workers=4, flush_interval_seconds=0.0), state, pool, ["event_id"], dest_ids) + mgr = DeliveryManager( + DeliveryConfig(workers=4, flush_interval_seconds=0.0), state, pool, ["event_id"], dest_ids, mode="full_cdc" + ) for i, d in enumerate(dest_ids): mgr.buffer(d, _cdc_batch(d, list(range(1, 4 + i))), through_snapshot=5) @@ -180,7 +188,9 @@ def test_drain_flushes_buffered_data_on_shutdown(tmp_path, state): pool = _make_pool(tmp_path, ["d1"]) state.initialize_destinations(["d1"]) # Hour-long interval: nothing would flush without the shutdown trigger. - mgr = DeliveryManager(DeliveryConfig(workers=1, flush_interval_seconds=3600.0), state, pool, ["event_id"], ["d1"]) + mgr = DeliveryManager( + DeliveryConfig(workers=1, flush_interval_seconds=3600.0), state, pool, ["event_id"], ["d1"], mode="full_cdc" + ) mgr.buffer("d1", _cdc_batch("acme", [1, 2]), through_snapshot=4) mgr.drain(timeout_s=60) @@ -196,7 +206,9 @@ def test_multi_update_window_no_duplicate_keys(tmp_path, state): not duplicate rows from a duplicate-join-key upsert.""" pool = _make_pool(tmp_path, ["d1"]) state.initialize_destinations(["d1"]) - mgr = DeliveryManager(DeliveryConfig(workers=1, flush_interval_seconds=0.0), state, pool, ["event_id"], ["d1"]) + mgr = DeliveryManager( + DeliveryConfig(workers=1, flush_interval_seconds=0.0), state, pool, ["event_id"], ["d1"], mode="full_cdc" + ) insert = _cdc_batch("acme", [1]) update1 = pa.table( @@ -236,7 +248,9 @@ def test_phantom_heal_after_commit_cursor_gap(tmp_path, state): pool = _make_pool(tmp_path, ["d1"]) state.initialize_destinations(["d1"]) - mgr = DeliveryManager(DeliveryConfig(workers=1, flush_interval_seconds=0.0), state, pool, ["event_id"], ["d1"]) + mgr = DeliveryManager( + DeliveryConfig(workers=1, flush_interval_seconds=0.0), state, pool, ["event_id"], ["d1"], mode="full_cdc" + ) # Normal flush through snapshot 7: row 1 delivered, cursor = 7. mgr.buffer("d1", _cdc_batch("acme", [1]), through_snapshot=7) @@ -289,7 +303,9 @@ def test_phantom_heal_with_key_reuse_in_replay(tmp_path, state): pool = _make_pool(tmp_path, ["d1"]) state.initialize_destinations(["d1"]) - mgr = DeliveryManager(DeliveryConfig(workers=1, flush_interval_seconds=0.0), state, pool, ["event_id"], ["d1"]) + mgr = DeliveryManager( + DeliveryConfig(workers=1, flush_interval_seconds=0.0), state, pool, ["event_id"], ["d1"], mode="full_cdc" + ) crashed = pa.table( { @@ -328,7 +344,9 @@ def test_applied_counters_accumulate_by_change_type(tmp_path, state): and cumulative buffered rows accumulate on successful flushes.""" pool = _make_pool(tmp_path, ["d1"]) state.initialize_destinations(["d1"]) - mgr = DeliveryManager(DeliveryConfig(workers=1, flush_interval_seconds=0.0), state, pool, ["event_id"], ["d1"]) + mgr = DeliveryManager( + DeliveryConfig(workers=1, flush_interval_seconds=0.0), state, pool, ["event_id"], ["d1"], mode="full_cdc" + ) mgr.buffer("d1", _cdc_batch("acme", [1, 2]), through_snapshot=7) # 2 inserts mgr.buffer("d1", _cdc_batch("acme", [2], "delete"), through_snapshot=8) # 1 delete diff --git a/tests/integration/test_replication_integration.py b/tests/integration/test_replication_integration.py index 3ec6eb1..c2ccc84 100644 --- a/tests/integration/test_replication_integration.py +++ b/tests/integration/test_replication_integration.py @@ -99,7 +99,7 @@ def dest_table_b(dest_catalog_b): @pytest.fixture() def router(): - return Router(RoutingConfig(field="company", key_columns=["event_id"])) + return Router(RoutingConfig(field="company", mode="full_cdc", key_columns=["event_id"])) # --------------------------------------------------------------------------- @@ -288,7 +288,7 @@ def test_full_cdc_insert_then_delete_same_key(source_catalog, source_table, dest # Apply: tombstone delete against a destination that never saw the # insert — idempotent no-op on the data, counted as one delete op. counts = _apply_changes(dest_catalog_a, dest_table_a, resolved, ["event_id"]) - assert counts == {"deleted": 1, "upserted": 0, "upsert_matched": 0, "used_append": False} + assert counts == {"deleted": 1, "upserted": 0, "upsert_matched": 0} # Destination should be empty dest_scan = _read_all(dest_table_a) diff --git a/tests/perf/test_fanout_perf.py b/tests/perf/test_fanout_perf.py index 114a3c7..23fcaf1 100644 --- a/tests/perf/test_fanout_perf.py +++ b/tests/perf/test_fanout_perf.py @@ -133,7 +133,7 @@ def _make_conflict_batch(num_rows: int, conflict_pct: float = 0.05) -> pa.Table: @pytest.mark.perf def test_router_split_100_destinations(perf_timer): """10K rows, 100 routing values.""" - cfg = RoutingConfig(field="company") + cfg = RoutingConfig(field="company", mode="append_only") router = Router(cfg) table = _make_routing_table(10_000, 100) routing_values = [f"dest_{i}" for i in range(100)] @@ -149,7 +149,7 @@ def test_router_split_100_destinations(perf_timer): @pytest.mark.perf def test_router_split_1000_destinations(perf_timer): """100K rows, 1000 routing values.""" - cfg = RoutingConfig(field="company") + cfg = RoutingConfig(field="company", mode="append_only") router = Router(cfg) table = _make_routing_table(100_000, 1000) routing_values = [f"dest_{i}" for i in range(1000)] @@ -165,7 +165,7 @@ def test_router_split_1000_destinations(perf_timer): @pytest.mark.perf def test_router_split_1m_rows_10k_destinations(perf_timer): """1M rows, 10K routing values (production scale).""" - cfg = RoutingConfig(field="company") + cfg = RoutingConfig(field="company", mode="append_only") router = Router(cfg) table = _make_routing_table(1_000_000, 10_000) routing_values = [f"dest_{i}" for i in range(10_000)] @@ -375,6 +375,7 @@ def test_delivery_fanout(perf_timer, tmp_path, n_dests): pool, ["event_id"], dest_ids, + mode="full_cdc", ) for d in dest_ids: diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index ef0d2ff..f4ff3de 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -26,6 +26,7 @@ routing: field: company + mode: append_only destinations: - id: quacksworth-lake @@ -103,7 +104,7 @@ def test_load_no_destinations(tmp_path: Path): p = tmp_path / "bad.yaml" p.write_text( "source:\n name: s\n postgres_uri_env: X\n data_path: s3://s/\n table: t\n" - "routing:\n field: x\ndestinations: []\n" + "routing:\n field: x\n mode: append_only\ndestinations: []\n" ) with pytest.raises(ConfigError, match="At least one destination"): load(p) @@ -148,58 +149,96 @@ def test_destination_custom_table(tmp_path: Path): assert cfg.destinations[0].table == "custom_events" -def test_destination_append_at_least_once_defaults_false(config_file: Path): - """Field defaults to False — opt-in by destination.""" +def test_routing_mode_required(config_file: Path): + """Loading the MINIMAL_YAML (which sets mode: append_only) parses fine — + this is the baseline the matrix tests below mutate.""" cfg = load(config_file) - assert cfg.destinations[0].append_at_least_once is False + assert cfg.routing.mode == "append_only" -def test_destination_append_at_least_once_true(tmp_path: Path): - p = tmp_path / "cfg.yaml" - p.write_text( - MINIMAL_YAML.replace( - " data_path: s3://quacksworth/", " data_path: s3://quacksworth/\n append_at_least_once: true" - ) - ) - cfg = load(p) - assert cfg.destinations[0].append_at_least_once is True +def test_routing_mode_unset_rejected(tmp_path: Path): + """No mode → ConfigError. We don't want to fall back to inferring from + key_columns presence; that was the old silent-misconfig hazard. + A missing key resolves to the dataclass default `mode=""`, which the + empty-case branch matches with a "required, no default" message.""" + raw = MINIMAL_YAML.replace("\n mode: append_only", "") + p = tmp_path / "no_mode.yaml" + p.write_text(raw) + with pytest.raises(ConfigError, match=r"routing.mode is required, no default"): + load(p) -def test_destination_append_at_least_once_false_explicit(tmp_path: Path): - p = tmp_path / "cfg.yaml" - p.write_text( - MINIMAL_YAML.replace( - " data_path: s3://quacksworth/", " data_path: s3://quacksworth/\n append_at_least_once: false" - ) - ) + +def test_routing_mode_unknown_value_rejected(tmp_path: Path): + """Typos or stale legacy values fail loudly with the enum listed in the + error so the operator can self-correct without grepping source.""" + raw = MINIMAL_YAML.replace("mode: append_only", "mode: cdc_replay") + p = tmp_path / "bad_mode.yaml" + p.write_text(raw) + with pytest.raises(ConfigError, match=r"routing.mode must be one of \['full_cdc', 'append_only'\]"): + load(p) + + +def test_routing_mode_full_cdc_requires_key_columns(tmp_path: Path): + """The whole point of full_cdc is the upsert join keys — empty + key_columns is operator misconfig (the old derivation flipped silently + to append_only here).""" + raw = MINIMAL_YAML.replace("mode: append_only", "mode: full_cdc") + p = tmp_path / "full_cdc_no_keys.yaml" + p.write_text(raw) + with pytest.raises(ConfigError, match="full_cdc.*requires.*key_columns"): + load(p) + + +def test_routing_mode_full_cdc_with_key_columns_ok(tmp_path: Path): + raw = MINIMAL_YAML.replace("mode: append_only", "mode: full_cdc\n key_columns: [uuid]") + p = tmp_path / "full_cdc_with_keys.yaml" + p.write_text(raw) cfg = load(p) - assert cfg.destinations[0].append_at_least_once is False + assert cfg.routing.mode == "full_cdc" + assert cfg.routing.key_columns == ["uuid"] + + +def test_routing_mode_yaml_bool_coerced_rejected(tmp_path: Path): + """YAML 1.1 coerces `mode: yes` to Python True (PyYAML behavior). The + validator catches the type mismatch with a quote-hint before the enum + check fires with a confusing `got True` message.""" + raw = MINIMAL_YAML.replace("mode: append_only", "mode: yes") + p = tmp_path / "yaml_bool.yaml" + p.write_text(raw) + with pytest.raises(ConfigError, match=r"routing.mode must be a string"): + load(p) -def test_destination_append_at_least_once_rejects_string(tmp_path: Path): - """YAML scalar coercion is loose — an unquoted "true" might land as a bool - but a quoted "true" would land as a string. Reject non-bool loudly so a - config typo can't silently disable the optimization (or worse, enable it - on a destination that needs strict idempotency).""" - p = tmp_path / "cfg.yaml" - p.write_text( - MINIMAL_YAML.replace( - " data_path: s3://quacksworth/", " data_path: s3://quacksworth/\n append_at_least_once: 'true'" - ) - ) - with pytest.raises(ConfigError, match="append_at_least_once.*boolean"): +def test_routing_mode_empty_string_distinguished_from_unknown(tmp_path: Path): + """`mode:` (empty value) is operator-omission, not a typo. Distinct + error message tells them to set it rather than guess the typo.""" + raw = MINIMAL_YAML.replace("mode: append_only", "mode: ''") + p = tmp_path / "empty_mode.yaml" + p.write_text(raw) + with pytest.raises(ConfigError, match=r"routing.mode is required, no default"): load(p) -def test_destination_append_at_least_once_rejects_int(tmp_path: Path): - """Reject `: 1` — Python's bool is an int subclass, but YAML 1 is an int.""" - p = tmp_path / "cfg.yaml" - p.write_text( - MINIMAL_YAML.replace( - " data_path: s3://quacksworth/", " data_path: s3://quacksworth/\n append_at_least_once: 1" - ) - ) - with pytest.raises(ConfigError, match="append_at_least_once.*boolean"): +def test_routing_mode_null_value_routes_to_required_message(tmp_path: Path): + """`mode:` (key present, no value) parses to Python None via PyYAML. + The loader coerces None→"" before construction so the operator gets the + "is required" guidance instead of the isinstance "quote the value if + YAML coerced it" hint — which would point them at the wrong fix.""" + raw = MINIMAL_YAML.replace("mode: append_only", "mode:") + p = tmp_path / "null_mode.yaml" + p.write_text(raw) + with pytest.raises(ConfigError, match=r"routing.mode is required, no default"): + load(p) + + +def test_routing_mode_append_only_forbids_key_columns(tmp_path: Path): + """append_only's apply path doesn't use key_columns at all; a non-empty + list is misconfig the operator must clear or switch modes for.""" + raw = MINIMAL_YAML.replace("mode: append_only", "mode: append_only\n key_columns: [uuid]") + p = tmp_path / "append_only_with_keys.yaml" + p.write_text(raw) + with pytest.raises(ConfigError, match="append_only.*forbids.*key_columns"): load(p) @@ -269,6 +308,7 @@ def test_partition_explicit(tmp_path: Path, monkeypatch): table: events routing: field: company + mode: append_only destinations: - id: a routing_value: acme @@ -416,6 +456,7 @@ def test_full_config(tmp_path: Path, monkeypatch): routing: field: company + mode: append_only destinations: - id: quacksworth-lake @@ -465,33 +506,27 @@ def test_load_non_mapping_yaml(tmp_path: Path): def test_load_with_key_columns(tmp_path: Path): - """YAML with key_columns: [event_id, company] parses correctly.""" + """YAML with key_columns: [event_id, company] parses correctly under full_cdc.""" p = tmp_path / "cfg.yaml" - p.write_text(MINIMAL_YAML.replace(" field: company", " field: company\n key_columns: [event_id, company]")) + p.write_text(MINIMAL_YAML.replace("mode: append_only", "mode: full_cdc\n key_columns: [event_id, company]")) cfg = load(p) assert cfg.routing.key_columns == ["event_id", "company"] def test_load_without_key_columns_defaults_empty(config_file: Path): - """YAML without key_columns defaults to [].""" + """YAML without key_columns defaults to []. The MINIMAL_YAML config is + mode: append_only, so this implicitly also asserts append_only doesn't + require the field.""" cfg = load(config_file) assert cfg.routing.key_columns == [] -def test_load_key_columns_empty_list(tmp_path: Path): - """Explicit key_columns: [] is valid.""" - p = tmp_path / "cfg.yaml" - p.write_text(MINIMAL_YAML.replace(" field: company", " field: company\n key_columns: []")) - cfg = load(p) - assert cfg.routing.key_columns == [] - - def test_routing_config_has_key_columns(): """RoutingConfig dataclass has key_columns field.""" - rc = RoutingConfig(field="company", key_columns=["event_id", "company"]) + rc = RoutingConfig(field="company", mode="full_cdc", key_columns=["event_id", "company"]) assert rc.key_columns == ["event_id", "company"] - rc_default = RoutingConfig(field="company") + rc_default = RoutingConfig(field="company", mode="append_only") assert rc_default.key_columns == [] @@ -500,30 +535,30 @@ def test_routing_config_has_key_columns(): def test_seed_mode_default_is_scan(): """RoutingConfig defaults seed_mode to 'scan'.""" - rc = RoutingConfig(field="company") + rc = RoutingConfig(field="company", mode="append_only") assert rc.seed_mode == "scan" def test_seed_mode_earliest(): - rc = RoutingConfig(field="company", seed_mode="earliest") + rc = RoutingConfig(field="company", mode="append_only", seed_mode="earliest") assert rc.seed_mode == "earliest" def test_seed_mode_latest(): - rc = RoutingConfig(field="company", seed_mode="latest") + rc = RoutingConfig(field="company", mode="append_only", seed_mode="latest") assert rc.seed_mode == "latest" def test_seed_mode_invalid(): """seed_mode='bogus' raises ConfigError.""" with pytest.raises(ConfigError, match="seed_mode"): - RoutingConfig(field="company", seed_mode="bogus") + RoutingConfig(field="company", mode="append_only", seed_mode="bogus") def test_seed_mode_cdc_replay_invalid(): """cdc_replay is no longer valid; use earliest or latest.""" with pytest.raises(ConfigError, match="seed_mode"): - RoutingConfig(field="company", seed_mode="cdc_replay") + RoutingConfig(field="company", mode="append_only", seed_mode="cdc_replay") def test_load_with_seed_mode(tmp_path: Path): @@ -653,21 +688,16 @@ def test_log_summary_destination_fields_per_index(config_file, caplog): assert "config: destinations.count=1" in lines assert any("config: destinations[0].id='quacksworth-lake'" == line for line in lines) assert any("config: destinations[0].routing_value='quacksworth'" == line for line in lines) - assert any("config: destinations[0].append_at_least_once=False" == line for line in lines) -def test_log_summary_logs_append_at_least_once_true(tmp_path: Path, caplog): - """The flag value flows through verbatim — defends against a "redact bools" - refactor or a hardcoded False that would hide an enabled fast path.""" - p = tmp_path / "cfg.yaml" - p.write_text( - MINIMAL_YAML.replace( - " data_path: s3://quacksworth/", " data_path: s3://quacksworth/\n append_at_least_once: true" - ) - ) - cfg = load(p) +def test_log_summary_routing_mode_logged(config_file, caplog): + """The new routing.mode field must show up in the per-leaf log dump so + operators can grep `config: routing.mode=` to confirm what the live pod + is actually configured for. Defends against a future drop-from-log + refactor that hides the field that determines the entire pipeline shape.""" + cfg = load(config_file) lines = _captured_log_lines(cfg, caplog) - assert any("config: destinations[0].append_at_least_once=True" == line for line in lines) + assert any("config: routing.mode='append_only'" == line for line in lines) def test_log_summary_does_not_log_resolved_postgres_uri(config_file, caplog, monkeypatch): diff --git a/tests/unit/test_delivery.py b/tests/unit/test_delivery.py index fc7d391..4798d3c 100644 --- a/tests/unit/test_delivery.py +++ b/tests/unit/test_delivery.py @@ -11,7 +11,7 @@ import pytest from viaduck import metrics -from viaduck.config import DeliveryConfig +from viaduck.config import ConfigError, DeliveryConfig from viaduck.delivery import DeliveryManager @@ -49,13 +49,16 @@ def _fake(dest, tables, through, trigger): return _fake, calls -def _manager(dests=("d1",), cursors=None, **cfg_overrides): +def _manager(dests=("d1",), cursors=None, mode="append_only", key_columns=None, **cfg_overrides): + """Default to append_only (key_columns=[]) since that's the production posthog + shape; tests that need the full_cdc path pass mode="full_cdc" and a + non-empty key_columns.""" defaults = dict(workers=2, flush_interval_seconds=3600.0) defaults.update(cfg_overrides) cfg = DeliveryConfig(**defaults) sm = _state_mgr(cursors or {d: 0 for d in dests}) pool = MagicMock() - mgr = DeliveryManager(cfg, sm, pool, [], list(dests)) + mgr = DeliveryManager(cfg, sm, pool, key_columns or [], list(dests), mode=mode) return mgr, sm, pool @@ -272,14 +275,11 @@ def test_flush_failure_then_recovery(): def test_full_cdc_flush_concats_buffered_reads(): """Multiple buffered reads are concatenated before Phase 2 — cross-read conflicts resolve exactly like within-read ones.""" - mgr, _, _ = _manager(flush_interval_seconds=0.0) - mgr._full_cdc = True - mgr._key_columns = ["value"] + mgr, _, _ = _manager(flush_interval_seconds=0.0, mode="full_cdc", key_columns=["value"]) captured = {} - def fake_apply(pool, dest, batch, key_columns, append_at_least_once=False): + def fake_apply(pool, dest, batch, key_columns): captured["rows"] = batch.num_rows - captured["aalo"] = append_at_least_once return batch.num_rows with patch("viaduck.delivery.apply_full_cdc", side_effect=fake_apply): @@ -290,142 +290,16 @@ def fake_apply(pool, dest, batch, key_columns, append_at_least_once=False): assert captured["rows"] == 5 -def test_full_cdc_flush_threads_append_at_least_once_per_destination(): - """Per-destination flags survive the buffer → worker → apply path, - keyed by dest_id. Two destinations with different flag values must - each see their own value at apply time.""" - cfg = DeliveryConfig(workers=2, flush_interval_seconds=0.0) - sm = _state_mgr({"d-fast": 0, "d-slow": 0}) - pool = MagicMock() - mgr = DeliveryManager( - cfg, - sm, - pool, - ["value"], - ["d-fast", "d-slow"], - append_at_least_once_by_dest={"d-fast": True, "d-slow": False}, - ) - - seen: dict[str, bool] = {} - - def fake_apply(pool, dest, batch, key_columns, append_at_least_once=False): - seen[dest] = append_at_least_once - return batch.num_rows - - with patch("viaduck.delivery.apply_full_cdc", side_effect=fake_apply): - mgr.buffer("d-fast", _table(2), 5) - mgr.buffer("d-slow", _table(2), 5) - mgr.maybe_flush() - assert mgr.wait_idle() - - assert seen == {"d-fast": True, "d-slow": False} - - -def test_full_cdc_flush_defaults_append_at_least_once_to_false(): - """No append_at_least_once_by_dest arg → every destination behaves as - before. Defends against accidentally flipping default behaviour for - every existing deployment.""" - cfg = DeliveryConfig(workers=1, flush_interval_seconds=0.0) - sm = _state_mgr({"d1": 0}) - pool = MagicMock() - mgr = DeliveryManager(cfg, sm, pool, ["value"], ["d1"]) # no kwarg - - seen: dict[str, bool] = {} - - def fake_apply(pool, dest, batch, key_columns, append_at_least_once=False): - seen[dest] = append_at_least_once - return batch.num_rows - - with patch("viaduck.delivery.apply_full_cdc", side_effect=fake_apply): - mgr.buffer("d1", _table(2), 5) - mgr.maybe_flush() - assert mgr.wait_idle() - - assert seen == {"d1": False} - - -def test_apply_mode_gauge_set_for_each_destination_at_init(): - """The dest_apply_mode gauge is the operator-visible answer to "is this - dest on the fast path?". Verify both states (on/off) are reflected per - destination at constructor time, before any flush runs. - - Mock keys on .labels() returning a single shared child mock, so the .set() - calls land on it regardless of which destination label was passed in. We - then read the two .set() args and verify both 0 and 1 appeared (some - destination got each).""" - cfg = DeliveryConfig(workers=1, flush_interval_seconds=3600.0) - sm = _state_mgr({"d-fast": 0, "d-slow": 0}) - pool = MagicMock() - - with patch("viaduck.delivery.metrics.dest_apply_mode") as gauge: - DeliveryManager( - cfg, - sm, - pool, - ["value"], - ["d-fast", "d-slow"], - append_at_least_once_by_dest={"d-fast": True, "d-slow": False}, - ) - - gauge.labels.assert_any_call(destination="d-fast") - gauge.labels.assert_any_call(destination="d-slow") - assert gauge.labels.call_count == 2 - - set_values = [c.args[0] for c in gauge.labels.return_value.set.call_args_list] - assert sorted(set_values) == [0, 1] - - -def test_init_logs_for_fast_path_destinations(caplog): - """A startup log line per fast-path destination so the operator-visible - signal is plain in the deploy log, not just in /metrics.""" +def test_delivery_manager_rejects_unknown_mode(): + """The mode arg is a closed enum at the DeliveryManager layer too. + Operators interact with the YAML config (which goes through + RoutingConfig.__post_init__'s enum check); this guard catches direct- + instantiation misuse in tests / future call sites.""" cfg = DeliveryConfig(workers=1, flush_interval_seconds=3600.0) - sm = _state_mgr({"d-fast": 0, "d-slow": 0}) - pool = MagicMock() - - with caplog.at_level("INFO", logger="viaduck.delivery"): - DeliveryManager( - cfg, - sm, - pool, - ["value"], - ["d-fast", "d-slow"], - append_at_least_once_by_dest={"d-fast": True, "d-slow": False}, - ) - - fast_path_log_lines = [r.message for r in caplog.records if "append_at_least_once" in r.message] - assert any("d-fast" in m for m in fast_path_log_lines) - assert not any("d-slow" in m for m in fast_path_log_lines) - - -def test_full_cdc_flush_unknown_destination_defaults_to_false(): - """A destination present in assigned_ids but absent from the per-dest - flag dict must default to False — never promote to fast path on a - missing key. Catches future plumbing bugs where a new destination - is added but the dict isn't kept in sync.""" - cfg = DeliveryConfig(workers=1, flush_interval_seconds=0.0) sm = _state_mgr({"d1": 0}) pool = MagicMock() - mgr = DeliveryManager( - cfg, - sm, - pool, - ["value"], - ["d1"], - append_at_least_once_by_dest={"some-other-dest": True}, - ) - - seen: dict[str, bool] = {} - - def fake_apply(pool, dest, batch, key_columns, append_at_least_once=False): - seen[dest] = append_at_least_once - return batch.num_rows - - with patch("viaduck.delivery.apply_full_cdc", side_effect=fake_apply): - mgr.buffer("d1", _table(2), 5) - mgr.maybe_flush() - assert mgr.wait_idle() - - assert seen == {"d1": False} + with pytest.raises(ConfigError, match="must be 'full_cdc' or 'append_only'"): + DeliveryManager(cfg, sm, pool, [], ["d1"], mode="weird") # --------------------------------------------------------------------------- @@ -587,7 +461,7 @@ def test_on_flush_success_fires_for_data_not_for_idle_persists(): hits = [] cfg = DeliveryConfig(workers=1, flush_interval_seconds=0.0) sm = _state_mgr({"d1": 0}) - mgr = DeliveryManager(cfg, sm, MagicMock(), [], ["d1"], on_flush_success=lambda: hits.append(1)) + mgr = DeliveryManager(cfg, sm, MagicMock(), [], ["d1"], mode="append_only", on_flush_success=lambda: hits.append(1)) # Idle position-only persist: no success signal. mgr.advance_position("d1", 3) diff --git a/tests/unit/test_main.py b/tests/unit/test_main.py index a47931c..10edec1 100644 --- a/tests/unit/test_main.py +++ b/tests/unit/test_main.py @@ -13,6 +13,7 @@ _build_delete_filter, _resolve_conflicts, _write_with_retry, + append_only, ) from viaduck.main import ( _derive_dest_status, @@ -196,7 +197,7 @@ def test_poll_cycle_no_snapshots(): cfg = _make_cfg([]) with patch("viaduck.main.source.current_snapshot_id", return_value=None): - _poll_cycle(MagicMock(), delivery, MagicMock(), router, cfg, [], {}, key_columns=[], full_cdc=False) + _poll_cycle(MagicMock(), delivery, MagicMock(), router, cfg, [], {}, key_columns=[], mode="append_only") delivery.read_plan.assert_not_called() delivery.maybe_flush.assert_called_once() @@ -218,7 +219,7 @@ def test_poll_cycle_all_caught_up(): ["dest-1"], {"quacksworth": "dest-1"}, key_columns=[], - full_cdc=False, + mode="append_only", ) router.build_filter_expr.assert_not_called() @@ -249,7 +250,7 @@ def test_poll_cycle_routes_and_buffers(): ["dest-1"], {"quacksworth": "dest-1"}, key_columns=[], - full_cdc=False, + mode="append_only", ) delivery.buffer.assert_called_once_with("dest-1", arrow_data, 10, epoch=0) @@ -277,7 +278,7 @@ def test_poll_cycle_empty_changeset_advances_positions(): ["dest-1", "dest-2"], {"a": "dest-1", "b": "dest-2"}, key_columns=[], - full_cdc=False, + mode="append_only", ) assert delivery.advance_position.call_count == 2 @@ -309,7 +310,7 @@ def test_poll_cycle_routing_error_breaks_gracefully(): ["dest-1"], {"quacksworth": "dest-1"}, key_columns=[], - full_cdc=False, + mode="append_only", ) delivery.buffer.assert_not_called() @@ -346,7 +347,7 @@ def fake_read_cdc(src_table, *, after_snapshot, end_snapshot, filter_expr=None): ["dest-1"], {"quacksworth": "dest-1"}, key_columns=[], - full_cdc=False, + mode="append_only", ) assert read_calls == [(0, 5), (5, 10)], f"expected two chunk reads, got {read_calls}" @@ -390,7 +391,7 @@ def fake_read(src_table, *, after_snapshot, end_snapshot, filter_expr=None): ["dest-1", "dest-2"], {"a": "dest-1", "b": "dest-2"}, key_columns=[], - full_cdc=False, + mode="append_only", ) # First chunk (0→3): dest-1 buffered at 3, dest-2 advanced to 3 @@ -424,7 +425,7 @@ def test_poll_cycle_multi_chunk_all_empty_flushes_per_chunk(): ["dest-1"], {"quacksworth": "dest-1"}, key_columns=[], - full_cdc=False, + mode="append_only", ) # cursor advanced to each chunk_end incrementally, not directly to 10 @@ -457,7 +458,7 @@ def test_poll_cycle_advances_no_data_destinations(): ["dest-1", "dest-2"], {"a": "dest-1", "b": "dest-2"}, key_columns=[], - full_cdc=False, + mode="append_only", ) delivery.buffer.assert_called_once_with("dest-1", arrow_data, 10, epoch=0) @@ -483,7 +484,7 @@ def test_poll_cycle_pauses_reads_at_watermark(): ["dest-1"], {"quacksworth": "dest-1"}, key_columns=[], - full_cdc=False, + mode="append_only", ) delivery.read_plan.assert_not_called() @@ -516,7 +517,7 @@ def test_poll_cycle_mid_chunk_watermark_flushes_completed_chunks(): ["dest-1"], {"quacksworth": "dest-1"}, key_columns=[], - full_cdc=False, + mode="append_only", ) # chunk 0→5 completed: cursor advanced and flushed @@ -541,7 +542,7 @@ def test_poll_cycle_snapshot_at_zero(): ["dest-1"], {"quacksworth": "dest-1"}, key_columns=[], - full_cdc=False, + mode="append_only", ) router.build_filter_expr.assert_not_called() @@ -1054,7 +1055,7 @@ def test_apply_changes_empty(): catalog, dest_table, txn, txn_table = _mock_catalog_and_table() batch = _cdc_table([]) counts = _apply_changes(catalog, dest_table, batch, ["company"]) - assert counts == {"deleted": 0, "upserted": 0, "upsert_matched": 0, "used_append": False} + assert counts == {"deleted": 0, "upserted": 0, "upsert_matched": 0} catalog.begin_transaction.assert_not_called() @@ -1102,336 +1103,66 @@ def test_apply_changes_strips_metadata(): # --------------------------------------------------------------------------- -# _apply_changes: append_at_least_once fast path +# append_only: boundary check for accidentally-routed CDC batches # --------------------------------------------------------------------------- -def test_apply_changes_aalo_flag_off_uses_upsert_on_pure_insert_batch(): - """Default (flag off): pure-insert batch still goes through tbl.upsert(), - proving the fast path is opt-in.""" - catalog, dest_table, txn, txn_table = _mock_catalog_and_table() - batch = _cdc_table( - [ - {"company": "acme", "value": 1, "change_type": "insert", "snapshot_id": 1, "rowid": 100}, - {"company": "acme", "value": 2, "change_type": "insert", "snapshot_id": 1, "rowid": 200}, - ] - ) - counts = _apply_changes(catalog, dest_table, batch, ["value"]) - txn_table.upsert.assert_called_once() - txn_table.append.assert_not_called() - assert counts["upserted"] == 2 - - -def test_apply_changes_aalo_flag_on_pure_insert_uses_append(): - """Flag on + every row is an insert: tbl.append() instead of tbl.upsert(). - Validates the (a) branch of the fast-path decision. - - Also asserts append is called with no kwargs — a future refactor that - accidentally passes join_cols= to append() would silently work in tests - that only check assert_called_once(). Locking the call shape catches it.""" - catalog, dest_table, txn, txn_table = _mock_catalog_and_table() - batch = _cdc_table( - [ - {"company": "acme", "value": 1, "change_type": "insert", "snapshot_id": 1, "rowid": 100}, - {"company": "acme", "value": 2, "change_type": "insert", "snapshot_id": 1, "rowid": 200}, - ] - ) - counts = _apply_changes(catalog, dest_table, batch, ["value"], append_at_least_once=True) - txn_table.append.assert_called_once() - assert txn_table.append.call_args.kwargs == {}, "append must NOT receive join_cols= or other kwargs" - assert len(txn_table.append.call_args.args) == 1, "append takes the Arrow table positionally; nothing else" - txn_table.upsert.assert_not_called() - txn_table.delete.assert_not_called() - # No "matched" concept on append — the counter source stays 0 so the - # dest_upsert_matched_total gauge in apply_full_cdc never gets incremented. - assert counts == {"deleted": 0, "upserted": 2, "upsert_matched": 0, "used_append": True} - - -def test_apply_changes_aalo_flag_on_with_delete_falls_back_to_upsert(): - """Flag on + any delete row: must NOT use append (append can't delete). - Verifies the per-batch safety net rejects mixed batches even when the - upsert candidates within them are pure inserts.""" - catalog, dest_table, txn, txn_table = _mock_catalog_and_table() - batch = _cdc_table( - [ - {"company": "acme", "value": 1, "change_type": "delete", "snapshot_id": 1, "rowid": 100}, - {"company": "acme", "value": 2, "change_type": "insert", "snapshot_id": 1, "rowid": 200}, - ] - ) - counts = _apply_changes(catalog, dest_table, batch, ["value"], append_at_least_once=True) - txn_table.delete.assert_called_once() - txn_table.upsert.assert_called_once() - txn_table.append.assert_not_called() - assert counts["deleted"] == 1 - assert counts["upserted"] == 1 - - -def test_apply_changes_aalo_flag_on_with_update_postimage_falls_back_to_upsert(): - """Flag on + any update_postimage row: must NOT use append. Updates need - MERGE-WHEN-MATCHED to overwrite the existing row; append would create - a duplicate at the key. This is the future-proofing case — a schema - change that introduces an update_postimage path must NOT silently - corrupt the destination.""" - catalog, dest_table, txn, txn_table = _mock_catalog_and_table() - batch = _cdc_table( - [ - {"company": "acme", "value": 1, "change_type": "insert", "snapshot_id": 1, "rowid": 100}, - {"company": "acme", "value": 2, "change_type": "update_postimage", "snapshot_id": 1, "rowid": 200}, - ] - ) - counts = _apply_changes(catalog, dest_table, batch, ["value"], append_at_least_once=True) - txn_table.upsert.assert_called_once() - txn_table.append.assert_not_called() - assert counts["upserted"] == 2 - - -def test_apply_changes_aalo_flag_on_within_batch_dupe_keys_still_deduped(): - """Within-batch duplicate keys (same key, multiple snapshot IDs) must be - collapsed to last-write-wins even on the append path; otherwise the - flag promotes "duplicates only on retry" to "duplicates within a single - apply", which is a stronger break than the contract advertises.""" - catalog, dest_table, txn, txn_table = _mock_catalog_and_table() - batch = _cdc_table( - [ - {"company": "acme", "value": 1, "change_type": "insert", "snapshot_id": 1, "rowid": 100}, - {"company": "acme", "value": 1, "change_type": "insert", "snapshot_id": 2, "rowid": 100}, - {"company": "acme", "value": 1, "change_type": "insert", "snapshot_id": 3, "rowid": 100}, - ] - ) - counts = _apply_changes(catalog, dest_table, batch, ["value"], append_at_least_once=True) - txn_table.append.assert_called_once() - written_table = txn_table.append.call_args[0][0] - assert written_table.num_rows == 1 # 3 candidates collapsed to 1 winner - assert counts["upserted"] == 1 - - -def test_apply_changes_aalo_flag_on_empty_batch_is_noop(): - """Empty batch + flag on: still a no-op (no transaction opened).""" - catalog, dest_table, txn, txn_table = _mock_catalog_and_table() - batch = _cdc_table([]) - counts = _apply_changes(catalog, dest_table, batch, ["company"], append_at_least_once=True) - assert counts == {"deleted": 0, "upserted": 0, "upsert_matched": 0, "used_append": False} - catalog.begin_transaction.assert_not_called() - - -def test_apply_changes_aalo_flag_on_uses_transaction(): - """Even on the fast path the write goes through begin_transaction — - a future caller that adds more work inside the txn must stay atomic.""" - catalog, dest_table, txn, txn_table = _mock_catalog_and_table() - batch = _cdc_table( - [ - {"company": "acme", "value": 1, "change_type": "insert", "snapshot_id": 1, "rowid": 100}, - ] - ) - _apply_changes(catalog, dest_table, batch, ["value"], append_at_least_once=True) - catalog.begin_transaction.assert_called_once() - - -def test_apply_changes_aalo_flag_on_append_raises_propagates(): - """Symmetric to test_apply_changes_transaction_rollback_on_failure (upsert - path): if tbl.append() raises inside the transaction context, the - exception propagates so the context manager can roll back. Defends - against a future change that wraps the append in try/except (silently - swallowing failures and leaving the destination in inconsistent state - relative to the cursor).""" - catalog, dest_table, txn, txn_table = _mock_catalog_and_table() - txn_table.append.side_effect = Exception("append write error") - batch = _cdc_table( - [ - {"company": "acme", "value": 1, "change_type": "insert", "snapshot_id": 1, "rowid": 100}, - ] - ) - with pytest.raises(Exception, match="append write error"): - _apply_changes(catalog, dest_table, batch, ["value"], append_at_least_once=True) - txn_table.upsert.assert_not_called() - - -def test_apply_changes_aalo_flag_on_strips_metadata(): - """change_type, snapshot_id, rowid stripped from the append payload too.""" - catalog, dest_table, txn, txn_table = _mock_catalog_and_table() - batch = _cdc_table( - [ - {"company": "acme", "value": 1, "change_type": "insert", "snapshot_id": 1, "rowid": 100}, - ] - ) - _apply_changes(catalog, dest_table, batch, ["value"], append_at_least_once=True) - written_table = txn_table.append.call_args[0][0] - assert "change_type" not in written_table.column_names - assert "snapshot_id" not in written_table.column_names - assert "rowid" not in written_table.column_names - assert "company" in written_table.column_names - assert "value" in written_table.column_names - - -def test_is_pure_insert_batch_pure_insert_true(): - """Helper: all inserts → True.""" - from viaduck.apply import _is_pure_insert_batch - - batch = _cdc_table( - [ - {"company": "a", "value": 1, "change_type": "insert", "snapshot_id": 1, "rowid": 1}, - {"company": "a", "value": 2, "change_type": "insert", "snapshot_id": 1, "rowid": 2}, - ] - ) - assert _is_pure_insert_batch(batch) is True - - -def test_is_pure_insert_batch_with_delete_false(): - from viaduck.apply import _is_pure_insert_batch - - batch = _cdc_table( - [ - {"company": "a", "value": 1, "change_type": "insert", "snapshot_id": 1, "rowid": 1}, - {"company": "a", "value": 2, "change_type": "delete", "snapshot_id": 1, "rowid": 2}, - ] - ) - assert _is_pure_insert_batch(batch) is False - - -def test_is_pure_insert_batch_with_update_postimage_false(): - from viaduck.apply import _is_pure_insert_batch - - batch = _cdc_table( - [ - {"company": "a", "value": 1, "change_type": "insert", "snapshot_id": 1, "rowid": 1}, - {"company": "a", "value": 2, "change_type": "update_postimage", "snapshot_id": 1, "rowid": 2}, - ] - ) - assert _is_pure_insert_batch(batch) is False - - -def test_is_pure_insert_batch_empty_false(): - """Empty batch is not "pure insert" — there's nothing to insert. Returning - True here would push an empty batch into the fast path, which is harmless - today but couples the helper's meaning to a downstream branch's tolerance - of empty input.""" - from viaduck.apply import _is_pure_insert_batch - - batch = _cdc_table([]) - assert _is_pure_insert_batch(batch) is False - - -def test_is_pure_insert_batch_with_null_change_type_false(): - """Null change_type must force False — defends against a regression of the - null-aware gate. With pc.all's default skip_nulls=True, a batch like - [insert, NULL, insert] silently returns True; the null-typed row would - then be dropped by tbl.append's column projection without any operator- - visible signal. The gate must trip the safety net into upsert fallback.""" - from viaduck.apply import _is_pure_insert_batch - +def test_append_only_accepts_insert_only_batch(): + """The expected source.read_cdc shape (ducklake_table_insertions output) + has no `change_type` column — those rows flow straight through to + tbl.append(). Sanity-check that the defensive guard doesn't reject the + happy path.""" + pool = MagicMock() + pool.get.return_value = (MagicMock(), MagicMock()) batch = pa.table( { - "company": ["a", "a", "a"], - "value": [1, 2, 3], - "change_type": pa.array(["insert", None, "insert"]), - "snapshot_id": [1, 1, 1], - "rowid": [1, 2, 3], + "company": ["acme"], + "value": [1], + # NOTE: snapshot_id/rowid are present (read_cdc returns them) but + # `change_type` is NOT — that's the insert-only contract from + # ducklake_table_insertions. + "snapshot_id": [1], + "rowid": [100], } ) - assert _is_pure_insert_batch(batch) is False + written = append_only(pool, "dest-1", batch) + assert written == 1 -def test_is_pure_insert_batch_all_null_change_type_false(): - """A batch where every change_type is NULL is degenerate but must still - return False — refusing to fast-path is the safe choice (the upsert - path would also drop these rows, but at least it doesn't claim "every - row is an insert").""" - from viaduck.apply import _is_pure_insert_batch - +def test_append_only_rejects_batch_with_change_type_column(): + """If a future viaduck change accidentally routes a ducklake_table_changes + batch (which carries `change_type`) into the append path, the destination + would silently land deletes and update_postimages as plain inserts. The + boundary guard catches this and refuses to write.""" + pool = MagicMock() + pool.get.return_value = (MagicMock(), MagicMock()) batch = pa.table( { - "company": ["a", "a"], + "company": ["acme", "acme"], "value": [1, 2], - "change_type": pa.array([None, None], type=pa.string()), + "change_type": ["insert", "delete"], "snapshot_id": [1, 1], - "rowid": [1, 2], + "rowid": [100, 200], } ) - assert _is_pure_insert_batch(batch) is False + with pytest.raises(RuntimeError, match="change_type"): + append_only(pool, "dest-1", batch) -def test_apply_changes_aalo_flag_on_with_null_change_type_falls_back_to_upsert(): - """End-to-end: flag on + a null change_type row in the batch must NOT - take the append path. The null row gets dropped by upsert_mask too, but - the remaining inserts must land via tbl.upsert (idempotent on retry), - not tbl.append (duplicates on retry).""" - catalog, dest_table, txn, txn_table = _mock_catalog_and_table() +def test_append_only_empty_batch_is_noop_even_with_change_type(): + """The fast no-op exit for empty batches runs BEFORE the boundary check. + An empty batch with a `change_type` column (degenerate but possible from + a filter that masked everything) is still a no-op rather than a noisy + error — nothing to misclassify, nothing to write.""" + pool = MagicMock() batch = pa.table( { - "company": ["a", "a"], - "value": [1, 2], - "change_type": pa.array(["insert", None], type=pa.string()), - "snapshot_id": pa.array([1, 1], type=pa.int64()), - "rowid": pa.array([1, 2], type=pa.int64()), + "company": pa.array([], type=pa.string()), + "change_type": pa.array([], type=pa.string()), } ) - _apply_changes(catalog, dest_table, batch, ["value"], append_at_least_once=True) - txn_table.upsert.assert_called_once() - txn_table.append.assert_not_called() - - -# --------------------------------------------------------------------------- -# apply_full_cdc: metric increments on the fast path -# --------------------------------------------------------------------------- - - -def test_apply_full_cdc_fast_path_increments_fast_path_counter(): - """When apply_full_cdc takes the fast path, the per-batch counter - increments and the upsert-matched counter does NOT (the latter is silent - by design). Asserts the documented metric implication directly so a - refactor that silently disables either signal fails loudly.""" - from unittest.mock import patch - - from viaduck.apply import apply_full_cdc - - catalog, dest_table, txn, txn_table = _mock_catalog_and_table() - pool = MagicMock() - pool.get.return_value = (catalog, dest_table) - - batch = _cdc_table( - [ - {"company": "acme", "value": 1, "change_type": "insert", "snapshot_id": 1, "rowid": 100}, - {"company": "acme", "value": 2, "change_type": "insert", "snapshot_id": 1, "rowid": 200}, - ] - ) - - with ( - patch("viaduck.apply.metrics.dest_apply_fast_path_batches_total") as fast_path_counter, - patch("viaduck.apply.metrics.dest_upsert_matched_total") as matched_counter, - patch("viaduck.apply.metrics.dest_rows_upserted_total") as upserted_counter, - ): - ops = apply_full_cdc(pool, "team-2", batch, ["value"], append_at_least_once=True) - - assert ops == 2 - fast_path_counter.labels.assert_called_once_with(destination="team-2") - fast_path_counter.labels.return_value.inc.assert_called_once_with() - matched_counter.labels.assert_not_called() # silent on fast path - upserted_counter.labels.assert_called_once_with(destination="team-2") - upserted_counter.labels.return_value.inc.assert_called_once_with(2) - - -def test_apply_full_cdc_upsert_path_does_not_increment_fast_path_counter(): - """Counterpoint: flag off (default) → fast-path counter stays silent - even on a pure-insert batch. Defends against an accidental - inc() in the upsert branch.""" - from unittest.mock import patch - - from viaduck.apply import apply_full_cdc - - catalog, dest_table, txn, txn_table = _mock_catalog_and_table() - pool = MagicMock() - pool.get.return_value = (catalog, dest_table) - - batch = _cdc_table( - [ - {"company": "acme", "value": 1, "change_type": "insert", "snapshot_id": 1, "rowid": 100}, - ] - ) - - with patch("viaduck.apply.metrics.dest_apply_fast_path_batches_total") as fast_path_counter: - apply_full_cdc(pool, "team-2", batch, ["value"]) - - fast_path_counter.labels.assert_not_called() + assert append_only(pool, "dest-1", batch) == 0 + pool.get.assert_not_called() # --------------------------------------------------------------------------- @@ -1439,14 +1170,20 @@ def test_apply_full_cdc_upsert_path_does_not_increment_fast_path_counter(): # --------------------------------------------------------------------------- -def _make_real_delivery(state_mgr, dest_pool, key_columns, assigned_ids): +def _make_real_delivery(state_mgr, dest_pool, key_columns, assigned_ids, *, mode): """Real DeliveryManager in flush-every-cycle mode so end-to-end poll - tests keep their write coverage. wait_idle() joins the single worker.""" + tests keep their write coverage. wait_idle() joins the single worker. + + mode is keyword-only and required — the helper deliberately does NOT + derive it from key_columns presence (that was the silent-misconfig + hazard this PR removes from production; allowing it back in via a test + helper would let a future "test the validation matrix" use case sneak + through inverted).""" from viaduck.config import DeliveryConfig from viaduck.delivery import DeliveryManager dcfg = DeliveryConfig(workers=1, flush_interval_seconds=0.0) - return DeliveryManager(dcfg, state_mgr, dest_pool, key_columns, assigned_ids) + return DeliveryManager(dcfg, state_mgr, dest_pool, key_columns, assigned_ids, mode=mode) def _txn_catalog(): @@ -1498,7 +1235,7 @@ def test_poll_cycle_full_cdc_routes_and_writes(): mock_dest_table.scan.return_value.count.return_value = 0 # empty dest: no truncate dest_pool.get.return_value = (mock_catalog, mock_dest_table) - delivery = _make_real_delivery(state_mgr, dest_pool, ["value"], ["dest-1"]) + delivery = _make_real_delivery(state_mgr, dest_pool, ["value"], ["dest-1"], mode="full_cdc") with ( patch("viaduck.main.source.current_snapshot_id", return_value=10), patch("viaduck.main.source.read_cdc_changes", return_value=arrow_data), @@ -1512,7 +1249,7 @@ def test_poll_cycle_full_cdc_routes_and_writes(): ["dest-1"], {"quacksworth": "dest-1"}, key_columns=["value"], - full_cdc=True, + mode="full_cdc", ) assert delivery.wait_idle() @@ -1536,7 +1273,7 @@ def test_poll_cycle_append_only_unchanged(): mock_dest_table = MagicMock() dest_pool.get.return_value = (MagicMock(), mock_dest_table) - delivery = _make_real_delivery(state_mgr, dest_pool, [], ["dest-1"]) + delivery = _make_real_delivery(state_mgr, dest_pool, [], ["dest-1"], mode="append_only") with ( patch("viaduck.main.source.current_snapshot_id", return_value=10), patch("viaduck.main.source.read_cdc", return_value=arrow_data) as mock_read_cdc, @@ -1551,7 +1288,7 @@ def test_poll_cycle_append_only_unchanged(): ["dest-1"], {"quacksworth": "dest-1"}, key_columns=[], - full_cdc=False, + mode="append_only", ) assert delivery.wait_idle() @@ -1585,7 +1322,7 @@ def test_poll_cycle_cdc_delete_only_changeset(): mock_dest_table.scan.return_value.count.return_value = 0 # empty dest: no truncate dest_pool.get.return_value = (mock_catalog, mock_dest_table) - delivery = _make_real_delivery(state_mgr, dest_pool, ["company"], ["dest-1"]) + delivery = _make_real_delivery(state_mgr, dest_pool, ["company"], ["dest-1"], mode="full_cdc") with ( patch("viaduck.main.source.current_snapshot_id", return_value=10), patch("viaduck.main.source.read_cdc_changes", return_value=arrow_data), @@ -1599,7 +1336,7 @@ def test_poll_cycle_cdc_delete_only_changeset(): ["dest-1"], {"quacksworth": "dest-1"}, key_columns=["company"], - full_cdc=True, + mode="full_cdc", ) assert delivery.wait_idle() @@ -1634,7 +1371,7 @@ def test_poll_cycle_cdc_write_failure_isolation(): mock_dest_table.scan.return_value.count.return_value = 0 # empty dest: no truncate dest_pool.get.return_value = (mock_catalog, mock_dest_table) - delivery = _make_real_delivery(state_mgr, dest_pool, ["company"], ["dest-1"]) + delivery = _make_real_delivery(state_mgr, dest_pool, ["company"], ["dest-1"], mode="full_cdc") with ( patch("viaduck.main.source.current_snapshot_id", return_value=10), patch("viaduck.main.source.read_cdc_changes", return_value=arrow_data), @@ -1649,7 +1386,7 @@ def test_poll_cycle_cdc_write_failure_isolation(): ["dest-1"], {"quacksworth": "dest-1"}, key_columns=["company"], - full_cdc=True, + mode="full_cdc", ) assert delivery.wait_idle() @@ -1698,7 +1435,7 @@ def test_poll_cycle_cdc_routing_value_mutation(): mock_dest_table.scan.return_value.count.return_value = 0 # empty dest: no truncate dest_pool.get.return_value = (mock_catalog, mock_dest_table) - delivery = _make_real_delivery(state_mgr, dest_pool, ["company"], ["dest-1", "dest-2"]) + delivery = _make_real_delivery(state_mgr, dest_pool, ["company"], ["dest-1", "dest-2"], mode="full_cdc") with ( patch("viaduck.main.source.current_snapshot_id", return_value=10), patch("viaduck.main.source.read_cdc_changes", return_value=raw_data), @@ -1712,7 +1449,7 @@ def test_poll_cycle_cdc_routing_value_mutation(): ["dest-1", "dest-2"], {"quacksworth": "dest-1", "mallardine": "dest-2"}, key_columns=["company"], - full_cdc=True, + mode="full_cdc", ) assert delivery.wait_idle() @@ -1745,7 +1482,7 @@ def test_poll_cycle_branches_on_key_columns(): ["dest-1"], {"quacksworth": "dest-1"}, key_columns=[], - full_cdc=False, + mode="append_only", ) mock_read_cdc.assert_called_once() mock_read_changes.assert_not_called() @@ -1771,7 +1508,7 @@ def test_poll_cycle_branches_on_key_columns(): ["dest-1"], {"quacksworth": "dest-1"}, key_columns=["company"], - full_cdc=True, + mode="full_cdc", ) mock_read_changes2.assert_called_once() mock_read_cdc2.assert_not_called() @@ -2007,7 +1744,7 @@ def test_cdc_batch_rows_metric_observed(): ["dest-1"], {"quacksworth": "dest-1"}, key_columns=[], - full_cdc=False, + mode="append_only", ) mock_batch_metric.observe.assert_called_once_with(3) diff --git a/tests/unit/test_metrics.py b/tests/unit/test_metrics.py index 555329f..b786056 100644 --- a/tests/unit/test_metrics.py +++ b/tests/unit/test_metrics.py @@ -73,23 +73,3 @@ def test_init_binds_cdc_orphaned_preimages_metric(): init("test-pipeline") assert hasattr(metrics.cdc_orphaned_preimages_total, "inc") - - -def test_init_binds_apply_mode_gauge(): - """After init(), dest_apply_mode should accept .labels(destination=).set(). - Operator-visible signal that a destination is on the append fast path.""" - from viaduck import metrics - - init("test-pipeline") - - assert hasattr(metrics.dest_apply_mode, "labels") - - -def test_init_binds_apply_fast_path_batches_counter(): - """After init(), dest_apply_fast_path_batches_total should accept - .labels(destination=).inc(). Increments per batch that took the fast path.""" - from viaduck import metrics - - init("test-pipeline") - - assert hasattr(metrics.dest_apply_fast_path_batches_total, "labels") diff --git a/tests/unit/test_phase_equivalence.py b/tests/unit/test_phase_equivalence.py index 136e9bb..0ddca8e 100644 --- a/tests/unit/test_phase_equivalence.py +++ b/tests/unit/test_phase_equivalence.py @@ -249,7 +249,7 @@ def test_resolve_conflicts_equivalence(seed): def test_split_and_count_equivalence(seed, with_nulls): rng = random.Random(seed + 2000) batch = _random_batch(rng, 200, with_mutations=True, with_nulls=with_nulls) - router = Router(RoutingConfig(field="company", key_columns=["val"], seed_mode="scan")) + router = Router(RoutingConfig(field="company", mode="full_cdc", key_columns=["val"], seed_mode="scan")) actual_routed, actual_unrouted = router.split_and_count(batch, ROUTING_VALUES) expected_routed, expected_unrouted = _oracle_split_and_count(router, batch, ROUTING_VALUES) assert actual_unrouted == expected_unrouted @@ -369,7 +369,7 @@ def test_mutation_direction_null_asymmetry(): ) def test_split_and_count_typed_columns_equivalence(column, values): table = pa.table({"company": column, "v": pa.array(range(len(column)), type=pa.int64())}) - router = Router(RoutingConfig(field="company", key_columns=[], seed_mode="scan")) + router = Router(RoutingConfig(field="company", mode="append_only", key_columns=[], seed_mode="scan")) actual_routed, actual_unrouted = router.split_and_count(table, values) expected_routed, expected_unrouted = _oracle_split_and_count(router, table, values) assert actual_unrouted == expected_unrouted @@ -385,7 +385,7 @@ def test_split_and_count_rejects_converted_value_collision(): from viaduck.router import RoutingError as RErr table = pa.table({"company": pa.array([1, 2], type=pa.int64())}) - router = Router(RoutingConfig(field="company", key_columns=[], seed_mode="scan")) + router = Router(RoutingConfig(field="company", mode="append_only", key_columns=[], seed_mode="scan")) with pytest.raises(RErr, match="collide"): router.split_and_count(table, ["1", "01"]) diff --git a/tests/unit/test_router.py b/tests/unit/test_router.py index 78a5393..25d2cd8 100644 --- a/tests/unit/test_router.py +++ b/tests/unit/test_router.py @@ -11,7 +11,7 @@ @pytest.fixture() def router() -> Router: - return Router(RoutingConfig(field="company")) + return Router(RoutingConfig(field="company", mode="append_only")) @pytest.fixture() @@ -65,7 +65,7 @@ def test_split_string_preserves_columns(router: Router, sample_table: pa.Table): def test_split_integer_basic(int_table: pa.Table): - router = Router(RoutingConfig(field="team_id")) + router = Router(RoutingConfig(field="team_id", mode="append_only")) routed, unrouted = router.split_and_count(int_table, ["123", "456"]) assert routed["123"].num_rows == 2 assert routed["456"].num_rows == 2 @@ -74,7 +74,7 @@ def test_split_integer_basic(int_table: pa.Table): def test_split_integer_auto_detects_type(int_table: pa.Table): """Router should auto-detect integer column and cast routing values.""" - router = Router(RoutingConfig(field="team_id")) + router = Router(RoutingConfig(field="team_id", mode="append_only")) routed, _ = router.split_and_count(int_table, ["123"]) assert routed["123"].num_rows == 2 @@ -167,7 +167,7 @@ def test_split_all_routed(router: Router, sample_table: pa.Table): def test_split_invalid_integer_routing_value(int_table: pa.Table): """Non-numeric routing value for integer column should raise RoutingError.""" - router = Router(RoutingConfig(field="team_id")) + router = Router(RoutingConfig(field="team_id", mode="append_only")) with pytest.raises(RoutingError, match="Cannot convert"): router.split_and_count(int_table, ["not_a_number"]) @@ -175,7 +175,7 @@ def test_split_invalid_integer_routing_value(int_table: pa.Table): def test_split_invalid_float_routing_value(): """Non-numeric routing value for float column should raise RoutingError.""" table = pa.table({"score": pa.array([1.5, 2.5], type=pa.float64())}) - router = Router(RoutingConfig(field="score")) + router = Router(RoutingConfig(field="score", mode="append_only")) with pytest.raises(RoutingError, match="Cannot convert"): router.split_and_count(table, ["not_a_float"]) @@ -190,7 +190,7 @@ def test_split_float_column(): "value": [1, 2, 3, 4], } ) - router = Router(RoutingConfig(field="score")) + router = Router(RoutingConfig(field="score", mode="append_only")) routed, unrouted = router.split_and_count(table, ["0.5", "1.5"]) assert routed["0.5"].num_rows == 2 assert routed["1.5"].num_rows == 1 diff --git a/viaduck/apply.py b/viaduck/apply.py index e71dea9..cb8aebd 100644 --- a/viaduck/apply.py +++ b/viaduck/apply.py @@ -282,36 +282,7 @@ def _dedupe_upserts_last_write_wins(upsert_rows: pa.Table, key_columns: list[str return ordered.take(winners.column("__idx_max")).drop(["__idx"]) -def _is_pure_insert_batch(batch: pa.Table) -> bool: - """True iff every row in the batch is a CDC insert. The check runs on - the raw batch (before delete/upsert split) so a single non-insert row - forces the upsert path. - - Null change_type forces False. pc.all defaults to skip_nulls=True, which - would silently classify [insert, NULL, insert] as pure-insert — and the - null row would then be dropped by tbl.append's column filter without ever - being detected, identical to how the upsert path's masks (both Kleene- - aware) silently swallow it. The upsert path's drop is incidental and not - a contract we want the fast path to inherit; gate explicitly on - null_count==0 so a malformed batch trips the safety net into the - upsert fallback rather than landing rows blindly. - """ - if batch.num_rows == 0: - return False - ct_col = batch.column("change_type") - if ct_col.null_count != 0: - return False - return bool(pc.all(pc.equal(ct_col, pa.scalar("insert"))).as_py()) - - -def _apply_changes( - catalog, - dest_table, - batch: pa.Table, - key_columns: list[str], - *, - append_at_least_once: bool = False, -) -> dict[str, int | bool]: +def _apply_changes(catalog, dest_table, batch: pa.Table, key_columns: list[str]) -> dict[str, int]: """Apply CDC changes to a destination table atomically. Deletes are applied first, then upserts, within a single catalog transaction. @@ -323,28 +294,11 @@ def _apply_changes( same result. This enables safe at-least-once retry on crash recovery. Destinations must not be written to by other sources. - When ``append_at_least_once`` is true AND the batch contains only - change_type="insert" rows, the upsert is replaced with tbl.append() — - pyducklake's upsert otherwise does a MERGE-with-target-join plus two full - count(*) scans on every call, all of which are wasted work on an insert-only - batch. The tradeoff is that apply-committed-but-cursor-not-advanced retries - produce duplicate rows (no MERGE WHEN MATCHED to collapse them). End-to-end - semantics are unchanged because CDC delivery into viaduck is already - at-least-once; the destination contract widens but doesn't break. The - per-batch check means a non-insert row anywhere in the batch transparently - falls back to the upsert path, so a future schema change that introduces - updates doesn't silently corrupt the destination. - - Returns dict of counts: - ``{"deleted": N, "upserted": N, "upsert_matched": N, "used_append": bool}``. + Returns dict of counts: {"deleted": N, "upserted": N, "upsert_matched": N}. - deleted: rows sent to delete (input count; delete API doesn't return affected count) - upserted: rows sent to upsert (input count) - - upsert_matched: rows that matched existing rows during upsert (from UpsertResult.rows_updated). - Always 0 on the append fast path — pyducklake doesn't surface a "matched" concept on append, - and at-least-once semantics make the question ill-defined anyway. - - used_append: true iff the fast path actually took the tbl.append() branch on this batch. - Distinguishes "flag on AND batch eligible" from "flag on but batch had a non-insert row". + - upsert_matched: rows that matched existing rows during upsert (from UpsertResult.rows_updated) """ ct_col = batch.column("change_type") @@ -357,19 +311,13 @@ def _apply_changes( pc.equal(ct_col, pa.scalar("update_postimage")), ) # Winner(k) BEFORE stripping meta — the dedup orders by snapshot_id/rowid. - # The dedup runs on the fast path too: within-batch duplicate keys (the same - # UUID emitted twice in one CDC read range) would otherwise materialize as - # destination duplicates *within a single apply*, which is a stronger break - # than the "duplicates on retry only" contract the flag advertises. upsert_rows = strip_meta(_dedupe_upserts_last_write_wins(batch.filter(upsert_mask), key_columns)) - counts: dict[str, int | bool] = {"deleted": 0, "upserted": 0, "upsert_matched": 0, "used_append": False} + counts = {"deleted": 0, "upserted": 0, "upsert_matched": 0} if delete_rows.num_rows == 0 and upsert_rows.num_rows == 0: return counts - use_append = append_at_least_once and delete_rows.num_rows == 0 and _is_pure_insert_batch(batch) - with catalog.begin_transaction() as txn: tbl = txn.load_table(dest_table.identifier) @@ -385,14 +333,9 @@ def _apply_changes( counts["deleted"] = delete_rows.num_rows if upsert_rows.num_rows > 0: - if use_append: - tbl.append(upsert_rows) - counts["upserted"] = upsert_rows.num_rows - counts["used_append"] = True - else: - upsert_result = tbl.upsert(upsert_rows, join_cols=key_columns) - counts["upserted"] = upsert_rows.num_rows - counts["upsert_matched"] = upsert_result.rows_updated + upsert_result = tbl.upsert(upsert_rows, join_cols=key_columns) + counts["upserted"] = upsert_rows.num_rows + counts["upsert_matched"] = upsert_result.rows_updated return counts @@ -402,14 +345,7 @@ def _apply_changes( # --------------------------------------------------------------------------- -def apply_full_cdc( - dest_pool, - dest_id: str, - batch: pa.Table, - key_columns: list[str], - *, - append_at_least_once: bool = False, -) -> int: +def apply_full_cdc(dest_pool, dest_id: str, batch: pa.Table, key_columns: list[str]) -> int: """Phase 2 + Phase 3 for one destination flush. Returns ops applied.""" resolved = _resolve_conflicts(batch) if resolved.num_rows == 0: @@ -417,23 +353,47 @@ def apply_full_cdc( counts = _write_with_retry( dest_pool, dest_id, - lambda cat, tbl: _apply_changes(cat, tbl, resolved, key_columns, append_at_least_once=append_at_least_once), + lambda cat, tbl: _apply_changes(cat, tbl, resolved, key_columns), ) if counts["deleted"] > 0: metrics.dest_rows_deleted_total.labels(destination=dest_id).inc(counts["deleted"]) if counts["upserted"] > 0: metrics.dest_rows_upserted_total.labels(destination=dest_id).inc(counts["upserted"]) - if counts["used_append"]: - metrics.dest_apply_fast_path_batches_total.labels(destination=dest_id).inc() if counts["upsert_matched"] > 0: metrics.dest_upsert_matched_total.labels(destination=dest_id).inc(counts["upsert_matched"]) return counts["deleted"] + counts["upserted"] def append_only(dest_pool, dest_id: str, batch: pa.Table) -> int: - """Append-only mode (no key_columns): one append per flush.""" + """Append-only mode (mode=append_only): one tbl.append() per flush. + + Precondition: the operator declared the source insert-only via + `routing.mode=append_only`. The batch is expected to come from + `source.read_cdc` (which calls `ducklake_table_insertions`) and is + therefore structurally insert-only — `ducklake_table_insertions` does + NOT synthesize a `change_type` column, so the boundary check below + treats the presence of that column as proof that the batch came from + `read_cdc_changes` / `ducklake_table_changes` instead (a viaduck + routing bug that would otherwise silently misclassify deletes and + update_postimages as inserts in the destination). + + The check is cheap (O(1) — just a column-name lookup) and runs once + per flush. It does NOT catch the case where DuckLake's + `ducklake_table_insertions` macro itself starts returning a + `change_type` column — that's a contract change we'd want to know + about anyway. + """ if batch.num_rows == 0: return 0 + if "change_type" in batch.column_names: + raise RuntimeError( + f"append_only got a batch with a 'change_type' column for destination {dest_id!r}; " + "this batch came from ducklake_table_changes (CDC stream including deletes / update_*) " + "rather than ducklake_table_insertions. mode=append_only assumes the source-read path " + "is insert-only — either the dispatch in _poll_cycle is routing the wrong batch into " + "append_only(), or DuckLake's table_insertions contract changed. Refusing to write " + "potentially-misclassified rows." + ) _write_with_retry(dest_pool, dest_id, lambda cat, tbl, b=batch: tbl.append(b)) metrics.dest_rows_written_total.labels(destination=dest_id).inc(batch.num_rows) return batch.num_rows diff --git a/viaduck/config.py b/viaduck/config.py index c6cb478..aab340c 100644 --- a/viaduck/config.py +++ b/viaduck/config.py @@ -54,14 +54,6 @@ def _validate_int(value: object, ctx: str) -> int: return value -def _validate_bool(value: object, ctx: str) -> bool: - """Validate that a YAML node is a real bool — reject ints and strings so - `append_at_least_once: 1` or `: "true"` can't silently shadow the intent.""" - if not isinstance(value, bool): - raise ConfigError(f"{ctx} must be a boolean (got {type(value).__name__})") - return value - - def _resolve_env_properties(props: dict[str, str]) -> dict[str, str]: """Resolve properties: keys ending in _env have their values read from env vars.""" resolved = {} @@ -108,9 +100,29 @@ def resolved_properties(self) -> dict[str, str]: return _resolve_env_properties(self.properties) +_VALID_ROUTING_MODES = ("full_cdc", "append_only") + + @dataclass(frozen=True) class RoutingConfig: field: str + # Source-read + apply mode for the pipeline: + # full_cdc — read source via ducklake_table_changes (inserts + + # deletes + update preimages/postimages), run Phase 1 + # preimage resolution and Phase 2 conflict resolution, + # apply via tbl.upsert(rows, join_cols=key_columns). + # Requires non-empty key_columns. + # append_only — read source via ducklake_table_insertions (inserts + # only), skip Phase 1 and Phase 2 entirely, apply + # via tbl.append(rows). Requires empty key_columns + # (none of the apply-path machinery uses them, so a + # non-empty value would be silent misconfiguration). + # Required: no default — the previous "infer full_cdc from len(key_columns) + # > 0" derivation was a silent misconfig hazard (a typo'd or accidentally- + # empty list flipped the entire pipeline shape with no operator-visible + # signal). Explicit mode + a startup error on mismatch keeps the operator + # honest. + mode: str = "" key_columns: list[str] = field(default_factory=list) seed_mode: str = "scan" # "scan", "earliest", or "latest" # REPLACE-semantics seeding: a destination at cursor 0 with existing @@ -122,6 +134,37 @@ class RoutingConfig: seed_truncate: bool = True def __post_init__(self): + if not isinstance(self.mode, str): + # YAML 1.1 coerces bare `yes`/`no`/`on`/`off` to bools, and `1`/`0` + # to ints, so a typo'd `mode: yes` lands here as Python True and + # the enum check below would print `got True` — operator-confusing. + raise ConfigError( + f"routing.mode must be a string, got {type(self.mode).__name__} ({self.mode!r}). " + 'Quote the value if YAML coerced it (e.g. `mode: "append_only"`).' + ) + if not self.mode: + raise ConfigError( + f"routing.mode is required, no default. Set it to one of {list(_VALID_ROUTING_MODES)}: " + f"'full_cdc' for sources that emit deletes/updates (requires key_columns), " + f"'append_only' for insert-only sources (key_columns must be empty)." + ) + if self.mode not in _VALID_ROUTING_MODES: + raise ConfigError( + f"routing.mode must be one of {list(_VALID_ROUTING_MODES)}, got {self.mode!r}. " + f"Use 'full_cdc' for sources that emit deletes/updates (requires key_columns), " + f"'append_only' for insert-only sources (key_columns must be empty)." + ) + if self.mode == "full_cdc" and not self.key_columns: + raise ConfigError( + "routing.mode='full_cdc' requires a non-empty routing.key_columns list " + "(the upsert join keys for the apply path)." + ) + if self.mode == "append_only" and self.key_columns: + raise ConfigError( + "routing.mode='append_only' forbids routing.key_columns; the append path " + f"does not use them and a non-empty value indicates misconfiguration. " + f"Got key_columns={self.key_columns!r}; remove them or switch to mode='full_cdc'." + ) if self.seed_mode not in ("scan", "earliest", "latest"): raise ConfigError(f"routing.seed_mode must be 'scan', 'earliest', or 'latest', got {self.seed_mode!r}") @@ -135,25 +178,6 @@ class DestinationConfig: data_path: str table: str properties: dict[str, str] = field(default_factory=dict) - # Opt-in fast path for insert-only CDC: when true, _apply_changes uses - # tbl.append() instead of tbl.upsert() on batches that contain only - # change_type="insert" rows. Skips pyducklake's MERGE planning + the two - # bookkeeping count(*) scans. - # - # Semantic change to flag carefully: today the upsert path silently - # collapses upstream-CDC at-least-once duplicates because MERGE WHEN - # MATCHED reduces them to one destination row. The append path does NOT. - # Both upstream-redelivery duplicates AND apply-committed-but-cursor- - # not-advanced replay duplicates now physically materialize in the - # destination table — they don't stop at viaduck. Enable ONLY when every - # consumer of the destination table (queries, downstream pipelines, - # exports) can tolerate per-key duplicates, not just the immediate - # downstream consumer of viaduck. - # - # Mixed batches still fall through to upsert via a per-batch check - # (_is_pure_insert_batch), so a future CDC schema change that introduces - # update_postimage doesn't silently corrupt the destination. - append_at_least_once: bool = False @property def postgres_uri(self) -> str: @@ -325,6 +349,7 @@ def log_summary(self, log: logging.Logger) -> None: log.info("config: source.properties=%r", self.source.properties) log.info("config: routing.field=%r", self.routing.field) + log.info("config: routing.mode=%r", self.routing.mode) log.info("config: routing.key_columns=%r", self.routing.key_columns) log.info("config: routing.seed_mode=%r", self.routing.seed_mode) log.info("config: routing.seed_truncate=%s", self.routing.seed_truncate) @@ -361,7 +386,6 @@ def log_summary(self, log: logging.Logger) -> None: log.info("config: destinations[%d].data_path=%r", i, d.data_path) log.info("config: destinations[%d].table=%r", i, d.table) log.info("config: destinations[%d].properties=%r", i, d.properties) - log.info("config: destinations[%d].append_at_least_once=%s", i, d.append_at_least_once) def assigned_destination_ids(self) -> list[str]: """Return destination IDs assigned to this instance based on partition config.""" @@ -425,6 +449,10 @@ def load(path: str | Path) -> ViaduckConfig: raise ConfigError("routing.key_columns entries must all be strings") routing = RoutingConfig( field=_require_non_empty(rt.get("field", ""), "routing.field"), + # `or ""` so YAML `mode:` (key present, no value → Python None) routes + # to the "is required, no default" branch rather than the isinstance + # type-error branch (which would tell the operator to quote `None`). + mode=rt.get("mode") or "", key_columns=raw_key_cols, seed_mode=rt.get("seed_mode", "scan"), seed_truncate=bool(rt.get("seed_truncate", True)), @@ -441,7 +469,6 @@ def load(path: str | Path) -> ViaduckConfig: _validate_string_dict(d.get("properties", {}), f"destinations[{i}].properties"), default_props, ) - raw_aaolo = d.get("append_at_least_once", False) destinations.append( DestinationConfig( id=_require_non_empty(str(d.get("id", "")), f"destinations[{i}].id"), @@ -453,7 +480,6 @@ def load(path: str | Path) -> ViaduckConfig: data_path=_require_non_empty(d.get("data_path", ""), f"destinations[{i}].data_path"), table=d.get("table", source.table), properties=dest_props, - append_at_least_once=_validate_bool(raw_aaolo, f"destinations[{i}].append_at_least_once"), ) ) diff --git a/viaduck/delivery.py b/viaduck/delivery.py index 2156de5..c8c7f01 100644 --- a/viaduck/delivery.py +++ b/viaduck/delivery.py @@ -42,6 +42,7 @@ from viaduck import metrics from viaduck.apply import append_only, apply_full_cdc +from viaduck.config import ConfigError if TYPE_CHECKING: from viaduck.config import DeliveryConfig @@ -95,38 +96,30 @@ def __init__( dest_pool: DestinationPool, key_columns: list[str], assigned_ids: list[str], + *, + mode: str, on_flush_success=None, - append_at_least_once_by_dest: dict[str, bool] | None = None, ): self._cfg = cfg self._state = state_mgr self._pool = dest_pool self._key_columns = key_columns - self._full_cdc = len(key_columns) > 0 + # Apply mode comes from explicit routing.mode (validated upstream in + # config.RoutingConfig.__post_init__): full_cdc → upsert path with + # delete+upsert under one txn, append_only → straight tbl.append(). + # The previous "infer full_cdc from len(key_columns) > 0" derivation + # was a silent misconfig hazard. mode is keyword-only so a positional + # caller (test, future external) fails loudly at the call site rather + # than running an unrelated string past the enum check. + if mode not in ("full_cdc", "append_only"): + raise ConfigError( + f"DeliveryManager mode must be 'full_cdc' or 'append_only', got {mode!r}. " + "This should have been caught by RoutingConfig.__post_init__ at config load; " + "if you're seeing this, the caller bypassed config validation." + ) + self._mode = mode + self._full_cdc = mode == "full_cdc" self._on_flush_success = on_flush_success - # Defaults to empty dict → every destination uses the upsert path. - # The dict-of-bool shape (not a single flag on cfg) is deliberate: - # the apply mode is a per-destination contract with its downstream, - # not a transport-level knob shared across destinations. Constructed - # once in __init__ (single-threaded before the worker pool starts) - # and never mutated thereafter — workers read it via .get() under - # the GIL without holding self._lock. - self._append_at_least_once_by_dest: dict[str, bool] = dict(append_at_least_once_by_dest or {}) - - # Observability: a labeled gauge per assigned destination so dashboards - # can answer "is this dest on the fast path" without grepping config, - # and a startup log line per fast-path destination so the operator- - # visible signal is plain in the deploy log. - for dest_id in assigned_ids: - mode_on = self._append_at_least_once_by_dest.get(dest_id, False) - metrics.dest_apply_mode.labels(destination=dest_id).set(1 if mode_on else 0) - if mode_on: - log.info( - "Destination %s configured with append_at_least_once=true; " - "insert-only batches will skip the upsert path and may produce " - "duplicate rows on apply-commit / cursor-advance failure retries", - dest_id, - ) self._lock = threading.Lock() self._buffers: dict[str, _Buffer] = {d: _Buffer() for d in assigned_ids} @@ -386,13 +379,7 @@ def _flush(self, dest_id: str, tables: list[pa.Table], through: int, trigger: st if tables: batch = tables[0] if len(tables) == 1 else pa.concat_tables(tables, promote_options="default") if self._full_cdc: - ops_count = apply_full_cdc( - self._pool, - dest_id, - batch, - self._key_columns, - append_at_least_once=self._append_at_least_once_by_dest.get(dest_id, False), - ) + ops_count = apply_full_cdc(self._pool, dest_id, batch, self._key_columns) else: ops_count = append_only(self._pool, dest_id, batch) # Cursor persist AFTER the destination commit (the gap is the diff --git a/viaduck/main.py b/viaduck/main.py index 406514b..580347c 100644 --- a/viaduck/main.py +++ b/viaduck/main.py @@ -702,7 +702,7 @@ def run(cfg: config.ViaduckConfig) -> None: _seed_new_destinations(src_table, state_mgr, dest_pool, cfg, assigned_ids) key_columns = cfg.routing.key_columns - full_cdc = len(key_columns) > 0 + mode = cfg.routing.mode # Buffered delivery: per-destination buffers + flush worker pool # (constructed AFTER seeding so positions initialize from the @@ -714,8 +714,8 @@ def run(cfg: config.ViaduckConfig) -> None: dest_pool, key_columns, assigned_ids, + mode=mode, on_flush_success=health.record_replication, - append_at_least_once_by_dest={d.id: d.append_at_least_once for d in cfg.destinations}, ) log.info( @@ -723,7 +723,7 @@ def run(cfg: config.ViaduckConfig) -> None: cfg.source.name, cfg.source.table, cfg.routing.field, - "full_cdc" if full_cdc else "append_only", + mode, len(assigned_ids), cfg.instance.id, ) @@ -740,7 +740,7 @@ def _signal_handler(signum, frame): while not shutdown: try: - _poll_cycle(src_table, delivery, dest_pool, router, cfg, assigned_ids, rv_to_dest, key_columns, full_cdc) + _poll_cycle(src_table, delivery, dest_pool, router, cfg, assigned_ids, rv_to_dest, key_columns, mode) except Exception: log.exception("Fatal error in poll cycle") break @@ -769,7 +769,11 @@ def _signal_handler(signum, frame): log.info("Shutdown complete") -def _poll_cycle(src_table, delivery, dest_pool, router, cfg, assigned_ids, rv_to_dest, key_columns, full_cdc): +def _poll_cycle(src_table, delivery, dest_pool, router, cfg, assigned_ids, rv_to_dest, key_columns, mode): + # Local boolean so the existing branch sites stay terse. Threading mode + # (not full_cdc) through the call signature avoids reconstructing the + # original config value from a derived bool later (status payload). + full_cdc = mode == "full_cdc" """One poll cycle: read CDC from each position group into buffers, advance in-memory positions, evaluate flush triggers. @@ -948,7 +952,7 @@ def _poll_cycle(src_table, delivery, dest_pool, router, cfg, assigned_ids, rv_to status.update( source_table=f"{cfg.source.name}.{cfg.source.table}", source_snapshot=snap_now, - mode="full_cdc" if full_cdc else "append_only", + mode=mode, poll_interval=cfg.poll.interval_seconds, flush_interval=cfg.delivery.flush_interval_seconds, delivery_config={ diff --git a/viaduck/metrics.py b/viaduck/metrics.py index 69e6e02..b2a3994 100644 --- a/viaduck/metrics.py +++ b/viaduck/metrics.py @@ -106,18 +106,6 @@ def labels(self, **kwargs): "Rows that matched existing rows during upsert (updated, not inserted)", ["pipeline", "destination"], ) -_dest_apply_mode = Gauge( - "viaduck_dest_apply_mode", - "Configured destination apply mode: 1 if append_at_least_once is enabled, 0 if the upsert path is in effect. " - "Set once at startup; the per-batch safety net may still take the upsert path for any batch containing a " - "non-insert row even when this gauge is 1.", - ["pipeline", "destination"], -) -_dest_apply_fast_path_batches_total = Counter( - "viaduck_dest_apply_fast_path_batches_total", - "Flush batches that took the append fast path (skipped pyducklake's MERGE+count scans).", - ["pipeline", "destination"], -) _cdc_routing_mutations_total = Counter( "viaduck_cdc_routing_mutations_total", "Cross-tenant routing value changes detected in updates", @@ -194,8 +182,6 @@ def labels(self, **kwargs): dest_rows_deleted_total = _dest_rows_deleted_total dest_rows_upserted_total = _dest_rows_upserted_total dest_upsert_matched_total = _dest_upsert_matched_total -dest_apply_mode = _dest_apply_mode -dest_apply_fast_path_batches_total = _dest_apply_fast_path_batches_total cdc_routing_mutations_total = _cdc_routing_mutations_total cdc_conflicts_resolved_total = _cdc_conflicts_resolved_total cdc_tombstones_emitted_total = _cdc_tombstones_emitted_total @@ -218,7 +204,6 @@ def init(pipeline: str): global errors_total global cdc_batch_rows global dest_rows_deleted_total, dest_rows_upserted_total, dest_upsert_matched_total - global dest_apply_mode, dest_apply_fast_path_batches_total global cdc_routing_mutations_total, cdc_conflicts_resolved_total, cdc_orphaned_preimages_total global cdc_tombstones_emitted_total global delivery_buffer_rows, delivery_buffer_bytes, delivery_buffer_total_bytes @@ -238,8 +223,6 @@ def init(pipeline: str): dest_rows_deleted_total = _AutoPipelineLabels(_dest_rows_deleted_total, pipeline) dest_rows_upserted_total = _AutoPipelineLabels(_dest_rows_upserted_total, pipeline) dest_upsert_matched_total = _AutoPipelineLabels(_dest_upsert_matched_total, pipeline) - dest_apply_mode = _AutoPipelineLabels(_dest_apply_mode, pipeline) - dest_apply_fast_path_batches_total = _AutoPipelineLabels(_dest_apply_fast_path_batches_total, pipeline) # Metrics with no other labels — pre-label to get direct .inc()/.set()/.observe() polls_total = _polls_total.labels(pipeline=pipeline)