Skip to content

[FLINK-36753][runtime]Adaptive Scheduler actively triggers a Checkpoint after all resources are ready#27921

Open
Samrat002 wants to merge 2 commits intoapache:masterfrom
Samrat002:FLINK-36753
Open

[FLINK-36753][runtime]Adaptive Scheduler actively triggers a Checkpoint after all resources are ready#27921
Samrat002 wants to merge 2 commits intoapache:masterfrom
Samrat002:FLINK-36753

Conversation

@Samrat002
Copy link
Copy Markdown
Contributor

What is the purpose of the change

FLIP-461 introduced checkpoint-synchronized rescaling where the Adaptive Scheduler waits for a checkpoint to complete before rescaling. However, it passively waits for the next periodic checkpoint, which can delay rescaling significantly when checkpoint intervals are large (e.g., 10 minutes).
This PR makes the Adaptive Scheduler actively trigger a checkpoint when resources change and rescaling is desired. The trigger fires at the right time. ie, when the DefaultStateTransitionManager enters the Stabilizing or Stabilized phase (i.e., when the resource gate is open and the scheduler is waiting for the checkpoint gate). The feature is controlled by a new configuration option jobmanager.adaptive-scheduler.rescale-trigger.active-checkpoint.enabled (default: false).

The feature respects execution.checkpointing.min-pause, skips if a checkpoint is already in progress, and only fires when parallelism has actually changed.

Brief change log

  • Added requestActiveCheckpointTrigger() to StateTransitionManager.Context interface
  • DefaultStateTransitionManager calls requestActiveCheckpointTrigger() when entering Stabilizing, on onChange during Stabilizing, and when entering Stabilized
  • Executing implements the callback with guard conditions (config enabled, checkpointing configured, parallelism changed, no checkpoint in progress)
  • Added config option jobmanager.adaptive-scheduler.rescale-trigger.active-checkpoint.enabled wired through AdaptiveScheduler.Settings
  • Added integration test proving rescale happens without periodic checkpoints or manual triggers

Verifying this change

This change added tests and can be verified as follows:

  • Added RescaleOnCheckpointITCase#testRescaleWithActiveCheckpointTrigger that starts a job with checkpointing interval of 1 hour, maxTriggerDelay set to infinity, and no manual triggerCheckpoint() call.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 12, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@Samrat002 Samrat002 marked this pull request as ready for review April 13, 2026 17:20
@Samrat002
Copy link
Copy Markdown
Contributor Author

@1996fanrui PTAL whenever time.

Copy link
Copy Markdown
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution. I've left a couple of comments, however I don't have context to review whether this is properly integrated with AdatpiveScheduler and DefaultStateTransitionManager. Would be great for someone else to take a look as well.

public static final ConfigOption<Boolean> SCHEDULER_RESCALE_TRIGGER_ACTIVE_CHECKPOINT_ENABLED =
key("jobmanager.adaptive-scheduler.rescale-trigger.active-checkpoint.enabled")
.booleanType()
.defaultValue(false)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a downside of using this option? If we expect this to be generally positive change, and you disable it by default only as a pre-caution/for backward compatibility, I would be actually fine setting it by default to true.

Comment on lines +357 to +372
checkpointCoordinator
.triggerCheckpoint(false)
.whenComplete(
(completedCheckpoint, throwable) -> {
if (throwable != null) {
getLogger()
.warn(
"Active checkpoint trigger for rescale failed.",
throwable);
} else {
getLogger()
.info(
"Active checkpoint for rescale completed successfully: {}.",
completedCheckpoint.getCheckpointID());
}
});
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we wrap this with FutureUtils.assertNoException?

Comment on lines +223 to +228
waitForRunningTasks(restClusterClient, jobId, AFTER_RESCALE_PARALLELISM);
final int expectedFreeSlotCount = NUMBER_OF_SLOTS - AFTER_RESCALE_PARALLELISM;
LOG.info(
"Waiting for {} slot(s) to become available after scale down.",
expectedFreeSlotCount);
waitForAvailableSlots(restClusterClient, expectedFreeSlotCount);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand your test, it would still pass after 1h, after the regular periodic checkpoint is triggered after 1h, even with your new option disabled, right?

I think you should make sure that the timeout in waitForRunningTasks waitForAvailableSlots (or CI timeout 4h) is longer than env.enableCheckpointing(Duration.ofHours(1).toMillis());. So either, decrease the timeout in waiting to < 30 minutes, or increase checkpointing interval to 24h (CI will be killed after 4h AFAIR).

}

@Test
void testRescaleWithActiveCheckpointTrigger(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you made sure that this test is failing without your change?

Also, I don't see this test enabling SCHEDULER_RESCALE_TRIGGER_ACTIVE_CHECKPOINT_ENABLED anywhere? How is it passing right now?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants