From 2e1c79f3b4783b7d54873fcb700e918fc34b4323 Mon Sep 17 00:00:00 2001 From: Lalit Yadav Date: Wed, 17 Jun 2026 21:35:27 -0500 Subject: [PATCH] Add drain states to PipelineResult --- .../flink/FlinkDetachedRunnerResult.java | 4 +-- .../runners/flink/FlinkRunnerResultTest.java | 12 +++---- .../runners/dataflow/DataflowPipelineJob.java | 1 + .../runners/dataflow/util/MonitoringUtil.java | 6 ++-- .../dataflow/DataflowPipelineJobTest.java | 2 +- .../dataflow/util/MonitoringUtilTest.java | 4 +-- .../runners/jobsubmission/JobInvocation.java | 15 ++++++-- .../jobsubmission/JobInvocationTest.java | 36 +++++++++++++++++++ .../portability/JobServicePipelineResult.java | 7 ++-- .../portability/PortableRunnerTest.java | 21 +++++++++++ .../org/apache/beam/sdk/PipelineResult.java | 6 ++++ .../beam/sdk/nexmark/NexmarkLauncher.java | 2 ++ 12 files changed, 96 insertions(+), 20 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java index b26e865526dd..d7c8912ded01 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkDetachedRunnerResult.java @@ -120,11 +120,11 @@ public synchronized State drain() throws IOException { private State getDrainState(CompletableFuture drainFuture) throws IOException { if (!drainFuture.isDone()) { - return State.RUNNING; + return State.DRAINING; } try { drainFuture.get(); - return State.DONE; + return State.DRAINED; } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException("Failed to drain Flink job", e); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerResultTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerResultTest.java index 908d940f5efb..09ceede9d586 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerResultTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerResultTest.java @@ -66,18 +66,18 @@ public void testDrainDoneResultDoesNotThrowAnException() throws Exception { } @Test - public void testDetachedDrainReturnsRunningThenDone() throws Exception { + public void testDetachedDrainReturnsDrainingThenDrained() throws Exception { JobClient jobClient = mock(JobClient.class); CompletableFuture drainFuture = new CompletableFuture<>(); when(jobClient.stopWithSavepoint(true, null, SavepointFormatType.DEFAULT)) .thenReturn(drainFuture); FlinkDetachedRunnerResult result = new FlinkDetachedRunnerResult(jobClient, 1); - assertThat(result.drain(), is(PipelineResult.State.RUNNING)); - assertThat(result.getState(), is(PipelineResult.State.RUNNING)); + assertThat(result.drain(), is(PipelineResult.State.DRAINING)); + assertThat(result.getState(), is(PipelineResult.State.DRAINING)); drainFuture.complete("savepoint"); - assertThat(result.getState(), is(PipelineResult.State.DONE)); + assertThat(result.getState(), is(PipelineResult.State.DRAINED)); verify(jobClient).stopWithSavepoint(true, null, SavepointFormatType.DEFAULT); } @@ -132,11 +132,11 @@ public void testDetachedDrainRetriesAfterFailure() throws Exception { result.drain(); fail("Expected IOException"); } catch (IOException expected) { - assertThat(result.drain(), is(PipelineResult.State.RUNNING)); + assertThat(result.drain(), is(PipelineResult.State.DRAINING)); } retryDrainFuture.complete("savepoint"); - assertThat(result.getState(), is(PipelineResult.State.DONE)); + assertThat(result.getState(), is(PipelineResult.State.DRAINED)); verify(jobClient, times(2)).stopWithSavepoint(true, null, SavepointFormatType.DEFAULT); } } diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 0d7e5eaf68d9..278dace6ef48 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -365,6 +365,7 @@ private void logTerminalState(State state) { switch (state) { case DONE: case CANCELLED: + case DRAINED: LOG.info("Job {} finished with status {}.", getJobId(), state); break; case UPDATED: diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java index d117cf786129..0e25bbd265ae 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/MonitoringUtil.java @@ -221,17 +221,19 @@ public static State toState(@Nullable String stateName) { return State.CANCELLED; case "JOB_STATE_UPDATED": return State.UPDATED; + case "JOB_STATE_DRAINING": + return State.DRAINING; + case "JOB_STATE_DRAINED": + return State.DRAINED; case "JOB_STATE_RUNNING": case "JOB_STATE_PENDING": // Job has not yet started; closest mapping is RUNNING - case "JOB_STATE_DRAINING": // Job is still active; the closest mapping is RUNNING case "JOB_STATE_CANCELLING": // Job is still active; the closest mapping is RUNNING case "JOB_STATE_PAUSING": // Job is still active; the closest mapping is RUNNING case "JOB_STATE_RESOURCE_CLEANING_UP": // Job is still active; the closest mapping is RUNNING return State.RUNNING; case "JOB_STATE_DONE": - case "JOB_STATE_DRAINED": // Job has successfully terminated; closest mapping is DONE return State.DONE; default: LOG.warn( diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java index 4b088eb41a7d..be7ad2e6e110 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineJobTest.java @@ -417,7 +417,7 @@ public void testDrainUnterminatedJobThatSucceeds() throws IOException { DataflowPipelineJob job = new DataflowPipelineJob(DataflowClient.create(options), JOB_ID, options, null); - assertEquals(State.RUNNING, job.drain()); + assertEquals(State.DRAINING, job.drain()); Job content = new Job(); content.setProjectId(PROJECT_ID); content.setId(JOB_ID); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java index 5f76b6750ffa..35af1be4cdfc 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/util/MonitoringUtilTest.java @@ -100,9 +100,9 @@ public void testToStateNormal() { // Non-trivially mapped cases assertEquals(State.STOPPED, MonitoringUtil.toState("JOB_STATE_PAUSED")); - assertEquals(State.RUNNING, MonitoringUtil.toState("JOB_STATE_DRAINING")); + assertEquals(State.DRAINING, MonitoringUtil.toState("JOB_STATE_DRAINING")); assertEquals(State.RUNNING, MonitoringUtil.toState("JOB_STATE_PAUSING")); - assertEquals(State.DONE, MonitoringUtil.toState("JOB_STATE_DRAINED")); + assertEquals(State.DRAINED, MonitoringUtil.toState("JOB_STATE_DRAINED")); } @Test diff --git a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvocation.java b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvocation.java index 9da3bf38e575..3bd0a1311b04 100644 --- a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvocation.java +++ b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobInvocation.java @@ -116,6 +116,12 @@ public void onSuccess(PortablePipelineResult pipelineResult) { case RUNNING: setState(JobState.Enum.RUNNING); break; + case DRAINING: + setState(JobState.Enum.DRAINING); + break; + case DRAINED: + setState(JobState.Enum.DRAINED); + break; case CANCELLED: setState(JobState.Enum.CANCELLED); break; @@ -169,9 +175,12 @@ public synchronized void cancel() { new FutureCallback() { @Override public void onSuccess(PortablePipelineResult pipelineResult) { - // Do not cancel when we are already done. - if (pipelineResult != null - && pipelineResult.getState() != PipelineResult.State.DONE) { + // Do not cancel when the runner has already successfully finished. + if (pipelineResult != null) { + PipelineResult.State state = pipelineResult.getState(); + if (state == PipelineResult.State.DONE || state == PipelineResult.State.DRAINED) { + return; + } try { pipelineResult.cancel(); setState(JobState.Enum.CANCELLED); diff --git a/runners/java-job-service/src/test/java/org/apache/beam/runners/jobsubmission/JobInvocationTest.java b/runners/java-job-service/src/test/java/org/apache/beam/runners/jobsubmission/JobInvocationTest.java index 31380ace43c3..7100e57717be 100644 --- a/runners/java-job-service/src/test/java/org/apache/beam/runners/jobsubmission/JobInvocationTest.java +++ b/runners/java-job-service/src/test/java/org/apache/beam/runners/jobsubmission/JobInvocationTest.java @@ -80,6 +80,28 @@ public void testStateAfterCompletion() throws Exception { awaitJobState(jobInvocation, JobApi.JobState.Enum.DONE); } + @Test(timeout = 10_000) + public void testStateAfterDrainCompleted() throws Exception { + jobInvocation.start(); + assertThat(jobInvocation.getState(), is(JobApi.JobState.Enum.RUNNING)); + + TestPipelineResult pipelineResult = new TestPipelineResult(PipelineResult.State.DRAINED); + runner.setResult(pipelineResult); + + awaitJobState(jobInvocation, JobApi.JobState.Enum.DRAINED); + } + + @Test(timeout = 10_000) + public void testStateAfterDrainStarted() throws Exception { + jobInvocation.start(); + assertThat(jobInvocation.getState(), is(JobApi.JobState.Enum.RUNNING)); + + TestPipelineResult pipelineResult = new TestPipelineResult(PipelineResult.State.DRAINING); + runner.setResult(pipelineResult); + + awaitJobState(jobInvocation, JobApi.JobState.Enum.DRAINING); + } + @Test(timeout = 10_000) public void testStateAfterCompletionWithoutResult() throws Exception { jobInvocation.start(); @@ -128,6 +150,20 @@ public void testNoCancellationWhenDone() throws Exception { assertThat(pipelineResult.cancelLatch.getCount(), is(1L)); } + @Test(timeout = 10_000) + public void testNoCancellationWhenDrained() throws Exception { + jobInvocation.start(); + assertThat(jobInvocation.getState(), is(JobApi.JobState.Enum.RUNNING)); + + TestPipelineResult pipelineResult = new TestPipelineResult(PipelineResult.State.DRAINED); + runner.setResult(pipelineResult); + awaitJobState(jobInvocation, JobApi.JobState.Enum.DRAINED); + + jobInvocation.cancel(); + assertThat(jobInvocation.getState(), is(JobApi.JobState.Enum.DRAINED)); + assertThat(pipelineResult.cancelLatch.getCount(), is(1L)); + } + @Test(timeout = 10_000) public void testReturnsMetricsFromJobInvocationAfterSuccess() throws Exception { JobApi.MetricResults expectedMonitoringInfos = JobApi.MetricResults.newBuilder().build(); diff --git a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/JobServicePipelineResult.java b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/JobServicePipelineResult.java index 0f3d3b6d1910..7c72a1039c0e 100644 --- a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/JobServicePipelineResult.java +++ b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/JobServicePipelineResult.java @@ -160,7 +160,7 @@ private void waitForTerminalState() { } private void propagateErrors() { - if (terminalState != State.DONE) { + if (terminalState != State.DONE && terminalState != State.DRAINED) { JobMessagesRequest messageStreamRequest = JobMessagesRequest.newBuilder().setJobIdBytes(jobId).build(); Iterator messageStreamIterator = @@ -196,10 +196,9 @@ private static State getJavaState(JobApi.JobState.Enum protoState) { case UPDATED: return State.UPDATED; case DRAINING: - // TODO: Determine the correct mappings for the states below. - return State.UNKNOWN; + return State.DRAINING; case DRAINED: - return State.UNKNOWN; + return State.DRAINED; case STARTING: return State.RUNNING; case CANCELLING: diff --git a/runners/portability/java/src/test/java/org/apache/beam/runners/portability/PortableRunnerTest.java b/runners/portability/java/src/test/java/org/apache/beam/runners/portability/PortableRunnerTest.java index 68f3f6eae396..068d70b0d250 100644 --- a/runners/portability/java/src/test/java/org/apache/beam/runners/portability/PortableRunnerTest.java +++ b/runners/portability/java/src/test/java/org/apache/beam/runners/portability/PortableRunnerTest.java @@ -109,6 +109,27 @@ public void stagesAndRunsJob() throws Exception { assertThat(state, is(State.DONE)); } + @Test + public void mapsDrainingJobState() throws Exception { + createJobServer(JobState.Enum.DRAINING, JobApi.MetricResults.getDefaultInstance()); + PortableRunner runner = PortableRunner.create(options, ManagedChannelFactory.createInProcess()); + PipelineResult result = runner.run(p); + + try { + assertThat(result.getState(), is(State.DRAINING)); + } finally { + ((AutoCloseable) result).close(); + } + } + + @Test + public void mapsDrainedJobState() throws Exception { + createJobServer(JobState.Enum.DRAINED, JobApi.MetricResults.getDefaultInstance()); + PortableRunner runner = PortableRunner.create(options, ManagedChannelFactory.createInProcess()); + State state = runner.run(p).waitUntilFinish(); + assertThat(state, is(State.DRAINED)); + } + @Test public void extractsMetrics() throws Exception { JobApi.MetricResults metricResults = generateMetricResults(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java index 46cca7833e52..eff031157e12 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/PipelineResult.java @@ -107,6 +107,12 @@ enum State { /** The job has been updated. */ UPDATED(true, true), + /** The job is draining: no longer accepting new input while finishing in-flight work. */ + DRAINING(false, false), + + /** The job has finished draining. */ + DRAINED(true, false), + /** The job state reported by a runner cannot be interpreted by the SDK. */ UNRECOGNIZED(false, false); diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java index 7e4e5da0d853..aaf63705ad2a 100644 --- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java +++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/NexmarkLauncher.java @@ -515,9 +515,11 @@ private void invokeBuilderForPublishOnlyPipeline(PipelineBuilder case UNRECOGNIZED: case STOPPED: case RUNNING: + case DRAINING: // Keep going. break; case DONE: + case DRAINED: // All done. running = false; break;