From 6cda60e89c0063049b11f16f292f9ddad529b179 Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Mon, 23 Feb 2026 13:56:57 +0100 Subject: [PATCH 01/10] feat: disable stats --- .../xtable/hudi/HudiConversionSource.java | 10 ++- .../hudi/HudiConversionSourceProvider.java | 4 +- .../xtable/hudi/HudiDataFileExtractor.java | 25 +++++- .../xtable/hudi/HudiFileStatsExtractor.java | 65 ++++++++++++++ .../apache/xtable/hudi/HudiSourceConfig.java | 23 +++++ .../hudi/TestHudiFileStatsExtractor.java | 85 +++++++++++++++++++ 6 files changed, 206 insertions(+), 6 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java index 00faa97d3..62be1cd8c 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java @@ -60,6 +60,13 @@ public class HudiConversionSource implements ConversionSource { public HudiConversionSource( HoodieTableMetaClient metaClient, PathBasedPartitionSpecExtractor sourcePartitionSpecExtractor) { + this(metaClient, sourcePartitionSpecExtractor, false); + } + + public HudiConversionSource( + HoodieTableMetaClient metaClient, + PathBasedPartitionSpecExtractor sourcePartitionSpecExtractor, + boolean skipStats) { this.metaClient = metaClient; this.tableExtractor = new HudiTableExtractor(new HudiSchemaExtractor(), sourcePartitionSpecExtractor); @@ -68,7 +75,8 @@ public HudiConversionSource( metaClient, new PathBasedPartitionValuesExtractor( sourcePartitionSpecExtractor.getPathToPartitionFieldFormat()), - new HudiFileStatsExtractor(metaClient)); + new HudiFileStatsExtractor(metaClient), + skipStats); } @Override diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java index aad7e0a16..dbb39a343 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java @@ -46,7 +46,9 @@ public HudiConversionSource getConversionSourceInstance(SourceTable sourceTable) final PathBasedPartitionSpecExtractor sourcePartitionSpecExtractor = HudiSourceConfig.fromProperties(sourceTable.getAdditionalProperties()) .loadSourcePartitionSpecExtractor(); + boolean skipStats = + HudiSourceConfig.getSkipStats(sourceTable.getAdditionalProperties(), hadoopConf); - return new HudiConversionSource(metaClient, sourcePartitionSpecExtractor); + return new HudiConversionSource(metaClient, sourcePartitionSpecExtractor, skipStats); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java index 5e17b389f..6d66f855f 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java @@ -77,6 +77,7 @@ public class HudiDataFileExtractor implements AutoCloseable { private final HoodieEngineContext engineContext; private final PathBasedPartitionValuesExtractor partitionValuesExtractor; private final HudiFileStatsExtractor fileStatsExtractor; + private final boolean skipStats; private final HoodieMetadataConfig metadataConfig; private final FileSystemViewManager fileSystemViewManager; private final Path basePath; @@ -85,6 +86,14 @@ public HudiDataFileExtractor( HoodieTableMetaClient metaClient, PathBasedPartitionValuesExtractor hudiPartitionValuesExtractor, HudiFileStatsExtractor hudiFileStatsExtractor) { + this(metaClient, hudiPartitionValuesExtractor, hudiFileStatsExtractor, false); + } + + public HudiDataFileExtractor( + HoodieTableMetaClient metaClient, + HudiPartitionValuesExtractor hudiPartitionValuesExtractor, + HudiFileStatsExtractor hudiFileStatsExtractor, + boolean skipStats) { this.engineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); metadataConfig = HoodieMetadataConfig.newBuilder() @@ -107,6 +116,7 @@ public HudiDataFileExtractor( this.metaClient = metaClient; this.partitionValuesExtractor = hudiPartitionValuesExtractor; this.fileStatsExtractor = hudiFileStatsExtractor; + this.skipStats = skipStats; } public List getFilesCurrentState(InternalTable table) { @@ -132,11 +142,14 @@ public InternalFilesDiff getDiffForCommit( getAddedAndRemovedPartitionInfo( visibleTimeline, instant, fsView, hoodieInstantForDiff, table.getPartitioningFields()); - Stream filesAddedWithoutStats = allInfo.getAdded().stream(); List filesAdded = - fileStatsExtractor - .addStatsToFiles(tableMetadata, filesAddedWithoutStats, table.getReadSchema()) - .collect(Collectors.toList()); + skipStats + ? fileStatsExtractor + .addRecordCountToFiles(tableMetadata, allInfo.getAdded().stream(), table.getReadSchema()) + .collect(Collectors.toList()) + : fileStatsExtractor + .addStatsToFiles(tableMetadata, allInfo.getAdded().stream(), table.getReadSchema()) + .collect(Collectors.toList()); List filesRemoved = allInfo.getRemoved(); return InternalFilesDiff.builder().filesAdded(filesAdded).filesRemoved(filesRemoved).build(); @@ -359,6 +372,10 @@ private List getInternalDataFilesForPartitions( .getLatestBaseFiles(partitionPath) .map(baseFile -> buildFileWithoutStats(partitionValues, baseFile)); }); + if (skipStats) { + return PartitionFileGroup.fromFiles( + fileStatsExtractor.addRecordCountToFiles(tableMetadata, filesWithoutStats, table.getReadSchema())); + } Stream files = fileStatsExtractor.addStatsToFiles(tableMetadata, filesWithoutStats, table.getReadSchema()); return PartitionFileGroup.fromFiles(files); diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java index 5a0b70cb9..9dd47bf07 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiFileStatsExtractor.java @@ -102,6 +102,30 @@ public Stream addStatsToFiles( : computeColumnStatsFromParquetFooters(files, nameFieldMap); } + /** + * Adds record count information only. + * + *

This avoids materializing full min/max/null column stats in memory while still writing + * correct file row counts (for example Delta `numRecords`). + */ + public Stream addRecordCountToFiles( + HoodieTableMetadata metadataTable, Stream files, InternalSchema schema) { + boolean useMetadataTableColStats = + metadataTable != null + && metaClient + .getTableConfig() + .isMetadataPartitionAvailable(MetadataPartitionType.COLUMN_STATS); + final Map nameFieldMap = + schema.getAllFields().stream() + .collect( + Collectors.toMap( + field -> getFieldNameForStats(field, useMetadataTableColStats), + Function.identity())); + return useMetadataTableColStats + ? computeRecordCountFromMetadataTable(metadataTable, files, nameFieldMap) + : computeRecordCountFromParquetFooters(files); + } + private Stream computeColumnStatsFromParquetFooters( Stream files, Map nameFieldMap) { return files.map( @@ -115,6 +139,16 @@ private Stream computeColumnStatsFromParquetFooters( }); } + private Stream computeRecordCountFromParquetFooters( + Stream files) { + return files.map( + file -> + file.toBuilder() + .recordCount( + UTILS.getRowCount(metaClient.getHadoopConf(), new Path(file.getPhysicalPath()))) + .build()); + } + private Pair getPartitionAndFileName(String path) { Path filePath = new CachingPath(path); String partitionPath = HudiPathUtils.getPartitionPath(metaClient.getBasePathV2(), filePath); @@ -167,6 +201,37 @@ private Stream computeColumnStatsFromMetadataTable( }); } + private Stream computeRecordCountFromMetadataTable( + HoodieTableMetadata metadataTable, + Stream files, + Map nameFieldMap) { + if (nameFieldMap.isEmpty()) { + return files.map(file -> file.toBuilder().recordCount(0L).build()); + } + Map, InternalDataFile> filePathsToDataFile = + files.collect( + Collectors.toMap( + file -> getPartitionAndFileName(file.getPhysicalPath()), Function.identity())); + if (filePathsToDataFile.isEmpty()) { + return Stream.empty(); + } + // Query a single column to fetch per-file valueCount, which is sufficient for row count. + String anyField = nameFieldMap.keySet().iterator().next(); + Map, HoodieMetadataColumnStats> statsByFile = + metadataTable.getColumnStats(new ArrayList<>(filePathsToDataFile.keySet()), anyField); + return filePathsToDataFile.entrySet().stream() + .map( + entry -> { + HoodieMetadataColumnStats stats = statsByFile.get(entry.getKey()); + long recordCount = + stats != null + ? stats.getValueCount() + : UTILS.getRowCount( + metaClient.getHadoopConf(), new Path(entry.getValue().getPhysicalPath())); + return entry.getValue().toBuilder().recordCount(recordCount).build(); + }); + } + private Optional getMaxFromColumnStats(List columnStats) { return columnStats.stream() .filter(entry -> entry.getField().getParentPath() == null) diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java index 7732c3820..b3cb3eaa7 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java @@ -28,6 +28,8 @@ import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; + import org.apache.xtable.model.schema.PartitionFieldSpec; import org.apache.xtable.model.schema.PartitionTransformType; import org.apache.xtable.reflection.ReflectionUtils; @@ -39,6 +41,7 @@ public class HudiSourceConfig { "xtable.hudi.source.partition_spec_extractor_class"; public static final String PARTITION_FIELD_SPEC_CONFIG = "xtable.hudi.source.partition_field_spec_config"; + public static final String SKIP_STATS_CONFIG = "xtable.hudi.source.skip_stats"; String partitionSpecExtractorClass; List partitionFieldSpecs; @@ -84,4 +87,24 @@ public PathBasedPartitionSpecExtractor loadSourcePartitionSpecExtractor() { return ReflectionUtils.createInstanceOfClass( partitionSpecExtractorClass, this.getPartitionFieldSpecs()); } + + public static boolean getSkipStats(Properties properties, Configuration configuration) { + String propertyValue = getPropertyOrNull(properties, SKIP_STATS_CONFIG); + if (propertyValue != null) { + return Boolean.parseBoolean(propertyValue); + } + if (configuration == null) { + return false; + } + String configValue = configuration.get(SKIP_STATS_CONFIG); + return configValue != null && Boolean.parseBoolean(configValue); + } + + private static String getPropertyOrNull( + Properties properties, String key) { + if (properties == null) { + return null; + } + return properties.getProperty(key); + } } diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java index a18bb743d..fe30beb9b 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiFileStatsExtractor.java @@ -162,6 +162,51 @@ void columnStatsWithMetadataTable(@TempDir Path tempDir) throws Exception { validateOutput(output); } + @Test + void recordCountOnlyWithMetadataTable(@TempDir Path tempDir) throws Exception { + String tableName = GenericTable.getTableName(); + String basePath; + try (TestJavaHudiTable table = + TestJavaHudiTable.withSchema( + tableName, tempDir, "long_field:SIMPLE", HoodieTableType.COPY_ON_WRITE, AVRO_SCHEMA)) { + List> records = + getRecords().stream().map(this::buildRecord).collect(Collectors.toList()); + table.insertRecords(true, records); + basePath = table.getBasePath(); + } + HoodieTableMetadata tableMetadata = + HoodieTableMetadata.create( + new HoodieJavaEngineContext(configuration), + HoodieMetadataConfig.newBuilder().enable(true).build(), + basePath, + true); + Path parquetFile = + Files.list(Paths.get(new URI(basePath))) + .filter(path -> path.toString().endsWith(".parquet")) + .findFirst() + .orElseThrow(() -> new RuntimeException("No files found")); + InternalDataFile inputFile = + InternalDataFile.builder() + .physicalPath(parquetFile.toString()) + .columnStats(Collections.emptyList()) + .fileFormat(FileFormat.APACHE_PARQUET) + .lastModified(1234L) + .fileSizeBytes(4321L) + .recordCount(0) + .build(); + HoodieTableMetaClient metaClient = + HoodieTableMetaClient.builder().setBasePath(basePath).setConf(configuration).build(); + HudiFileStatsExtractor fileStatsExtractor = new HudiFileStatsExtractor(metaClient); + List output = + fileStatsExtractor + .addRecordCountToFiles(tableMetadata, Stream.of(inputFile), schema) + .collect(Collectors.toList()); + + assertEquals(1, output.size()); + assertEquals(2, output.get(0).getRecordCount()); + assertEquals(0, output.get(0).getColumnStats().size()); + } + @Test void columnStatsWithoutMetadataTable(@TempDir Path tempDir) throws IOException { Path file = tempDir.resolve("tmp.parquet"); @@ -199,6 +244,46 @@ void columnStatsWithoutMetadataTable(@TempDir Path tempDir) throws IOException { validateOutput(output); } + @Test + void recordCountOnlyWithoutMetadataTable(@TempDir Path tempDir) throws IOException { + Path file = tempDir.resolve("tmp.parquet"); + GenericData genericData = GenericData.get(); + genericData.addLogicalTypeConversion(new Conversions.DecimalConversion()); + try (ParquetWriter writer = + AvroParquetWriter.builder( + HadoopOutputFile.fromPath( + new org.apache.hadoop.fs.Path(file.toUri()), configuration)) + .withSchema(AVRO_SCHEMA) + .withDataModel(genericData) + .build()) { + for (GenericRecord record : getRecords()) { + writer.write(record); + } + } + + InternalDataFile inputFile = + InternalDataFile.builder() + .physicalPath(file.toString()) + .columnStats(Collections.emptyList()) + .fileFormat(FileFormat.APACHE_PARQUET) + .lastModified(1234L) + .fileSizeBytes(4321L) + .recordCount(0) + .build(); + + HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class); + when(mockMetaClient.getHadoopConf()).thenReturn(configuration); + HudiFileStatsExtractor fileStatsExtractor = new HudiFileStatsExtractor(mockMetaClient); + List output = + fileStatsExtractor + .addRecordCountToFiles(null, Stream.of(inputFile), schema) + .collect(Collectors.toList()); + + assertEquals(1, output.size()); + assertEquals(2, output.get(0).getRecordCount()); + assertEquals(0, output.get(0).getColumnStats().size()); + } + private void validateOutput(List output) { assertEquals(1, output.size()); InternalDataFile fileWithStats = output.get(0); From 3873c617b358ad9c74334640e725cc520779712c Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Mon, 23 Feb 2026 14:14:22 +0100 Subject: [PATCH 02/10] disable row count --- .../xtable/delta/DeltaDataFileUpdatesExtractor.java | 5 +++++ .../org/apache/xtable/hudi/HudiDataFileExtractor.java | 7 ++----- .../apache/xtable/iceberg/IcebergDataFileUpdatesSync.java | 8 +++++--- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java index 71cebf9ae..f6bf413eb 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java @@ -130,6 +130,11 @@ private Stream createAddFileAction( private String getColumnStats( InternalSchema schema, long recordCount, List columnStats) { + // In skip-stats mode source files may not have row count/column stats. + // Return null so Delta doesn't persist incorrect numRecords=0 metadata. + if (recordCount <= 0 && (columnStats == null || columnStats.isEmpty())) { + return null; + } try { return deltaStatsExtractor.convertStatsToDeltaFormat(schema, recordCount, columnStats); } catch (JsonProcessingException e) { diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java index 6d66f855f..870aeb2a2 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java @@ -144,9 +144,7 @@ public InternalFilesDiff getDiffForCommit( List filesAdded = skipStats - ? fileStatsExtractor - .addRecordCountToFiles(tableMetadata, allInfo.getAdded().stream(), table.getReadSchema()) - .collect(Collectors.toList()) + ? allInfo.getAdded() : fileStatsExtractor .addStatsToFiles(tableMetadata, allInfo.getAdded().stream(), table.getReadSchema()) .collect(Collectors.toList()); @@ -373,8 +371,7 @@ private List getInternalDataFilesForPartitions( .map(baseFile -> buildFileWithoutStats(partitionValues, baseFile)); }); if (skipStats) { - return PartitionFileGroup.fromFiles( - fileStatsExtractor.addRecordCountToFiles(tableMetadata, filesWithoutStats, table.getReadSchema())); + return PartitionFileGroup.fromFiles(filesWithoutStats); } Stream files = fileStatsExtractor.addStatsToFiles(tableMetadata, filesWithoutStats, table.getReadSchema()); diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java index 56948cd1b..2ffcb3967 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java @@ -122,10 +122,12 @@ private DataFile getDataFile( DataFiles.builder(partitionSpec) .withPath(dataFile.getPhysicalPath()) .withFileSizeInBytes(dataFile.getFileSizeBytes()) - .withMetrics( - columnStatsConverter.toIceberg( - schema, dataFile.getRecordCount(), dataFile.getColumnStats())) .withFormat(convertFileFormat(dataFile.getFileFormat())); + if (dataFile.getRecordCount() > 0 || !dataFile.getColumnStats().isEmpty()) { + builder.withMetrics( + columnStatsConverter.toIceberg( + schema, dataFile.getRecordCount(), dataFile.getColumnStats())); + } if (partitionSpec.isPartitioned()) { builder.withPartition( partitionValueConverter.toIceberg(partitionSpec, schema, dataFile.getPartitionValues())); From a43bf31fa6a159ca0dd4a9ea12496accceb7c91d Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Mon, 23 Feb 2026 16:32:29 +0100 Subject: [PATCH 03/10] compute mandatory count --- .../org/apache/xtable/hudi/HudiDataFileExtractor.java | 10 ++++++++-- .../org/apache/xtable/utilities/RunCatalogSync.java | 6 ++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java index 870aeb2a2..6087f6782 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java @@ -144,7 +144,10 @@ public InternalFilesDiff getDiffForCommit( List filesAdded = skipStats - ? allInfo.getAdded() + ? fileStatsExtractor + .addRecordCountToFiles( + tableMetadata, allInfo.getAdded().stream(), table.getReadSchema()) + .collect(Collectors.toList()) : fileStatsExtractor .addStatsToFiles(tableMetadata, allInfo.getAdded().stream(), table.getReadSchema()) .collect(Collectors.toList()); @@ -371,7 +374,10 @@ private List getInternalDataFilesForPartitions( .map(baseFile -> buildFileWithoutStats(partitionValues, baseFile)); }); if (skipStats) { - return PartitionFileGroup.fromFiles(filesWithoutStats); + Stream filesWithRecordCount = + fileStatsExtractor.addRecordCountToFiles( + tableMetadata, filesWithoutStats, table.getReadSchema()); + return PartitionFileGroup.fromFiles(filesWithRecordCount); } Stream files = fileStatsExtractor.addStatsToFiles(tableMetadata, filesWithoutStats, table.getReadSchema()); diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java index e04d69850..cef519755 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java @@ -51,6 +51,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.config.Configurator; import org.apache.xtable.catalog.CatalogConversionFactory; import org.apache.xtable.conversion.ConversionConfig; @@ -111,6 +113,10 @@ public class RunCatalogSync { .addOption(HELP_OPTION, "help", false, "Displays help information to run this utility"); public static void main(String[] args) throws Exception { + // Reduce noisy per-partition FS-view logs while keeping useful metadata index INFO logs. + Configurator.setLevel( + "org.apache.hudi.common.table.view.AbstractTableFileSystemView", Level.WARN); + CommandLineParser parser = new DefaultParser(); CommandLine cmd; try { From 33d73ada93add04561c2160dabe02c92eb605dae Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Mon, 23 Feb 2026 17:07:46 +0100 Subject: [PATCH 04/10] fmt --- .../main/java/org/apache/xtable/hudi/HudiSourceConfig.java | 3 +-- .../main/java/org/apache/xtable/utilities/RunCatalogSync.java | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java index b3cb3eaa7..ddf885c68 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java @@ -100,8 +100,7 @@ public static boolean getSkipStats(Properties properties, Configuration configur return configValue != null && Boolean.parseBoolean(configValue); } - private static String getPropertyOrNull( - Properties properties, String key) { + private static String getPropertyOrNull(Properties properties, String key) { if (properties == null) { return null; } diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java index cef519755..e45ecd08c 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java @@ -48,11 +48,11 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.hadoop.conf.Configuration; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.core.config.Configurator; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.core.config.Configurator; import org.apache.xtable.catalog.CatalogConversionFactory; import org.apache.xtable.conversion.ConversionConfig; From 371bba25b29d71bd7e0ed42a86397fa22e5d4ca7 Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Mon, 23 Feb 2026 20:27:37 +0100 Subject: [PATCH 05/10] delta support 0 row parquet --- .../delta/DeltaDataFileUpdatesExtractor.java | 5 +- .../TestDeltaDataFileUpdatesExtractor.java | 91 +++++++++++++++++++ 2 files changed, 94 insertions(+), 2 deletions(-) create mode 100644 xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaDataFileUpdatesExtractor.java diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java index f6bf413eb..65a3f0f65 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java @@ -131,8 +131,9 @@ private Stream createAddFileAction( private String getColumnStats( InternalSchema schema, long recordCount, List columnStats) { // In skip-stats mode source files may not have row count/column stats. - // Return null so Delta doesn't persist incorrect numRecords=0 metadata. - if (recordCount <= 0 && (columnStats == null || columnStats.isEmpty())) { + // Return null only when row count is unknown (negative sentinel). + // Explicit rowCount=0 should be persisted as numRecords=0. + if (recordCount < 0 && (columnStats == null || columnStats.isEmpty())) { return null; } try { diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaDataFileUpdatesExtractor.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaDataFileUpdatesExtractor.java new file mode 100644 index 000000000..8accbfcbb --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaDataFileUpdatesExtractor.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.delta; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Collections; + +import org.junit.jupiter.api.Test; + +import org.apache.spark.sql.delta.actions.Action; +import org.apache.spark.sql.delta.actions.AddFile; + +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; + +class TestDeltaDataFileUpdatesExtractor { + + @Test + void shouldPersistNumRecordsZeroWhenRecordCountIsZeroAndColumnStatsAreEmpty() { + InternalSchema schema = + InternalSchema.builder() + .name("root") + .dataType(InternalType.RECORD) + .fields( + Collections.singletonList( + InternalField.builder() + .name("id") + .schema( + InternalSchema.builder() + .name("id") + .dataType(InternalType.INT) + .isNullable(true) + .build()) + .build())) + .isNullable(false) + .build(); + + InternalDataFile dataFile = + InternalDataFile.builder() + .physicalPath("s3://bucket/table/path/file-1.parquet") + .fileFormat(FileFormat.APACHE_PARQUET) + .fileSizeBytes(128) + .recordCount(0) + .columnStats(Collections.emptyList()) + .lastModified(1234L) + .build(); + + InternalFilesDiff diff = + InternalFilesDiff.builder() + .fileAdded(dataFile) + .filesRemoved(Collections.emptySet()) + .build(); + + DeltaDataFileUpdatesExtractor extractor = DeltaDataFileUpdatesExtractor.builder().build(); + Seq actions = extractor.applyDiff(diff, schema, "s3://bucket/table/path"); + AddFile addFile = + JavaConverters.seqAsJavaList(actions).stream() + .filter(AddFile.class::isInstance) + .map(AddFile.class::cast) + .findFirst() + .orElseThrow(() -> new AssertionError("Expected AddFile action")); + + assertNotNull(addFile.stats()); + assertTrue(addFile.stats().contains("\"numRecords\":0")); + } +} From 14517699f2a941f76594e41ce66f8402cb68ab81 Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Mon, 23 Feb 2026 20:27:46 +0100 Subject: [PATCH 06/10] iceberg support 0 row parquet --- .../apache/xtable/hudi/HudiSourceConfig.java | 4 ++-- .../iceberg/IcebergConversionSource.java | 4 +++- .../iceberg/IcebergDataFileUpdatesSync.java | 3 ++- .../xtable/iceberg/TestIcebergSync.java | 23 +++++++++++++++++++ 4 files changed, 30 insertions(+), 4 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java index ddf885c68..930b7ebb6 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java @@ -26,10 +26,10 @@ import lombok.Value; -import com.google.common.base.Preconditions; - import org.apache.hadoop.conf.Configuration; +import com.google.common.base.Preconditions; + import org.apache.xtable.model.schema.PartitionFieldSpec; import org.apache.xtable.model.schema.PartitionTransformType; import org.apache.xtable.reflection.ReflectionUtils; diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java index 325b50e9b..985a670c0 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java @@ -147,6 +147,9 @@ public InternalTable getTable(Snapshot snapshot) { public InternalTable getCurrentTable() { Table iceTable = getSourceTable(); Snapshot currentSnapshot = iceTable.currentSnapshot(); + if (currentSnapshot == null) { + throw new ReadException("Unable to read latest snapshot from Iceberg source table"); + } return getTable(currentSnapshot); } @@ -166,7 +169,6 @@ public InternalSnapshot getCurrentSnapshot() { .sourceIdentifier("0") .build(); } - InternalTable irTable = getTable(currentSnapshot); TableScan scan = diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java index 2ffcb3967..fdbd78a88 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileUpdatesSync.java @@ -123,7 +123,8 @@ private DataFile getDataFile( .withPath(dataFile.getPhysicalPath()) .withFileSizeInBytes(dataFile.getFileSizeBytes()) .withFormat(convertFileFormat(dataFile.getFileFormat())); - if (dataFile.getRecordCount() > 0 || !dataFile.getColumnStats().isEmpty()) { + // Iceberg data files always require a record count. Persist explicit zero counts as metrics. + if (dataFile.getRecordCount() >= 0 || !dataFile.getColumnStats().isEmpty()) { builder.withMetrics( columnStatsConverter.toIceberg( schema, dataFile.getRecordCount(), dataFile.getColumnStats())); diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java index cc70d4c0b..33e4863ef 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java @@ -299,6 +299,29 @@ public void testCreateSnapshotControlFlow() throws Exception { transactionArgumentCaptor.getAllValues().get(2)); } + @Test + public void testSnapshotWithZeroRecordCountFileAndNoColumnStats() throws Exception { + InternalTable table = + getInternalTable(tableName, basePath, internalSchema, null, LAST_COMMIT_TIME); + + InternalDataFile dataFileWithZeroCount = + getDataFile(1, Collections.emptyList()).toBuilder().recordCount(0).build(); + InternalSnapshot snapshot = buildSnapshot(table, "0", dataFileWithZeroCount); + + when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema); + when(mockPartitionSpecExtractor.toIceberg(eq(null), any())) + .thenReturn(PartitionSpec.unpartitioned()); + mockColStatsForFile(dataFileWithZeroCount, 1); + + TableFormatSync.getInstance() + .syncSnapshot(Collections.singletonList(conversionTarget), snapshot); + + validateIcebergTable( + tableName, table, Sets.newHashSet(dataFileWithZeroCount), Expressions.alwaysTrue()); + verify(mockColumnStatsConverter, times(1)) + .toIceberg(any(Schema.class), eq(0L), eq(Collections.emptyList())); + } + @Test public void testIncompleteWriteRollback() throws Exception { List fields2 = new ArrayList<>(internalSchema.getFields()); From 8852b84fa84001a212b636fdaf7c08c7c5bbe712 Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Thu, 26 Feb 2026 14:10:14 +0100 Subject: [PATCH 07/10] rename skip stats to skip column stats --- .../xtable/delta/DeltaDataFileUpdatesExtractor.java | 2 +- .../org/apache/xtable/hudi/HudiConversionSource.java | 4 ++-- .../xtable/hudi/HudiConversionSourceProvider.java | 6 +++--- .../apache/xtable/hudi/HudiDataFileExtractor.java | 12 ++++++------ .../org/apache/xtable/hudi/HudiSourceConfig.java | 8 ++++---- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java index 65a3f0f65..177bc43a2 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaDataFileUpdatesExtractor.java @@ -130,7 +130,7 @@ private Stream createAddFileAction( private String getColumnStats( InternalSchema schema, long recordCount, List columnStats) { - // In skip-stats mode source files may not have row count/column stats. + // In skip-column-stats mode source files may not have row count/column stats. // Return null only when row count is unknown (negative sentinel). // Explicit rowCount=0 should be persisted as numRecords=0. if (recordCount < 0 && (columnStats == null || columnStats.isEmpty())) { diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java index 62be1cd8c..763ac0767 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSource.java @@ -66,7 +66,7 @@ public HudiConversionSource( public HudiConversionSource( HoodieTableMetaClient metaClient, PathBasedPartitionSpecExtractor sourcePartitionSpecExtractor, - boolean skipStats) { + boolean skipColumnStats) { this.metaClient = metaClient; this.tableExtractor = new HudiTableExtractor(new HudiSchemaExtractor(), sourcePartitionSpecExtractor); @@ -76,7 +76,7 @@ public HudiConversionSource( new PathBasedPartitionValuesExtractor( sourcePartitionSpecExtractor.getPathToPartitionFieldFormat()), new HudiFileStatsExtractor(metaClient), - skipStats); + skipColumnStats); } @Override diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java index dbb39a343..3eabb9b7b 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionSourceProvider.java @@ -46,9 +46,9 @@ public HudiConversionSource getConversionSourceInstance(SourceTable sourceTable) final PathBasedPartitionSpecExtractor sourcePartitionSpecExtractor = HudiSourceConfig.fromProperties(sourceTable.getAdditionalProperties()) .loadSourcePartitionSpecExtractor(); - boolean skipStats = - HudiSourceConfig.getSkipStats(sourceTable.getAdditionalProperties(), hadoopConf); + boolean skipColumnStats = + HudiSourceConfig.getSkipColumnStats(sourceTable.getAdditionalProperties(), hadoopConf); - return new HudiConversionSource(metaClient, sourcePartitionSpecExtractor, skipStats); + return new HudiConversionSource(metaClient, sourcePartitionSpecExtractor, skipColumnStats); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java index 6087f6782..1951ec407 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java @@ -77,7 +77,7 @@ public class HudiDataFileExtractor implements AutoCloseable { private final HoodieEngineContext engineContext; private final PathBasedPartitionValuesExtractor partitionValuesExtractor; private final HudiFileStatsExtractor fileStatsExtractor; - private final boolean skipStats; + private final boolean skipColumnStats; private final HoodieMetadataConfig metadataConfig; private final FileSystemViewManager fileSystemViewManager; private final Path basePath; @@ -91,9 +91,9 @@ public HudiDataFileExtractor( public HudiDataFileExtractor( HoodieTableMetaClient metaClient, - HudiPartitionValuesExtractor hudiPartitionValuesExtractor, + PathBasedPartitionValuesExtractor hudiPartitionValuesExtractor, HudiFileStatsExtractor hudiFileStatsExtractor, - boolean skipStats) { + boolean skipColumnStats) { this.engineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); metadataConfig = HoodieMetadataConfig.newBuilder() @@ -116,7 +116,7 @@ public HudiDataFileExtractor( this.metaClient = metaClient; this.partitionValuesExtractor = hudiPartitionValuesExtractor; this.fileStatsExtractor = hudiFileStatsExtractor; - this.skipStats = skipStats; + this.skipColumnStats = skipColumnStats; } public List getFilesCurrentState(InternalTable table) { @@ -143,7 +143,7 @@ public InternalFilesDiff getDiffForCommit( visibleTimeline, instant, fsView, hoodieInstantForDiff, table.getPartitioningFields()); List filesAdded = - skipStats + skipColumnStats ? fileStatsExtractor .addRecordCountToFiles( tableMetadata, allInfo.getAdded().stream(), table.getReadSchema()) @@ -373,7 +373,7 @@ private List getInternalDataFilesForPartitions( .getLatestBaseFiles(partitionPath) .map(baseFile -> buildFileWithoutStats(partitionValues, baseFile)); }); - if (skipStats) { + if (skipColumnStats) { Stream filesWithRecordCount = fileStatsExtractor.addRecordCountToFiles( tableMetadata, filesWithoutStats, table.getReadSchema()); diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java index 930b7ebb6..d2e8c9d0d 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java @@ -41,7 +41,7 @@ public class HudiSourceConfig { "xtable.hudi.source.partition_spec_extractor_class"; public static final String PARTITION_FIELD_SPEC_CONFIG = "xtable.hudi.source.partition_field_spec_config"; - public static final String SKIP_STATS_CONFIG = "xtable.hudi.source.skip_stats"; + public static final String SKIP_COLUMN_STATS_CONFIG = "xtable.hudi.source.skip_column_stats"; String partitionSpecExtractorClass; List partitionFieldSpecs; @@ -88,15 +88,15 @@ public PathBasedPartitionSpecExtractor loadSourcePartitionSpecExtractor() { partitionSpecExtractorClass, this.getPartitionFieldSpecs()); } - public static boolean getSkipStats(Properties properties, Configuration configuration) { - String propertyValue = getPropertyOrNull(properties, SKIP_STATS_CONFIG); + public static boolean getSkipColumnStats(Properties properties, Configuration configuration) { + String propertyValue = getPropertyOrNull(properties, SKIP_COLUMN_STATS_CONFIG); if (propertyValue != null) { return Boolean.parseBoolean(propertyValue); } if (configuration == null) { return false; } - String configValue = configuration.get(SKIP_STATS_CONFIG); + String configValue = configuration.get(SKIP_COLUMN_STATS_CONFIG); return configValue != null && Boolean.parseBoolean(configValue); } From e0d001d504fe12080783ee92370aa4695f1d29f8 Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Thu, 26 Feb 2026 14:15:03 +0100 Subject: [PATCH 08/10] delta to skip column stats --- .../xtable/delta/DeltaConversionSource.java | 8 +++- .../delta/DeltaConversionSourceProvider.java | 3 ++ .../xtable/delta/DeltaDataFileExtractor.java | 5 +++ .../xtable/delta/DeltaSourceConfig.java | 44 +++++++++++++++++++ .../apache/xtable/hudi/HudiSourceConfig.java | 2 +- 5 files changed, 59 insertions(+), 3 deletions(-) create mode 100644 xtable-core/src/main/java/org/apache/xtable/delta/DeltaSourceConfig.java diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java index 7fc8ea7b7..90ee9950b 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java @@ -70,6 +70,9 @@ public class DeltaConversionSource implements ConversionSource { @Builder.Default private final DeltaTableExtractor tableExtractor = DeltaTableExtractor.builder().build(); + @Builder.Default + private final boolean skipColumnStats = false; + private Optional deltaIncrementalChangesState = Optional.empty(); private final SparkSession sparkSession; @@ -123,7 +126,7 @@ public TableChange getTableChangeForCommit(Long versionNumber) { fileFormat, tableAtVersion.getPartitioningFields(), tableAtVersion.getReadSchema().getAllFields(), - true, + !skipColumnStats, DeltaPartitionExtractor.getInstance(), DeltaStatsExtractor.getInstance()); addedFiles.put(dataFile.getPhysicalPath(), dataFile); @@ -223,7 +226,8 @@ private void resetState(long versionToStartFrom) { } private List getInternalDataFiles(Snapshot snapshot, InternalSchema schema) { - try (DataFileIterator fileIterator = dataFileExtractor.iterator(snapshot, schema)) { + try (DataFileIterator fileIterator = + dataFileExtractor.iterator(snapshot, schema, !skipColumnStats)) { List dataFiles = new ArrayList<>(); fileIterator.forEachRemaining(dataFiles::add); return PartitionFileGroup.fromFiles(dataFiles); diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java index 045e2b724..926dd8617 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSourceProvider.java @@ -31,12 +31,15 @@ public class DeltaConversionSourceProvider extends ConversionSourceProvider fields; diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSourceConfig.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSourceConfig.java new file mode 100644 index 000000000..eed9daa1d --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSourceConfig.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.delta; + +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; + +/** Configuration keys for Delta source format. */ +public final class DeltaSourceConfig { + public static final String SKIP_COLUMN_STATS_CONFIG = "xtable.source.skip_column_stats"; + + private DeltaSourceConfig() {} + + public static boolean getSkipColumnStats(Properties properties, Configuration configuration) { + if (properties != null) { + String propertyValue = properties.getProperty(SKIP_COLUMN_STATS_CONFIG); + if (propertyValue != null) { + return Boolean.parseBoolean(propertyValue); + } + } + if (configuration == null) { + return false; + } + String configValue = configuration.get(SKIP_COLUMN_STATS_CONFIG); + return configValue != null && Boolean.parseBoolean(configValue); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java index d2e8c9d0d..6345c273f 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiSourceConfig.java @@ -41,7 +41,7 @@ public class HudiSourceConfig { "xtable.hudi.source.partition_spec_extractor_class"; public static final String PARTITION_FIELD_SPEC_CONFIG = "xtable.hudi.source.partition_field_spec_config"; - public static final String SKIP_COLUMN_STATS_CONFIG = "xtable.hudi.source.skip_column_stats"; + public static final String SKIP_COLUMN_STATS_CONFIG = "xtable.source.skip_column_stats"; String partitionSpecExtractorClass; List partitionFieldSpecs; From 9a18b79830577bf1a1463ac2c3d63cafe82df5d4 Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Thu, 26 Feb 2026 14:19:14 +0100 Subject: [PATCH 09/10] implem for iceberg --- .../iceberg/IcebergConversionSource.java | 12 +++-- .../IcebergConversionSourceProvider.java | 4 ++ .../iceberg/IcebergDataFileExtractor.java | 2 +- .../xtable/iceberg/IcebergSourceConfig.java | 45 +++++++++++++++++++ 4 files changed, 59 insertions(+), 4 deletions(-) create mode 100644 xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSourceConfig.java diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java index 985a670c0..a747add8d 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java @@ -87,6 +87,9 @@ public class IcebergConversionSource implements ConversionSource { private final IcebergDataFileExtractor dataFileExtractor = IcebergDataFileExtractor.builder().build(); + @Builder.Default + private final boolean skipColumnStats = false; + private Table initSourceTable() { IcebergTableManager tableManager = IcebergTableManager.of(hadoopConf); String[] namespace = sourceTableConfig.getNamespace(); @@ -171,8 +174,10 @@ public InternalSnapshot getCurrentSnapshot() { } InternalTable irTable = getTable(currentSnapshot); - TableScan scan = - iceTable.newScan().useSnapshot(currentSnapshot.snapshotId()).includeColumnStats(); + TableScan scan = iceTable.newScan().useSnapshot(currentSnapshot.snapshotId()); + if (!skipColumnStats) { + scan = scan.includeColumnStats(); + } PartitionSpec partitionSpec = iceTable.spec(); List partitionedDataFiles; try (CloseableIterable files = scan.planFiles()) { @@ -199,7 +204,8 @@ private InternalDataFile fromIceberg( DataFile file, PartitionSpec partitionSpec, InternalTable internalTable) { List partitionValues = partitionConverter.toXTable(internalTable, file.partition(), partitionSpec); - return dataFileExtractor.fromIceberg(file, partitionValues, internalTable.getReadSchema()); + return dataFileExtractor.fromIceberg( + file, partitionValues, internalTable.getReadSchema(), !skipColumnStats); } @Override diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java index 449ebe5d3..6e52a187b 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSourceProvider.java @@ -27,9 +27,13 @@ public class IcebergConversionSourceProvider extends ConversionSourceProvider { @Override public IcebergConversionSource getConversionSourceInstance(SourceTable sourceTableConfig) { + boolean skipColumnStats = + IcebergSourceConfig.getSkipColumnStats( + sourceTableConfig.getAdditionalProperties(), hadoopConf); return IcebergConversionSource.builder() .sourceTableConfig(sourceTableConfig) .hadoopConf(hadoopConf) + .skipColumnStats(skipColumnStats) .build(); } } diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileExtractor.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileExtractor.java index 1b9d70f39..869cc27a9 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergDataFileExtractor.java @@ -53,7 +53,7 @@ InternalDataFile fromIceberg( return fromIceberg(dataFile, partitionValues, schema, true); } - private InternalDataFile fromIceberg( + InternalDataFile fromIceberg( DataFile dataFile, List partitionValues, InternalSchema schema, diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSourceConfig.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSourceConfig.java new file mode 100644 index 000000000..b02db68d9 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSourceConfig.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.iceberg; + +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; + +/** Configuration keys for Iceberg source format. */ +public final class IcebergSourceConfig { + public static final String SKIP_COLUMN_STATS_CONFIG = "xtable.source.skip_column_stats"; + + private IcebergSourceConfig() {} + + public static boolean getSkipColumnStats(Properties properties, Configuration configuration) { + if (properties != null) { + String propertyValue = properties.getProperty(SKIP_COLUMN_STATS_CONFIG); + if (propertyValue != null) { + return Boolean.parseBoolean(propertyValue); + } + } + if (configuration == null) { + return false; + } + String configValue = configuration.get(SKIP_COLUMN_STATS_CONFIG); + return configValue != null && Boolean.parseBoolean(configValue); + } +} + From e9f994e5458747c02589e1085f5fba17d840f41f Mon Sep 17 00:00:00 2001 From: nicolas-paris Date: Thu, 26 Feb 2026 14:39:57 +0100 Subject: [PATCH 10/10] add tests --- .../xtable/delta/DeltaConversionSource.java | 3 +- .../xtable/delta/DeltaSourceConfig.java | 2 +- .../iceberg/IcebergConversionSource.java | 3 +- .../xtable/iceberg/IcebergSourceConfig.java | 3 +- .../apache/xtable/ITConversionController.java | 100 ++++++++++++++++++ .../xtable/iceberg/TestIcebergSync.java | 6 +- 6 files changed, 109 insertions(+), 8 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java index 90ee9950b..323c2cd5e 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionSource.java @@ -70,8 +70,7 @@ public class DeltaConversionSource implements ConversionSource { @Builder.Default private final DeltaTableExtractor tableExtractor = DeltaTableExtractor.builder().build(); - @Builder.Default - private final boolean skipColumnStats = false; + @Builder.Default private final boolean skipColumnStats = false; private Optional deltaIncrementalChangesState = Optional.empty(); diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSourceConfig.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSourceConfig.java index eed9daa1d..5a247503d 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSourceConfig.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSourceConfig.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - + package org.apache.xtable.delta; import java.util.Properties; diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java index a747add8d..73ea161a8 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java @@ -87,8 +87,7 @@ public class IcebergConversionSource implements ConversionSource { private final IcebergDataFileExtractor dataFileExtractor = IcebergDataFileExtractor.builder().build(); - @Builder.Default - private final boolean skipColumnStats = false; + @Builder.Default private final boolean skipColumnStats = false; private Table initSourceTable() { IcebergTableManager tableManager = IcebergTableManager.of(hadoopConf); diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSourceConfig.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSourceConfig.java index b02db68d9..da1f47c3b 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSourceConfig.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSourceConfig.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - + package org.apache.xtable.iceberg; import java.util.Properties; @@ -42,4 +42,3 @@ public static boolean getSkipColumnStats(Properties properties, Configuration co return configValue != null && Boolean.parseBoolean(configValue); } } - diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java index 2da3078b6..af4e61d12 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java @@ -28,6 +28,7 @@ import static org.apache.xtable.model.storage.TableFormat.PARQUET; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.net.URI; import java.nio.ByteBuffer; @@ -103,9 +104,11 @@ import org.apache.xtable.hudi.HudiTestUtil; import org.apache.xtable.iceberg.IcebergConversionSourceProvider; import org.apache.xtable.iceberg.TestIcebergDataHelper; +import org.apache.xtable.model.storage.InternalDataFile; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.model.sync.SyncMode; import org.apache.xtable.paimon.PaimonConversionSourceProvider; +import org.apache.xtable.spi.extractor.ConversionSource; public class ITConversionController { @TempDir public static Path tempDir; @@ -174,6 +177,18 @@ private static Stream testCasesWithSyncModes() { return Stream.of(Arguments.of(SyncMode.INCREMENTAL), Arguments.of(SyncMode.FULL)); } + private static Stream sourceSkipColumnStatsAndSyncModes() { + List arguments = new ArrayList<>(); + for (String sourceFormat : Arrays.asList(HUDI, DELTA, ICEBERG)) { + for (SyncMode syncMode : SyncMode.values()) { + for (boolean skipColumnStats : new boolean[] {true, false}) { + arguments.add(Arguments.of(sourceFormat, syncMode, skipColumnStats)); + } + } + } + return arguments.stream(); + } + private ConversionSourceProvider getConversionSourceProvider(String sourceTableFormat) { switch (sourceTableFormat.toUpperCase()) { case HUDI: @@ -503,6 +518,60 @@ public void testTimeTravelQueries(String sourceTableFormat) throws Exception { } } + @ParameterizedTest + @MethodSource("sourceSkipColumnStatsAndSyncModes") + public void testSkipColumnStatsAcrossSources( + String sourceTableFormat, SyncMode syncMode, boolean skipColumnStats) throws Exception { + String tableName = getTableName(); + ConversionSourceProvider conversionSourceProvider = + getConversionSourceProvider(sourceTableFormat); + List targetTableFormats = getOtherFormats(sourceTableFormat); + try (GenericTable table = + GenericTable.getInstance(tableName, tempDir, sparkSession, jsc, sourceTableFormat, false)) { + table.insertRows(20); + try (ConversionSource conversionSource = + conversionSourceProvider.getConversionSourceInstance( + SourceTable.builder() + .name(tableName) + .formatName(sourceTableFormat) + .basePath(table.getBasePath()) + .dataPath(table.getDataPath()) + .additionalProperties( + getSourcePropertiesForSkipColumnStats(skipColumnStats, null)) + .build())) { + List dataFiles = + conversionSource.getCurrentSnapshot().getPartitionedDataFiles().stream() + .flatMap(group -> group.getDataFiles().stream()) + .collect(Collectors.toList()); + assertFalse(dataFiles.isEmpty(), "Expected at least one data file in source snapshot"); + boolean hasAnyColumnStats = + dataFiles.stream().anyMatch(file -> !file.getColumnStats().isEmpty()); + if (skipColumnStats) { + assertFalse( + hasAnyColumnStats, "Column stats should be skipped when skip_column_stats=true"); + } else { + assertTrue( + hasAnyColumnStats, "Column stats should be present when skip_column_stats=false"); + } + + ConversionConfig conversionConfig = + getTableSyncConfig( + sourceTableFormat, + syncMode, + tableName, + table, + targetTableFormats, + null, + null, + getSourcePropertiesForSkipColumnStats(skipColumnStats, null)); + ConversionController conversionController = + new ConversionController(jsc.hadoopConfiguration()); + conversionController.sync(conversionConfig, conversionSourceProvider); + checkDatasetEquivalence(sourceTableFormat, table, targetTableFormats, 20); + } + } + } + private static List getOtherFormats(String sourceTableFormat) { return Arrays.stream(TableFormat.values()) .filter(fmt -> !fmt.equals(sourceTableFormat)) @@ -1115,7 +1184,28 @@ private static ConversionConfig getTableSyncConfig( List targetTableFormats, String partitionConfig, Duration metadataRetention) { + return getTableSyncConfig( + sourceTableFormat, + syncMode, + tableName, + table, + targetTableFormats, + partitionConfig, + metadataRetention, + new Properties()); + } + + private static ConversionConfig getTableSyncConfig( + String sourceTableFormat, + SyncMode syncMode, + String tableName, + GenericTable table, + List targetTableFormats, + String partitionConfig, + Duration metadataRetention, + Properties additionalSourceProperties) { Properties sourceProperties = new Properties(); + sourceProperties.putAll(additionalSourceProperties); if (partitionConfig != null) { sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionConfig); } @@ -1147,4 +1237,14 @@ private static ConversionConfig getTableSyncConfig( .syncMode(syncMode) .build(); } + + private static Properties getSourcePropertiesForSkipColumnStats( + boolean skipColumnStats, String partitionConfig) { + Properties sourceProperties = new Properties(); + sourceProperties.put("xtable.source.skip_column_stats", String.valueOf(skipColumnStats)); + if (partitionConfig != null) { + sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionConfig); + } + return sourceProperties; + } } diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java index 33e4863ef..199a2c790 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java @@ -317,7 +317,11 @@ public void testSnapshotWithZeroRecordCountFileAndNoColumnStats() throws Excepti .syncSnapshot(Collections.singletonList(conversionTarget), snapshot); validateIcebergTable( - tableName, table, Sets.newHashSet(dataFileWithZeroCount), Expressions.alwaysTrue()); + tableName, + table, + Sets.newHashSet(dataFileWithZeroCount), + Expressions.alwaysTrue(), + icebergSchema); verify(mockColumnStatsConverter, times(1)) .toIceberg(any(Schema.class), eq(0L), eq(Collections.emptyList())); }