From de03f405458a1ff799130a8a4337446bd30175a4 Mon Sep 17 00:00:00 2001 From: Xinyu Liu Date: Wed, 11 Jun 2025 10:32:12 -0700 Subject: [PATCH 1/4] Add filtering of large record before combine (sorting) --- .../FlinkBatchPortablePipelineTranslator.java | 12 ++- .../utils/LargeRecordFilterFunction.java | 39 +++++++++ .../examples/flink/flink_streaming_impulse.py | 8 +- .../examples/flink/flink_word_count.py | 80 +++++++++++++++++++ 4 files changed, 131 insertions(+), 8 deletions(-) create mode 100644 runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/LargeRecordFilterFunction.java create mode 100644 sdks/python/apache_beam/examples/flink/flink_word_count.py diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java index 4d9a5a516c75..ddb11fad218a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java @@ -46,6 +46,7 @@ import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.types.KvKeySelector; +import org.apache.beam.runners.flink.translation.utils.LargeRecordFilterFunction; import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat; import org.apache.beam.runners.fnexecution.provisioning.JobInfo; import org.apache.beam.runners.fnexecution.wire.WireCoders; @@ -92,6 +93,8 @@ import org.apache.flink.api.java.operators.SingleInputUdfOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A translator that translates bounded portable pipelines into executable Flink pipelines. @@ -119,6 +122,8 @@ public class FlinkBatchPortablePipelineTranslator implements FlinkPortablePipelineTranslator< FlinkBatchPortablePipelineTranslator.BatchTranslationContext> { + private static final Logger LOG = + LoggerFactory.getLogger(FlinkBatchPortablePipelineTranslator.class); /** * Creates a batch translation context. The resulting Flink execution dag will live in a new @@ -206,6 +211,7 @@ public FlinkPipelineOptions getPipelineOptions() { @Override public JobExecutionResult execute(String jobName) throws Exception { + LOG.info("Executing Flink batch job with name: {}", jobName); return getExecutionEnvironment().execute(jobName); } @@ -515,8 +521,12 @@ private static void translateGroupByKey( TypeInformation>>> partialReduceTypeInfo = new CoderTypeInformation<>(outputCoder, context.getPipelineOptions()); + LOG.info("Add step to filter large records in GroupByKey"); + DataSet>> filteredDataSet = + inputDataSet.filter(new LargeRecordFilterFunction<>()); + Grouping>> inputGrouping = - inputDataSet.groupBy(new KvKeySelector<>(inputElementCoder.getKeyCoder())); + filteredDataSet.groupBy(new KvKeySelector<>(inputElementCoder.getKeyCoder())); FlinkPartialReduceFunction, ?> partialReduceFunction = new FlinkPartialReduceFunction<>( diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/LargeRecordFilterFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/LargeRecordFilterFunction.java new file mode 100644 index 000000000000..a280c22e9a9c --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/LargeRecordFilterFunction.java @@ -0,0 +1,39 @@ +package org.apache.beam.runners.flink.translation.utils; + +import java.util.List; +import org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; +import org.apache.flink.api.common.functions.FilterFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LargeRecordFilterFunction implements + FilterFunction>> { + private static final Logger LOG = + LoggerFactory.getLogger(LargeRecordFilterFunction.class); + private static final long MAX_RECORD_SIZE = 1000000; // 1 MB + + @Override + public boolean filter(WindowedValue> windowedValue) throws Exception { + KV kv = windowedValue.getValue(); + long size = getObjectSize(kv.getKey()) + getObjectSize(kv.getValue()); + if (size >= MAX_RECORD_SIZE) { + LOG.warn("Dropping large record with size: {}", size); + return false; + } + return true; + } + + private static long getObjectSize(T o) { + // This method should return the size of the object in bytes. + // Implement this based on your requirements or use a library that can calculate object size. + if (o instanceof byte[]) { + return ((byte[]) o).length; + } else if(o instanceof List) { + return ((List) o).stream().mapToLong(LargeRecordFilterFunction::getObjectSize).sum(); + } else { + return 0; // Assume an average size for non-byte array or list objects. + } + } +} diff --git a/sdks/python/apache_beam/examples/flink/flink_streaming_impulse.py b/sdks/python/apache_beam/examples/flink/flink_streaming_impulse.py index fdd209a8efd9..3582fbc993c6 100644 --- a/sdks/python/apache_beam/examples/flink/flink_streaming_impulse.py +++ b/sdks/python/apache_beam/examples/flink/flink_streaming_impulse.py @@ -52,7 +52,7 @@ def apply_timestamp(element): def run(argv=None): """Build and run the pipeline.""" args = [ - "--runner=PortableRunner", "--job_endpoint=localhost:8099", "--streaming" + "--runner=PortableRunner", "--job_endpoint=localhost:8099", "--streaming", "--environment_type=LOOPBACK" ] if argv: args.extend(argv) @@ -83,12 +83,6 @@ def run(argv=None): _ = ( messages | 'decode' >> beam.Map(lambda x: ('', 1)) - | 'window' >> beam.WindowInto( - window.GlobalWindows(), - trigger=Repeatedly(AfterProcessingTime(5 * 1000)), - accumulation_mode=AccumulationMode.DISCARDING) - | 'group' >> beam.GroupByKey() - | 'count' >> beam.Map(count) | 'log' >> beam.Map(lambda x: logging.info("%d" % x[1]))) diff --git a/sdks/python/apache_beam/examples/flink/flink_word_count.py b/sdks/python/apache_beam/examples/flink/flink_word_count.py new file mode 100644 index 000000000000..9edca60b7552 --- /dev/null +++ b/sdks/python/apache_beam/examples/flink/flink_word_count.py @@ -0,0 +1,80 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A streaming workflow that uses a synthetic streaming source. + +This can only be used with the Flink portable runner. +""" + +# pytype: skip-file + +import argparse +import logging +import sys +import re + +import apache_beam as beam +from apache_beam.options.pipeline_options import PipelineOptions + + + +def run(argv=None): + """Build and run the pipeline.""" + args = [ + "--runner=PortableRunner", "--job_endpoint=localhost:8099", "--environment_type=LOOPBACK", "--max_bundle_size=1", + ] + if argv: + args.extend(argv) + + parser = argparse.ArgumentParser() + known_args, pipeline_args = parser.parse_known_args(args) + pipeline_options = PipelineOptions(pipeline_args) + + with beam.Pipeline(options=pipeline_options) as p: + + # Read the text file[pattern] into a PCollection. + lines = p | beam.Create([ + "Hello, world!", + "Hello, beam!", + "Hello, flink!", + "Hello, python!", + "Hello, java!", + ]) + + # Count the occurrences of each word. + counts = ( + lines + | 'Split' >> ( + beam.FlatMap( + lambda x: re.findall(r'[A-Za-z\']+', x)).with_output_types(str)) + | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) + | 'GroupAndSum' >> beam.CombinePerKey(sum)) + + # Format the counts into a PCollection of strings. + def format_result(word_count): + (word, count) = word_count + return '%s: %s' % (word, count) + + output = counts | 'Format' >> beam.Map(format_result) + + # Write the output using a "Write" transform that has side effects. + # pylint: disable=expression-not-assigned + output | beam.Map(print) + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + run(sys.argv[1:]) From 8f0b2a69619dfafea1bff0f6936f65853f8a149b Mon Sep 17 00:00:00 2001 From: Xinyu Liu Date: Thu, 12 Jun 2025 10:02:32 -0700 Subject: [PATCH 2/4] Update the cap to be 5MB --- .../utils/LargeRecordFilterFunction.java | 14 +++- .../examples/flink/flink_word_count.py | 80 ------------------- 2 files changed, 10 insertions(+), 84 deletions(-) delete mode 100644 sdks/python/apache_beam/examples/flink/flink_word_count.py diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/LargeRecordFilterFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/LargeRecordFilterFunction.java index a280c22e9a9c..01fd2f831bbd 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/LargeRecordFilterFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/LargeRecordFilterFunction.java @@ -8,11 +8,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * [Glean] + * FilterFunction that filters out large records based on a size threshold. + */ public class LargeRecordFilterFunction implements FilterFunction>> { private static final Logger LOG = LoggerFactory.getLogger(LargeRecordFilterFunction.class); - private static final long MAX_RECORD_SIZE = 1000000; // 1 MB + private static final long MAX_RECORD_SIZE = 5000000; // 5 MB @Override public boolean filter(WindowedValue> windowedValue) throws Exception { @@ -25,15 +29,17 @@ public boolean filter(WindowedValue> windowedValue) throws Exception { return true; } + /** + * Calculate the size of an object in bytes. + * This is a simplified version for objects used in portability. + */ private static long getObjectSize(T o) { - // This method should return the size of the object in bytes. - // Implement this based on your requirements or use a library that can calculate object size. if (o instanceof byte[]) { return ((byte[]) o).length; } else if(o instanceof List) { return ((List) o).stream().mapToLong(LargeRecordFilterFunction::getObjectSize).sum(); } else { - return 0; // Assume an average size for non-byte array or list objects. + return 0; // for other types, we don't calculate size } } } diff --git a/sdks/python/apache_beam/examples/flink/flink_word_count.py b/sdks/python/apache_beam/examples/flink/flink_word_count.py deleted file mode 100644 index 9edca60b7552..000000000000 --- a/sdks/python/apache_beam/examples/flink/flink_word_count.py +++ /dev/null @@ -1,80 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""A streaming workflow that uses a synthetic streaming source. - -This can only be used with the Flink portable runner. -""" - -# pytype: skip-file - -import argparse -import logging -import sys -import re - -import apache_beam as beam -from apache_beam.options.pipeline_options import PipelineOptions - - - -def run(argv=None): - """Build and run the pipeline.""" - args = [ - "--runner=PortableRunner", "--job_endpoint=localhost:8099", "--environment_type=LOOPBACK", "--max_bundle_size=1", - ] - if argv: - args.extend(argv) - - parser = argparse.ArgumentParser() - known_args, pipeline_args = parser.parse_known_args(args) - pipeline_options = PipelineOptions(pipeline_args) - - with beam.Pipeline(options=pipeline_options) as p: - - # Read the text file[pattern] into a PCollection. - lines = p | beam.Create([ - "Hello, world!", - "Hello, beam!", - "Hello, flink!", - "Hello, python!", - "Hello, java!", - ]) - - # Count the occurrences of each word. - counts = ( - lines - | 'Split' >> ( - beam.FlatMap( - lambda x: re.findall(r'[A-Za-z\']+', x)).with_output_types(str)) - | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) - | 'GroupAndSum' >> beam.CombinePerKey(sum)) - - # Format the counts into a PCollection of strings. - def format_result(word_count): - (word, count) = word_count - return '%s: %s' % (word, count) - - output = counts | 'Format' >> beam.Map(format_result) - - # Write the output using a "Write" transform that has side effects. - # pylint: disable=expression-not-assigned - output | beam.Map(print) - -if __name__ == '__main__': - logging.getLogger().setLevel(logging.INFO) - run(sys.argv[1:]) From e3550a7bc5c5971a16a216a6021d1a513ff1fcae Mon Sep 17 00:00:00 2001 From: Xinyu Liu Date: Thu, 12 Jun 2025 10:05:49 -0700 Subject: [PATCH 3/4] Unwanted changes --- .../apache_beam/examples/flink/flink_streaming_impulse.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/flink/flink_streaming_impulse.py b/sdks/python/apache_beam/examples/flink/flink_streaming_impulse.py index 3582fbc993c6..fdd209a8efd9 100644 --- a/sdks/python/apache_beam/examples/flink/flink_streaming_impulse.py +++ b/sdks/python/apache_beam/examples/flink/flink_streaming_impulse.py @@ -52,7 +52,7 @@ def apply_timestamp(element): def run(argv=None): """Build and run the pipeline.""" args = [ - "--runner=PortableRunner", "--job_endpoint=localhost:8099", "--streaming", "--environment_type=LOOPBACK" + "--runner=PortableRunner", "--job_endpoint=localhost:8099", "--streaming" ] if argv: args.extend(argv) @@ -83,6 +83,12 @@ def run(argv=None): _ = ( messages | 'decode' >> beam.Map(lambda x: ('', 1)) + | 'window' >> beam.WindowInto( + window.GlobalWindows(), + trigger=Repeatedly(AfterProcessingTime(5 * 1000)), + accumulation_mode=AccumulationMode.DISCARDING) + | 'group' >> beam.GroupByKey() + | 'count' >> beam.Map(count) | 'log' >> beam.Map(lambda x: logging.info("%d" % x[1]))) From 611ef2fd79e706ab316d9c16fdf889417cfe76d3 Mon Sep 17 00:00:00 2001 From: Xinyu Liu Date: Thu, 12 Jun 2025 16:29:05 -0700 Subject: [PATCH 4/4] Add more glean specific comments --- .../runners/flink/FlinkBatchPortablePipelineTranslator.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java index ddb11fad218a..19c943b67747 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java @@ -521,12 +521,14 @@ private static void translateGroupByKey( TypeInformation>>> partialReduceTypeInfo = new CoderTypeInformation<>(outputCoder, context.getPipelineOptions()); - LOG.info("Add step to filter large records in GroupByKey"); + ///////////////////////// BEGIN GLEAN MODIFICATION /////////////////////////////// + LOG.info("Add step to filter large records before GroupBy"); DataSet>> filteredDataSet = inputDataSet.filter(new LargeRecordFilterFunction<>()); Grouping>> inputGrouping = filteredDataSet.groupBy(new KvKeySelector<>(inputElementCoder.getKeyCoder())); + ///////////////////////// END GLEAN MODIFICATION ///////////////////////////////// FlinkPartialReduceFunction, ?> partialReduceFunction = new FlinkPartialReduceFunction<>(