diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java index 5e17b389f..f3b73ec2f 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiDataFileExtractor.java @@ -326,9 +326,7 @@ private AddedAndRemovedFiles getUpdatesToPartitionForReplaceCommit( partitionValuesExtractor.extractPartitionValues(partitioningFields, partitionPath); Stream fileGroups = Stream.concat( - fsView.getAllFileGroups(partitionPath), - fsView.getReplacedFileGroupsBeforeOrOn( - instantToConsider.getTimestamp(), partitionPath)); + fsView.getAllFileGroups(partitionPath), fsView.getAllReplacedFileGroups(partitionPath)); fileGroups.forEach( fileGroup -> { List baseFiles = fileGroup.getAllBaseFiles().collect(Collectors.toList()); diff --git a/xtable-core/src/test/java/org/apache/xtable/TestSparkHudiTable.java b/xtable-core/src/test/java/org/apache/xtable/TestSparkHudiTable.java index 1aaf61f96..ab8086822 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestSparkHudiTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestSparkHudiTable.java @@ -220,6 +220,19 @@ public void deletePartition(String partition, HoodieTableType tableType) { assertNoWriteErrors(result); } + public List> insertOverwrite( + List> records) { + String actionType = + CommitUtils.getCommitActionType( + WriteOperationType.INSERT_OVERWRITE, HoodieTableType.COPY_ON_WRITE); + String instant = getStartCommitOfActionType(actionType); + JavaRDD> writeRecords = jsc.parallelize(records, 1); + HoodieWriteResult writeResult = writeClient.insertOverwrite(writeRecords, instant); + List result = writeResult.getWriteStatuses().collect(); + assertNoWriteErrors(result); + return records; + } + public void cluster() { String instant = writeClient.scheduleClustering(Option.empty()).get(); writeClient.cluster(instant, true); diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java index 99d932f7b..ae9ab5dd0 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSource.java @@ -419,6 +419,54 @@ public void testsForDropPartition(HoodieTableType tableType) { } } + @Test + public void testMultipleInsertOverwriteOnSamePartitions() { + String tableName = "test_table_" + UUID.randomUUID(); + try (TestSparkHudiTable table = + TestSparkHudiTable.forStandardSchema( + tableName, tempDir, jsc, "level:SIMPLE", HoodieTableType.COPY_ON_WRITE)) { + List> allBaseFilePaths = new ArrayList<>(); + List allTableChanges = new ArrayList<>(); + + // Initial insert into partition "INFO" + String commitInstant1 = table.startCommit(); + List> insertsForCommit1 = table.generateRecords(50, "INFO"); + table.insertRecordsWithCommitAlreadyStarted(insertsForCommit1, commitInstant1, true); + allBaseFilePaths.add(table.getAllLatestBaseFilePaths()); + + // INSERT_OVERWRITE on "INFO" partition (replacecommit A — new file groups replace initial) + List> overwriteRecords1 = table.generateRecords(30, "INFO"); + table.insertOverwrite(overwriteRecords1); + allBaseFilePaths.add(table.getAllLatestBaseFilePaths()); + + // INSERT_OVERWRITE on "INFO" partition again (replacecommit B — new file groups replace A's) + List> overwriteRecords2 = table.generateRecords(20, "INFO"); + table.insertOverwrite(overwriteRecords2); + allBaseFilePaths.add(table.getAllLatestBaseFilePaths()); + + HudiConversionSource hudiClient = + getHudiSourceClient(CONFIGURATION, table.getBasePath(), "level:VALUE"); + // Get the current snapshot + InternalSnapshot internalSnapshot = hudiClient.getCurrentSnapshot(); + ValidationTestHelper.validateSnapshot( + internalSnapshot, allBaseFilePaths.get(allBaseFilePaths.size() - 1)); + // Get changes in Incremental format since the initial insert + InstantsForIncrementalSync instantsForIncrementalSync = + InstantsForIncrementalSync.builder() + .lastSyncInstant(HudiInstantUtils.parseFromInstantTime(commitInstant1)) + .build(); + CommitsBacklog instantCommitsBacklog = + hudiClient.getCommitsBacklog(instantsForIncrementalSync); + for (HoodieInstant instant : instantCommitsBacklog.getCommitsToProcess()) { + TableChange tableChange = hudiClient.getTableChangeForCommit(instant); + allTableChanges.add(tableChange); + } + // Without the fix, replacecommit A would have 0 adds because the FileSystemView + // built from the full timeline marks A's file groups as replaced by B. + ValidationTestHelper.validateTableChanges(allBaseFilePaths, allTableChanges); + } + } + @ParameterizedTest @MethodSource("testsForAllTableTypes") public void testsForDeleteAllRecordsInPartition(HoodieTableType tableType) {