Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,8 @@ public abstract static class Sink implements FileIO.Sink<GenericRecord> {

abstract @Nullable ValueProvider<Integer> getMinRowCountForPageSizeCheck();

abstract @Nullable ValueProvider<Integer> getMaxRowCountForPageSizeCheck();

abstract @Nullable Class<? extends GenericData> getAvroDataModelClass();

abstract Builder toBuilder();
Expand All @@ -1056,6 +1058,9 @@ abstract static class Builder {
abstract Builder setMinRowCountForPageSizeCheck(
ValueProvider<Integer> minRowCountForPageSizeCheck);

abstract Builder setMaxRowCountForPageSizeCheck(
ValueProvider<Integer> maxRowCountForPageSizeCheck);

abstract Builder setAvroDataModelClass(Class<? extends GenericData> modelClass);

abstract Sink build();
Expand Down Expand Up @@ -1130,6 +1135,39 @@ public Sink withMinRowCountForPageSizeCheck(
return toBuilder().setMinRowCountForPageSizeCheck(minRowCountForPageSizeCheck).build();
}

/**
* Specify the maximum number of rows to buffer before a page size check is forced. By default
* Parquet estimates the next check from the average row size and may defer it (up to {@code
* 10000} rows); a run of small rows followed by large rows can then let the column page buffer
* overflow {@code Integer.MAX_VALUE} before the deferred check fires. Setting this (e.g. {@code
* 1}) caps the interval so a check -- and flush -- happens at least this often regardless of
* the estimate. Pair it with {@link #withMinRowCountForPageSizeCheck(int)} to bound the buffer
* for tables whose row sizes vary widely.
*/
public Sink withMaxRowCountForPageSizeCheck(int maxRowCountForPageSizeCheck) {
checkArgument(
maxRowCountForPageSizeCheck > 0, "maxRowCountForPageSizeCheck must be positive");
return toBuilder()
.setMaxRowCountForPageSizeCheck(
ValueProvider.StaticValueProvider.of(maxRowCountForPageSizeCheck))
.build();
}

/**
* Like {@link #withMaxRowCountForPageSizeCheck(int)}, but accepts a {@link ValueProvider} so
* the value can be supplied at runtime (required for classic Dataflow templates).
*/
public Sink withMaxRowCountForPageSizeCheck(
ValueProvider<Integer> maxRowCountForPageSizeCheck) {
checkNotNull(maxRowCountForPageSizeCheck, "maxRowCountForPageSizeCheck can not be null");
if (maxRowCountForPageSizeCheck.isAccessible()) {
Integer value = maxRowCountForPageSizeCheck.get();
checkNotNull(value, "maxRowCountForPageSizeCheck value cannot be null");
checkArgument(value > 0, "maxRowCountForPageSizeCheck must be positive");
}
return toBuilder().setMaxRowCountForPageSizeCheck(maxRowCountForPageSizeCheck).build();
}

/**
* Define the Avro data model; see {@link AvroParquetWriter.Builder#withDataModel(GenericData)}.
*/
Expand Down Expand Up @@ -1170,6 +1208,15 @@ public void open(WritableByteChannel channel) throws IOException {
}
}

ValueProvider<Integer> maxRowCountProvider = getMaxRowCountForPageSizeCheck();
if (maxRowCountProvider != null) {
Integer maxRowCount = maxRowCountProvider.get();
if (maxRowCount != null) {
checkArgument(maxRowCount > 0, "maxRowCountForPageSizeCheck must be positive");
builder = builder.withMaxRowCountForPageSizeCheck(maxRowCount);
}
}

if (modelClass != null) {
try {
builder.withDataModel(buildModelObject(modelClass));
Expand Down
Loading