Skip to content

[dvc][compat] Paused-SIT: DVC subscribes to future version immediately, paused in non-target regions until targetRegionPromoted#2879

Draft
misyel wants to merge 15 commits into
linkedin:mainfrom
misyel:mkwong/dvc-paused-sit-phases-2-3
Draft

[dvc][compat] Paused-SIT: DVC subscribes to future version immediately, paused in non-target regions until targetRegionPromoted#2879
misyel wants to merge 15 commits into
linkedin:mainfrom
misyel:mkwong/dvc-paused-sit-phases-2-3

Conversation

@misyel

@misyel misyel commented Jun 19, 2026

Copy link
Copy Markdown
Contributor

Problem Statement

Under target-region push + deferred swap, DVC clients in non-target regions today do not subscribe to the future version until targetRegionPromoted=true arrives. This means they miss the Start-Of-Push control message entirely, so the push monitor never registers them as expected reporters — VPJ exits as soon as server-side ingestion completes without waiting for non-target DVC clients.

The desired behavior: DVC clients in every region subscribe immediately at version creation, consume Start-Of-Push (registering as push reporters), then pause Kafka consumption in non-target regions until targetRegionPromoted=true. Once the signal arrives, they unpause and ingest in parallel. VPJ naturally waits for all DVCs everywhere before completing.

Solution

Phase 2: SIT pause/resume primitives

  • Add futureSlotPaused flag to PartitionConsumptionState (mirrors storeLevelPaused)
  • Add pauseAfterStartOfPush flag + pausePartitionForFutureSlot / resumeFromFutureSlotPause / isFutureSlotPaused helpers to StoreIngestionTask
  • Trigger pause in processStartOfPush after reportStarted (so push monitor sees this DVC as an expected reporter before pausing)
  • Guard quota resumeConsumption so it cannot physically resume a future-slot-paused partition
  • Gate blob transfer off for paused SITs (data not needed until after promotion)
  • Plumb createPaused through IngestionBackend -> DefaultIngestionBackend -> KafkaStoreIngestionService with DaVinci-only assertion
  • Add createPaused to VersionBackend.subscribe; add resume() and isPaused()

Phase 3: StoreBackend wiring

  • Add shouldCreatePaused() helper: false for no-target-region push, false if this region IS the target, true for non-target when targetRegionPromoted=false
  • trySubscribeDaVinciFutureVersion() now always subscribes (passing createPaused from shouldCreatePaused) instead of skipping non-target regions
  • Add maybeResumeDaVinciFutureVersion(): if paused future version and shouldCreatePaused now returns false, call resume()
  • DaVinciBackend.handleStoreChanged() calls maybeResumeDaVinciFutureVersion() after trySwapDaVinciCurrentVersion and before trySubscribeDaVinciFutureVersion

Code changes

  • Added new code behind a config.
  • Introduced new log lines.
    • Confirmed if logs need to be rate limited to avoid excessive logging.

Concurrency-Specific Checks

  • No race conditions. pausePartitionForFutureSlot and resumeFromFutureSlotPause operate on PCS flags + AggKafkaConsumerService which is thread-safe. maybeResumeDaVinciFutureVersion and trySubscribeDaVinciFutureVersion are both synchronized on StoreBackend.
  • futureSlotPaused is volatile, matching the storeLevelPaused pattern.
  • resumeFromFutureSlotPause only clears the flag and physically resumes when storeLevelPaused is false.
  • No blocking calls inside critical sections.
  • No new collections introduced.

How was this PR tested?

  • New unit tests added.
  • New integration tests added.
  • Modified or extended existing tests.
  • Verified backward compatibility. Stores without targetSwapRegion set are unaffected (shouldCreatePaused returns false, SIT created active as before).

Unit tests (StoreIngestionTaskTest):

  • SIT created with createPaused=true consumes SOP and fires reportStarted, then pauses
  • resumeFromFutureSlotPause only clears flag and physically resumes when storeLevelPaused is false
  • Quota resumeConsumption does not override future-slot pause
  • Blob transfer suppressed when pauseAfterStartOfPush is set

Unit tests (StoreBackendTest):

  • testCreatesPausedSITInNonTargetRegion: non-target region creates paused SIT
  • testCreatesActiveSITInTargetRegion: target region is active
  • testResumePausedSITOnTargetPromotion: paused SIT resumes when targetRegionPromoted flips

Integration test (TestDeferredVersionSwapWithSequentialRolloutWithDvc): existing test passes, verifying targetRegionPromoted propagates and v2 data is readable after promotion.

Does this PR introduce any user-facing or breaking changes?

  • No. DVC behavior is unchanged for stores without targetSwapRegion set. For stores with deferred swap, VPJ duration will increase (VPJ now waits for all DVC clients everywhere to complete, not just server-side ingestion).

misyel and others added 14 commits June 16, 2026 14:42
VersionStatus.ONLINE

Non-target-region DVC clients previously waited for the version to reach
VersionStatus.ONLINE before subscribing (startIngestionInNonTargetRegion
condition). With the paused-SIT deferred swap design (linkedin#2812), the version
swap in non-target regions is intentionally deferred — ONLINE never arrives
until after DVC has already ingested the data — creating a deadlock.

Replace the ONLINE gate with targetRegionPromoted=true, set by
DeferredVersionSwapService once the target region's push completes and
propagated to child controllers via UpdateStore admin message.

The skip-subscribe log message now reports targetRegionPromoted instead
of VersionStatus, so it accurately reflects the actual gating condition.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Add `pauseAfterStartOfPush` flag and three public methods:
- `pausePartitionForFutureSlot(int)` — sets PCS.futureSlotPaused and
  physically pauses Kafka consumption via aggKafkaConsumerService
- `resumeFromFutureSlotPause()` — clears futureSlotPaused on all
  partitions and resumes consumption, skipping any partition still held
  by a store-level pause
- `isFutureSlotPaused()` — returns true if any partition has the flag set

Covers both paths with unit tests in StoreIngestionTaskTest.
pauseAfterStartOfPush test

- Move pcs.setFutureSlotPaused(false) inside the !isStoreLevelPaused branch in
  resumeFromFutureSlotPause so the flag stays true when store-level pause
blocks the
  physical resume, keeping flag state consistent with physical consumer state.
- Update testFutureSlotResumeDoesNotPhysicallyResumeWhenStoreLevelPauseActive
to assert
  isFutureSlotPaused() remains true (not false) when store-level pause is
active.
- Add public isPauseAfterStartOfPush()/setPauseAfterStartOfPush()
getter/setter.
- Add testPauseAfterStartOfPushFieldSetAndRead as a placeholder until Task 4
wires
  the SOP hook.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
A disk-quota resume callback could physically un-pause a partition that
was intentionally held back by pausePartitionForFutureSlot.  Add an
explicit isFutureSlotPaused() check in resumeConsumption so future-slot
pauses take the same precedence as store-level pauses.  Covered by the
new testQuotaResumeDoesNotOverrideFutureSlotPause unit test.
…ngle &&

- StoreIngestionTask.resumeConsumption: widened from private to
  package-private so the test can call it directly via doCallRealMethod();
  merged the two sequential store-level-pause / future-slot-pause if-blocks
  into a single combined && guard (single logQuotaCallbackSuppressed call).
- StoreIngestionTaskTest.testQuotaResumeDoesNotOverrideFutureSlotPause:
  removed getDeclaredMethod/setAccessible reflection; now calls
  task.resumeConsumption() directly. Moved pubSubTopicRepository
  injection into buildMinimalSitForFutureSlotTests (alongside the
  existing field injections) and added doCallRealMethod() wiring for
  resumeConsumption there.
pauseAfterStartOfPush=true

Adds the pause hook inside processStartOfPush, immediately after
reportStarted fires, so the push monitor registers the DVC as an
expected reporter before the partition is paused. Also widens
processStartOfPush and beginBatchWrite from private to package-private
to allow direct invocation in unit tests.

Includes testProcessStartOfPushPausesPartitionWhenFlagSet verifying
that reportStarted is called before the pause and that the partition
stays unpaused when the flag is false.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
infrastructure

Add @VisibleForTesting getters for aggKafkaConsumerService and
storageMetadataService,
and refactor pausePartitionForFutureSlot, resumeFromFutureSlotPause,
isFutureSlotPaused,
resumeConsumption, and processStartOfPush to use getter methods instead of
direct field
access. This lets the 5 future-slot tests use doReturn() stubs instead of
getDeclaredField/setAccessible reflection.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
A paused SIT does not need SST files from local peers — the data will
be ingested via Kafka after the slot is resumed. Short-circuit
shouldStartBlobTransfer when pauseAfterStartOfPush=true to avoid
wasting memory and bandwidth.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
fkaStoreIngestionService

- IngestionBackend: add default 5-arg startConsumption(createPaused) that
  throws UnsupportedOperationException for backends that don't support it
- DefaultIngestionBackend: override the 5-arg overload, mirroring the
  existing 4-arg body but delegating to the new KafkaStoreIngestionService
  4-arg overload with createPaused passed through
- KafkaStoreIngestionService: add 4-arg startConsumption(createPaused) that
  guards DaVinci-only semantics, calls the existing 3-arg path, then flags
  the resulting StoreIngestionTask with setPauseAfterStartOfPush(true)
- KafkaStoreIngestionServiceTest: add testCreatePausedThrowsOnNonDaVinciConfig
  verifying VeniceException is thrown when createPaused=true on a server
config
…sPaused

Adds a boolean createPaused parameter to VersionBackend.subscribe() and
threads it through to the 5-arg IngestionBackend.startConsumption(). Adds
package-private resume() and isPaused() methods that delegate to
StoreIngestionTask.resumeFromFutureSlotPause() / isFutureSlotPaused() via
backend.getIngestionService().getStoreIngestionTask(topicName). All
existing call sites in StoreBackend and VersionBackendTest pass false.
Update VersionBackendTest.testRecordTransformerSubscribe to match the
new 5-arg startConsumption(config, partition, pubSubPos, replicaId,
createPaused) signature added in Phase 2. Use anyBoolean() for the new
primitive boolean parameter instead of any() to avoid NPE on unboxing.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
maybeResumeDaVinciFutureVersion

StoreBackend.trySubscribeDaVinciFutureVersion now always subscribes to
the future version slot. For non-target-region target-region pushes the
subscription is created paused (createPaused=true via shouldCreatePaused),
replacing the old skip-subscribe gate. maybeResumeDaVinciFutureVersion
checks whether a paused future version should now be resumed (when
targetRegionPromoted flips to true) and is wired into
DaVinciBackend.handleStoreChanged between tryDeleteInvalidDaVinciFutureVersion
and trySwapDaVinciCurrentVersion.

Removed: local
targetRegions/isTargetRegionEnabled/startIngestionOnTargetRegionPromoted
variables and the old if/else skip-subscribe block.
Added: private shouldCreatePaused(Version), synchronized
maybeResumeDaVinciFutureVersion().

Tests: updated testSubscribeWithDelayedIngestionEnabled for always-subscribe
semantics; added testCreatesPausedSITInNonTargetRegion,
testCreatesActiveSITInTargetRegion, testResumePausedSITOnTargetPromotion.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings June 19, 2026 00:26

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 13 out of 13 changed files in this pull request and generated no comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@misyel misyel marked this pull request as draft June 19, 2026 00:38
… topic

lock

Previously pauseAfterStartOfPush was set after startConsumption returned,
which
left a window where the SIT could poll and process SOP with the flag still
false.

Fix: pass createPaused into createStoreIngestionTask so the flag is set on the
SIT inside the topic lock, before ingestionExecutorService.submit() and before
subscribePartition() is called. Since the SIT cannot process any messages
until
a partition is subscribed, the flag is guaranteed to be visible before SOP can
be consumed.

3-arg startConsumption now delegates to the 4-arg path with
createPaused=false.

Co-Authored-By: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants