[dvc][compat] Paused-SIT: DVC subscribes to future version immediately, paused in non-target regions until targetRegionPromoted#2879
Draft
misyel wants to merge 15 commits into
Conversation
VersionStatus.ONLINE Non-target-region DVC clients previously waited for the version to reach VersionStatus.ONLINE before subscribing (startIngestionInNonTargetRegion condition). With the paused-SIT deferred swap design (linkedin#2812), the version swap in non-target regions is intentionally deferred — ONLINE never arrives until after DVC has already ingested the data — creating a deadlock. Replace the ONLINE gate with targetRegionPromoted=true, set by DeferredVersionSwapService once the target region's push completes and propagated to child controllers via UpdateStore admin message. The skip-subscribe log message now reports targetRegionPromoted instead of VersionStatus, so it accurately reflects the actual gating condition. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Add `pauseAfterStartOfPush` flag and three public methods: - `pausePartitionForFutureSlot(int)` — sets PCS.futureSlotPaused and physically pauses Kafka consumption via aggKafkaConsumerService - `resumeFromFutureSlotPause()` — clears futureSlotPaused on all partitions and resumes consumption, skipping any partition still held by a store-level pause - `isFutureSlotPaused()` — returns true if any partition has the flag set Covers both paths with unit tests in StoreIngestionTaskTest.
pauseAfterStartOfPush test - Move pcs.setFutureSlotPaused(false) inside the !isStoreLevelPaused branch in resumeFromFutureSlotPause so the flag stays true when store-level pause blocks the physical resume, keeping flag state consistent with physical consumer state. - Update testFutureSlotResumeDoesNotPhysicallyResumeWhenStoreLevelPauseActive to assert isFutureSlotPaused() remains true (not false) when store-level pause is active. - Add public isPauseAfterStartOfPush()/setPauseAfterStartOfPush() getter/setter. - Add testPauseAfterStartOfPushFieldSetAndRead as a placeholder until Task 4 wires the SOP hook. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
A disk-quota resume callback could physically un-pause a partition that was intentionally held back by pausePartitionForFutureSlot. Add an explicit isFutureSlotPaused() check in resumeConsumption so future-slot pauses take the same precedence as store-level pauses. Covered by the new testQuotaResumeDoesNotOverrideFutureSlotPause unit test.
…ngle && - StoreIngestionTask.resumeConsumption: widened from private to package-private so the test can call it directly via doCallRealMethod(); merged the two sequential store-level-pause / future-slot-pause if-blocks into a single combined && guard (single logQuotaCallbackSuppressed call). - StoreIngestionTaskTest.testQuotaResumeDoesNotOverrideFutureSlotPause: removed getDeclaredMethod/setAccessible reflection; now calls task.resumeConsumption() directly. Moved pubSubTopicRepository injection into buildMinimalSitForFutureSlotTests (alongside the existing field injections) and added doCallRealMethod() wiring for resumeConsumption there.
pauseAfterStartOfPush=true Adds the pause hook inside processStartOfPush, immediately after reportStarted fires, so the push monitor registers the DVC as an expected reporter before the partition is paused. Also widens processStartOfPush and beginBatchWrite from private to package-private to allow direct invocation in unit tests. Includes testProcessStartOfPushPausesPartitionWhenFlagSet verifying that reportStarted is called before the pause and that the partition stays unpaused when the flag is false. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
infrastructure Add @VisibleForTesting getters for aggKafkaConsumerService and storageMetadataService, and refactor pausePartitionForFutureSlot, resumeFromFutureSlotPause, isFutureSlotPaused, resumeConsumption, and processStartOfPush to use getter methods instead of direct field access. This lets the 5 future-slot tests use doReturn() stubs instead of getDeclaredField/setAccessible reflection. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
A paused SIT does not need SST files from local peers — the data will be ingested via Kafka after the slot is resumed. Short-circuit shouldStartBlobTransfer when pauseAfterStartOfPush=true to avoid wasting memory and bandwidth. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
fkaStoreIngestionService - IngestionBackend: add default 5-arg startConsumption(createPaused) that throws UnsupportedOperationException for backends that don't support it - DefaultIngestionBackend: override the 5-arg overload, mirroring the existing 4-arg body but delegating to the new KafkaStoreIngestionService 4-arg overload with createPaused passed through - KafkaStoreIngestionService: add 4-arg startConsumption(createPaused) that guards DaVinci-only semantics, calls the existing 3-arg path, then flags the resulting StoreIngestionTask with setPauseAfterStartOfPush(true) - KafkaStoreIngestionServiceTest: add testCreatePausedThrowsOnNonDaVinciConfig verifying VeniceException is thrown when createPaused=true on a server config
…sPaused Adds a boolean createPaused parameter to VersionBackend.subscribe() and threads it through to the 5-arg IngestionBackend.startConsumption(). Adds package-private resume() and isPaused() methods that delegate to StoreIngestionTask.resumeFromFutureSlotPause() / isFutureSlotPaused() via backend.getIngestionService().getStoreIngestionTask(topicName). All existing call sites in StoreBackend and VersionBackendTest pass false.
Update VersionBackendTest.testRecordTransformerSubscribe to match the new 5-arg startConsumption(config, partition, pubSubPos, replicaId, createPaused) signature added in Phase 2. Use anyBoolean() for the new primitive boolean parameter instead of any() to avoid NPE on unboxing. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
maybeResumeDaVinciFutureVersion StoreBackend.trySubscribeDaVinciFutureVersion now always subscribes to the future version slot. For non-target-region target-region pushes the subscription is created paused (createPaused=true via shouldCreatePaused), replacing the old skip-subscribe gate. maybeResumeDaVinciFutureVersion checks whether a paused future version should now be resumed (when targetRegionPromoted flips to true) and is wired into DaVinciBackend.handleStoreChanged between tryDeleteInvalidDaVinciFutureVersion and trySwapDaVinciCurrentVersion. Removed: local targetRegions/isTargetRegionEnabled/startIngestionOnTargetRegionPromoted variables and the old if/else skip-subscribe block. Added: private shouldCreatePaused(Version), synchronized maybeResumeDaVinciFutureVersion(). Tests: updated testSubscribeWithDelayedIngestionEnabled for always-subscribe semantics; added testCreatesPausedSITInNonTargetRegion, testCreatesActiveSITInTargetRegion, testResumePausedSITOnTargetPromotion. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
trySwapDaVinciCurrentVersion
There was a problem hiding this comment.
Copilot reviewed 13 out of 13 changed files in this pull request and generated no comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
… topic lock Previously pauseAfterStartOfPush was set after startConsumption returned, which left a window where the SIT could poll and process SOP with the flag still false. Fix: pass createPaused into createStoreIngestionTask so the flag is set on the SIT inside the topic lock, before ingestionExecutorService.submit() and before subscribePartition() is called. Since the SIT cannot process any messages until a partition is subscribed, the flag is guaranteed to be visible before SOP can be consumed. 3-arg startConsumption now delegates to the 4-arg path with createPaused=false. Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem Statement
Under target-region push + deferred swap, DVC clients in non-target regions today do not subscribe to the future version until targetRegionPromoted=true arrives. This means they miss the Start-Of-Push control message entirely, so the push monitor never registers them as expected reporters — VPJ exits as soon as server-side ingestion completes without waiting for non-target DVC clients.
The desired behavior: DVC clients in every region subscribe immediately at version creation, consume Start-Of-Push (registering as push reporters), then pause Kafka consumption in non-target regions until targetRegionPromoted=true. Once the signal arrives, they unpause and ingest in parallel. VPJ naturally waits for all DVCs everywhere before completing.
Solution
Phase 2: SIT pause/resume primitives
Phase 3: StoreBackend wiring
Code changes
Concurrency-Specific Checks
How was this PR tested?
Unit tests (StoreIngestionTaskTest):
Unit tests (StoreBackendTest):
Integration test (TestDeferredVersionSwapWithSequentialRolloutWithDvc): existing test passes, verifying targetRegionPromoted propagates and v2 data is readable after promotion.
Does this PR introduce any user-facing or breaking changes?