[FLINK-36753][runtime]Adaptive Scheduler actively triggers a Checkpoint after all resources are ready#27921
[FLINK-36753][runtime]Adaptive Scheduler actively triggers a Checkpoint after all resources are ready#27921Samrat002 wants to merge 2 commits intoapache:masterfrom
Conversation
…nt after all resources are ready
|
@1996fanrui PTAL whenever time. |
pnowojski
left a comment
There was a problem hiding this comment.
Thanks for the contribution. I've left a couple of comments, however I don't have context to review whether this is properly integrated with AdatpiveScheduler and DefaultStateTransitionManager. Would be great for someone else to take a look as well.
| public static final ConfigOption<Boolean> SCHEDULER_RESCALE_TRIGGER_ACTIVE_CHECKPOINT_ENABLED = | ||
| key("jobmanager.adaptive-scheduler.rescale-trigger.active-checkpoint.enabled") | ||
| .booleanType() | ||
| .defaultValue(false) |
There was a problem hiding this comment.
Is there a downside of using this option? If we expect this to be generally positive change, and you disable it by default only as a pre-caution/for backward compatibility, I would be actually fine setting it by default to true.
| checkpointCoordinator | ||
| .triggerCheckpoint(false) | ||
| .whenComplete( | ||
| (completedCheckpoint, throwable) -> { | ||
| if (throwable != null) { | ||
| getLogger() | ||
| .warn( | ||
| "Active checkpoint trigger for rescale failed.", | ||
| throwable); | ||
| } else { | ||
| getLogger() | ||
| .info( | ||
| "Active checkpoint for rescale completed successfully: {}.", | ||
| completedCheckpoint.getCheckpointID()); | ||
| } | ||
| }); |
There was a problem hiding this comment.
Shouldn't we wrap this with FutureUtils.assertNoException?
| waitForRunningTasks(restClusterClient, jobId, AFTER_RESCALE_PARALLELISM); | ||
| final int expectedFreeSlotCount = NUMBER_OF_SLOTS - AFTER_RESCALE_PARALLELISM; | ||
| LOG.info( | ||
| "Waiting for {} slot(s) to become available after scale down.", | ||
| expectedFreeSlotCount); | ||
| waitForAvailableSlots(restClusterClient, expectedFreeSlotCount); |
There was a problem hiding this comment.
If I understand your test, it would still pass after 1h, after the regular periodic checkpoint is triggered after 1h, even with your new option disabled, right?
I think you should make sure that the timeout in waitForRunningTasks waitForAvailableSlots (or CI timeout 4h) is longer than env.enableCheckpointing(Duration.ofHours(1).toMillis());. So either, decrease the timeout in waiting to < 30 minutes, or increase checkpointing interval to 24h (CI will be killed after 4h AFAIR).
| } | ||
|
|
||
| @Test | ||
| void testRescaleWithActiveCheckpointTrigger( |
There was a problem hiding this comment.
Have you made sure that this test is failing without your change?
Also, I don't see this test enabling SCHEDULER_RESCALE_TRIGGER_ACTIVE_CHECKPOINT_ENABLED anywhere? How is it passing right now?
What is the purpose of the change
FLIP-461 introduced checkpoint-synchronized rescaling where the Adaptive Scheduler waits for a checkpoint to complete before rescaling. However, it passively waits for the next periodic checkpoint, which can delay rescaling significantly when checkpoint intervals are large (e.g., 10 minutes).
This PR makes the Adaptive Scheduler actively trigger a checkpoint when resources change and rescaling is desired. The trigger fires at the right time. ie, when the
DefaultStateTransitionManagerenters the Stabilizing or Stabilized phase (i.e., when the resource gate is open and the scheduler is waiting for the checkpoint gate). The feature is controlled by a new configuration optionjobmanager.adaptive-scheduler.rescale-trigger.active-checkpoint.enabled(default: false).The feature respects
execution.checkpointing.min-pause, skips if a checkpoint is already in progress, and only fires when parallelism has actually changed.Brief change log
Verifying this change
This change added tests and can be verified as follows:
RescaleOnCheckpointITCase#testRescaleWithActiveCheckpointTriggerthat starts a job with checkpointing interval of 1 hour, maxTriggerDelay set to infinity, and no manual triggerCheckpoint() call.Does this pull request potentially affect one of the following parts:
Documentation