From 6b7afb1a2e3b9adfc03ba03df665ab80352addfe Mon Sep 17 00:00:00 2001 From: Samrat002 Date: Sun, 12 Apr 2026 22:59:06 +0530 Subject: [PATCH 1/2] [FLINK-36753][runtime]Adaptive Scheduler actively triggers a Checkpoint after all resources are ready --- .../configuration/JobManagerOptions.java | 18 ++++ .../scheduler/adaptive/AdaptiveScheduler.java | 16 +++- .../DefaultStateTransitionManager.java | 7 ++ .../runtime/scheduler/adaptive/Executing.java | 82 ++++++++++++++++++- .../adaptive/StateTransitionManager.java | 9 ++ .../scheduler/adaptive/ExecutingTest.java | 16 +++- .../scheduling/RescaleOnCheckpointITCase.java | 55 +++++++++++++ 7 files changed, 194 insertions(+), 9 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index 065de7e63d320..03be10d88e1d8 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -745,6 +745,24 @@ public InlineElement getDescription() { .key())) .build()); + @Documentation.Section({ + Documentation.Sections.EXPERT_SCHEDULING, + Documentation.Sections.ALL_JOB_MANAGER + }) + public static final ConfigOption SCHEDULER_RESCALE_TRIGGER_ACTIVE_CHECKPOINT_ENABLED = + key("jobmanager.adaptive-scheduler.rescale-trigger.active-checkpoint.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + Description.builder() + .text( + "When enabled, the Adaptive Scheduler actively triggers a checkpoint when resources change and rescaling is desired, " + + "rather than waiting for the next periodic checkpoint. " + + "This reduces rescaling latency, especially when checkpoint intervals are large. " + + "The active trigger respects %s and will not trigger if a checkpoint is already in progress.", + text("execution.checkpointing.min-pause")) + .build()); + /** * @deprecated Use {@link * JobManagerOptions#SCHEDULER_SUBMISSION_RESOURCE_STABILIZATION_TIMEOUT}. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 70c5b62ce90ad..b75f0d09f29e1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -314,7 +314,9 @@ public static Settings of( SCHEDULER_RESCALE_TRIGGER_MAX_DELAY, maximumDelayForRescaleTriggerDefault), rescaleOnFailedCheckpointsCount, - configuration.get(WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE)); + configuration.get(WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE), + configuration.get( + JobManagerOptions.SCHEDULER_RESCALE_TRIGGER_ACTIVE_CHECKPOINT_ENABLED)); } private final SchedulerExecutionMode executionMode; @@ -326,6 +328,7 @@ public static Settings of( private final Duration maximumDelayForTriggeringRescale; private final int rescaleOnFailedCheckpointCount; private final int rescaleHistoryMax; + private final boolean activeCheckpointTriggerEnabled; private Settings( SchedulerExecutionMode executionMode, @@ -336,7 +339,8 @@ private Settings( Duration executingResourceStabilizationTimeout, Duration maximumDelayForTriggeringRescale, int rescaleOnFailedCheckpointCount, - int rescaleHistoryMax) { + int rescaleHistoryMax, + boolean activeCheckpointTriggerEnabled) { this.executionMode = executionMode; this.submissionResourceWaitTimeout = submissionResourceWaitTimeout; this.submissionResourceStabilizationTimeout = submissionResourceStabilizationTimeout; @@ -346,6 +350,7 @@ private Settings( this.maximumDelayForTriggeringRescale = maximumDelayForTriggeringRescale; this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount; this.rescaleHistoryMax = rescaleHistoryMax; + this.activeCheckpointTriggerEnabled = activeCheckpointTriggerEnabled; } public SchedulerExecutionMode getExecutionMode() { @@ -384,6 +389,10 @@ public int getRescaleHistoryMax() { return rescaleHistoryMax; } + public boolean isActiveCheckpointTriggerEnabled() { + return activeCheckpointTriggerEnabled; + } + public JobRescaleConfigInfo toJobRescaleConfigInfo() { return new JobRescaleConfigInfo( rescaleHistoryMax, @@ -1311,7 +1320,8 @@ public void goToExecuting( userCodeClassLoader, failureCollection, this::createExecutingStateTransitionManager, - settings.getRescaleOnFailedCheckpointCount())); + settings.getRescaleOnFailedCheckpointCount(), + settings.isActiveCheckpointTriggerEnabled())); } private StateTransitionManager createExecutingStateTransitionManager( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java index 87f810ae784d3..dae2812d63484 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java @@ -158,10 +158,16 @@ private void progressToStabilizing(Temporal firstChangeEventTimestamp) { resourceStabilizationTimeout, firstChangeEventTimestamp, maxTriggerDelay)); + transitionContext.requestActiveCheckpointTrigger(); } private void progressToStabilized(Temporal firstChangeEventTimestamp) { progressToPhase(new Stabilized(clock, this, firstChangeEventTimestamp, maxTriggerDelay)); + transitionContext.requestActiveCheckpointTrigger(); + } + + void requestActiveCheckpointTrigger() { + transitionContext.requestActiveCheckpointTrigger(); } private void triggerTransitionToSubsequentState() { @@ -370,6 +376,7 @@ void onChange(boolean newResourceDriven) { // event was already handled by a onTrigger callback with a no-op onChangeEventTimestamp = now(); scheduleTransitionEvaluation(); + context().requestActiveCheckpointTrigger(); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java index deae52856dc41..56daf28acd4b2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointScheduling; import org.apache.flink.runtime.checkpoint.CheckpointStatsListener; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; @@ -64,6 +65,7 @@ class Executing extends StateWithExecutionGraph private final StateTransitionManager stateTransitionManager; private final int rescaleOnFailedCheckpointCount; + private final boolean activeCheckpointTriggerEnabled; // null indicates that there was no change event observed, yet @Nullable private AtomicInteger failedCheckpointCountdown; @@ -77,7 +79,8 @@ class Executing extends StateWithExecutionGraph List failureCollection, Function stateTransitionManagerFactory, - int rescaleOnFailedCheckpointCount) { + int rescaleOnFailedCheckpointCount, + boolean activeCheckpointTriggerEnabled) { super( context, executionGraph, @@ -96,6 +99,7 @@ class Executing extends StateWithExecutionGraph rescaleOnFailedCheckpointCount > 0, "The rescaleOnFailedCheckpointCount should be larger than 0."); this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount; + this.activeCheckpointTriggerEnabled = activeCheckpointTriggerEnabled; this.failedCheckpointCountdown = null; recordRescaleForJobIntoExecuting(logger, context); @@ -182,6 +186,11 @@ public ScheduledFuture scheduleOperation(Runnable callback, Duration delay) { return context.runIfState(this, callback, delay); } + @Override + public void requestActiveCheckpointTrigger() { + triggerCheckpointForRescale(); + } + @Override public void transitionToSubsequentState() { Optional availableVertexParallelism = @@ -300,6 +309,69 @@ private void initializeFailedCheckpointCountdownIfUnset() { } } + /** + * Actively triggers a checkpoint to expedite rescaling. Without this, the scheduler would + * passively wait for the next periodic checkpoint, which could delay rescaling significantly + * when checkpoint intervals are large. + * + *

Guard conditions: + * + *

    + *
  • Checkpointing must be configured + *
  • Parallelism must have actually changed + *
  • No checkpoint must be currently in progress or being triggered + *
+ */ + private void triggerCheckpointForRescale() { + if (!activeCheckpointTriggerEnabled) { + return; + } + + final CheckpointCoordinator checkpointCoordinator = + getExecutionGraph().getCheckpointCoordinator(); + + if (checkpointCoordinator == null + || !checkpointCoordinator.isPeriodicCheckpointingConfigured()) { + getLogger() + .debug( + "Skipping active checkpoint trigger for rescale: checkpointing not configured."); + return; + } + + if (!parallelismChanged()) { + getLogger() + .debug( + "Skipping active checkpoint trigger for rescale: parallelism unchanged."); + return; + } + + if (checkpointCoordinator.getNumberOfPendingCheckpoints() > 0 + || checkpointCoordinator.isTriggering()) { + getLogger() + .debug( + "Skipping active checkpoint trigger for rescale: checkpoint already in progress."); + return; + } + + getLogger().info("Actively triggering checkpoint to expedite rescaling."); + checkpointCoordinator + .triggerCheckpoint(false) + .whenComplete( + (completedCheckpoint, throwable) -> { + if (throwable != null) { + getLogger() + .warn( + "Active checkpoint trigger for rescale failed.", + throwable); + } else { + getLogger() + .info( + "Active checkpoint for rescale completed successfully: {}.", + completedCheckpoint.getCheckpointID()); + } + }); + } + CompletableFuture stopWithSavepoint( @Nullable final String targetDirectory, boolean terminate, @@ -399,6 +471,7 @@ static class Factory implements StateFactory { private final Function stateTransitionManagerFactory; private final int rescaleOnFailedCheckpointCount; + private final boolean activeCheckpointTriggerEnabled; Factory( ExecutionGraph executionGraph, @@ -410,7 +483,8 @@ static class Factory implements StateFactory { List failureCollection, Function stateTransitionManagerFactory, - int rescaleOnFailedCheckpointCount) { + int rescaleOnFailedCheckpointCount, + boolean activeCheckpointTriggerEnabled) { this.context = context; this.log = log; this.executionGraph = executionGraph; @@ -420,6 +494,7 @@ static class Factory implements StateFactory { this.failureCollection = failureCollection; this.stateTransitionManagerFactory = stateTransitionManagerFactory; this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount; + this.activeCheckpointTriggerEnabled = activeCheckpointTriggerEnabled; } public Class getStateClass() { @@ -436,7 +511,8 @@ public Executing getState() { userCodeClassLoader, failureCollection, stateTransitionManagerFactory, - rescaleOnFailedCheckpointCount); + rescaleOnFailedCheckpointCount, + activeCheckpointTriggerEnabled); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java index 98229a9afd3e3..62dbfd85499c0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java @@ -89,5 +89,14 @@ interface Context extends RescaleContext { * @return the {@link JobID} of the job */ JobID getJobId(); + + /** + * Requests the context to actively trigger a checkpoint to expedite rescaling. Called when + * the {@link DefaultStateTransitionManager} enters a phase that is ready to accept {@link + * #onTrigger()} events (i.e., {@link DefaultStateTransitionManager.Stabilizing}). The + * implementation decides whether to actually trigger based on its own guard conditions + * (e.g., checkpointing enabled, no checkpoint in progress, config flag). + */ + default void requestActiveCheckpointTrigger() {} } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java index f593082f6c75b..b139474e0a4c7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java @@ -160,7 +160,8 @@ void testNoDeploymentCallOnEnterWhenVertexRunning() throws Exception { ClassLoader.getSystemClassLoader(), new ArrayList<>(), (context) -> TestingStateTransitionManager.withNoOp(), - 1); + 1, + false); assertThat(mockExecutionVertex.isDeployCalled()).isFalse(); } } @@ -186,7 +187,8 @@ void testIllegalStateExceptionOnNotRunningExecutionGraph() { ClassLoader.getSystemClassLoader(), new ArrayList<>(), context -> TestingStateTransitionManager.withNoOp(), - 1); + 1, + false); } }) .isInstanceOf(IllegalStateException.class); @@ -691,6 +693,7 @@ private final class ExecutingStateBuilder { private Function stateTransitionManagerFactory = context -> TestingStateTransitionManager.withNoOp(); private int rescaleOnFailedCheckpointCount = 1; + private boolean activeCheckpointTriggerEnabled = false; private ExecutingStateBuilder() throws JobException, JobExecutionException { operatorCoordinatorHandler = new TestingOperatorCoordinatorHandler(); @@ -733,7 +736,8 @@ private Executing build(MockExecutingContext ctx) { ClassLoader.getSystemClassLoader(), new ArrayList<>(), stateTransitionManagerFactory::apply, - rescaleOnFailedCheckpointCount); + rescaleOnFailedCheckpointCount, + activeCheckpointTriggerEnabled); } finally { Preconditions.checkState( !ctx.hadStateTransition, @@ -1029,6 +1033,12 @@ public boolean updateState(TaskExecutionStateTransition state) { public Iterable getVerticesTopologically() { return getVerticesTopologicallySupplier.get(); } + + @Nullable + @Override + public CheckpointCoordinator getCheckpointCoordinator() { + return null; + } } private static class FinishingMockExecutionGraph extends StateTrackingMockExecutionGraph { diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java index 6a24600f1ace1..b058ac1016f38 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/RescaleOnCheckpointITCase.java @@ -175,4 +175,59 @@ void testRescaleOnCheckpoint( restClusterClient.cancel(jobGraph.getJobID()).join(); } } + + @Test + void testRescaleWithActiveCheckpointTrigger( + @InjectMiniCluster MiniCluster miniCluster, + @InjectClusterClient RestClusterClient restClusterClient) + throws Exception { + final Configuration config = new Configuration(); + + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(config); + env.setParallelism(BEFORE_RESCALE_PARALLELISM); + env.enableCheckpointing(Duration.ofHours(1).toMillis()); + env.fromSequence(0, Integer.MAX_VALUE).sinkTo(new DiscardingSink<>()); + + final JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + final Iterator jobVertexIterator = jobGraph.getVertices().iterator(); + assertThat(jobVertexIterator.hasNext()).isTrue(); + final JobVertexID jobVertexId = jobVertexIterator.next().getID(); + + final JobResourceRequirements jobResourceRequirements = + JobResourceRequirements.newBuilder() + .setParallelismForJobVertex(jobVertexId, 1, AFTER_RESCALE_PARALLELISM) + .build(); + + restClusterClient.submitJob(jobGraph).join(); + + final JobID jobId = jobGraph.getJobID(); + try { + LOG.info( + "Waiting for job {} to reach parallelism of {} for vertex {}.", + jobId, + BEFORE_RESCALE_PARALLELISM, + jobVertexId); + waitForRunningTasks(restClusterClient, jobId, BEFORE_RESCALE_PARALLELISM); + + LOG.info( + "Updating job {} resource requirements: parallelism {} -> {}.", + jobId, + BEFORE_RESCALE_PARALLELISM, + AFTER_RESCALE_PARALLELISM); + restClusterClient.updateJobResourceRequirements(jobId, jobResourceRequirements).join(); + LOG.info( + "Waiting for job {} to rescale to parallelism {} via active checkpoint trigger.", + jobId, + AFTER_RESCALE_PARALLELISM); + waitForRunningTasks(restClusterClient, jobId, AFTER_RESCALE_PARALLELISM); + final int expectedFreeSlotCount = NUMBER_OF_SLOTS - AFTER_RESCALE_PARALLELISM; + LOG.info( + "Waiting for {} slot(s) to become available after scale down.", + expectedFreeSlotCount); + waitForAvailableSlots(restClusterClient, expectedFreeSlotCount); + } finally { + restClusterClient.cancel(jobGraph.getJobID()).join(); + } + } } From 59a76f93a0ada5252e62a6d78b170c84b4563d76 Mon Sep 17 00:00:00 2001 From: Samrat002 Date: Mon, 13 Apr 2026 22:48:32 +0530 Subject: [PATCH 2/2] [FLINK-36753][runtime] Doc update --- .../shortcodes/generated/all_jobmanager_section.html | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/layouts/shortcodes/generated/all_jobmanager_section.html b/docs/layouts/shortcodes/generated/all_jobmanager_section.html index 87d8875a6df2f..a9b1720823e8d 100644 --- a/docs/layouts/shortcodes/generated/all_jobmanager_section.html +++ b/docs/layouts/shortcodes/generated/all_jobmanager_section.html @@ -50,6 +50,12 @@ Boolean This parameter defines whether the adaptive scheduler prioritizes using the minimum number of TaskManagers when scheduling tasks.
Note, this parameter is suitable if execution.state-recovery.from-local is not enabled. More details about this configuration are available at FLINK-33977. + +
jobmanager.adaptive-scheduler.rescale-trigger.active-checkpoint.enabled
+ false + Boolean + When enabled, the Adaptive Scheduler actively triggers a checkpoint when resources change and rescaling is desired, rather than waiting for the next periodic checkpoint. This reduces rescaling latency, especially when checkpoint intervals are large. The active trigger respects execution.checkpointing.min-pause and will not trigger if a checkpoint is already in progress. +
jobmanager.adaptive-scheduler.rescale-trigger.max-checkpoint-failures
2