From eb0ed03324d853f891b207ac76337c3da3307e07 Mon Sep 17 00:00:00 2001 From: Manan Mangal Date: Tue, 16 Jun 2026 00:27:00 -0700 Subject: [PATCH 1/2] Add Flink 2.2.1 runner support --- .../test-properties.json | 2 +- ...am_PostCommit_Java_PVR_Flink_Streaming.yml | 4 +- ..._PostCommit_Java_ValidatesRunner_Flink.yml | 4 +- .../beam_Publish_Docker_Snapshots.yml | 4 +- .../run_rc_validation_java_quickstart.yml | 2 +- gradle.properties | 2 +- release/build.gradle.kts | 2 +- runners/flink/2.2/build.gradle | 58 + .../2.2/job-server-container/build.gradle | 26 + runners/flink/2.2/job-server/build.gradle | 31 + .../flink/FlinkExecutionEnvironments.java | 507 +++++ .../wrappers/streaming/DoFnOperator.java | 1786 +++++++++++++++++ .../flink/FlinkPipelineOptionsTest.java | 208 ++ .../apache_beam/options/pipeline_options.py | 2 +- .../src/apache_beam/runners/flink.ts | 2 +- .../content/en/documentation/runners/flink.md | 5 + 16 files changed, 2633 insertions(+), 12 deletions(-) create mode 100644 runners/flink/2.2/build.gradle create mode 100644 runners/flink/2.2/job-server-container/build.gradle create mode 100644 runners/flink/2.2/job-server/build.gradle create mode 100644 runners/flink/2.2/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java create mode 100644 runners/flink/2.2/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java create mode 100644 runners/flink/2.2/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java diff --git a/.github/actions/setup-default-test-properties/test-properties.json b/.github/actions/setup-default-test-properties/test-properties.json index f06de5174e6c..29ec686d87d5 100644 --- a/.github/actions/setup-default-test-properties/test-properties.json +++ b/.github/actions/setup-default-test-properties/test-properties.json @@ -14,7 +14,7 @@ }, "JavaTestProperties": { "SUPPORTED_VERSIONS": ["8", "11", "17", "21", "25"], - "FLINK_VERSIONS": ["1.17", "1.18", "1.19", "1.20", "2.0"], + "FLINK_VERSIONS": ["1.17", "1.18", "1.19", "1.20", "2.0", "2.2"], "SPARK_VERSIONS": ["3"] }, "GoTestProperties": { diff --git a/.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml b/.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml index d5f7edeb11d0..f2a3803b7719 100644 --- a/.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml +++ b/.github/workflows/beam_PostCommit_Java_PVR_Flink_Streaming.yml @@ -56,11 +56,11 @@ jobs: runs-on: [self-hosted, ubuntu-24.04, main] timeout-minutes: 120 strategy: - matrix: + matrix: job_name: [beam_PostCommit_Java_PVR_Flink_Streaming] job_phrase: [Run Java Flink PortableValidatesRunner Streaming] # every major version - flink_version: [ '1.20', '2.0' ] + flink_version: [ '1.20', '2.0', '2.2'] if: | github.event_name == 'workflow_dispatch' || github.event_name == 'pull_request_target' || diff --git a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml index febec1afa592..6fc0b1475fa9 100644 --- a/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml +++ b/.github/workflows/beam_PostCommit_Java_ValidatesRunner_Flink.yml @@ -49,7 +49,7 @@ env: GRADLE_ENTERPRISE_CACHE_PASSWORD: ${{ secrets.GE_CACHE_PASSWORD }} jobs: - beam_PostCommit_Java_ValidatesRunner_Flink: + beam_PostCommit_Java_ValidatesRunner_Flink: name: ${{ matrix.job_name }} (${{ matrix.flink_version }}) runs-on: [self-hosted, ubuntu-24.04, main] timeout-minutes: 100 @@ -58,7 +58,7 @@ jobs: job_name: [beam_PostCommit_Java_ValidatesRunner_Flink] job_phrase: [Run Flink ValidatesRunner] # every major version - flink_version: ['1.20', '2.0'] + flink_version: ['1.20', '2.0', '2.2'] if: | github.event_name == 'workflow_dispatch' || github.event_name == 'pull_request_target' || diff --git a/.github/workflows/beam_Publish_Docker_Snapshots.yml b/.github/workflows/beam_Publish_Docker_Snapshots.yml index 5e4412b2650c..0895c226d479 100644 --- a/.github/workflows/beam_Publish_Docker_Snapshots.yml +++ b/.github/workflows/beam_Publish_Docker_Snapshots.yml @@ -84,10 +84,10 @@ jobs: -Pdocker-repository-root=gcr.io/apache-beam-testing/beam_portability \ -Pdocker-tag-list=${{ github.sha }}${LATEST_TAG} \ -Pdocker-pull-licenses - - name: run Publish Docker Snapshots script for Flink 2.0 + - name: run Publish Docker Snapshots script for Flink 2.2 uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :runners:flink:2.0:job-server-container:dockerPush + gradle-command: :runners:flink:2.2:job-server-container:dockerPush arguments: | -Pdocker-repository-root=gcr.io/apache-beam-testing/beam_portability \ -Pdocker-tag-list=${{ github.sha }}${LATEST_TAG} \ diff --git a/.github/workflows/run_rc_validation_java_quickstart.yml b/.github/workflows/run_rc_validation_java_quickstart.yml index dce9b7f3fedb..0675d33c3181 100644 --- a/.github/workflows/run_rc_validation_java_quickstart.yml +++ b/.github/workflows/run_rc_validation_java_quickstart.yml @@ -88,7 +88,7 @@ jobs: - name: Run QuickStart Java Flink Runner uses: ./.github/actions/gradle-command-self-hosted-action with: - gradle-command: :runners:flink:2.0:runQuickstartJavaFlinkLocal + gradle-command: :runners:flink:2.2:runQuickstartJavaFlinkLocal arguments: | -Prepourl=${{ env.APACHE_REPO_URL }} \ -Pver=${{ env.RELEASE_VERSION }} diff --git a/gradle.properties b/gradle.properties index 95e50105a494..7c5c370f120c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -39,7 +39,7 @@ docker_image_default_repo_root=apache docker_image_default_repo_prefix=beam_ # supported flink versions -flink_versions=1.17,1.18,1.19,1.20,2.0 +flink_versions=1.17,1.18,1.19,1.20,2.0,2.2 # supported spark versions spark_versions=3,4 # supported python versions diff --git a/release/build.gradle.kts b/release/build.gradle.kts index 5be707428605..54165dc49654 100644 --- a/release/build.gradle.kts +++ b/release/build.gradle.kts @@ -39,7 +39,7 @@ task("runJavaExamplesValidationTask") { dependsOn(":runners:direct-java:runQuickstartJavaDirect") dependsOn(":runners:google-cloud-dataflow-java:runQuickstartJavaDataflow") dependsOn(":runners:spark:3:runQuickstartJavaSpark") - dependsOn(":runners:flink:2.0:runQuickstartJavaFlinkLocal") + dependsOn(":runners:flink:2.2:runQuickstartJavaFlinkLocal") dependsOn(":runners:direct-java:runMobileGamingJavaDirect") if (project.hasProperty("ver") || !project.version.toString().endsWith("SNAPSHOT")) { // only run one variant of MobileGaming on Dataflow for nightly diff --git a/runners/flink/2.2/build.gradle b/runners/flink/2.2/build.gradle new file mode 100644 index 000000000000..0deb9163d196 --- /dev/null +++ b/runners/flink/2.2/build.gradle @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Flink 2.2 API changes addressed in this module's source overrides: +// - CheckpointingMode moved (Flink 2.2): org.apache.flink.streaming.api.CheckpointingMode +// is now @Deprecated; canonical path is org.apache.flink.core.execution.CheckpointingMode. +// FlinkExecutionEnvironments.java and FlinkPipelineOptionsTest.java are overridden here +// to use the non-deprecated import. +// +// Flink 2.2 breaking changes that are SQL/cluster-layer only (no runner code change needed): +// - TLS cipher suite default changed (FLINK-39022): JDK 11.0.30+/17.0.18+/21.0.10+/24+ +// disabled TLS_RSA_* ciphers; new default uses ECDHE suites. Verify cipher support +// on TLS-enabled clusters before upgrading. +// - StreamingMultiJoinOperator state format change (FLINK-38209): savepoints from Flink 2.1 +// with table.optimizer.multi-join.enabled=true are NOT compatible with 2.2. +// - SQL row NOT NULL now enforced (FLINK-38181): use table.legacy-nested-row-nullability +// to restore prior (silent-ignore) behavior if needed. + +project.ext { + flink_major = '2.2' + flink_version = '2.2.1' + excluded_files = [ + 'main': [ + // Used by DataSet API only + "org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapter.java", + "org/apache/beam/runners/flink/FlinkBatchPipelineTranslator.java", + "org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java", + "org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java", + "org/apache/beam/runners/flink/translation/functions/FlinkNonMergingReduceFunction.java", + // Moved to org.apache.flink.runtime.state.StateBackendFactory + "org/apache/beam/runners/flink/FlinkStateBackendFactory.java", + ], + 'test': [ + // Used by DataSet API only + "org/apache/beam/runners/flink/adapter/BeamFlinkDataSetAdapterTest.java", + "org/apache/beam/runners/flink/batch/NonMergingGroupByKeyTest.java", + "org/apache/beam/runners/flink/batch/ReshuffleTest.java", + ] + ] +} + +// Load the main build script which contains all build logic. +apply from: "../flink_runner.gradle" diff --git a/runners/flink/2.2/job-server-container/build.gradle b/runners/flink/2.2/job-server-container/build.gradle new file mode 100644 index 000000000000..afdb68a0fc91 --- /dev/null +++ b/runners/flink/2.2/job-server-container/build.gradle @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server-container' + +project.ext { + resource_path = basePath +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server_container.gradle" diff --git a/runners/flink/2.2/job-server/build.gradle b/runners/flink/2.2/job-server/build.gradle new file mode 100644 index 000000000000..527245329211 --- /dev/null +++ b/runners/flink/2.2/job-server/build.gradle @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +def basePath = '../../job-server' + +project.ext { + // Look for the source code in the parent module + main_source_dirs = ["$basePath/src/main/java"] + test_source_dirs = ["$basePath/src/test/java"] + main_resources_dirs = ["$basePath/src/main/resources"] + test_resources_dirs = ["$basePath/src/test/resources"] + archives_base_name = 'beam-runners-flink-2.2-job-server' +} + +// Load the main build script which contains all build logic. +apply from: "$basePath/flink_job_server.gradle" diff --git a/runners/flink/2.2/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java b/runners/flink/2.2/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java new file mode 100644 index 000000000000..b340632219a2 --- /dev/null +++ b/runners/flink/2.2/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getDefaultLocalParallelism; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Streams; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.configuration.ExternalizedCheckpointRetention; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.RestartStrategyOptions; +import org.apache.flink.configuration.StateBackendOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.execution.CheckpointingMode; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; +import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utilities for Flink execution environments. */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +public class FlinkExecutionEnvironments { + + private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutionEnvironments.class); + + private static final ObjectMapper mapper = new ObjectMapper(); + + /** + * If the submitted job is a batch processing job, this method creates the adequate Flink {@link + * org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending on the + * user-specified options. + */ + public static StreamExecutionEnvironment createBatchExecutionEnvironment( + FlinkPipelineOptions options) { + return createBatchExecutionEnvironment( + options, + MoreObjects.firstNonNull(options.getFilesToStage(), Collections.emptyList()), + options.getFlinkConfDir()); + } + + static StreamExecutionEnvironment createBatchExecutionEnvironment( + FlinkPipelineOptions options, List filesToStage, @Nullable String confDir) { + + LOG.info("Creating a Batch Execution Environment."); + + // Although Flink uses Rest, it expects the address not to contain a http scheme + String flinkMasterHostPort = stripHttpSchema(options.getFlinkMaster()); + Configuration flinkConfiguration = getFlinkConfiguration(confDir); + StreamExecutionEnvironment flinkBatchEnv; + + // depending on the master, create the right environment. + if ("[local]".equals(flinkMasterHostPort)) { + setManagedMemoryByFraction(flinkConfiguration); + disableClassLoaderLeakCheck(flinkConfiguration); + flinkBatchEnv = StreamExecutionEnvironment.createLocalEnvironment(flinkConfiguration); + if (!options.getAttachedMode()) { + LOG.warn("Detached mode is only supported in RemoteStreamEnvironment"); + } + } else if ("[collection]".equals(flinkMasterHostPort)) { + throw new UnsupportedOperationException( + "CollectionEnvironment has been removed in Flink 2. Use [local] instead."); + } else if ("[auto]".equals(flinkMasterHostPort)) { + flinkBatchEnv = StreamExecutionEnvironment.getExecutionEnvironment(); + if (flinkBatchEnv instanceof LocalStreamEnvironment) { + disableClassLoaderLeakCheck(flinkConfiguration); + flinkBatchEnv = StreamExecutionEnvironment.createLocalEnvironment(flinkConfiguration); + flinkBatchEnv.setParallelism(getDefaultLocalParallelism()); + } + if (!options.getAttachedMode()) { + LOG.warn("Detached mode is not supported in [auto]."); + } + } else { + int defaultPort = flinkConfiguration.get(RestOptions.PORT); + HostAndPort hostAndPort = + HostAndPort.fromString(flinkMasterHostPort).withDefaultPort(defaultPort); + flinkConfiguration.set(RestOptions.PORT, hostAndPort.getPort()); + if (!options.getAttachedMode()) { + flinkConfiguration.set(DeploymentOptions.ATTACHED, options.getAttachedMode()); + } + flinkBatchEnv = + StreamExecutionEnvironment.createRemoteEnvironment( + hostAndPort.getHost(), + hostAndPort.getPort(), + flinkConfiguration, + filesToStage.toArray(new String[filesToStage.size()])); + LOG.info("Using Flink Master URL {}:{}.", hostAndPort.getHost(), hostAndPort.getPort()); + } + + // Set the execution mode for data exchange. + flinkBatchEnv.setRuntimeMode(RuntimeExecutionMode.BATCH); + + // set the correct parallelism. + if (options.getParallelism() != -1) { + flinkBatchEnv.setParallelism(options.getParallelism()); + } + + // Set the correct parallelism, required by UnboundedSourceWrapper to generate consistent + // splits. + final int parallelism = + determineParallelism( + options.getParallelism(), flinkBatchEnv.getParallelism(), flinkConfiguration); + + flinkBatchEnv.setParallelism(parallelism); + // set parallelism in the options (required by some execution code) + options.setParallelism(parallelism); + + if (options.getObjectReuse()) { + flinkBatchEnv.getConfig().enableObjectReuse(); + } else { + flinkBatchEnv.getConfig().disableObjectReuse(); + } + + applyLatencyTrackingInterval(flinkBatchEnv.getConfig(), options); + + configureWebUIOptions(flinkBatchEnv.getConfig(), options.as(PipelineOptions.class)); + + return flinkBatchEnv; + } + + @VisibleForTesting + static StreamExecutionEnvironment createStreamExecutionEnvironment(FlinkPipelineOptions options) { + return createStreamExecutionEnvironment( + options, + MoreObjects.firstNonNull(options.getFilesToStage(), Collections.emptyList()), + options.getFlinkConfDir()); + } + + /** + * If the submitted job is a stream processing job, this method creates the adequate Flink {@link + * org.apache.flink.streaming.api.environment.StreamExecutionEnvironment} depending on the + * user-specified options. + */ + public static StreamExecutionEnvironment createStreamExecutionEnvironment( + FlinkPipelineOptions options, List filesToStage, @Nullable String confDir) { + + LOG.info("Creating a Streaming Environment."); + + // Although Flink uses Rest, it expects the address not to contain a http scheme + String masterUrl = stripHttpSchema(options.getFlinkMaster()); + Configuration flinkConfiguration = getFlinkConfiguration(confDir); + configureRestartStrategy(options, flinkConfiguration); + configureStateBackend(options, flinkConfiguration); + StreamExecutionEnvironment flinkStreamEnv; + + // depending on the master, create the right environment. + if ("[local]".equals(masterUrl)) { + setManagedMemoryByFraction(flinkConfiguration); + disableClassLoaderLeakCheck(flinkConfiguration); + flinkStreamEnv = + StreamExecutionEnvironment.createLocalEnvironment( + getDefaultLocalParallelism(), flinkConfiguration); + if (!options.getAttachedMode()) { + LOG.warn("Detached mode is only supported in RemoteStreamEnvironment"); + } + } else if ("[auto]".equals(masterUrl)) { + + flinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(flinkConfiguration); + if (flinkStreamEnv instanceof LocalStreamEnvironment) { + disableClassLoaderLeakCheck(flinkConfiguration); + flinkStreamEnv = + StreamExecutionEnvironment.createLocalEnvironment( + getDefaultLocalParallelism(), flinkConfiguration); + } + if (!options.getAttachedMode()) { + LOG.warn("Detached mode is not only supported in [auto]"); + } + } else { + int defaultPort = flinkConfiguration.get(RestOptions.PORT); + HostAndPort hostAndPort = HostAndPort.fromString(masterUrl).withDefaultPort(defaultPort); + flinkConfiguration.set(RestOptions.PORT, hostAndPort.getPort()); + final SavepointRestoreSettings savepointRestoreSettings; + if (options.getSavepointPath() != null) { + savepointRestoreSettings = + SavepointRestoreSettings.forPath( + options.getSavepointPath(), options.getAllowNonRestoredState()); + } else { + savepointRestoreSettings = SavepointRestoreSettings.none(); + } + if (!options.getAttachedMode()) { + flinkConfiguration.set(DeploymentOptions.ATTACHED, options.getAttachedMode()); + } + flinkStreamEnv = + new RemoteStreamEnvironment( + hostAndPort.getHost(), + hostAndPort.getPort(), + flinkConfiguration, + filesToStage.toArray(new String[filesToStage.size()]), + null, + savepointRestoreSettings); + LOG.info("Using Flink Master URL {}:{}.", hostAndPort.getHost(), hostAndPort.getPort()); + } + + // Set the parallelism, required by UnboundedSourceWrapper to generate consistent splits. + final int parallelism = + determineParallelism( + options.getParallelism(), flinkStreamEnv.getParallelism(), flinkConfiguration); + flinkStreamEnv.setParallelism(parallelism); + if (options.getMaxParallelism() > 0) { + flinkStreamEnv.setMaxParallelism(options.getMaxParallelism()); + } else if (!options.isStreaming()) { + // In Flink maxParallelism defines the number of keyGroups. + // (see + // https://github.com/apache/flink/blob/e9dd4683f758b463d0b5ee18e49cecef6a70c5cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L76) + // The default value (parallelism * 1.5) + // (see + // https://github.com/apache/flink/blob/e9dd4683f758b463d0b5ee18e49cecef6a70c5cf/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java#L137-L147) + // create a lot of skew so we force maxParallelism = parallelism in Batch mode. + LOG.info("Setting maxParallelism to {}", parallelism); + flinkStreamEnv.setMaxParallelism(parallelism); + } + // set parallelism in the options (required by some execution code) + options.setParallelism(parallelism); + + if (options.getObjectReuse()) { + flinkStreamEnv.getConfig().enableObjectReuse(); + } else { + flinkStreamEnv.getConfig().disableObjectReuse(); + } + + if (!options.getOperatorChaining()) { + flinkStreamEnv.disableOperatorChaining(); + } + + configureCheckpointing(options, flinkStreamEnv); + + applyLatencyTrackingInterval(flinkStreamEnv.getConfig(), options); + + if (options.getAutoWatermarkInterval() != null) { + flinkStreamEnv.getConfig().setAutoWatermarkInterval(options.getAutoWatermarkInterval()); + } + configureWebUIOptions(flinkStreamEnv.getConfig(), options.as(PipelineOptions.class)); + configureCustomKryoSerializers(flinkStreamEnv.getConfig()); + + return flinkStreamEnv; + } + + private static void configureWebUIOptions( + ExecutionConfig config, org.apache.beam.sdk.options.PipelineOptions options) { + SerializablePipelineOptions serializablePipelineOptions = + new SerializablePipelineOptions(options); + String optionsAsString = serializablePipelineOptions.toString(); + + try { + JsonNode node = mapper.readTree(optionsAsString); + JsonNode optionsNode = node.get("options"); + Map output = + Streams.stream(optionsNode.fields()) + .filter(entry -> !entry.getValue().isNull()) + .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().asText())); + + config.setGlobalJobParameters(new GlobalJobParametersImpl(output)); + } catch (Exception e) { + LOG.warn("Unable to configure web ui options", e); + } + } + + private static void configureCustomKryoSerializers(ExecutionConfig config) { + SerializerConfigImpl serializerConfig = (SerializerConfigImpl) config.getSerializerConfig(); + // Force Beam schema to use JavaSerializer to fix serialization involving ImmutableMap + serializerConfig.registerTypeWithKryoSerializer( + org.apache.beam.sdk.schemas.Schema.class, + com.esotericsoftware.kryo.serializers.JavaSerializer.class); + } + + private static class GlobalJobParametersImpl extends ExecutionConfig.GlobalJobParameters { + private final Map jobOptions; + + private GlobalJobParametersImpl(Map jobOptions) { + this.jobOptions = jobOptions; + } + + @Override + public Map toMap() { + return jobOptions; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof GlobalJobParametersImpl)) { + return false; + } + + ExecutionConfig.GlobalJobParameters jobParams = (ExecutionConfig.GlobalJobParameters) obj; + return Maps.difference(jobParams.toMap(), this.jobOptions).areEqual(); + } + + @Override + public int hashCode() { + return Objects.hashCode(jobOptions); + } + } + + private static void configureCheckpointing( + FlinkPipelineOptions options, StreamExecutionEnvironment flinkStreamEnv) { + // A value of -1 corresponds to disabled checkpointing (see CheckpointConfig in Flink). + // If the value is not -1, then the validity checks are applied. + // By default, checkpointing is disabled. + long checkpointInterval = options.getCheckpointingInterval(); + if (checkpointInterval != -1) { + if (checkpointInterval < 1) { + throw new IllegalArgumentException("The checkpoint interval must be positive"); + } + flinkStreamEnv.enableCheckpointing( + checkpointInterval, CheckpointingMode.valueOf(options.getCheckpointingMode())); + + if (options.getShutdownSourcesAfterIdleMs() == -1) { + // If not explicitly configured, we never shutdown sources when checkpointing is enabled. + options.setShutdownSourcesAfterIdleMs(Long.MAX_VALUE); + } + + if (options.getCheckpointTimeoutMillis() != -1) { + flinkStreamEnv + .getCheckpointConfig() + .setCheckpointTimeout(options.getCheckpointTimeoutMillis()); + } + + boolean externalizedCheckpoint = options.isExternalizedCheckpointsEnabled(); + boolean retainOnCancellation = options.getRetainExternalizedCheckpointsOnCancellation(); + if (externalizedCheckpoint) { + flinkStreamEnv + .getCheckpointConfig() + .setExternalizedCheckpointRetention( + retainOnCancellation + ? ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION + : ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION); + } + + if (options.getUnalignedCheckpointEnabled()) { + flinkStreamEnv.getCheckpointConfig().enableUnalignedCheckpoints(); + } + flinkStreamEnv + .getCheckpointConfig() + .setForceUnalignedCheckpoints(options.getForceUnalignedCheckpointEnabled()); + + long minPauseBetweenCheckpoints = options.getMinPauseBetweenCheckpoints(); + if (minPauseBetweenCheckpoints != -1) { + flinkStreamEnv + .getCheckpointConfig() + .setMinPauseBetweenCheckpoints(minPauseBetweenCheckpoints); + } + if (options.getTolerableCheckpointFailureNumber() != null + && options.getTolerableCheckpointFailureNumber() > 0) { + flinkStreamEnv + .getCheckpointConfig() + .setTolerableCheckpointFailureNumber(options.getTolerableCheckpointFailureNumber()); + } + + flinkStreamEnv + .getCheckpointConfig() + .setMaxConcurrentCheckpoints(options.getNumConcurrentCheckpoints()); + } else { + if (options.getShutdownSourcesAfterIdleMs() == -1) { + // If not explicitly configured, we never shutdown sources when checkpointing is disabled. + options.setShutdownSourcesAfterIdleMs(0L); + } + } + } + + private static void configureStateBackend(FlinkPipelineOptions options, Configuration config) { + if (options.getStateBackend() != null) { + final String storagePath = options.getStateBackendStoragePath(); + Preconditions.checkArgument( + storagePath != null, + "State backend was set to '%s' but no storage path was provided.", + options.getStateBackend()); + + if (options.getStateBackend().equalsIgnoreCase("rocksdb")) { + config.set(StateBackendOptions.STATE_BACKEND, "rocksdb"); + } else if (options.getStateBackend().equalsIgnoreCase("filesystem") + || options.getStateBackend().equalsIgnoreCase("hashmap")) { + config.set(StateBackendOptions.STATE_BACKEND, "hashmap"); + } else { + throw new IllegalArgumentException( + String.format( + "Unknown state backend '%s'. Use 'rocksdb' or 'filesystem' or configure via Flink config file.", + options.getStateBackend())); + } + config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, storagePath); + } else if (options.getStateBackendFactory() != null) { + // Legacy way of setting the state backend + config.set(StateBackendOptions.STATE_BACKEND, options.getStateBackendFactory().getName()); + } + } + + private static void configureRestartStrategy(FlinkPipelineOptions options, Configuration config) { + // for the following 2 parameters, a value of -1 means that Flink will use + // the default values as specified in the configuration. + int numRetries = options.getNumberOfExecutionRetries(); + if (numRetries != -1) { + // setNumberOfExecutionRetries + config.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay"); + config.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, numRetries); + } + long retryDelay = options.getExecutionRetryDelay(); + if (retryDelay != -1) { + config.set( + RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, + java.time.Duration.ofMillis(retryDelay)); + } + } + + /** + * Removes the http:// or https:// schema from a url string. This is commonly used with the + * flink_master address which is expected to be of form host:port but users may specify a URL; + * Python code also assumes a URL which may be passed here. + */ + private static String stripHttpSchema(String url) { + return url.trim().replaceFirst("^http[s]?://", ""); + } + + private static int determineParallelism( + final int pipelineOptionsParallelism, + final int envParallelism, + final Configuration configuration) { + if (pipelineOptionsParallelism > 0) { + return pipelineOptionsParallelism; + } + if (envParallelism > 0) { + // If the user supplies a parallelism on the command-line, this is set on the execution + // environment during creation + return envParallelism; + } + + final int flinkConfigParallelism = + configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).orElse(-1); + if (flinkConfigParallelism > 0) { + return flinkConfigParallelism; + } + LOG.warn( + "No default parallelism could be found. Defaulting to parallelism 1. " + + "Please set an explicit parallelism with --parallelism"); + return 1; + } + + private static Configuration getFlinkConfiguration(@Nullable String flinkConfDir) { + return flinkConfDir == null || flinkConfDir.isEmpty() + ? GlobalConfiguration.loadConfiguration() + : GlobalConfiguration.loadConfiguration(flinkConfDir); + } + + private static void applyLatencyTrackingInterval( + ExecutionConfig config, FlinkPipelineOptions options) { + long latencyTrackingInterval = options.getLatencyTrackingInterval(); + config.setLatencyTrackingInterval(latencyTrackingInterval); + } + + private static void setManagedMemoryByFraction(final Configuration config) { + if (!config.containsKey("taskmanager.memory.managed.size")) { + float managedMemoryFraction = config.get(TaskManagerOptions.MANAGED_MEMORY_FRACTION); + long freeHeapMemory = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag(); + long managedMemorySize = (long) (freeHeapMemory * managedMemoryFraction); + config.setString("taskmanager.memory.managed.size", String.valueOf(managedMemorySize)); + } + } + + /** + * Disables classloader.check-leaked-classloader unless set by the user. See + * https://github.com/apache/beam/issues/20783. + */ + private static void disableClassLoaderLeakCheck(final Configuration config) { + if (!config.containsKey(CoreOptions.CHECK_LEAKED_CLASSLOADER.key())) { + config.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false); + } + } +} diff --git a/runners/flink/2.2/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/2.2/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java new file mode 100644 index 000000000000..020e58a1c663 --- /dev/null +++ b/runners/flink/2.2/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -0,0 +1,1786 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import static org.apache.flink.util.Preconditions.checkArgument; + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.locks.Lock; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.InMemoryBundleFinalizer; +import org.apache.beam.runners.core.NullSideInputReader; +import org.apache.beam.runners.core.ProcessFnRunner; +import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; +import org.apache.beam.runners.core.SideInputHandler; +import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; +import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateNamespaces.WindowNamespace; +import org.apache.beam.runners.core.StatefulDoFnRunner; +import org.apache.beam.runners.core.StepContext; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.adapter.FlinkKey; +import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate; +import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; +import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; +import org.apache.beam.runners.flink.translation.utils.CheckpointStats; +import org.apache.beam.runners.flink.translation.utils.Workarounds; +import org.apache.beam.runners.flink.translation.wrappers.streaming.stableinput.BufferingDoFnRunner; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals; +import org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StructuredCoder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; +import org.apache.beam.sdk.transforms.DoFnSchemaInformation; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.util.NoopLock; +import org.apache.beam.sdk.util.WindowedValueMultiReceiver; +import org.apache.beam.sdk.util.WindowedValueReceiver; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.state.InternalPriorityQueue; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.core.execution.CheckpointingMode; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.InternalTimerServiceImpl; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeService; +import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.util.OutputTag; +import org.apache.flink.util.function.BiConsumerWithException; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Flink operator for executing {@link DoFn DoFns}. + * + * @param the input type of the {@link DoFn} + * @param the output type of the {@link DoFn} + */ +// We use Flink's lifecycle methods to initialize transient fields +@SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED") +@SuppressWarnings({ + "rawtypes", // TODO(https://github.com/apache/beam/issues/20447) + "keyfor", + "nullness" +}) // TODO(https://github.com/apache/beam/issues/20497) +public class DoFnOperator + extends AbstractStreamOperator> + implements OneInputStreamOperator, WindowedValue>, + TwoInputStreamOperator, RawUnionValue, WindowedValue>, + Triggerable { + + private static final Logger LOG = LoggerFactory.getLogger(DoFnOperator.class); + private final boolean isStreaming; + + protected DoFn doFn; + + protected final SerializablePipelineOptions serializedOptions; + + protected final TupleTag mainOutputTag; + protected final List> additionalOutputTags; + + protected final Collection> sideInputs; + protected final Map> sideInputTagMapping; + + protected final WindowingStrategy windowingStrategy; + + protected final OutputManagerFactory outputManagerFactory; + + protected transient DoFnRunner doFnRunner; + protected transient PushbackSideInputDoFnRunner pushbackDoFnRunner; + protected transient BufferingDoFnRunner bufferingDoFnRunner; + + protected transient SideInputHandler sideInputHandler; + + protected transient SideInputReader sideInputReader; + + protected transient BufferedOutputManager outputManager; + + private transient DoFnInvoker doFnInvoker; + + protected transient FlinkStateInternals keyedStateInternals; + protected transient FlinkTimerInternals timerInternals; + + protected final String stepName; + + final Coder> windowedInputCoder; + + final Map, Coder> outputCoders; + + final Coder keyCoder; + + final KeySelector, ?> keySelector; + + final TimerInternals.TimerDataCoderV2 timerCoder; + + /** Max number of elements to include in a bundle. */ + private final long maxBundleSize; + /** Max duration of a bundle. */ + private final long maxBundleTimeMills; + + private final DoFnSchemaInformation doFnSchemaInformation; + + private final Map> sideInputMapping; + + /** If true, we must process elements only after a checkpoint is finished. */ + final boolean requiresStableInput; + + /** + * If both requiresStableInput and this parameter are true, we must flush the buffer during drain + * operation. + */ + final boolean enableStableInputDrain; + + final int numConcurrentCheckpoints; + + private final boolean usesOnWindowExpiration; + + private final boolean finishBundleBeforeCheckpointing; + + /** Stores new finalizations being gathered. */ + private transient InMemoryBundleFinalizer bundleFinalizer; + /** Pending bundle finalizations which have not been acknowledged yet. */ + private transient LinkedHashMap> + pendingFinalizations; + /** + * Keep a maximum of 32 bundle finalizations for {@link + * BundleFinalizer.Callback#onBundleSuccess()}. + */ + private static final int MAX_NUMBER_PENDING_BUNDLE_FINALIZATIONS = 32; + + protected transient InternalTimerService timerService; + // Flink 1.20 moved timeServiceManager to protected scope. No longer need delegate + // private transient InternalTimeServiceManager timeServiceManager; + + private transient PushedBackElementsHandler> pushedBackElementsHandler; + + /** Metrics container for reporting Beam metrics to Flink (null if metrics are disabled). */ + transient @Nullable FlinkMetricContainer flinkMetricContainer; + + /** Helper class to report the checkpoint duration. */ + private transient @Nullable CheckpointStats checkpointStats; + + /** A timer that finishes the current bundle after a fixed amount of time. */ + private transient ScheduledFuture checkFinishBundleTimer; + + /** + * This and the below fields need to be volatile because we use multiple threads to access these. + * (a) the main processing thread (b) a timer thread to finish bundles by a timeout instead of the + * number of element However, we do not need a lock because Flink makes sure to acquire the + * "checkpointing" lock for the main processing but also for timer set via its {@code + * timerService}. + * + *

The volatile flag can be removed once https://issues.apache.org/jira/browse/FLINK-12481 has + * been addressed. + */ + private transient volatile boolean bundleStarted; + /** Number of processed elements in the current bundle. */ + private transient volatile long elementCount; + /** Time that the last bundle was finished (to set the timer). */ + private transient volatile long lastFinishBundleTime; + /** Callback to be executed before the current bundle is started. */ + private transient volatile Runnable preBundleCallback; + /** Callback to be executed after the current bundle was finished. */ + private transient volatile Runnable bundleFinishedCallback; + + // Watermark state. + // Volatile because these can be set in two mutually exclusive threads (see above). + private transient volatile long currentInputWatermark; + private transient volatile long currentSideInputWatermark; + private transient volatile long currentOutputWatermark; + private transient volatile long pushedBackWatermark; + + /** Constructor for DoFnOperator. */ + public DoFnOperator( + @Nullable DoFn doFn, + String stepName, + Coder> inputWindowedCoder, + Map, Coder> outputCoders, + TupleTag mainOutputTag, + List> additionalOutputTags, + OutputManagerFactory outputManagerFactory, + WindowingStrategy windowingStrategy, + Map> sideInputTagMapping, + Collection> sideInputs, + PipelineOptions options, + @Nullable Coder keyCoder, + @Nullable KeySelector, ?> keySelector, + DoFnSchemaInformation doFnSchemaInformation, + Map> sideInputMapping) { + this.doFn = doFn; + this.stepName = stepName; + this.windowedInputCoder = inputWindowedCoder; + this.outputCoders = outputCoders; + this.mainOutputTag = mainOutputTag; + this.additionalOutputTags = additionalOutputTags; + this.sideInputTagMapping = sideInputTagMapping; + this.sideInputs = sideInputs; + this.serializedOptions = new SerializablePipelineOptions(options); + this.isStreaming = serializedOptions.get().as(FlinkPipelineOptions.class).isStreaming(); + this.windowingStrategy = windowingStrategy; + this.outputManagerFactory = outputManagerFactory; + + // API removed in Flink 2.0. setChainingStrategy is now set internally. + // setChainingStrategy(ChainingStrategy.ALWAYS); + + this.keyCoder = keyCoder; + this.keySelector = keySelector; + + this.timerCoder = + TimerInternals.TimerDataCoderV2.of(windowingStrategy.getWindowFn().windowCoder()); + + FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class); + + this.maxBundleSize = flinkOptions.getMaxBundleSize(); + Preconditions.checkArgument(maxBundleSize > 0, "Bundle size must be at least 1"); + this.maxBundleTimeMills = flinkOptions.getMaxBundleTimeMills(); + Preconditions.checkArgument(maxBundleTimeMills > 0, "Bundle time must be at least 1"); + this.doFnSchemaInformation = doFnSchemaInformation; + this.sideInputMapping = sideInputMapping; + + this.requiresStableInput = isRequiresStableInput(doFn); + + this.usesOnWindowExpiration = + doFn != null && DoFnSignatures.getSignature(doFn.getClass()).onWindowExpiration() != null; + + if (requiresStableInput) { + Preconditions.checkState( + CheckpointingMode.valueOf(flinkOptions.getCheckpointingMode()) + == CheckpointingMode.EXACTLY_ONCE, + "Checkpointing mode is not set to exactly once but @RequiresStableInput is used."); + Preconditions.checkState( + flinkOptions.getCheckpointingInterval() > 0, + "No checkpointing configured but pipeline uses @RequiresStableInput"); + LOG.warn( + "Enabling stable input for transform {}. Will only process elements at most every {} milliseconds.", + stepName, + flinkOptions.getCheckpointingInterval() + + Math.max(0, flinkOptions.getMinPauseBetweenCheckpoints())); + } + + this.enableStableInputDrain = flinkOptions.getEnableStableInputDrain(); + + this.numConcurrentCheckpoints = flinkOptions.getNumConcurrentCheckpoints(); + + this.finishBundleBeforeCheckpointing = flinkOptions.getFinishBundleBeforeCheckpointing(); + } + + private boolean isRequiresStableInput(DoFn doFn) { + // WindowDoFnOperator does not use a DoFn + return doFn != null + && DoFnSignatures.getSignature(doFn.getClass()).processElement().requiresStableInput(); + } + + @VisibleForTesting + boolean getRequiresStableInput() { + return requiresStableInput; + } + + // allow overriding this in WindowDoFnOperator because this one dynamically creates + // the DoFn + protected DoFn getDoFn() { + return doFn; + } + + protected Iterable> preProcess(WindowedValue input) { + // Assume Input is PreInputT + return Collections.singletonList((WindowedValue) input); + } + + // allow overriding this, for example SplittableDoFnOperator will not create a + // stateful DoFn runner because ProcessFn, which is used for executing a Splittable DoFn + // doesn't play by the normal DoFn rules and WindowDoFnOperator uses LateDataDroppingDoFnRunner + protected DoFnRunner createWrappingDoFnRunner( + DoFnRunner wrappedRunner, StepContext stepContext) { + + if (keyCoder != null) { + StatefulDoFnRunner.CleanupTimer cleanupTimer = + new StatefulDoFnRunner.TimeInternalsCleanupTimer( + timerInternals, windowingStrategy) { + @Override + public void setForWindow(InputT input, BoundedWindow window) { + if (!window.equals(GlobalWindow.INSTANCE) || usesOnWindowExpiration) { + // Skip setting a cleanup timer for the global window as these timers + // lead to potentially unbounded state growth in the runner, depending on key + // cardinality. Cleanup for global window will be performed upon arrival of the + // final watermark. + // In the case of OnWindowExpiration, we still set the timer. + super.setForWindow(input, window); + } + } + }; + + // we don't know the window type + // @SuppressWarnings({"unchecked", "rawtypes"}) + Coder windowCoder = windowingStrategy.getWindowFn().windowCoder(); + + @SuppressWarnings({"unchecked"}) + StatefulDoFnRunner.StateCleaner stateCleaner = + new StatefulDoFnRunner.StateInternalsStateCleaner<>( + doFn, keyedStateInternals, windowCoder); + + return DoFnRunners.defaultStatefulDoFnRunner( + doFn, + getInputCoder(), + wrappedRunner, + stepContext, + windowingStrategy, + cleanupTimer, + stateCleaner, + true /* requiresTimeSortedInput is supported */); + + } else { + return doFnRunner; + } + } + + @Override + public void setup( + StreamTask containingTask, + StreamConfig config, + Output>> output) { + + // make sure that FileSystems is initialized correctly + FileSystems.setDefaultPipelineOptions(serializedOptions.get()); + + super.setup(containingTask, config, output); + } + + protected boolean shoudBundleElements() { + return isStreaming; + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + + ListStateDescriptor> pushedBackStateDescriptor = + new ListStateDescriptor<>( + "pushed-back-elements", + new CoderTypeSerializer<>(windowedInputCoder, serializedOptions)); + + if (keySelector != null) { + pushedBackElementsHandler = + KeyedPushedBackElementsHandler.create( + keySelector, getKeyedStateBackend(), pushedBackStateDescriptor); + } else { + ListState> listState = + getOperatorStateBackend().getListState(pushedBackStateDescriptor); + pushedBackElementsHandler = NonKeyedPushedBackElementsHandler.create(listState); + } + + currentInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); + currentSideInputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); + currentOutputWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(); + + sideInputReader = NullSideInputReader.of(sideInputs); + + if (!sideInputs.isEmpty()) { + + FlinkBroadcastStateInternals sideInputStateInternals = + new FlinkBroadcastStateInternals<>( + getContainingTask().getIndexInSubtaskGroup(), + getOperatorStateBackend(), + serializedOptions); + + sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals); + sideInputReader = sideInputHandler; + + Stream> pushedBack = pushedBackElementsHandler.getElements(); + long min = + pushedBack.map(v -> v.getTimestamp().getMillis()).reduce(Long.MAX_VALUE, Math::min); + pushedBackWatermark = min; + } else { + pushedBackWatermark = Long.MAX_VALUE; + } + + // StatefulPardo or WindowDoFn + if (keyCoder != null) { + keyedStateInternals = + new FlinkStateInternals<>( + (KeyedStateBackend) getKeyedStateBackend(), + keyCoder, + windowingStrategy.getWindowFn().windowCoder(), + serializedOptions); + + if (timerService == null) { + timerService = + getInternalTimerService( + "beam-timer", new CoderTypeSerializer<>(timerCoder, serializedOptions), this); + } + + timerInternals = new FlinkTimerInternals(timerService); + Preconditions.checkNotNull(getTimeServiceManager(), "Time service manager is not set."); + } + + outputManager = + outputManagerFactory.create( + output, getLockToAcquireForStateAccessDuringBundles(), getOperatorStateBackend()); + } + + /** + * Subclasses may provide a lock to ensure that the state backend is not accessed concurrently + * during bundle execution. + */ + protected Lock getLockToAcquireForStateAccessDuringBundles() { + return NoopLock.get(); + } + + @Override + public void open() throws Exception { + // WindowDoFnOperator need use state and timer to get DoFn. + // So must wait StateInternals and TimerInternals ready. + // This will be called after initializeState() + this.doFn = getDoFn(); + + FlinkPipelineOptions options = serializedOptions.get().as(FlinkPipelineOptions.class); + doFnInvoker = DoFnInvokers.tryInvokeSetupFor(doFn, options); + + StepContext stepContext = new FlinkStepContext(); + doFnRunner = + DoFnRunners.simpleRunner( + options, + doFn, + sideInputReader, + outputManager, + mainOutputTag, + additionalOutputTags, + stepContext, + getInputCoder(), + outputCoders, + windowingStrategy, + doFnSchemaInformation, + sideInputMapping); + + doFnRunner = + createBufferingDoFnRunnerIfNeeded(createWrappingDoFnRunner(doFnRunner, stepContext)); + earlyBindStateIfNeeded(); + + if (!options.getDisableMetrics()) { + flinkMetricContainer = new FlinkMetricContainer(getRuntimeContext()); + doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, flinkMetricContainer); + String checkpointMetricNamespace = options.getReportCheckpointDuration(); + if (checkpointMetricNamespace != null) { + MetricName checkpointMetric = + MetricName.named(checkpointMetricNamespace, "checkpoint_duration"); + checkpointStats = + new CheckpointStats( + () -> + flinkMetricContainer + .getMetricsContainer(stepName) + .getDistribution(checkpointMetric)); + } + } + + elementCount = 0L; + lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime(); + + // Schedule timer to check timeout of finish bundle. + long bundleCheckPeriod = Math.max(maxBundleTimeMills / 2, 1); + checkFinishBundleTimer = + getProcessingTimeService() + .scheduleAtFixedRate( + timestamp -> checkInvokeFinishBundleByTime(), bundleCheckPeriod, bundleCheckPeriod); + + if (doFn instanceof SplittableParDoViaKeyedWorkItems.ProcessFn) { + pushbackDoFnRunner = + new ProcessFnRunner<>((DoFnRunner) doFnRunner, sideInputs, sideInputHandler); + } else { + pushbackDoFnRunner = + SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); + } + + bundleFinalizer = new InMemoryBundleFinalizer(); + pendingFinalizations = new LinkedHashMap<>(); + } + + DoFnRunner createBufferingDoFnRunnerIfNeeded( + DoFnRunner wrappedRunner) throws Exception { + + if (requiresStableInput) { + // put this in front of the root FnRunner before any additional wrappers + return this.bufferingDoFnRunner = + BufferingDoFnRunner.create( + wrappedRunner, + "stable-input-buffer", + windowedInputCoder, + windowingStrategy.getWindowFn().windowCoder(), + getOperatorStateBackend(), + getBufferingKeyedStateBackend(), + numConcurrentCheckpoints, + serializedOptions); + } + return wrappedRunner; + } + + /** + * Retrieve a keyed state backend that should be used to buffer elements for + * {@code @RequiresStableInput} functionality. By default this is the default keyed backend, but + * can be override in {@link ExecutableStageDoFnOperator}. + * + * @return the keyed backend to use for element buffering + */ + @Nullable KeyedStateBackend getBufferingKeyedStateBackend() { + return getKeyedStateBackend(); + } + + private void earlyBindStateIfNeeded() throws IllegalArgumentException, IllegalAccessException { + if (keyCoder != null) { + if (doFn != null) { + DoFnSignature signature = DoFnSignatures.getSignature(doFn.getClass()); + FlinkStateInternals.EarlyBinder earlyBinder = + new FlinkStateInternals.EarlyBinder( + getKeyedStateBackend(), + serializedOptions, + windowingStrategy.getWindowFn().windowCoder()); + for (DoFnSignature.StateDeclaration value : signature.stateDeclarations().values()) { + StateSpec spec = + (StateSpec) signature.stateDeclarations().get(value.id()).field().get(doFn); + spec.bind(value.id(), earlyBinder); + } + if (doFnRunner instanceof StatefulDoFnRunner) { + ((StatefulDoFnRunner) doFnRunner) + .getSystemStateTags() + .forEach(tag -> tag.getSpec().bind(tag.getId(), earlyBinder)); + } + } + } + } + + void cleanUp() throws Exception { + Optional.ofNullable(flinkMetricContainer) + .ifPresent(FlinkMetricContainer::registerMetricsForPipelineResult); + Optional.ofNullable(checkFinishBundleTimer).ifPresent(timer -> timer.cancel(true)); + Workarounds.deleteStaticCaches(); + Optional.ofNullable(doFnInvoker).ifPresent(DoFnInvoker::invokeTeardown); + } + + void flushData() throws Exception { + // This is our last change to block shutdown of this operator while + // there are still remaining processing-time timers. Flink will ignore pending + // processing-time timers when upstream operators have shut down and will also + // shut down this operator with pending processing-time timers. + if (numProcessingTimeTimers() > 0) { + timerInternals.processPendingProcessingTimeTimers(); + } + if (numProcessingTimeTimers() > 0) { + throw new RuntimeException( + "There are still " + + numProcessingTimeTimers() + + " processing-time timers left, this indicates a bug"); + } + // make sure we send a +Inf watermark downstream. It can happen that we receive +Inf + // in processWatermark*() but have holds, so we have to re-evaluate here. + processWatermark(new Watermark(Long.MAX_VALUE)); + // Make sure to finish the current bundle + while (bundleStarted) { + invokeFinishBundle(); + } + if (requiresStableInput && enableStableInputDrain) { + // Flush any buffered events here before draining the pipeline. Note that this is best-effort + // and requiresStableInput contract might be violated in cases where buffer processing fails. + bufferingDoFnRunner.checkpointCompleted(Long.MAX_VALUE); + updateOutputWatermark(); + } + if (currentOutputWatermark < Long.MAX_VALUE) { + throw new RuntimeException( + String.format( + "There are still watermark holds left when terminating operator %s Watermark held %d", + getOperatorName(), currentOutputWatermark)); + } + + // sanity check: these should have been flushed out by +Inf watermarks + if (!sideInputs.isEmpty()) { + + List> pushedBackElements = + pushedBackElementsHandler.getElements().collect(Collectors.toList()); + + if (pushedBackElements.size() > 0) { + String pushedBackString = Joiner.on(",").join(pushedBackElements); + throw new RuntimeException( + "Leftover pushed-back data: " + pushedBackString + ". This indicates a bug."); + } + } + } + + @Override + public void finish() throws Exception { + try { + flushData(); + } finally { + super.finish(); + } + } + + @Override + public void close() throws Exception { + try { + cleanUp(); + } finally { + super.close(); + } + } + + protected int numProcessingTimeTimers() { + return getTimeServiceManager() + .map( + manager -> { + if (timeServiceManager instanceof InternalTimeServiceManagerImpl) { + final InternalTimeServiceManagerImpl cast = + (InternalTimeServiceManagerImpl) timeServiceManager; + return cast.numProcessingTimeTimers(); + } else if (timeServiceManager instanceof BatchExecutionInternalTimeServiceManager) { + return 0; + } else { + throw new IllegalStateException( + String.format( + "Unknown implementation of InternalTimerServiceManager. %s", + timeServiceManager)); + } + }) + .orElse(0); + } + + public long getEffectiveInputWatermark() { + // hold back by the pushed back values waiting for side inputs + long combinedPushedBackWatermark = pushedBackWatermark; + if (requiresStableInput) { + combinedPushedBackWatermark = + Math.min(combinedPushedBackWatermark, bufferingDoFnRunner.getOutputWatermarkHold()); + } + return Math.min(combinedPushedBackWatermark, currentInputWatermark); + } + + public long getCurrentOutputWatermark() { + return currentOutputWatermark; + } + + protected final void setPreBundleCallback(Runnable callback) { + this.preBundleCallback = callback; + } + + protected final void setBundleFinishedCallback(Runnable callback) { + this.bundleFinishedCallback = callback; + } + + @Override + public final void processElement(StreamRecord> streamRecord) { + for (WindowedValue e : preProcess(streamRecord.getValue())) { + checkInvokeStartBundle(); + LOG.trace("Processing element {} in {}", streamRecord.getValue().getValue(), doFn.getClass()); + long oldHold = keyCoder != null ? keyedStateInternals.minWatermarkHoldMs() : -1L; + doFnRunner.processElement(e); + checkInvokeFinishBundleByCount(); + emitWatermarkIfHoldChanged(oldHold); + } + } + + @Override + public final void processElement1(StreamRecord> streamRecord) + throws Exception { + for (WindowedValue e : preProcess(streamRecord.getValue())) { + checkInvokeStartBundle(); + Iterable> justPushedBack = + pushbackDoFnRunner.processElementInReadyWindows(e); + + long min = pushedBackWatermark; + for (WindowedValue pushedBackValue : justPushedBack) { + min = Math.min(min, pushedBackValue.getTimestamp().getMillis()); + pushedBackElementsHandler.pushBack(pushedBackValue); + } + pushedBackWatermark = min; + + checkInvokeFinishBundleByCount(); + } + } + + /** + * Add the side input value. Here we are assuming that views have already been materialized and + * are sent over the wire as {@link Iterable}. Subclasses may elect to perform materialization in + * state and receive side input incrementally instead. + * + * @param streamRecord + */ + protected void addSideInputValue(StreamRecord streamRecord) { + @SuppressWarnings("unchecked") + WindowedValue> value = + (WindowedValue>) streamRecord.getValue().getValue(); + + PCollectionView sideInput = sideInputTagMapping.get(streamRecord.getValue().getUnionTag()); + sideInputHandler.addSideInputValue(sideInput, value); + } + + @Override + public final void processElement2(StreamRecord streamRecord) throws Exception { + // we finish the bundle because the newly arrived side-input might + // make a view available that was previously not ready. + // The PushbackSideInputRunner will only reset its cache of non-ready windows when + // finishing a bundle. + invokeFinishBundle(); + checkInvokeStartBundle(); + + // add the side input, which may cause pushed back elements become eligible for processing + addSideInputValue(streamRecord); + + List> newPushedBack = new ArrayList<>(); + + Iterator> it = pushedBackElementsHandler.getElements().iterator(); + + while (it.hasNext()) { + WindowedValue element = it.next(); + // we need to set the correct key in case the operator is + // a (keyed) window operator + if (keySelector != null) { + setCurrentKey(keySelector.getKey(element)); + } + + Iterable> justPushedBack = + pushbackDoFnRunner.processElementInReadyWindows(element); + Iterables.addAll(newPushedBack, justPushedBack); + } + + pushedBackElementsHandler.clear(); + long min = Long.MAX_VALUE; + for (WindowedValue pushedBackValue : newPushedBack) { + min = Math.min(min, pushedBackValue.getTimestamp().getMillis()); + pushedBackElementsHandler.pushBack(pushedBackValue); + } + pushedBackWatermark = min; + + checkInvokeFinishBundleByCount(); + + // maybe output a new watermark + processWatermark1(new Watermark(currentInputWatermark)); + } + + @Override + public final void processWatermark(Watermark mark) throws Exception { + LOG.trace("Processing watermark {} in {}", mark.getTimestamp(), doFn.getClass()); + processWatermark1(mark); + } + + @Override + public final void processWatermark1(Watermark mark) throws Exception { + // Flush any data buffered during snapshotState(). + outputManager.flushBuffer(); + + // We do the check here because we are guaranteed to at least get the +Inf watermark on the + // main input when the job finishes. + if (currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + // this means we will never see any more side input + // we also do the check here because we might have received the side-input MAX watermark + // before receiving any main-input data + emitAllPushedBackData(); + } + + currentInputWatermark = mark.getTimestamp(); + processInputWatermark(true); + } + + private void processInputWatermark(boolean advanceInputWatermark) throws Exception { + long inputWatermarkHold = applyInputWatermarkHold(getEffectiveInputWatermark()); + if (keyCoder != null && advanceInputWatermark) { + timeServiceManager.advanceWatermark(new Watermark(inputWatermarkHold)); + } + + long potentialOutputWatermark = + applyOutputWatermarkHold( + currentOutputWatermark, computeOutputWatermark(inputWatermarkHold)); + + maybeEmitWatermark(potentialOutputWatermark); + } + + /** + * Allows to apply a hold to the input watermark. By default, just passes the input watermark + * through. + */ + public long applyInputWatermarkHold(long inputWatermark) { + return inputWatermark; + } + + /** + * Allows to apply a hold to the output watermark before it is sent out. Used to apply hold on + * output watermark for delayed (asynchronous or buffered) processing. + * + * @param currentOutputWatermark the current output watermark + * @param potentialOutputWatermark The potential new output watermark which can be adjusted, if + * needed. The input watermark hold has already been applied. + * @return The new output watermark which will be emitted. + */ + public long applyOutputWatermarkHold(long currentOutputWatermark, long potentialOutputWatermark) { + return potentialOutputWatermark; + } + + private long computeOutputWatermark(long inputWatermarkHold) { + final long potentialOutputWatermark; + if (keyCoder == null) { + potentialOutputWatermark = inputWatermarkHold; + } else { + potentialOutputWatermark = + Math.min(keyedStateInternals.minWatermarkHoldMs(), inputWatermarkHold); + } + return potentialOutputWatermark; + } + + private void maybeEmitWatermark(long watermark) { + if (watermark > currentOutputWatermark) { + // Must invoke finishBatch before emit the +Inf watermark otherwise there are some late + // events. + if (watermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + invokeFinishBundle(); + } + + if (bundleStarted) { + // do not update watermark in the middle of bundle, because it might cause + // user-buffered data to be emitted past watermark + return; + } + + LOG.debug("Emitting watermark {} from {}", watermark, getOperatorName()); + currentOutputWatermark = watermark; + output.emitWatermark(new Watermark(watermark)); + + // Check if the final watermark was triggered to perform state cleanup for global window + // TODO: Do we need to do this when OnWindowExpiration is set, since in that case we have a + // cleanup timer? + if (keyedStateInternals != null + && currentOutputWatermark + > adjustTimestampForFlink(GlobalWindow.INSTANCE.maxTimestamp().getMillis())) { + keyedStateInternals.clearGlobalState(); + } + } + } + + @Override + public final void processWatermark2(Watermark mark) throws Exception { + currentSideInputWatermark = mark.getTimestamp(); + if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) { + // this means we will never see any more side input + emitAllPushedBackData(); + + // maybe output a new watermark + processWatermark1(new Watermark(currentInputWatermark)); + } + } + + /** + * Emits all pushed-back data. This should be used once we know that there will not be any future + * side input, i.e. that there is no point in waiting. + */ + private void emitAllPushedBackData() throws Exception { + + Iterator> it = pushedBackElementsHandler.getElements().iterator(); + + while (it.hasNext()) { + checkInvokeStartBundle(); + WindowedValue element = it.next(); + // we need to set the correct key in case the operator is + // a (keyed) window operator + setKeyContextElement1(new StreamRecord<>(element)); + + doFnRunner.processElement(element); + } + + pushedBackElementsHandler.clear(); + pushedBackWatermark = Long.MAX_VALUE; + } + + /** + * Check whether invoke startBundle, if it is, need to output elements that were buffered as part + * of finishing a bundle in snapshot() first. + * + *

In order to avoid having {@link DoFnRunner#processElement(WindowedValue)} or {@link + * DoFnRunner#onTimer(String, String, Object, BoundedWindow, Instant, Instant, TimeDomain)} not + * between StartBundle and FinishBundle, this method needs to be called in each processElement and + * each processWatermark and onProcessingTime. Do not need to call in onEventTime, because it has + * been guaranteed in the processWatermark. + */ + private void checkInvokeStartBundle() { + if (!bundleStarted) { + // Flush any data buffered during snapshotState(). + outputManager.flushBuffer(); + LOG.debug("Starting bundle."); + if (preBundleCallback != null) { + preBundleCallback.run(); + } + pushbackDoFnRunner.startBundle(); + bundleStarted = true; + } + } + + /** Check whether invoke finishBundle by elements count. Called in processElement. */ + @SuppressWarnings("NonAtomicVolatileUpdate") + @SuppressFBWarnings("VO_VOLATILE_INCREMENT") + private void checkInvokeFinishBundleByCount() { + if (!shoudBundleElements()) { + return; + } + // We do not access this statement concurrently, but we want to make sure that each thread + // sees the latest value, which is why we use volatile. See the class field section above + // for more information. + //noinspection NonAtomicOperationOnVolatileField + elementCount++; + if (elementCount >= maxBundleSize) { + invokeFinishBundle(); + updateOutputWatermark(); + } + } + + /** Check whether invoke finishBundle by timeout. */ + private void checkInvokeFinishBundleByTime() { + if (!shoudBundleElements()) { + return; + } + long now = getProcessingTimeService().getCurrentProcessingTime(); + if (now - lastFinishBundleTime >= maxBundleTimeMills) { + invokeFinishBundle(); + scheduleForCurrentProcessingTime(ts -> updateOutputWatermark()); + } + } + + @SuppressWarnings("FutureReturnValueIgnored") + protected void scheduleForCurrentProcessingTime(ProcessingTimeCallback callback) { + // We are scheduling a timer for advancing the watermark, to not delay finishing the bundle + // and temporarily release the checkpoint lock. Otherwise, we could potentially loop when a + // timer keeps scheduling a timer for the same timestamp. + ProcessingTimeService timeService = getProcessingTimeService(); + timeService.registerTimer(timeService.getCurrentProcessingTime(), callback); + } + + void updateOutputWatermark() { + try { + processInputWatermark(false); + } catch (Exception ex) { + failBundleFinalization(ex); + } + } + + protected final void invokeFinishBundle() { + long previousBundleFinishTime = lastFinishBundleTime; + if (bundleStarted) { + LOG.debug("Finishing bundle."); + pushbackDoFnRunner.finishBundle(); + LOG.debug("Finished bundle. Element count: {}", elementCount); + elementCount = 0L; + lastFinishBundleTime = getProcessingTimeService().getCurrentProcessingTime(); + bundleStarted = false; + // callback only after current bundle was fully finalized + // it could start a new bundle, for example resulting from timer processing + if (bundleFinishedCallback != null) { + LOG.debug("Invoking bundle finish callback."); + bundleFinishedCallback.run(); + } + } + try { + if (previousBundleFinishTime - getProcessingTimeService().getCurrentProcessingTime() + > maxBundleTimeMills) { + processInputWatermark(false); + } + } catch (Exception ex) { + LOG.warn("Failed to update downstream watermark", ex); + } + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) { + if (finishBundleBeforeCheckpointing) { + // We finish the bundle and flush any pending data. + // This avoids buffering any data as part of snapshotState() below. + while (bundleStarted) { + invokeFinishBundle(); + } + updateOutputWatermark(); + } + } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + if (checkpointStats != null) { + checkpointStats.snapshotStart(context.getCheckpointId()); + } + + if (requiresStableInput) { + // We notify the BufferingDoFnRunner to associate buffered state with this + // snapshot id and start a new buffer for elements arriving after this snapshot. + bufferingDoFnRunner.checkpoint(context.getCheckpointId()); + } + + int diff = pendingFinalizations.size() - MAX_NUMBER_PENDING_BUNDLE_FINALIZATIONS; + if (diff >= 0) { + for (Iterator iterator = pendingFinalizations.keySet().iterator(); diff >= 0; diff--) { + iterator.next(); + iterator.remove(); + } + } + pendingFinalizations.put(context.getCheckpointId(), bundleFinalizer.getAndClearFinalizations()); + + try { + outputManager.openBuffer(); + // Ensure that no new bundle gets started as part of finishing a bundle + while (bundleStarted) { + invokeFinishBundle(); + } + outputManager.closeBuffer(); + } catch (Exception e) { + failBundleFinalization(e); + } + + super.snapshotState(context); + } + + private void failBundleFinalization(Exception e) { + // https://jira.apache.org/jira/browse/FLINK-14653 + // Any regular exception during checkpointing will be tolerated by Flink because those + // typically do not affect the execution flow. We need to fail hard here because errors + // in bundle execution are application errors which are not related to checkpointing. + throw new Error("Checkpointing failed because bundle failed to finalize.", e); + } + + public BundleFinalizer getBundleFinalizer() { + return bundleFinalizer; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + if (checkpointStats != null) { + checkpointStats.reportCheckpointDuration(checkpointId); + } + + if (requiresStableInput) { + // We can now release all buffered data which was held back for + // @RequiresStableInput guarantees. + bufferingDoFnRunner.checkpointCompleted(checkpointId); + updateOutputWatermark(); + } + + List finalizations = + pendingFinalizations.remove(checkpointId); + if (finalizations != null) { + // confirm all finalizations that were associated with the checkpoint + for (InMemoryBundleFinalizer.Finalization finalization : finalizations) { + finalization.getCallback().onBundleSuccess(); + } + } + + super.notifyCheckpointComplete(checkpointId); + } + + @Override + public void onEventTime(InternalTimer timer) { + checkInvokeStartBundle(); + fireTimerInternal(timer.getKey(), timer.getNamespace()); + } + + @Override + public void onProcessingTime(InternalTimer timer) { + checkInvokeStartBundle(); + fireTimerInternal(timer.getKey(), timer.getNamespace()); + } + + // allow overriding this in ExecutableStageDoFnOperator to set the key context + protected void fireTimerInternal(FlinkKey key, TimerData timerData) { + long oldHold = keyCoder != null ? keyedStateInternals.minWatermarkHoldMs() : -1L; + fireTimer(timerData); + emitWatermarkIfHoldChanged(oldHold); + } + + void emitWatermarkIfHoldChanged(long currentWatermarkHold) { + if (keyCoder != null) { + long newWatermarkHold = keyedStateInternals.minWatermarkHoldMs(); + if (newWatermarkHold > currentWatermarkHold) { + try { + processInputWatermark(false); + } catch (Exception ex) { + // should not happen + throw new IllegalStateException(ex); + } + } + } + } + + // allow overriding this in WindowDoFnOperator + protected void fireTimer(TimerData timerData) { + LOG.debug( + "Firing timer: {} at {} with output time {}", + timerData.getTimerId(), + timerData.getTimestamp().getMillis(), + timerData.getOutputTimestamp().getMillis()); + StateNamespace namespace = timerData.getNamespace(); + // This is a user timer, so namespace must be WindowNamespace + checkArgument(namespace instanceof WindowNamespace); + BoundedWindow window = ((WindowNamespace) namespace).getWindow(); + timerInternals.onFiredOrDeletedTimer(timerData); + + pushbackDoFnRunner.onTimer( + timerData.getTimerId(), + timerData.getTimerFamilyId(), + keyedStateInternals.getKey(), + window, + timerData.getTimestamp(), + timerData.getOutputTimestamp(), + timerData.getDomain(), + timerData.causedByDrain()); + } + + @SuppressWarnings("unchecked") + Coder getInputCoder() { + return (Coder) Iterables.getOnlyElement(windowedInputCoder.getCoderArguments()); + } + + /** Factory for creating an {@link BufferedOutputManager} from a Flink {@link Output}. */ + interface OutputManagerFactory extends Serializable { + BufferedOutputManager create( + Output>> output, + Lock bufferLock, + OperatorStateBackend operatorStateBackend) + throws Exception; + } + + /** + * A {@link WindowedValueReceiver} that can buffer its outputs. Uses {@link + * PushedBackElementsHandler} to buffer the data. Buffering data is necessary because no elements + * can be emitted during {@code snapshotState} which is called when the checkpoint barrier already + * has been sent downstream. Emitting elements would break the flow of checkpoint barrier and + * violate exactly-once semantics. + * + *

This buffering can be deactived using {@code + * FlinkPipelineOptions#setFinishBundleBeforeCheckpointing(true)}. If activated, we flush out + * bundle data before the barrier is sent downstream. This is done via {@code + * prepareSnapshotPreBarrier}. When Flink supports unaligned checkpoints, this should become the + * default and this class should be removed as in https://github.com/apache/beam/pull/9652. + */ + public static class BufferedOutputManager implements WindowedValueMultiReceiver { + + private final TupleTag mainTag; + private final Map, OutputTag>> tagsToOutputTags; + private final Map, Integer> tagsToIds; + /** + * A lock to be acquired before writing to the buffer. This lock will only be acquired during + * buffering. It will not be acquired during flushing the buffer. + */ + private final Lock bufferLock; + + private final boolean isStreaming; + + private Map> idsToTags; + /** Elements buffered during a snapshot, by output id. */ + @VisibleForTesting + final PushedBackElementsHandler>> pushedBackElementsHandler; + + protected final Output>> output; + + /** Indicates whether we are buffering data as part of snapshotState(). */ + private boolean openBuffer = false; + /** For performance, to avoid having to access the state backend when the buffer is empty. */ + private boolean bufferIsEmpty = false; + + BufferedOutputManager( + Output>> output, + TupleTag mainTag, + Map, OutputTag>> tagsToOutputTags, + Map, Integer> tagsToIds, + Lock bufferLock, + PushedBackElementsHandler>> pushedBackElementsHandler, + boolean isStreaming) { + this.output = output; + this.mainTag = mainTag; + this.tagsToOutputTags = tagsToOutputTags; + this.tagsToIds = tagsToIds; + this.bufferLock = bufferLock; + this.idsToTags = new HashMap<>(); + for (Map.Entry, Integer> entry : tagsToIds.entrySet()) { + idsToTags.put(entry.getValue(), entry.getKey()); + } + this.pushedBackElementsHandler = pushedBackElementsHandler; + this.isStreaming = isStreaming; + } + + void openBuffer() { + this.openBuffer = true; + } + + void closeBuffer() { + this.openBuffer = false; + } + + @Override + public void output(TupleTag tag, WindowedValue value) { + // Don't buffer elements in Batch mode + if (!openBuffer || !isStreaming) { + emit(tag, value); + } else { + buffer(KV.of(tagsToIds.get(tag), value)); + } + } + + private void buffer(KV> taggedValue) { + bufferLock.lock(); + try { + pushedBackElementsHandler.pushBack(taggedValue); + } catch (Exception e) { + throw new RuntimeException("Couldn't pushback element.", e); + } finally { + bufferLock.unlock(); + bufferIsEmpty = false; + } + } + + /** + * Flush elements of bufferState to Flink Output. This method should not be invoked in {@link + * #snapshotState(StateSnapshotContext)} because the checkpoint barrier has already been sent + * downstream; emitting elements at this point would violate the checkpoint barrier alignment. + * + *

The buffer should be flushed before starting a new bundle when the buffer cannot be + * concurrently accessed and thus does not need to be guarded by a lock. + */ + void flushBuffer() { + if (openBuffer || bufferIsEmpty) { + // Checkpoint currently in progress or nothing buffered, do not proceed + return; + } + try { + pushedBackElementsHandler + .getElements() + .forEach( + element -> + emit(idsToTags.get(element.getKey()), (WindowedValue) element.getValue())); + pushedBackElementsHandler.clear(); + bufferIsEmpty = true; + } catch (Exception e) { + throw new RuntimeException("Couldn't flush pushed back elements.", e); + } + } + + private void emit(TupleTag tag, WindowedValue value) { + if (tag.equals(mainTag)) { + // with tagged outputs we can't get around this because we don't + // know our own output type... + @SuppressWarnings("unchecked") + WindowedValue castValue = (WindowedValue) value; + output.collect(new StreamRecord<>(castValue)); + } else { + @SuppressWarnings("unchecked") + OutputTag> outputTag = (OutputTag) tagsToOutputTags.get(tag); + output.collect(outputTag, new StreamRecord<>(value)); + } + } + } + + /** Coder for KV of id and value. It will be serialized in Flink checkpoint. */ + private static class TaggedKvCoder extends StructuredCoder>> { + + private final Map>> idsToCoders; + + TaggedKvCoder(Map>> idsToCoders) { + this.idsToCoders = idsToCoders; + } + + @Override + public void encode(KV> kv, OutputStream out) throws IOException { + Coder> coder = idsToCoders.get(kv.getKey()); + VarIntCoder.of().encode(kv.getKey(), out); + coder.encode(kv.getValue(), out); + } + + @Override + public KV> decode(InputStream in) throws IOException { + Integer id = VarIntCoder.of().decode(in); + Coder> coder = idsToCoders.get(id); + WindowedValue value = coder.decode(in); + return KV.of(id, value); + } + + @Override + public List> getCoderArguments() { + return new ArrayList<>(idsToCoders.values()); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + for (Coder coder : idsToCoders.values()) { + verifyDeterministic(this, "Coder must be deterministic", coder); + } + } + } + + /** + * Implementation of {@link OutputManagerFactory} that creates an {@link BufferedOutputManager} + * that can write to multiple logical outputs by Flink side output. + */ + public static class MultiOutputOutputManagerFactory + implements OutputManagerFactory { + + private final TupleTag mainTag; + private final Map, Integer> tagsToIds; + private final Map, OutputTag>> tagsToOutputTags; + private final Map, Coder>> tagsToCoders; + private final SerializablePipelineOptions pipelineOptions; + private final boolean isStreaming; + + // There is no side output. + @SuppressWarnings("unchecked") + public MultiOutputOutputManagerFactory( + TupleTag mainTag, + Coder> mainCoder, + SerializablePipelineOptions pipelineOptions) { + this( + mainTag, + new HashMap<>(), + ImmutableMap., Coder>>builder() + .put(mainTag, (Coder) mainCoder) + .build(), + ImmutableMap., Integer>builder().put(mainTag, 0).build(), + pipelineOptions); + } + + public MultiOutputOutputManagerFactory( + TupleTag mainTag, + Map, OutputTag>> tagsToOutputTags, + Map, Coder>> tagsToCoders, + Map, Integer> tagsToIds, + SerializablePipelineOptions pipelineOptions) { + this.mainTag = mainTag; + this.tagsToOutputTags = tagsToOutputTags; + this.tagsToCoders = tagsToCoders; + this.tagsToIds = tagsToIds; + this.pipelineOptions = pipelineOptions; + this.isStreaming = pipelineOptions.get().as(FlinkPipelineOptions.class).isStreaming(); + } + + @Override + public BufferedOutputManager create( + Output>> output, + Lock bufferLock, + OperatorStateBackend operatorStateBackend) + throws Exception { + Preconditions.checkNotNull(output); + Preconditions.checkNotNull(bufferLock); + Preconditions.checkNotNull(operatorStateBackend); + + TaggedKvCoder taggedKvCoder = buildTaggedKvCoder(); + ListStateDescriptor>> taggedOutputPushbackStateDescriptor = + new ListStateDescriptor<>( + "bundle-buffer-tag", new CoderTypeSerializer<>(taggedKvCoder, pipelineOptions)); + ListState>> listStateBuffer = + operatorStateBackend.getListState(taggedOutputPushbackStateDescriptor); + PushedBackElementsHandler>> pushedBackElementsHandler = + NonKeyedPushedBackElementsHandler.create(listStateBuffer); + + return new BufferedOutputManager<>( + output, + mainTag, + tagsToOutputTags, + tagsToIds, + bufferLock, + pushedBackElementsHandler, + isStreaming); + } + + private TaggedKvCoder buildTaggedKvCoder() { + ImmutableMap.Builder>> idsToCodersBuilder = + ImmutableMap.builder(); + for (Map.Entry, Integer> entry : tagsToIds.entrySet()) { + idsToCodersBuilder.put(entry.getValue(), tagsToCoders.get(entry.getKey())); + } + return new TaggedKvCoder(idsToCodersBuilder.build()); + } + } + + /** + * {@link StepContext} for running {@link DoFn DoFns} on Flink. This does not allow accessing + * state or timer internals. + */ + protected class FlinkStepContext implements StepContext { + + @Override + public StateInternals stateInternals() { + return keyedStateInternals; + } + + @Override + public TimerInternals timerInternals() { + return timerInternals; + } + + @Override + public BundleFinalizer bundleFinalizer() { + return bundleFinalizer; + } + } + + class FlinkTimerInternals implements TimerInternals { + + private static final String PENDING_TIMERS_STATE_NAME = "pending-timers"; + + /** + * Pending Timers (=not been fired yet) by context id. The id is generated from the state + * namespace of the timer and the timer's id. Necessary for supporting removal of existing + * timers. In Flink removal of timers can only be done by providing id and time of the timer. + * + *

CAUTION: This map is scoped by the current active key. Do not attempt to perform any + * calculations which span across keys. + */ + @VisibleForTesting final MapState pendingTimersById; + + private final InternalTimerService timerService; + + private FlinkTimerInternals(InternalTimerService timerService) throws Exception { + MapStateDescriptor pendingTimersByIdStateDescriptor = + new MapStateDescriptor<>( + PENDING_TIMERS_STATE_NAME, + new StringSerializer(), + new CoderTypeSerializer<>(timerCoder, serializedOptions)); + + this.pendingTimersById = getKeyedStateStore().getMapState(pendingTimersByIdStateDescriptor); + this.timerService = timerService; + populateOutputTimestampQueue(timerService); + } + + /** + * Processes all pending processing timers. This is intended for use during shutdown. From Flink + * 1.10 on, processing timer execution is stopped when the operator is closed. This leads to + * problems for applications which assume all pending timers will be completed. Although Flink + * does drain the remaining timers after close(), this is not sufficient because no new timers + * are allowed to be scheduled anymore. This breaks Beam pipelines which rely on all processing + * timers to be scheduled and executed. + */ + void processPendingProcessingTimeTimers() { + final KeyedStateBackend keyedStateBackend = getKeyedStateBackend(); + final InternalPriorityQueue> processingTimeTimersQueue = + Workarounds.retrieveInternalProcessingTimerQueue(timerService); + + InternalTimer internalTimer; + while ((internalTimer = processingTimeTimersQueue.poll()) != null) { + keyedStateBackend.setCurrentKey(internalTimer.getKey()); + TimerData timer = internalTimer.getNamespace(); + checkInvokeStartBundle(); + fireTimerInternal((FlinkKey) internalTimer.getKey(), timer); + } + } + + private void populateOutputTimestampQueue(InternalTimerService timerService) + throws Exception { + + BiConsumerWithException consumer = + (timerData, stamp) -> + keyedStateInternals.addWatermarkHoldUsage(timerData.getOutputTimestamp()); + if (timerService instanceof InternalTimerServiceImpl) { + timerService.forEachEventTimeTimer(consumer); + timerService.forEachProcessingTimeTimer(consumer); + } + } + + private String constructTimerId(String timerFamilyId, String timerId) { + return timerFamilyId + "+" + timerId; + } + + @Override + public void setTimer( + StateNamespace namespace, + String timerId, + String timerFamilyId, + Instant target, + Instant outputTimestamp, + TimeDomain timeDomain) { + setTimer( + TimerData.of(timerId, timerFamilyId, namespace, target, outputTimestamp, timeDomain)); + } + + /** + * @deprecated use {@link #setTimer(StateNamespace, String, String, Instant, Instant, + * TimeDomain)}. + */ + @Deprecated + @Override + public void setTimer(TimerData timer) { + try { + LOG.debug( + "Setting timer: {} at {} with output time {}", + timer.getTimerId(), + timer.getTimestamp().getMillis(), + timer.getOutputTimestamp().getMillis()); + String contextTimerId = + getContextTimerId( + constructTimerId(timer.getTimerFamilyId(), timer.getTimerId()), + timer.getNamespace()); + @Nullable final TimerData oldTimer = pendingTimersById.get(contextTimerId); + if (!timer.equals(oldTimer)) { + // Only one timer can exist at a time for a given timer id and context. + // If a timer gets set twice in the same context, the second must + // override the first. Thus, we must cancel any pending timers + // before we set the new one. + cancelPendingTimer(oldTimer); + registerTimer(timer, contextTimerId); + } + } catch (Exception e) { + throw new RuntimeException("Failed to set timer", e); + } + } + + private void registerTimer(TimerData timer, String contextTimerId) throws Exception { + LOG.debug("Registering timer {}", timer); + pendingTimersById.put(contextTimerId, timer); + long time = timer.getTimestamp().getMillis(); + switch (timer.getDomain()) { + case EVENT_TIME: + timerService.registerEventTimeTimer(timer, adjustTimestampForFlink(time)); + break; + case PROCESSING_TIME: + case SYNCHRONIZED_PROCESSING_TIME: + timerService.registerProcessingTimeTimer(timer, adjustTimestampForFlink(time)); + break; + default: + throw new UnsupportedOperationException("Unsupported time domain: " + timer.getDomain()); + } + keyedStateInternals.addWatermarkHoldUsage(timer.getOutputTimestamp()); + } + + /** + * Looks up a timer by its id. This is necessary to support canceling existing timers with the + * same id. Flink does not provide this functionality. + * + * @param contextTimerId Timer ID o cancel. + */ + private void cancelPendingTimerById(String contextTimerId) throws Exception { + cancelPendingTimer(pendingTimersById.get(contextTimerId)); + } + + /** + * Cancels a pending timer. + * + * @param timer Timer to cancel. + */ + private void cancelPendingTimer(@Nullable TimerData timer) { + if (timer != null) { + deleteTimerInternal(timer); + } + } + + /** + * Hook which must be called when a timer is fired or deleted to perform cleanup. Note: Make + * sure that the state backend key is set correctly. It is best to run this in the fireTimer() + * method. + */ + void onFiredOrDeletedTimer(TimerData timer) { + try { + pendingTimersById.remove( + getContextTimerId( + constructTimerId(timer.getTimerFamilyId(), timer.getTimerId()), + timer.getNamespace())); + keyedStateInternals.removeWatermarkHoldUsage(timer.getOutputTimestamp()); + } catch (Exception e) { + throw new RuntimeException("Failed to cleanup pending timers state.", e); + } + } + + /** @deprecated use {@link #deleteTimer(StateNamespace, String, String, TimeDomain)}. */ + @Deprecated + @Override + public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) { + throw new UnsupportedOperationException("Canceling of a timer by ID is not yet supported."); + } + + @Override + public void deleteTimer( + StateNamespace namespace, String timerId, String timerFamilyId, TimeDomain timeDomain) { + try { + cancelPendingTimerById(getContextTimerId(timerId, namespace)); + } catch (Exception e) { + throw new RuntimeException("Failed to cancel timer", e); + } + } + + /** @deprecated use {@link #deleteTimer(StateNamespace, String, String, TimeDomain)}. */ + @Override + @Deprecated + public void deleteTimer(TimerData timer) { + deleteTimer( + timer.getNamespace(), + constructTimerId(timer.getTimerFamilyId(), timer.getTimerId()), + timer.getTimerFamilyId(), + timer.getDomain()); + } + + void deleteTimerInternal(TimerData timer) { + long time = timer.getTimestamp().getMillis(); + switch (timer.getDomain()) { + case EVENT_TIME: + timerService.deleteEventTimeTimer(timer, adjustTimestampForFlink(time)); + break; + case PROCESSING_TIME: + case SYNCHRONIZED_PROCESSING_TIME: + timerService.deleteProcessingTimeTimer(timer, adjustTimestampForFlink(time)); + break; + default: + throw new UnsupportedOperationException("Unsupported time domain: " + timer.getDomain()); + } + onFiredOrDeletedTimer(timer); + } + + @Override + public Instant currentProcessingTime() { + return new Instant(timerService.currentProcessingTime()); + } + + @Override + public @Nullable Instant currentSynchronizedProcessingTime() { + return new Instant(timerService.currentProcessingTime()); + } + + @Override + public Instant currentInputWatermarkTime() { + if (timerService instanceof BatchExecutionInternalTimeService) { + // In batch mode, this method will only either return BoundedWindow.TIMESTAMP_MIN_VALUE, + // or BoundedWindow.TIMESTAMP_MAX_VALUE. + // + // For batch execution mode, the currentInputWatermark variable will never be updated + // until all the records are processed. However, every time when a record with a new + // key arrives, the Flink timer service watermark will be set to + // MAX_WATERMARK(LONG.MAX_VALUE) so that all the timers associated with the current + // key can fire. After that the Flink timer service watermark will be reset to + // LONG.MIN_VALUE, so the next key will start from a fresh env as if the previous + // records of a different key never existed. So the watermark is either Long.MIN_VALUE + // or long MAX_VALUE. So we should just use the Flink time service watermark in batch mode. + // + // In Flink the watermark ranges from + // [LONG.MIN_VALUE (-9223372036854775808), LONG.MAX_VALUE (9223372036854775807)] while the + // beam + // watermark range is [BoundedWindow.TIMESTAMP_MIN_VALUE (-9223372036854775), + // BoundedWindow.TIMESTAMP_MAX_VALUE (9223372036854775)]. To ensure the timestamp visible to + // the users follow the Beam convention, we just use the Beam range instead. + return timerService.currentWatermark() == Long.MAX_VALUE + ? new Instant(Long.MAX_VALUE) + : BoundedWindow.TIMESTAMP_MIN_VALUE; + } else { + return new Instant(getEffectiveInputWatermark()); + } + } + + @Override + public @Nullable Instant currentOutputWatermarkTime() { + return new Instant(currentOutputWatermark); + } + + /** + * Check whether event time timers lower or equal to the given timestamp exist. Caution: This is + * scoped by the current key. + */ + public boolean hasPendingEventTimeTimers(long maxTimestamp) throws Exception { + for (TimerData timer : pendingTimersById.values()) { + if (timer.getDomain() == TimeDomain.EVENT_TIME + && timer.getTimestamp().getMillis() <= maxTimestamp) { + return true; + } + } + return false; + } + + /** Unique contextual id of a timer. Used to look up any existing timers in a context. */ + private String getContextTimerId(String timerId, StateNamespace namespace) { + return timerId + namespace.stringKey(); + } + } + + /** + * In Beam, a timer with timestamp {@code T} is only illegible for firing when the time has moved + * past this time stamp, i.e. {@code T < current_time}. In the case of event time, current_time is + * the watermark, in the case of processing time it is the system time. + * + *

Flink's TimerService has different semantics because it only ensures {@code T <= + * current_time}. + * + *

To make up for this, we need to add one millisecond to Flink's internal timer timestamp. + * Note that we do not modify Beam's timestamp and we are not exposing Flink's timestamp. + * + *

See also https://jira.apache.org/jira/browse/BEAM-3863 + */ + static long adjustTimestampForFlink(long beamTimerTimestamp) { + if (beamTimerTimestamp == Long.MAX_VALUE) { + // We would overflow, do not adjust timestamp + return Long.MAX_VALUE; + } + return beamTimerTimestamp + 1; + } +} diff --git a/runners/flink/2.2/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java b/runners/flink/2.2/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java new file mode 100644 index 000000000000..dd8753efc4cb --- /dev/null +++ b/runners/flink/2.2/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsNull.nullValue; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import java.util.HashMap; +import org.apache.beam.repackaged.core.org.apache.commons.lang3.SerializationUtils; +import org.apache.beam.runners.core.construction.SerializablePipelineOptions; +import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFnSchemaInformation; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.WindowedValue; +import org.apache.beam.sdk.values.WindowedValues; +import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +// Flink 2.2: CheckpointingMode moved from org.apache.flink.streaming.api to +// org.apache.flink.core.execution (FLINK-39022 / Flink 2.2 deprecation cleanup). +import org.apache.flink.core.execution.CheckpointingMode; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.joda.time.Instant; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for serialization and deserialization of {@link PipelineOptions} in {@link DoFnOperator}. + * + *

This is the Flink 2.2 override. It uses {@code org.apache.flink.core.execution.CheckpointingMode} + * (the non-deprecated path) instead of the deprecated {@code org.apache.flink.streaming.api.CheckpointingMode}. + */ +public class FlinkPipelineOptionsTest { + + /** Pipeline options. */ + public interface MyOptions extends FlinkPipelineOptions { + @Description("Bla bla bla") + @Default.String("Hello") + String getTestOption(); + + void setTestOption(String value); + } + + private static MyOptions options = + PipelineOptionsFactory.fromArgs("--testOption=nothing").as(MyOptions.class); + + /** Verifies that Flink 2.2.x is on the classpath. */ + @Test + public void testFlinkVersion() { + String version = EnvironmentInformation.getVersion(); + assertTrue( + "Expected Flink 2.2.x but got: " + version, + version.startsWith("2.2")); + } + + /** These defaults should only be changed with a very good reason. */ + @Test + public void testDefaults() { + FlinkPipelineOptions options = FlinkPipelineOptions.defaults(); + assertThat(options.getParallelism(), is(-1)); + assertThat(options.getMaxParallelism(), is(-1)); + assertThat(options.getFlinkMaster(), is("[auto]")); + assertThat(options.getFilesToStage(), is(nullValue())); + assertThat(options.getLatencyTrackingInterval(), is(0L)); + assertThat(options.getShutdownSourcesAfterIdleMs(), is(-1L)); + assertThat(options.getObjectReuse(), is(false)); + // Uses the non-deprecated CheckpointingMode from org.apache.flink.core.execution + assertThat(options.getCheckpointingMode(), is(CheckpointingMode.EXACTLY_ONCE.name())); + assertThat(options.getMinPauseBetweenCheckpoints(), is(-1L)); + assertThat(options.getCheckpointingInterval(), is(-1L)); + assertThat(options.getCheckpointTimeoutMillis(), is(-1L)); + assertThat(options.getNumConcurrentCheckpoints(), is(1)); + assertThat(options.getTolerableCheckpointFailureNumber(), is(0)); + assertThat(options.getFinishBundleBeforeCheckpointing(), is(false)); + assertThat(options.getNumberOfExecutionRetries(), is(-1)); + assertThat(options.getExecutionRetryDelay(), is(-1L)); + assertThat(options.getRetainExternalizedCheckpointsOnCancellation(), is(false)); + assertThat(options.getStateBackendFactory(), is(nullValue())); + assertThat(options.getStateBackend(), is(nullValue())); + assertThat(options.getStateBackendStoragePath(), is(nullValue())); + assertThat(options.getExecutionModeForBatch(), is(FlinkPipelineOptions.PIPELINED)); + assertThat(options.getUseDataStreamForBatch(), is(false)); + assertThat(options.getSavepointPath(), is(nullValue())); + assertThat(options.getAllowNonRestoredState(), is(false)); + assertThat(options.getDisableMetrics(), is(false)); + assertThat(options.getFasterCopy(), is(false)); + + assertThat(options.isStreaming(), is(false)); + assertThat(options.getMaxBundleSize(), is(5000L)); + assertThat(options.getMaxBundleTimeMills(), is(10000L)); + + // In streaming mode bundle size and bundle time are shorter + FlinkPipelineOptions optionsStreaming = FlinkPipelineOptions.defaults(); + optionsStreaming.setStreaming(true); + assertThat(optionsStreaming.getMaxBundleSize(), is(1000L)); + assertThat(optionsStreaming.getMaxBundleTimeMills(), is(1000L)); + } + + @Test(expected = Exception.class) + public void parDoBaseClassPipelineOptionsNullTest() { + TupleTag mainTag = new TupleTag<>("main-output"); + Coder> coder = WindowedValues.getValueOnlyCoder(StringUtf8Coder.of()); + new DoFnOperator<>( + new TestDoFn(), + "stepName", + coder, + Collections.emptyMap(), + mainTag, + Collections.emptyList(), + new DoFnOperator.MultiOutputOutputManagerFactory<>( + mainTag, coder, new SerializablePipelineOptions(FlinkPipelineOptions.defaults())), + WindowingStrategy.globalDefault(), + new HashMap<>(), + Collections.emptyList(), + null, + null, /* key coder */ + null /* key selector */, + DoFnSchemaInformation.create(), + Collections.emptyMap()); + } + + /** Tests that PipelineOptions are present after serialization. */ + @Test + public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception { + + TupleTag mainTag = new TupleTag<>("main-output"); + + Coder> coder = WindowedValues.getValueOnlyCoder(StringUtf8Coder.of()); + DoFnOperator doFnOperator = + new DoFnOperator<>( + new TestDoFn(), + "stepName", + coder, + Collections.emptyMap(), + mainTag, + Collections.emptyList(), + new DoFnOperator.MultiOutputOutputManagerFactory<>( + mainTag, coder, new SerializablePipelineOptions(FlinkPipelineOptions.defaults())), + WindowingStrategy.globalDefault(), + new HashMap<>(), + Collections.emptyList(), + options, + null, /* key coder */ + null /* key selector */, + DoFnSchemaInformation.create(), + Collections.emptyMap()); + + final byte[] serialized = SerializationUtils.serialize(doFnOperator); + + @SuppressWarnings("unchecked") + DoFnOperator deserialized = SerializationUtils.deserialize(serialized); + + TypeInformation> typeInformation = + TypeInformation.of(new TypeHint>() {}); + + OneInputStreamOperatorTestHarness, WindowedValue> testHarness = + new OneInputStreamOperatorTestHarness<>( + deserialized, typeInformation.createSerializer(new SerializerConfigImpl())); + testHarness.open(); + + // execute once to access options + testHarness.processElement( + new StreamRecord<>( + WindowedValues.of( + new Object(), Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING))); + + testHarness.close(); + } + + private static class TestDoFn extends DoFn { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + Assert.assertNotNull(c.getPipelineOptions()); + Assert.assertEquals( + options.getTestOption(), c.getPipelineOptions().as(MyOptions.class).getTestOption()); + } + } +} diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index c813939d53f1..8f07d4cec2c2 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -2106,7 +2106,7 @@ def _add_argparse_args(cls, parser): class FlinkRunnerOptions(PipelineOptions): # These should stay in sync with gradle.properties. - PUBLISHED_FLINK_VERSIONS = ['1.17', '1.18', '1.19', '1.20', '2.0'] + PUBLISHED_FLINK_VERSIONS = ['1.17', '1.18', '1.19', '1.20', '2.0', '2.2'] @classmethod def _add_argparse_args(cls, parser): diff --git a/sdks/typescript/src/apache_beam/runners/flink.ts b/sdks/typescript/src/apache_beam/runners/flink.ts index 8f80b971da2a..1b5ed99caedb 100644 --- a/sdks/typescript/src/apache_beam/runners/flink.ts +++ b/sdks/typescript/src/apache_beam/runners/flink.ts @@ -28,7 +28,7 @@ import { JavaJarService } from "../utils/service"; const MAGIC_HOST_NAMES = ["[local]", "[auto]"]; // These should stay in sync with gradle.properties. -const PUBLISHED_FLINK_VERSIONS = ["1.17", "1.18", "1.19", "1.20", "2.0"]; +const PUBLISHED_FLINK_VERSIONS = ["1.17", "1.18", "1.19", "1.20", "2.0", "2.2"]; const defaultOptions = { flinkMaster: "[local]", diff --git a/website/www/site/content/en/documentation/runners/flink.md b/website/www/site/content/en/documentation/runners/flink.md index 1aafa41ecb9c..2afd45a6308a 100644 --- a/website/www/site/content/en/documentation/runners/flink.md +++ b/website/www/site/content/en/documentation/runners/flink.md @@ -330,6 +330,11 @@ To find out which version of Flink is compatible with Beam please see the table Artifact Id Supported Beam Versions + + 2.2.x + beam-runners-flink-2.2 + ≥ 2.75.0 + 2.0.x beam-runners-flink-2.0 From 59b52813b76c8685a56ec4f5a70cf036a2b6605a Mon Sep 17 00:00:00 2001 From: Manan Mangal Date: Tue, 16 Jun 2026 01:02:02 -0700 Subject: [PATCH 2/2] Resolve lz4-java Gradle capability conflict in Flink job-server --- build.gradle.kts | 14 ++++++++++++++ .../wrappers/streaming/DoFnOperator.java | 2 +- .../runners/flink/FlinkPipelineOptionsTest.java | 11 ++++------- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index 4af8fa3f1ab4..f02971609728 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1055,3 +1055,17 @@ if (project.hasProperty("testJavaVersion")) { } } } + +// Flink 2.2.1 (FLINK-39139) replaced org.lz4:lz4-java with the fork at.yawk.lz4:lz4-java, +// which declares the same 'org.lz4:lz4-java' Gradle capability. Any subproject that depends +// on both Flink 2.2+ and another library using org.lz4 (Kafka, Spark, etc.) hits a capability +// conflict. This resolution applies globally so that individual subprojects do not each need +// to declare it independently. +allprojects { + configurations.all { + resolutionStrategy.capabilitiesResolution.withCapability("org.lz4:lz4-java") { + val yawk = candidates.find { it.id.displayName.startsWith("at.yawk.lz4:") } + if (yawk != null) select(yawk) else selectHighestVersion() + } + } +} diff --git a/runners/flink/2.2/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/2.2/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 020e58a1c663..409797625db4 100644 --- a/runners/flink/2.2/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/2.2/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -103,12 +103,12 @@ import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.runtime.state.InternalPriorityQueue; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; -import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl; diff --git a/runners/flink/2.2/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java b/runners/flink/2.2/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java index dd8753efc4cb..6cebadc49d5c 100644 --- a/runners/flink/2.2/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java +++ b/runners/flink/2.2/src/test/java/org/apache/beam/runners/flink/FlinkPipelineOptionsTest.java @@ -44,8 +44,6 @@ import org.apache.flink.api.common.serialization.SerializerConfigImpl; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; -// Flink 2.2: CheckpointingMode moved from org.apache.flink.streaming.api to -// org.apache.flink.core.execution (FLINK-39022 / Flink 2.2 deprecation cleanup). import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -57,8 +55,9 @@ /** * Tests for serialization and deserialization of {@link PipelineOptions} in {@link DoFnOperator}. * - *

This is the Flink 2.2 override. It uses {@code org.apache.flink.core.execution.CheckpointingMode} - * (the non-deprecated path) instead of the deprecated {@code org.apache.flink.streaming.api.CheckpointingMode}. + *

This is the Flink 2.2 override. It uses {@code + * org.apache.flink.core.execution.CheckpointingMode} (the non-deprecated path) instead of the + * deprecated {@code org.apache.flink.streaming.api.CheckpointingMode}. */ public class FlinkPipelineOptionsTest { @@ -78,9 +77,7 @@ public interface MyOptions extends FlinkPipelineOptions { @Test public void testFlinkVersion() { String version = EnvironmentInformation.getVersion(); - assertTrue( - "Expected Flink 2.2.x but got: " + version, - version.startsWith("2.2")); + assertTrue("Expected Flink 2.2.x but got: " + version, version.startsWith("2.2")); } /** These defaults should only be changed with a very good reason. */