-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-36753][runtime]Adaptive Scheduler actively triggers a Checkpoint after all resources are ready #27921
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |
| import org.apache.flink.api.common.JobStatus; | ||
| import org.apache.flink.core.execution.SavepointFormatType; | ||
| import org.apache.flink.runtime.JobException; | ||
| import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; | ||
| import org.apache.flink.runtime.checkpoint.CheckpointScheduling; | ||
| import org.apache.flink.runtime.checkpoint.CheckpointStatsListener; | ||
| import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; | ||
|
|
@@ -64,6 +65,7 @@ class Executing extends StateWithExecutionGraph | |
|
|
||
| private final StateTransitionManager stateTransitionManager; | ||
| private final int rescaleOnFailedCheckpointCount; | ||
| private final boolean activeCheckpointTriggerEnabled; | ||
| // null indicates that there was no change event observed, yet | ||
| @Nullable private AtomicInteger failedCheckpointCountdown; | ||
|
|
||
|
|
@@ -77,7 +79,8 @@ class Executing extends StateWithExecutionGraph | |
| List<ExceptionHistoryEntry> failureCollection, | ||
| Function<StateTransitionManager.Context, StateTransitionManager> | ||
| stateTransitionManagerFactory, | ||
| int rescaleOnFailedCheckpointCount) { | ||
| int rescaleOnFailedCheckpointCount, | ||
| boolean activeCheckpointTriggerEnabled) { | ||
| super( | ||
| context, | ||
| executionGraph, | ||
|
|
@@ -96,6 +99,7 @@ class Executing extends StateWithExecutionGraph | |
| rescaleOnFailedCheckpointCount > 0, | ||
| "The rescaleOnFailedCheckpointCount should be larger than 0."); | ||
| this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount; | ||
| this.activeCheckpointTriggerEnabled = activeCheckpointTriggerEnabled; | ||
| this.failedCheckpointCountdown = null; | ||
|
|
||
| recordRescaleForJobIntoExecuting(logger, context); | ||
|
|
@@ -182,6 +186,11 @@ public ScheduledFuture<?> scheduleOperation(Runnable callback, Duration delay) { | |
| return context.runIfState(this, callback, delay); | ||
| } | ||
|
|
||
| @Override | ||
| public void requestActiveCheckpointTrigger() { | ||
| triggerCheckpointForRescale(); | ||
| } | ||
|
|
||
| @Override | ||
| public void transitionToSubsequentState() { | ||
| Optional<VertexParallelism> availableVertexParallelism = | ||
|
|
@@ -300,6 +309,69 @@ private void initializeFailedCheckpointCountdownIfUnset() { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Actively triggers a checkpoint to expedite rescaling. Without this, the scheduler would | ||
| * passively wait for the next periodic checkpoint, which could delay rescaling significantly | ||
| * when checkpoint intervals are large. | ||
| * | ||
| * <p>Guard conditions: | ||
| * | ||
| * <ul> | ||
| * <li>Checkpointing must be configured | ||
| * <li>Parallelism must have actually changed | ||
| * <li>No checkpoint must be currently in progress or being triggered | ||
| * </ul> | ||
| */ | ||
| private void triggerCheckpointForRescale() { | ||
| if (!activeCheckpointTriggerEnabled) { | ||
| return; | ||
| } | ||
|
|
||
| final CheckpointCoordinator checkpointCoordinator = | ||
| getExecutionGraph().getCheckpointCoordinator(); | ||
|
|
||
| if (checkpointCoordinator == null | ||
| || !checkpointCoordinator.isPeriodicCheckpointingConfigured()) { | ||
| getLogger() | ||
| .debug( | ||
| "Skipping active checkpoint trigger for rescale: checkpointing not configured."); | ||
| return; | ||
| } | ||
|
|
||
| if (!parallelismChanged()) { | ||
| getLogger() | ||
| .debug( | ||
| "Skipping active checkpoint trigger for rescale: parallelism unchanged."); | ||
| return; | ||
| } | ||
|
|
||
| if (checkpointCoordinator.getNumberOfPendingCheckpoints() > 0 | ||
| || checkpointCoordinator.isTriggering()) { | ||
| getLogger() | ||
| .debug( | ||
| "Skipping active checkpoint trigger for rescale: checkpoint already in progress."); | ||
| return; | ||
| } | ||
|
|
||
| getLogger().info("Actively triggering checkpoint to expedite rescaling."); | ||
| checkpointCoordinator | ||
| .triggerCheckpoint(false) | ||
| .whenComplete( | ||
| (completedCheckpoint, throwable) -> { | ||
| if (throwable != null) { | ||
| getLogger() | ||
| .warn( | ||
| "Active checkpoint trigger for rescale failed.", | ||
| throwable); | ||
| } else { | ||
| getLogger() | ||
| .info( | ||
| "Active checkpoint for rescale completed successfully: {}.", | ||
| completedCheckpoint.getCheckpointID()); | ||
| } | ||
| }); | ||
|
Comment on lines
+357
to
+372
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't we wrap this with |
||
| } | ||
|
|
||
| CompletableFuture<String> stopWithSavepoint( | ||
| @Nullable final String targetDirectory, | ||
| boolean terminate, | ||
|
|
@@ -399,6 +471,7 @@ static class Factory implements StateFactory<Executing> { | |
| private final Function<StateTransitionManager.Context, StateTransitionManager> | ||
| stateTransitionManagerFactory; | ||
| private final int rescaleOnFailedCheckpointCount; | ||
| private final boolean activeCheckpointTriggerEnabled; | ||
|
|
||
| Factory( | ||
| ExecutionGraph executionGraph, | ||
|
|
@@ -410,7 +483,8 @@ static class Factory implements StateFactory<Executing> { | |
| List<ExceptionHistoryEntry> failureCollection, | ||
| Function<StateTransitionManager.Context, StateTransitionManager> | ||
| stateTransitionManagerFactory, | ||
| int rescaleOnFailedCheckpointCount) { | ||
| int rescaleOnFailedCheckpointCount, | ||
| boolean activeCheckpointTriggerEnabled) { | ||
| this.context = context; | ||
| this.log = log; | ||
| this.executionGraph = executionGraph; | ||
|
|
@@ -420,6 +494,7 @@ static class Factory implements StateFactory<Executing> { | |
| this.failureCollection = failureCollection; | ||
| this.stateTransitionManagerFactory = stateTransitionManagerFactory; | ||
| this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount; | ||
| this.activeCheckpointTriggerEnabled = activeCheckpointTriggerEnabled; | ||
| } | ||
|
|
||
| public Class<Executing> getStateClass() { | ||
|
|
@@ -436,7 +511,8 @@ public Executing getState() { | |
| userCodeClassLoader, | ||
| failureCollection, | ||
| stateTransitionManagerFactory, | ||
| rescaleOnFailedCheckpointCount); | ||
| rescaleOnFailedCheckpointCount, | ||
| activeCheckpointTriggerEnabled); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -175,4 +175,59 @@ void testRescaleOnCheckpoint( | |
| restClusterClient.cancel(jobGraph.getJobID()).join(); | ||
| } | ||
| } | ||
|
|
||
| @Test | ||
| void testRescaleWithActiveCheckpointTrigger( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have you made sure that this test is failing without your change? Also, I don't see this test enabling |
||
| @InjectMiniCluster MiniCluster miniCluster, | ||
| @InjectClusterClient RestClusterClient<?> restClusterClient) | ||
| throws Exception { | ||
| final Configuration config = new Configuration(); | ||
|
|
||
| final StreamExecutionEnvironment env = | ||
| StreamExecutionEnvironment.getExecutionEnvironment(config); | ||
| env.setParallelism(BEFORE_RESCALE_PARALLELISM); | ||
| env.enableCheckpointing(Duration.ofHours(1).toMillis()); | ||
| env.fromSequence(0, Integer.MAX_VALUE).sinkTo(new DiscardingSink<>()); | ||
|
|
||
| final JobGraph jobGraph = env.getStreamGraph().getJobGraph(); | ||
| final Iterator<JobVertex> jobVertexIterator = jobGraph.getVertices().iterator(); | ||
| assertThat(jobVertexIterator.hasNext()).isTrue(); | ||
| final JobVertexID jobVertexId = jobVertexIterator.next().getID(); | ||
|
|
||
| final JobResourceRequirements jobResourceRequirements = | ||
| JobResourceRequirements.newBuilder() | ||
| .setParallelismForJobVertex(jobVertexId, 1, AFTER_RESCALE_PARALLELISM) | ||
| .build(); | ||
|
|
||
| restClusterClient.submitJob(jobGraph).join(); | ||
|
|
||
| final JobID jobId = jobGraph.getJobID(); | ||
| try { | ||
| LOG.info( | ||
| "Waiting for job {} to reach parallelism of {} for vertex {}.", | ||
| jobId, | ||
| BEFORE_RESCALE_PARALLELISM, | ||
| jobVertexId); | ||
| waitForRunningTasks(restClusterClient, jobId, BEFORE_RESCALE_PARALLELISM); | ||
|
|
||
| LOG.info( | ||
| "Updating job {} resource requirements: parallelism {} -> {}.", | ||
| jobId, | ||
| BEFORE_RESCALE_PARALLELISM, | ||
| AFTER_RESCALE_PARALLELISM); | ||
| restClusterClient.updateJobResourceRequirements(jobId, jobResourceRequirements).join(); | ||
| LOG.info( | ||
| "Waiting for job {} to rescale to parallelism {} via active checkpoint trigger.", | ||
| jobId, | ||
| AFTER_RESCALE_PARALLELISM); | ||
| waitForRunningTasks(restClusterClient, jobId, AFTER_RESCALE_PARALLELISM); | ||
| final int expectedFreeSlotCount = NUMBER_OF_SLOTS - AFTER_RESCALE_PARALLELISM; | ||
| LOG.info( | ||
| "Waiting for {} slot(s) to become available after scale down.", | ||
| expectedFreeSlotCount); | ||
| waitForAvailableSlots(restClusterClient, expectedFreeSlotCount); | ||
|
Comment on lines
+223
to
+228
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understand your test, it would still pass after 1h, after the regular periodic checkpoint is triggered after 1h, even with your new option disabled, right? I think you should make sure that the timeout in |
||
| } finally { | ||
| restClusterClient.cancel(jobGraph.getJobID()).join(); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a downside of using this option? If we expect this to be generally positive change, and you disable it by default only as a pre-caution/for backward compatibility, I would be actually fine setting it by default to
true.