[vpj] Snapshot-at-T: new VPJ mode (rewind knobs + offline RT merge)#2851
[vpj] Snapshot-at-T: new VPJ mode (rewind knobs + offline RT merge)#2851sixpluszero wants to merge 9 commits into
Conversation
Adds per-push-job knobs that let a hybrid full push ship a shortened rewind
window ([T, now]) instead of the store's full configured rewind, gated by a
threshold so it only triggers when the store's effective rewind is large
enough
to be worth it.
New configs (VenicePushJobConstants):
- snapshot.at.t.rewind.enabled (master switch, default false)
- snapshot.at.t.min.rewind.threshold.seconds (skip when the store's
effective
rewind is below this; default 1 day)
- snapshot.at.t.cutoff.epoch.seconds (the cutoff T; default = start of push)
- snapshot.at.t.rewind.buffer.seconds (gap-safety buffer; default 60)
The gate (maybeApplySnapshotAtTRewindOverride) sets the existing
rewindTimeInSecondsOverride to (now - T) + buffer and requires
REWIND_FROM_SOP,
reusing the controller's handleRewindTimeOverride path. It is skipped for
incremental/KIF-repush jobs, non-hybrid stores, when an explicit rewind
override
is already set, or when the effective rewind is below the threshold.
Control plane only: shortening the rewind is correct only when the batch
dataset
already incorporates real-time data up to T (the offline merge data plane,
follow-up). The master flag defaults off and must remain off in production
until
that data plane lands, otherwise the shortened rewind would skip nearline
writes
between the offline cutoff and T (a data gap).
Tests: unit coverage for the gate (applied / below-threshold / boundary /
ineligible jobs / future-cutoff) plus an integration test asserting the
shortened
rewind reaches the controller's requestTopicForWrites.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…th RMD The snapshot-at-T data plane's correctness core. Folds real-time PUT/UPDATE/DELETE records onto a batch base value by reusing the server's own MergeConflictResolver (da-vinci-client), and emits the merged value AND its Replication Metadata (RMD) so the merged result is identical to what a server produces by replaying the same RT during a hybrid rewind. - seedFromBatch establishes the batch-sentinel RMD (timestamp 0) exactly as a server stores batch data, so any real RT write folded in afterwards wins. - applyPut/applyUpdate/applyDelete fold RT records via the resolver; finalizeRecord emits (value, serialized RMD, valueSchemaId, rmdProtocolVersion, isDelete). - Handles value-level and field-level (write-compute / partial-update) RMD; the rmdUseFieldLevelTimestamp flag must match the store config. - Value is stored as byte[] (position-safe) so the resolver advancing a ByteBuffer's position while reading inputs cannot corrupt a later operation. This is the merge engine only; wiring it into the multi-region RT reader + reducer + writer pipeline is the remaining data-plane integration. Tests: value-level PUT (sentinel -> RT-wins -> stale-ignored), DELETE tombstone with RMD, and write-compute field-level partial update (per-field RMD timestamp). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
merge-and-produce executor, 2-region A/A integration test Adds the data-plane components that produce the snapshot-at-T merged version, plus an end-to-end multi-region integration test proving correctness. - SnapshotAtTRtReader: reads a region's RT topic from start to the current tail (bounded by the start->end record count, with optional event-time cutoff), normalizing each PUT/UPDATE/ DELETE for the merge and tagging it with the region's colo id. Bytes are copied out of the consumed message immediately so a pooled consumer buffer cannot corrupt a record processed later. - SnapshotAtTRtRecord: normalized RT record (op, key, payload, value/update schema ids, write timestamp, colo id). - SnapshotAtTPushExecutor: per key, seeds the merger from the batch value, folds all regions' RT records, and writes the merged value+RMD (or an RMD-carrying delete) via a VeniceWriter, which partitions each key itself. - SnapshotAtTPushIntegrationTest (2-region A/A cluster): produces RT to BOTH regions with logical timestamps (including a cross-region conflict key), runs the new-mode push (request a version with a short rewind override -> read both regions' RT -> merge -> produce value+RMD -> SOP/EOP), and asserts every key's served value in both regions equals the correct merged result (region 1's newer write wins the conflict key). Wiring this data path into VenicePushJob.run() as a first-class mode (in-process batch read + region-broker discovery) is the remaining productionization; the merge correctness and multi-region read are fully validated here. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…command,
end to end)
When the snapshot-at-T gate triggers, VenicePushJob.run() now produces the
merged version
itself instead of the data-writer job: it reads the offline batch input
in-process, reads each
region's RT (per snapshot.at.t.rt.region.brokers) up to the cutoff, merges
per key (value+RMD)
via SnapshotAtTRecordMerger, and produces the result through a single writer
that owns
SOP+data+EOP (so Data-Integrity-Validation stays consistent; the controller
skips SOP for this
mode).
- New config snapshot.at.t.rt.region.brokers ("coloId=broker;..." per region).
- SnapshotAtTSchemaRepository: a minimal ReadOnlySchemaRepository built from
the controller's
value / RMD / derived schemas, so the merge runs with the store's real
schemas.
- Batch input read via VeniceAvroFileIterator; version data written via a
byte[] VeniceWriter
with the store partitioner + chunking.
Integration test testSnapshotAtTViaVenicePushJob: a single VenicePushJob run
with
snapshot.at.t.enabled=true and a 0s threshold (so the mode triggers on the
store's 3600s
rewind) produces RT to two regions, merges with the batch Avro input, and
serves the correct
merged result in both regions. Passes.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…, fix CI (spotbugs + coverage) - Fail fast in the snapshot-at-T mode unless the store is hybrid AND Active-Active enabled, rather than relying on auto-discovery of the RT topology. - Compress merged values to the version's compression strategy before producing, and carry the dictionary in the Start-Of-Push, so ZSTD_WITH_DICT stores work. The single-command VPJ integration test now runs for both NO_OP and ZSTD_WITH_DICT. - Remove an unused field flagged by SpotBugs (StaticAnalysis check). - Add unit tests for SnapshotAtTPushExecutor, SnapshotAtTSchemaRepository, and the region-broker config parser to satisfy diff-coverage. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…isolate the merge The test set a 0s threshold so the mode triggers, but it did not prove it did: a normal push to this hybrid A/A store would, via the server's 3600s rewind, merge the same RT and serve the same values, and the short rewind could even re-apply the RT and mask a broken merge. Now: - Assert v2's rewind was overridden to the short snapshot window (well below the store's 3600s), which only happens when the snapshot-at-T gate fires and the snapshot data path runs. - Age the RT past that short rewind window before the push, so the served data reflects only the offline merge and is not silently fixed up by the server's post-EOP rewind. A broken merge (RT not folded in) now fails the test instead of passing. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…test shard New integration tests land in the catch-all IntegrationTests_99 bucket until the next automated rebalance. That bucket was already near its 15-minute job timeout from other unassigned tests, so adding this ~3-minute class tipped it over: the class itself passed (suite SUCCESS in ~200s), but the bucket was then cancelled at the timeout. Move it to a dedicated shard (86) so bucket 99 stays under the timeout and the test runs with headroom. A later rebalance_test_shards.py run (with CI timing) will fold it in optimally. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR introduces a new “snapshot-at-T” Venice Push Job (VPJ) mode intended to shorten hybrid store rewind time by (1) applying per-push rewind override knobs and (2) optionally performing an offline RT→batch merge that produces value + RMD so the version can come online after replaying only a short tail.
Changes:
- Added per-VPJ config knobs and gating logic to apply a rewind-time override for hybrid full pushes (“snapshot-at-T” rewind shortening).
- Added snapshot-at-T data-plane implementation: read batch input + read RT from multiple regions + merge via server MergeConflictResolver + write merged value+RMD with SOP/EOP.
- Added unit/integration tests and CI sharding to cover the new mode.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/venice-test-common/test-shard-assignments.json | Adds new integration test shard assignment for snapshot-at-T integration tests. |
| internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/snapshot/SnapshotAtTPushIntegrationTest.java | New multi-region end-to-end integration tests for snapshot-at-T merge + shortened rewind. |
| clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/VenicePushJobTest.java | Adds unit/integration-style tests for the rewind override gate and parsing region brokers. |
| clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/snapshot/SnapshotAtTSchemaRepositoryTest.java | New unit tests for the controller-backed schema repository used by the merge path. |
| clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/snapshot/SnapshotAtTRecordMergerTest.java | New unit tests for merge correctness (PUT/DELETE/update + RMD assertions). |
| clients/venice-push-job/src/test/java/com/linkedin/venice/hadoop/snapshot/SnapshotAtTPushExecutorTest.java | New unit tests for executing the merge and producing puts/deletes. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/vpj/VenicePushJobConstants.java | Adds snapshot-at-T config keys + defaults and documents behavior. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/VenicePushJob.java | Implements gate + controller request wiring + snapshot-at-T merge push execution path. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/snapshot/SnapshotAtTSchemaRepository.java | New minimal schema repository loaded from controller responses for merge resolver usage. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/snapshot/SnapshotAtTRtRecord.java | New RT record normalization container used by reader/merger/executor. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/snapshot/SnapshotAtTRtReader.java | New RT reader that deterministically reads partition ranges and normalizes RT messages. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/snapshot/SnapshotAtTRecordMerger.java | New offline merge core using MergeConflictResolver to produce merged value + RMD. |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/snapshot/SnapshotAtTPushExecutor.java | New executor that merges per key and writes merged records (put/delete with RMD). |
| clients/venice-push-job/src/main/java/com/linkedin/venice/hadoop/PushJobSetting.java | Adds snapshot-at-T settings fields to PushJobSetting. |
| .github/workflows/VeniceCI-E2ETests.yml | Adds a new integration test shard job (86) and wires it into the failure aggregation job. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Map<ByteBuffer, ByteBuffer> batchValues = readBatchInputForSnapshot(); | ||
| String realTimeTopicName = Utils.getRealTimeTopicName(storeInfo); | ||
| long cutoffMs = setting.snapshotAtTCutoffEpochSeconds > 0 ? setting.snapshotAtTCutoffEpochSeconds * 1000 : 0L; | ||
| SnapshotAtTRtReader rtReader = new SnapshotAtTRtReader(); | ||
| List<SnapshotAtTRtRecord> rtRecords = new ArrayList<>(); | ||
| for (Map.Entry<Integer, String> region: setting.snapshotAtTRtRegionBrokers.entrySet()) { | ||
| rtRecords.addAll( | ||
| rtReader.readRegion( | ||
| snapshotConsumerProps(region.getValue()), |
There was a problem hiding this comment.
Acknowledged — this is a known scalability limitation of the current single-process merge, tracked as a follow-up. Plan: make the merge distributed — read the batch (HDFS) and per-region RT as input splits (reusing the KafkaInput/KIF infrastructure), shuffle by key, and cogroup batch+RT per key through the same SnapshotAtTRecordMerger in reducers (memory bounded per partition, spillable), writing out via the existing partitioned VeniceWriter. Until that lands, the master flag snapshot.at.t.rewind.enabled stays off by default, so this path does not run in production.
… address review Fixes correctness gaps in the snapshot-at-T offline merge and addresses the Copilot review comments on linkedin#2851. - sepRT: the offline merge now reads BOTH the regular RT and the separate RT topic per region (mirroring the server, which subscribes to both during a hybrid rewind), so separate-RT / incremental-push writes are no longer dropped. Adds snapshotAtTRtTopicNames() + unit test. - Rewind correctness (review #5): stamp the snapshot-at-T Start-Of-Push with the job start time so the server's REWIND_FROM_SOP rewind start is pinned to (cutoff - buffer) regardless of how long the offline merge takes, instead of drifting past the snapshot's coverage on a long merge and silently dropping the real-time writes in the gap. Adds a sentinel-guarded broadcastStartOfPush overload to VeniceWriter that carries a caller-supplied SOP messageTimestamp (existing callers keep using the writer clock). - Review robustness fixes: - readBatchInputForSnapshot skips null (delete-marker) batch values instead of NPE-ing. - parseSnapshotAtTRegionBrokers rejects duplicate coloId and empty broker. - SnapshotAtTSchemaRepository fails fast on empty value schemas and on RMD / derived schema fetch errors. - SnapshotAtTRtReader aborts on an incomplete RT read (no silent data loss) and skips a not-yet-created separate RT topic. - Tests: cross-colo out-of-order PUT/UPDATE/DELETE order-independence; SOP timestamp carried + rewind-start pinned at/before cutoff; schema-repo failure cases; region-broker parse validation. linkedin#6 (the in-memory batch+RT merge is not yet streaming/distributed) remains a tracked follow-up; the master flag snapshot.at.t.rewind.enabled stays off by default until that data plane lands. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ata path The snapshot-at-T SOP-timestamp change (f7d95ab) moved the envelope-building body into a new 7-arg getKafkaMessageEnvelope and rewired the data/control send path to call it directly. That bypassed any subclass override of the public 6-arg getKafkaMessageEnvelope, breaking ConsumerIntegrationTest forward-compatibility: VeniceWriterWithNewerProtocol overrides the 6-arg method to emit a newer-protocol envelope, and with the override skipped it silently degraded to a normal writer ("expected value2 but found value1"). The tests passed on the prior commit e48990f and on main, confirming this is a regression from f7d95ab. Reverse the delegation: the overridable 6-arg method keeps the body; the 7-arg method builds via it (so subclass overrides fire on every send path) and only pins the producer messageTimestamp when a caller supplied a non-sentinel value. Adds a unit regression test asserting a subclass's 6-arg override is invoked on the put() path. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
| long nowSeconds = setting.jobStartTimeMs / 1000; | ||
| long cutoffEpochSeconds = | ||
| setting.snapshotAtTCutoffEpochSeconds == NOT_SET ? nowSeconds : setting.snapshotAtTCutoffEpochSeconds; | ||
| if (cutoffEpochSeconds > nowSeconds) { | ||
| throw new VeniceException( | ||
| String.format( | ||
| "Provided '%d' for %s; the snapshot-at-T cutoff cannot be a timestamp in the future (now=%ds).", | ||
| cutoffEpochSeconds, | ||
| SNAPSHOT_AT_T_CUTOFF_EPOCH_SECONDS, | ||
| nowSeconds)); | ||
| } | ||
| long finalRewindSeconds = (nowSeconds - cutoffEpochSeconds) + setting.snapshotAtTRewindBufferSeconds; | ||
| setting.rewindTimeInSecondsOverride = finalRewindSeconds; |
| List<String> realTimeTopicNames = snapshotAtTRtTopicNames(storeInfo); | ||
| long cutoffMs = setting.snapshotAtTCutoffEpochSeconds > 0 ? setting.snapshotAtTCutoffEpochSeconds * 1000 : 0L; | ||
| SnapshotAtTRtReader rtReader = new SnapshotAtTRtReader(); |
| int coloId = Integer.parseInt(trimmed.substring(0, separator).trim()); | ||
| String brokerAddress = trimmed.substring(separator + 1).trim(); |
Problem Statement
A full (BATCH) push to a hybrid store is not servable when the batch data finishes loading. After End-Of-Push, the new version replays the real-time topic from
(SOP or EOP) - rewindTimeInSecondsup to the live tail before it can go online. For stores with a multi-day rewind, this replay dominates the push wall-clock and sits on the version-readiness critical path. On a representative A/A + write-compute store with a ~3-day rewind, the rewind measured at ~7 hours, ~84% of each push's wall-clock (two pushes × three fabrics).A push that fixes a cutoff timestamp
Tand ships a batch dataset already current as ofTwould only need to rewind the short[T, now]tail. This PR adds the per-VPJ control knobs for that rewind-shortening. The offline merge that produces the combined dataset is a follow-up (see Solution).Solution
Adds per-push-job knobs that, for a hybrid full push, set the existing
rewindTimeInSecondsOverrideto(now - T) + buffer(reusing the controller'shandleRewindTimeOverridepath andREWIND_FROM_SOPsemantics), gated by a threshold so it only triggers when the store's effective rewind is large enough to be worth it.The gate (
maybeApplySnapshotAtTRewindOverride) is skipped for incremental/KIF-repush jobs, non-hybrid stores, when an explicit rewind override is already set, or when the store's effective rewind is below the threshold.Shortening the rewind is correct only when the batch dataset already incorporates real-time data up to
T. The master flag defaults off and must remain off in production until the merge below is wired end to end, otherwise the shortened rewind would skip nearline writes between the offline cutoff andT(a data gap).Data plane: offline RT-to-batch merge core
SnapshotAtTRecordMergeris the correctness core that produces the merged dataset the shortened rewind relies on. It folds RT PUT/UPDATE/DELETE records onto the batch base by reusing the server's ownMergeConflictResolver(da-vinci-clientis already avenice-push-jobdependency), and emits the merged value and its RMD, so the offline result equals what a server produces by replaying the same RT during rewind, including Active-Active field-level timestamps and write-compute partial updates.seedFromBatchestablishes the batch-sentinel RMD (timestamp 0) exactly as a server stores batch data, so any real RT write folded in afterwards wins. Values are held asbyte[]so aByteBufferposition the resolver advances while reading inputs cannot corrupt a later operation.The data plane is built around this engine:
SnapshotAtTRtReaderreads a region's RT topic from start to the current tail and normalizes each PUT/UPDATE/DELETE, tagging it with the region's colo id (so the merger does cross-region conflict resolution). Bytes are copied out of the consumed message immediately, since pooled consumer buffers are reused across polls.SnapshotAtTPushExecutorseeds the merger from the batch value per key, folds in all regions' RT records, and writes the merged value+RMD (or an RMD-carrying delete) via aVeniceWriter, which partitions each key itself.This is wired into
VenicePushJob.run()as a first-class mode: when the gate triggers, the job verifies the store is hybrid + Active-Active, reads the offline batch input in-process, reads each region's RT (persnapshot.at.t.rt.region.brokers), merges, compresses to the version's compression strategy, and produces the merged version through a single writer that owns SOP+data+EOP (so Data-Integrity-Validation stays consistent; the controller skips SOP for this mode).SnapshotAtTSchemaRepositorysupplies the store's real value/RMD/derived schemas to the merge. So a singleVenicePushJobrun performs the whole new-mode push, forNO_OPandZSTD_WITH_DICTstores alike.The remaining productionization is auto-discovering the per-region RT brokers from the controller's Active-Active source-fabric config (today they're passed via
snapshot.at.t.rt.region.brokers).Code changes
snapshot.at.t.rewind.enabled(boolean, defaultfalse) — master switchsnapshot.at.t.min.rewind.threshold.seconds(long, default 1 day = 86400) — skip when the store's effective rewind is below thissnapshot.at.t.cutoff.epoch.seconds(long, default = start of push) — the cutoffTsnapshot.at.t.rewind.buffer.seconds(long, default 60) — gap-safety bufferConcurrency-Specific Checks
PushJobSetting, run once during job setup; no shared mutable state.How was this PR tested?
testSnapshotAtTRewindOverrideReachesControllerRequestexercises config → gate →createNewStoreVersionand asserts the shortened rewind reachesrequestTopicForWrites.SnapshotAtTPushIntegrationTestproduces RT to two regions with logical timestamps (including a cross-region conflict key) and asserts every key's served value in both regions equals the correct merged result —testSnapshotAtTMergeAcrossTwoRegionsdrives the data plane directly (request short-rewind version → read both regions' RT → merge → produce value+RMD → SOP/EOP);testSnapshotAtTViaVenicePushJobdrives it through a singleVenicePushJobrun withsnapshot.at.t.enabled=trueand a 0s threshold (so the mode triggers on the store's 3600s rewind) plus a batch Avro input../gradlew :clients:venice-push-job:test --tests "*VenicePushJobTest" --tests "*SnapshotAtTRecordMergerTest"and./gradlew :internal:venice-test-common:integrationTest --tests "*SnapshotAtTPushIntegrationTest".Does this PR introduce any user-facing or breaking changes?
snapshot.at.t.rewind.enabled, which defaults tofalse; existing pushes are unchanged.