From 6f3322bc0ebef8f4b0f837e89860e239b3c83b98 Mon Sep 17 00:00:00 2001 From: Vinish Reddy Date: Mon, 9 Mar 2026 17:38:31 -0700 Subject: [PATCH] Fix batch INSERT_OVERWRITE replacecommits dropping adds in HudiDataFileExtractor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When XTable processes multiple INSERT_OVERWRITE replacecommit instants in a single batch (e.g., A replaces initial, B replaces A on the same partition), getUpdatesToPartitionForReplaceCommit uses getReplacedFileGroupsBeforeOrOn(A.timestamp) which excludes A's file groups (replaced by B, and B > A). Combined with getAllFileGroups also excluding them (globally marked replaced), replacecommit A emits 0 adds — causing downstream integrity failures. Fix: change getReplacedFileGroupsBeforeOrOn -> getAllReplacedFileGroups in the replace commit handler, consistent with how getUpdatesToPartition already handles regular commits. The newFileIds/replacedFileIds sets from the commit's write stats already ensure only the correct files are added or removed. Added insertOverwrite() test helper to TestSparkHudiTable and an integration test testMultipleInsertOverwriteOnSamePartitions that reproduces and validates the fix. --- .../xtable/hudi/HudiDataFileExtractor.java | 4 +- .../org/apache/xtable/TestSparkHudiTable.java | 13 +++++ .../xtable/hudi/ITHudiConversionSource.java | 48 +++++++++++++++++++ 3 files changed, 62 insertions(+), 3 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java index 5e17b389f..f3b73ec2f 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java @@ -326,9 +326,7 @@ private AddedAndRemovedFiles getUpdatesToPartitionForReplaceCommit( partitionValuesExtractor.extractPartitionValues(partitioningFields, partitionPath); Stream fileGroups = Stream.concat( - fsView.getAllFileGroups(partitionPath), - fsView.getReplacedFileGroupsBeforeOrOn( - instantToConsider.getTimestamp(), partitionPath)); + fsView.getAllFileGroups(partitionPath), fsView.getAllReplacedFileGroups(partitionPath)); fileGroups.forEach( fileGroup -> { List baseFiles = fileGroup.getAllBaseFiles().collect(Collectors.toList()); diff --git a/xtable-core/src/test/java/org/apache/xtable/TestSparkHudiTable.java b/xtable-core/src/test/java/org/apache/xtable/TestSparkHudiTable.java index 1aaf61f96..ab8086822 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestSparkHudiTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestSparkHudiTable.java @@ -220,6 +220,19 @@ public void deletePartition(String partition, HoodieTableType tableType) { assertNoWriteErrors(result); } + public List> insertOverwrite( + List> records) { + String actionType = + CommitUtils.getCommitActionType( + WriteOperationType.INSERT_OVERWRITE, HoodieTableType.COPY_ON_WRITE); + String instant = getStartCommitOfActionType(actionType); + JavaRDD> writeRecords = jsc.parallelize(records, 1); + HoodieWriteResult writeResult = writeClient.insertOverwrite(writeRecords, instant); + List result = writeResult.getWriteStatuses().collect(); + assertNoWriteErrors(result); + return records; + } + public void cluster() { String instant = writeClient.scheduleClustering(Option.empty()).get(); writeClient.cluster(instant, true); diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java index 99d932f7b..ae9ab5dd0 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java @@ -419,6 +419,54 @@ public void testsForDropPartition(HoodieTableType tableType) { } } + @Test + public void testMultipleInsertOverwriteOnSamePartitions() { + String tableName = "test_table_" + UUID.randomUUID(); + try (TestSparkHudiTable table = + TestSparkHudiTable.forStandardSchema( + tableName, tempDir, jsc, "level:SIMPLE", HoodieTableType.COPY_ON_WRITE)) { + List> allBaseFilePaths = new ArrayList<>(); + List allTableChanges = new ArrayList<>(); + + // Initial insert into partition "INFO" + String commitInstant1 = table.startCommit(); + List> insertsForCommit1 = table.generateRecords(50, "INFO"); + table.insertRecordsWithCommitAlreadyStarted(insertsForCommit1, commitInstant1, true); + allBaseFilePaths.add(table.getAllLatestBaseFilePaths()); + + // INSERT_OVERWRITE on "INFO" partition (replacecommit A — new file groups replace initial) + List> overwriteRecords1 = table.generateRecords(30, "INFO"); + table.insertOverwrite(overwriteRecords1); + allBaseFilePaths.add(table.getAllLatestBaseFilePaths()); + + // INSERT_OVERWRITE on "INFO" partition again (replacecommit B — new file groups replace A's) + List> overwriteRecords2 = table.generateRecords(20, "INFO"); + table.insertOverwrite(overwriteRecords2); + allBaseFilePaths.add(table.getAllLatestBaseFilePaths()); + + HudiConversionSource hudiClient = + getHudiSourceClient(CONFIGURATION, table.getBasePath(), "level:VALUE"); + // Get the current snapshot + InternalSnapshot internalSnapshot = hudiClient.getCurrentSnapshot(); + ValidationTestHelper.validateSnapshot( + internalSnapshot, allBaseFilePaths.get(allBaseFilePaths.size() - 1)); + // Get changes in Incremental format since the initial insert + InstantsForIncrementalSync instantsForIncrementalSync = + InstantsForIncrementalSync.builder() + .lastSyncInstant(HudiInstantUtils.parseFromInstantTime(commitInstant1)) + .build(); + CommitsBacklog instantCommitsBacklog = + hudiClient.getCommitsBacklog(instantsForIncrementalSync); + for (HoodieInstant instant : instantCommitsBacklog.getCommitsToProcess()) { + TableChange tableChange = hudiClient.getTableChangeForCommit(instant); + allTableChanges.add(tableChange); + } + // Without the fix, replacecommit A would have 0 adds because the FileSystemView + // built from the full timeline marks A's file groups as replaced by B. + ValidationTestHelper.validateTableChanges(allBaseFilePaths, allTableChanges); + } + } + @ParameterizedTest @MethodSource("testsForAllTableTypes") public void testsForDeleteAllRecordsInPartition(HoodieTableType tableType) {