From 2710cb7b8dede64204da3f1d20c905b7040f647b Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 11 Jun 2026 11:24:09 -0400 Subject: [PATCH] init --- .../apache/beam/sdk/io/parquet/ParquetIO.java | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java index feaceeeb4432..f869286b98bd 100644 --- a/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java +++ b/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java @@ -1033,6 +1033,8 @@ public abstract static class Sink implements FileIO.Sink { abstract @Nullable ValueProvider getMinRowCountForPageSizeCheck(); + abstract @Nullable ValueProvider getMaxRowCountForPageSizeCheck(); + abstract @Nullable Class getAvroDataModelClass(); abstract Builder toBuilder(); @@ -1056,6 +1058,9 @@ abstract static class Builder { abstract Builder setMinRowCountForPageSizeCheck( ValueProvider minRowCountForPageSizeCheck); + abstract Builder setMaxRowCountForPageSizeCheck( + ValueProvider maxRowCountForPageSizeCheck); + abstract Builder setAvroDataModelClass(Class modelClass); abstract Sink build(); @@ -1130,6 +1135,39 @@ public Sink withMinRowCountForPageSizeCheck( return toBuilder().setMinRowCountForPageSizeCheck(minRowCountForPageSizeCheck).build(); } + /** + * Specify the maximum number of rows to buffer before a page size check is forced. By default + * Parquet estimates the next check from the average row size and may defer it (up to {@code + * 10000} rows); a run of small rows followed by large rows can then let the column page buffer + * overflow {@code Integer.MAX_VALUE} before the deferred check fires. Setting this (e.g. {@code + * 1}) caps the interval so a check -- and flush -- happens at least this often regardless of + * the estimate. Pair it with {@link #withMinRowCountForPageSizeCheck(int)} to bound the buffer + * for tables whose row sizes vary widely. + */ + public Sink withMaxRowCountForPageSizeCheck(int maxRowCountForPageSizeCheck) { + checkArgument( + maxRowCountForPageSizeCheck > 0, "maxRowCountForPageSizeCheck must be positive"); + return toBuilder() + .setMaxRowCountForPageSizeCheck( + ValueProvider.StaticValueProvider.of(maxRowCountForPageSizeCheck)) + .build(); + } + + /** + * Like {@link #withMaxRowCountForPageSizeCheck(int)}, but accepts a {@link ValueProvider} so + * the value can be supplied at runtime (required for classic Dataflow templates). + */ + public Sink withMaxRowCountForPageSizeCheck( + ValueProvider maxRowCountForPageSizeCheck) { + checkNotNull(maxRowCountForPageSizeCheck, "maxRowCountForPageSizeCheck can not be null"); + if (maxRowCountForPageSizeCheck.isAccessible()) { + Integer value = maxRowCountForPageSizeCheck.get(); + checkNotNull(value, "maxRowCountForPageSizeCheck value cannot be null"); + checkArgument(value > 0, "maxRowCountForPageSizeCheck must be positive"); + } + return toBuilder().setMaxRowCountForPageSizeCheck(maxRowCountForPageSizeCheck).build(); + } + /** * Define the Avro data model; see {@link AvroParquetWriter.Builder#withDataModel(GenericData)}. */ @@ -1170,6 +1208,15 @@ public void open(WritableByteChannel channel) throws IOException { } } + ValueProvider maxRowCountProvider = getMaxRowCountForPageSizeCheck(); + if (maxRowCountProvider != null) { + Integer maxRowCount = maxRowCountProvider.get(); + if (maxRowCount != null) { + checkArgument(maxRowCount > 0, "maxRowCountForPageSizeCheck must be positive"); + builder = builder.withMaxRowCountForPageSizeCheck(maxRowCount); + } + } + if (modelClass != null) { try { builder.withDataModel(buildModelObject(modelClass));