From cbb35b12b5448d932ced0c5ce3548e7d78427a86 Mon Sep 17 00:00:00 2001 From: arjit3251 Date: Tue, 16 Jun 2026 12:24:36 +0530 Subject: [PATCH 1/2] Determine Iceberg copy set from dest catalog; order data before metadata on publish Two related fixes for inter-cluster Iceberg full-table copy: 1. Determine which files to copy by diffing the source table against what the DESTINATION Iceberg catalog references, rather than probing each file's presence on the destination filesystem (config-gated, default on: iceberg.dataset.determine.copy.from.dest.catalog). Because the dest table is committed only after a fully successful publish, every path it references is guaranteed present and consistent, while orphan metadata left on the dest filesystem by a prior partially-failed run is NOT referenced -- so its missing data files are still re-copied. This prevents the table corruption that filesystem-presence short-circuiting can cause (a metadata file present on the dest FS skips its whole subtree forever, leaving the committed table referencing data files that never get copied). 2. On publish, rename data files before metadata files (HadoopUtils.renameRecursivelyOrdered) so Iceberg metadata is never committed ahead of the data it references. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../copy/iceberg/IcebergDataset.java | 137 +++++++++++++----- .../copy/iceberg/IcebergDatasetFinder.java | 19 +++ .../copy/publisher/CopyDataPublisher.java | 4 +- .../copy/iceberg/IcebergDatasetTest.java | 116 ++++++++++++++- .../org/apache/gobblin/util/HadoopUtils.java | 94 ++++++++++++ 5 files changed, 324 insertions(+), 46 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java index b0a6d082363..6a2bfdc8930 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -39,6 +40,7 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import javax.annotation.concurrent.NotThreadSafe; import lombok.Data; @@ -207,28 +209,51 @@ protected TableMetadata getCurrentDestTableMetadata() throws IOException { protected GetFilePathsToFileStatusResult getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig, boolean shouldIncludeMetadataPath) throws IOException { IcebergTable icebergTable = this.getSrcIcebergTable(); - /** @return whether `pathStr` is present on `targetFs`, caching results while tunneling checked exceptions outward */ - Function isPresentOnTarget = CheckedExceptionFunction.wrapToTunneled(pathStr -> - // omit considering timestamp (or other markers of freshness), as files should be immutable - // ATTENTION: `CopyContext.getFileStatus()`, to partake in caching - copyConfig.getCopyContext().getFileStatus(targetFs, new Path(pathStr)).isPresent() - ); - // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg - // NOTE: if `shouldIncludeMetadataPath` was false during the prior executions, this condition will be false + // Decide which source files are "already copied" to the destination. Two strategies (config-gated): + // (a) DEST-CATALOG DIFF (default): a path is already-copied iff the DESTINATION Iceberg table's committed metadata + // references it. Because the dest table is committed (`IcebergRegisterStep`, a post-publish step) only AFTER a + // fully successful publish, every path the dest catalog references is guaranteed present AND part of a + // self-consistent snapshot -- so the subtree short-circuit below is sound. Crucially, orphan metadata left on + // the dest *filesystem* by a prior partially-failed run is NOT referenced by the dest catalog, so its missing + // data files are still re-copied. This averts the table corruption that pure filesystem-presence checking + // causes: there, a metadata file present on the dest FS would skip its whole subtree forever, leaving the + // committed table permanently referencing data files that never get copied. + // (b) FILESYSTEM PRESENCE (legacy): a path is already-copied iff it physically exists on `targetFs`. + final boolean determineCopyFromDestCatalog = + IcebergDatasetFinder.getConfigShouldDetermineCopyFromDestCatalog(this.properties); + IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly(); - if (currentSnapshotOverview.getMetadataPath().map(isPresentOnTarget).orElse(false) && - isPresentOnTarget.apply(currentSnapshotOverview.getManifestListPath())) { - log.info("~{}~ skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target", - this.getFileSetId(), currentSnapshotOverview.getSnapshotId(), - currentSnapshotOverview.getManifestListPath(), - currentSnapshotOverview.getMetadataPath().orElse("<>")); - TableMetadata readTimeTableMetadata = currentSnapshotOverview.getTableMetadata().orElseThrow(() -> new RuntimeException( - String.format("~%s~ no table metadata for current snapshot '%s' at '%s' with metadata path '%s'", - this.getFileSetId(), currentSnapshotOverview.getSnapshotId(), - currentSnapshotOverview.getManifestListPath(), - currentSnapshotOverview.getMetadataPath().orElse("<>")))); - return new GetFilePathsToFileStatusResult(Maps.newHashMap(), readTimeTableMetadata); + final Function isAlreadyCopied; + if (determineCopyFromDestCatalog) { + // cheap steady-state short-circuit: if the dest table's current snapshot IS the source's current snapshot (same, + // path-preserved manifest list), the tables are in sync -- skip without reading either side's manifests + Optional destCurrentManifestListPath = getDestCurrentManifestListPath(); + if (destCurrentManifestListPath.map(currentSnapshotOverview.getManifestListPath()::equals).orElse(false)) { + log.info("~{}~ skipping entire iceberg, since dest table is already at source current snapshot '{}' ('{}')", + this.getFileSetId(), currentSnapshotOverview.getSnapshotId(), currentSnapshotOverview.getManifestListPath()); + return new GetFilePathsToFileStatusResult(Maps.newHashMap(), getRequiredCurrentTableMetadata(currentSnapshotOverview)); + } + // not in sync: enumerate every path the dest catalog references (across its retained snapshots) and diff against it + Set destReferencedPaths = collectDestTableReferencedPaths(shouldIncludeMetadataPath); + log.info("~{}~ destination catalog references {} paths; diffing source against it to determine files to copy", + this.getFileSetId(), destReferencedPaths.size()); + isAlreadyCopied = destReferencedPaths::contains; + } else { + // omit considering timestamp (or other markers of freshness), as files should be immutable + // ATTENTION: `CopyContext.getFileStatus()`, to partake in caching + isAlreadyCopied = CheckedExceptionFunction.wrapToTunneled(pathStr -> + copyConfig.getCopyContext().getFileStatus(targetFs, new Path(pathStr)).isPresent()); + // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg + // NOTE: if `shouldIncludeMetadataPath` was false during the prior executions, this condition will be false + if (currentSnapshotOverview.getMetadataPath().map(isAlreadyCopied).orElse(false) && + isAlreadyCopied.apply(currentSnapshotOverview.getManifestListPath())) { + log.info("~{}~ skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target", + this.getFileSetId(), currentSnapshotOverview.getSnapshotId(), + currentSnapshotOverview.getManifestListPath(), + currentSnapshotOverview.getMetadataPath().orElse("<>")); + return new GetFilePathsToFileStatusResult(Maps.newHashMap(), getRequiredCurrentTableMetadata(currentSnapshotOverview)); + } } List readTimeTableMetadataHolder = Lists.newArrayList(); // expecting exactly one elem @@ -241,26 +266,19 @@ protected GetFilePathsToFileStatusResult getFilePathsToFileStatus(FileSystem tar log.info("~{}~ loaded snapshot '{}' at '{}' from metadata path: '{}'", this.getFileSetId(), snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<>")); // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data; - // most critically, all are presumed immutable and uniquely named, although any may be replaced. we depend - // also on incremental copy being run always atomically: to commit each iceberg only upon its full success. - // thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is - // already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date. - // hence, its entire subtree may be short-circuited. nevertheless, absence of a file at dest cannot imply - // its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some - // metadata files would have been replaced (e.g. during snapshot compaction). in such instances, at least - // some of the children pointed to within could have been copied prior, when they previously appeared as a - // child of the current file's predecessor (which this new meta file now replaces). - if (!isPresentOnTarget.apply(manListPath)) { + // all are presumed immutable and uniquely named, although any may be replaced. judging "already-copied" by + // the destination catalog (default; see above), a referenced manifest-list/manifest guarantees its entire + // subtree was committed--hence already fully copied--so its subtree may be short-circuited. nevertheless, + // absence cannot imply its entire subtree necessarily requires copying, because it is possible, even likely + // in practice, that some metadata files would have been replaced (e.g. during snapshot compaction). in such + // instances, at least some of the children pointed to within could have been copied prior, when they + // previously appeared as a child of the current file's predecessor (which this new meta file now replaces). + if (!isAlreadyCopied.apply(manListPath)) { List missingPaths = snapshotInfo.getSnapshotApexPaths(shouldIncludeMetadataPath); for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) { - if (!isPresentOnTarget.apply(mfi.getManifestFilePath())) { + if (!isAlreadyCopied.apply(mfi.getManifestFilePath())) { missingPaths.add(mfi.getManifestFilePath()); // being incremental info, no listed paths would have appeared prior w/ other snapshots, so add all now. - // skip verification despite corner case of a snapshot having reorganized/rebalanced manifest contents - // during a period where replication fell so far behind that no snapshots listed among current metadata - // are yet at dest. since the consequence of unnecessary copy is merely wasted data transfer and - // compute--and overall, potential is small--prefer sidestepping expense of exhaustive checking, since - // file count may run into 100k+ (even beyond!) missingPaths.addAll(mfi.getListedFilePaths()); } } @@ -268,13 +286,13 @@ protected GetFilePathsToFileStatusResult getFilePathsToFileStatus(FileSystem tar this.getFileSetId(), snapshotInfo.getSnapshotId(), missingPaths.size()); return missingPaths.iterator(); } else { - log.info("~{}~ snapshot '{}' already present on target... skipping (including contents)", + log.info("~{}~ snapshot '{}' already present on destination... skipping (including contents)", this.getFileSetId(), snapshotInfo.getSnapshotId()); // IMPORTANT: separately consider metadata path, to handle case of 'metadata-only' snapshot reusing manifest list Optional metadataPath = snapshotInfo.getMetadataPath(); - Optional nonReplicatedMetadataPath = metadataPath.filter(p -> !isPresentOnTarget.apply(p)); + Optional nonReplicatedMetadataPath = metadataPath.filter(p -> !isAlreadyCopied.apply(p)); metadataPath.ifPresent(ignore -> - log.info("~{}~ metadata IS {} present on target", + log.info("~{}~ metadata IS {} present on destination", this.getFileSetId(), !nonReplicatedMetadataPath.isPresent() ? "already" @@ -335,6 +353,47 @@ protected GetFilePathsToFileStatusResult getFilePathsToFileStatus(FileSystem tar return new GetFilePathsToFileStatusResult(results, readTimeTableMetadataHolder.get(0)); } + /** + * @return the manifest-list path of the destination table's current snapshot, or {@link Optional#empty()} if the + * destination table does not exist or has no snapshot yet (i.e. nothing committed to short-circuit against) + */ + private Optional getDestCurrentManifestListPath() throws IOException { + try { + return Optional.of(this.destIcebergTable.getCurrentSnapshotInfoOverviewOnly().getManifestListPath()); + } catch (IcebergTable.TableNotFoundException | IcebergTable.NoSnapshotFoundException e) { + log.info("~{}~ destination table has no current snapshot to short-circuit against ({})", + this.getFileSetId(), e.getMessage()); + return Optional.empty(); + } + } + + /** + * @return every file path the destination Iceberg table's catalog currently references, across all of its retained + * snapshots. Because the dest table is committed only after a fully successful publish, every such path is guaranteed + * already present and part of a consistent snapshot. Returns an empty set if the destination table does not exist yet + * (first-ever copy), in which case all source files will be copied. + */ + private Set collectDestTableReferencedPaths(boolean shouldIncludeMetadataPath) throws IOException { + Set referencedPaths = Sets.newHashSet(); + try { + Iterator destSnapshotInfos = this.destIcebergTable.getIncrementalSnapshotInfosIterator(); + while (destSnapshotInfos.hasNext()) { + referencedPaths.addAll(destSnapshotInfos.next().getAllPaths(shouldIncludeMetadataPath)); + } + } catch (IcebergTable.TableNotFoundException tnfe) { + log.info("~{}~ destination table not found ('{}'); treating as empty, so all source files will be copied", + this.getFileSetId(), tnfe.getTableId()); + } + return referencedPaths; + } + + /** @return the {@link TableMetadata} carried by {@code currentSnapshotOverview}, or throw if (unexpectedly) absent */ + private TableMetadata getRequiredCurrentTableMetadata(IcebergSnapshotInfo currentSnapshotOverview) { + return currentSnapshotOverview.getTableMetadata().orElseThrow(() -> new RuntimeException( + String.format("~%s~ no table metadata for current snapshot '%s' at '%s'", this.getFileSetId(), + currentSnapshotOverview.getSnapshotId(), currentSnapshotOverview.getManifestListPath()))); + } + /** * Stateful object to consolidate error messages (e.g. for logging), per a {@link Path} consolidation strategy. * OVERVIEW: to avoid run-away logging into the 1000s of lines, consolidate to parent (directory) level: diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java index f13a2ed8ebb..6ba39bb2f55 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java @@ -56,6 +56,20 @@ public class IcebergDatasetFinder implements IterableDatasetFinder every source file is copied */ + @Test + public void testCatalogDiffCopiesEverythingWhenDestTableAbsent() throws IOException { + Set expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), true, SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0); + validateCatalogDiffGetFilePaths(Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0), mockAbsentDestTable(), + expectedResultPaths, true); + } + + /** dest catalog is one snapshot behind => only the newer snapshot's files are copied */ + @Test + public void testCatalogDiffCopiesOnlyDeltaWhenDestBehind() throws IOException { + IcebergTable destTable = MockIcebergTable.withSnapshots(destTableId(), Lists.newArrayList(SNAPSHOT_PATHS_1)); + Set expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), true, SNAPSHOT_PATHS_0); + validateCatalogDiffGetFilePaths(Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0), destTable, + expectedResultPaths, true); + } + + /** + * THE CORRUPTION GUARD: even if a manifest + manifest-list were left on the dest *filesystem* by a prior failed run + * (here the dest catalog still references an unrelated snapshot), the source snapshot's files are ALL re-copied, + * because the dest catalog does not reference them. Filesystem-presence short-circuiting would instead skip them. + */ + @Test + public void testCatalogDiffReCopiesFilesOrphanedOnDestFsButNotCommitted() throws IOException { + // dest catalog committed only SNAPSHOT_PATHS_1; SNAPSHOT_PATHS_0 (the source current) is NOT referenced by it + IcebergTable destTable = MockIcebergTable.withSnapshots(destTableId(), Lists.newArrayList(SNAPSHOT_PATHS_1)); + Set expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), true, SNAPSHOT_PATHS_0); + validateCatalogDiffGetFilePaths(Lists.newArrayList(SNAPSHOT_PATHS_0), destTable, expectedResultPaths, true); + } + /** Exception wrapping is used internally--ensure that doesn't lapse into silently swallowing errors */ @Test(expectedExceptions = IOException.class) public void testGetFilePathsDoesNotSwallowDestFileSystemException() throws IOException { @@ -230,7 +280,7 @@ public void testGetFilePathsDoesNotSwallowDestFileSystemException() throws IOExc MockFileSystemBuilder sourceFsBuilder = new MockFileSystemBuilder(SRC_FS_URI); FileSystem sourceFs = sourceFsBuilder.build(); boolean shouldIncludeMetadataPathMakesNoDifference = true; - IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, null, new Properties(), sourceFs, shouldIncludeMetadataPathMakesNoDifference); + IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, null, legacyFsPresenceProperties(), sourceFs, shouldIncludeMetadataPathMakesNoDifference); MockFileSystemBuilder destFsBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destFsBuilder.build(); @@ -272,7 +322,7 @@ public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException { IcebergTable srcIcebergTbl = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_0)); IcebergTable destIcebergTbl = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_1)); boolean shouldIncludeManifestPath = true; - IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTbl, destIcebergTbl, new Properties(), sourceFs, shouldIncludeManifestPath); + IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTbl, destIcebergTbl, legacyFsPresenceProperties(), sourceFs, shouldIncludeManifestPath); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -299,7 +349,7 @@ public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws IOExcept IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0)); IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_1)); boolean shouldIncludeManifestPath = false; - IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, destIcebergTable, new Properties(), sourceFs, shouldIncludeManifestPath); + IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, destIcebergTable, legacyFsPresenceProperties(), sourceFs, shouldIncludeManifestPath); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -331,7 +381,7 @@ public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() throws IOExc IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_0)); IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_1)); boolean shouldIncludeManifestPath = true; - IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, destIcebergTable, new Properties(), sourceFs, shouldIncludeManifestPath); + IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, destIcebergTable, legacyFsPresenceProperties(), sourceFs, shouldIncludeManifestPath); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -361,7 +411,7 @@ public void testFsOwnershipAndPermissionWithoutPreservationWhenDestEmpty() throw IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_0)); IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_1)); boolean shouldIncludeManifestPath = true; - IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, destIcebergTable, new Properties(), sourceFs, shouldIncludeManifestPath); + IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, destIcebergTable, legacyFsPresenceProperties(), sourceFs, shouldIncludeManifestPath); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -385,6 +435,14 @@ protected IcebergTable validateGetFilePathsGivenDestState(List sourceSnapshotPathSets, + List existingDestPaths, Set expectedResultPaths, boolean shouldIncludeMetadataPath, + Properties datasetProperties) throws IOException { + return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, Optional.empty(), existingDestPaths, + expectedResultPaths, shouldIncludeMetadataPath, datasetProperties); + } + /** * exercise {@link IcebergDataset::getFilePaths} and validate the result * @return {@link IcebergTable} (mock!), for behavioral verification @@ -392,12 +450,23 @@ protected IcebergTable validateGetFilePathsGivenDestState(List sourceSnapshotPathSets, Optional> optExistingSourcePaths, List existingDestPaths, Set expectedResultPaths, boolean shouldIncludeMetadataPath) throws IOException { + return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, optExistingSourcePaths, existingDestPaths, + expectedResultPaths, shouldIncludeMetadataPath, legacyFsPresenceProperties()); + } + + /** + * exercise {@link IcebergDataset::getFilePaths} and validate the result + * @return {@link IcebergTable} (mock!), for behavioral verification + */ + protected IcebergTable validateGetFilePathsGivenDestState(List sourceSnapshotPathSets, + Optional> optExistingSourcePaths, List existingDestPaths, Set expectedResultPaths, + boolean shouldIncludeMetadataPath, Properties datasetProperties) throws IOException { IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(TableIdentifier.of(testDbName, testTblName), sourceSnapshotPathSets); MockFileSystemBuilder sourceFsBuilder = new MockFileSystemBuilder(SRC_FS_URI, !optExistingSourcePaths.isPresent()); optExistingSourcePaths.ifPresent(sourceFsBuilder::addPaths); FileSystem sourceFs = sourceFsBuilder.build(); - IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, null, new Properties(), sourceFs, shouldIncludeMetadataPath); + IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, null, datasetProperties, sourceFs, shouldIncludeMetadataPath); MockFileSystemBuilder destFsBuilder = new MockFileSystemBuilder(DEST_FS_URI); destFsBuilder.addPaths(existingDestPaths); @@ -414,6 +483,41 @@ protected IcebergTable validateGetFilePathsGivenDestState(List sourceSnapshotPathSets, + IcebergTable destIcebergTable, Set expectedResultPaths, boolean shouldIncludeMetadataPath) throws IOException { + IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(TableIdentifier.of(testDbName, testTblName), sourceSnapshotPathSets); + FileSystem sourceFs = new MockFileSystemBuilder(SRC_FS_URI, true).build(); // every source path "exists" + // default properties => dest-catalog diff (ICEBERG_DATASET_DETERMINE_COPY_FROM_DEST_CATALOG defaults to true) + IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, destIcebergTable, new Properties(), sourceFs, shouldIncludeMetadataPath); + + // dest FS is unused by catalog diff, but a CopyConfiguration still requires a target FileSystem + FileSystem destFs = new MockFileSystemBuilder(DEST_FS_URI, true).build(); + CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs); + + IcebergDataset.GetFilePathsToFileStatusResult pathsResult = + icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration, shouldIncludeMetadataPath); + Assert.assertEquals(pathsResult.getPathsToFileStatus().keySet(), expectedResultPaths); + } + /** @return `paths` after adding to it all paths of every one of `snapshotDefs` */ protected static Set withAllSnapshotPaths(Set paths, boolean shouldIncludeMetadataPath, MockIcebergTable.SnapshotPaths... snapshotDefs) { Arrays.stream(snapshotDefs).flatMap(snapshotDef -> diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java index f7070144c90..e78f4771d36 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java @@ -31,13 +31,16 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Queue; +import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -570,6 +573,97 @@ public static void renameRecursively(FileSystem fileSystem, Path from, Path to) } } + /** + * Same as {@link #renameRecursively(FileSystem, Path, Path)} but renames the immediate children of {@code from} + * in priority order: all children in a lower-priority group are renamed (recursively, to completion) before any + * child in the next group begins. Within a single group the renames still run concurrently. + * + *

This is used by Iceberg copy publishing to guarantee that {@code data/} files are committed before + * {@code metadata/} files, since Iceberg metadata references data files by path and must never be visible + * before the data it points to.

+ * + *

NOTE: ordering is applied at the level of the immediate children of {@code from}. This only separates + * {@code data} from {@code metadata} if those are the immediate children of {@code from} (i.e. the rename root sits + * at the table root). If they are nested deeper, {@code priorityFn} sees only the wrapper directory and the ordering + * is lost.

+ * + * @param fileSystem on which the data needs to be moved + * @param from path of the data to be moved + * @param to path of the data to be moved + * @param priorityFn maps an immediate child path of {@code from} to a priority; groups are processed in ascending + * priority order + */ + public static void renameRecursivelyOrdered(FileSystem fileSystem, Path from, Path to, + Function priorityFn) throws IOException { + + log.info(String.format("Recursively renaming (ordered) %s in %s to %s.", from, fileSystem.getUri(), to)); + + FileSystem throttledFS = getOptionallyThrottledFileSystem(fileSystem, 10000); + + if (!fileSystem.exists(from)) { + throw new IOException("Trying to rename a path that does not exist! " + from); + } + + FileStatus fromStatus = fileSystem.getFileStatus(from); + + // Fast path: if the whole tree can move atomically (target absent) or it's a single file, ordering is + // irrelevant -- defer to the standard concurrent implementation. + if (!fromStatus.isDirectory() || !fileSystem.exists(to)) { + renameRecursively(fileSystem, from, to); + return; + } + + // Target exists, so we descend to file level. Group immediate children by priority (TreeMap keeps ascending + // order) and fully complete each group before starting the next. + Map> grouped = new TreeMap<>(); + for (FileStatus child : fileSystem.listStatus(from)) { + grouped.computeIfAbsent(priorityFn.apply(child.getPath()), k -> Lists.newArrayList()).add(child); + } + + ExecutorService executorService = ScalingThreadPoolExecutor.newScalingThreadPool(1, 100, 100, + ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("rename-thread-%d"))); + try { + for (Entry> group : grouped.entrySet()) { + Queue> futures = Queues.newConcurrentLinkedQueue(); + for (FileStatus child : group.getValue()) { + Path relativeFilePath = new Path(StringUtils.substringAfter(child.getPath().toString(), + from.toString() + Path.SEPARATOR)); + Path toFilePath = new Path(to, relativeFilePath); + futures.add(executorService.submit( + new RenameRecursively(throttledFS, child, toFilePath, executorService, futures))); + } + // Barrier: drain this priority group completely before moving to the next. + while (!futures.isEmpty()) { + try { + futures.poll().get(); + } catch (ExecutionException | InterruptedException ee) { + throw new IOException(ee.getCause()); + } + } + log.info(String.format("Completed ordered rename group priority=%d (%d children) under %s.", + group.getKey(), group.getValue().size(), from)); + } + } finally { + ExecutorsUtils.shutdownExecutorService(executorService, Optional.of(log), 1, TimeUnit.SECONDS); + } + } + + /** + * Default priority for Iceberg distcp publishing: {@code data/} first, {@code metadata/} last, anything else in + * between. Intended for use as the {@code priorityFn} of + * {@link #renameRecursivelyOrdered(FileSystem, Path, Path, Function)}. + */ + public static int icebergRenamePriority(Path immediateChild) { + String name = immediateChild.getName(); + if ("data".equals(name)) { + return 0; + } + if ("metadata".equals(name)) { + return 2; + } + return 1; + } + /** * Calls {@link #getOptionallyThrottledFileSystem(FileSystem, int)} parsing the qps from the input {@link State} * at key {@link #MAX_FILESYSTEM_QPS}. From 2b83227736d1322b748377a9de8567c5c0146814 Mon Sep 17 00:00:00 2001 From: arjit3251 Date: Tue, 16 Jun 2026 12:30:25 +0530 Subject: [PATCH 2/2] Propagate job properties to Work units --- .../copy/publisher/CopyDataPublisher.java | 4 +- .../org/apache/gobblin/util/HadoopUtils.java | 94 ------------------- 2 files changed, 1 insertion(+), 97 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java index b29b4bf495c..5c32dcf440a 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java @@ -342,9 +342,7 @@ private void publishFileSet(CopyEntity.DatasetAndPartition datasetAndPartition, if (statesHelper.hasAnyCopyableFile()) { // Targets are always absolute, so we start moving from root (will skip any existing directories). - // Rename data files before metadata files so Iceberg metadata is never published ahead of the data it references. - HadoopUtils.renameRecursivelyOrdered(this.fs, datasetWriterOutputPath, new Path("/"), - HadoopUtils::icebergRenamePriority); + HadoopUtils.renameRecursively(this.fs, datasetWriterOutputPath, new Path("/")); } else { log.info("[{}] No copyable files in dataset. Proceeding to post-publish steps.", datasetAndPartition.identifier()); } diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java index e78f4771d36..f7070144c90 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java @@ -31,16 +31,13 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Map.Entry; import java.util.Properties; import java.util.Queue; -import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -573,97 +570,6 @@ public static void renameRecursively(FileSystem fileSystem, Path from, Path to) } } - /** - * Same as {@link #renameRecursively(FileSystem, Path, Path)} but renames the immediate children of {@code from} - * in priority order: all children in a lower-priority group are renamed (recursively, to completion) before any - * child in the next group begins. Within a single group the renames still run concurrently. - * - *

This is used by Iceberg copy publishing to guarantee that {@code data/} files are committed before - * {@code metadata/} files, since Iceberg metadata references data files by path and must never be visible - * before the data it points to.

- * - *

NOTE: ordering is applied at the level of the immediate children of {@code from}. This only separates - * {@code data} from {@code metadata} if those are the immediate children of {@code from} (i.e. the rename root sits - * at the table root). If they are nested deeper, {@code priorityFn} sees only the wrapper directory and the ordering - * is lost.

- * - * @param fileSystem on which the data needs to be moved - * @param from path of the data to be moved - * @param to path of the data to be moved - * @param priorityFn maps an immediate child path of {@code from} to a priority; groups are processed in ascending - * priority order - */ - public static void renameRecursivelyOrdered(FileSystem fileSystem, Path from, Path to, - Function priorityFn) throws IOException { - - log.info(String.format("Recursively renaming (ordered) %s in %s to %s.", from, fileSystem.getUri(), to)); - - FileSystem throttledFS = getOptionallyThrottledFileSystem(fileSystem, 10000); - - if (!fileSystem.exists(from)) { - throw new IOException("Trying to rename a path that does not exist! " + from); - } - - FileStatus fromStatus = fileSystem.getFileStatus(from); - - // Fast path: if the whole tree can move atomically (target absent) or it's a single file, ordering is - // irrelevant -- defer to the standard concurrent implementation. - if (!fromStatus.isDirectory() || !fileSystem.exists(to)) { - renameRecursively(fileSystem, from, to); - return; - } - - // Target exists, so we descend to file level. Group immediate children by priority (TreeMap keeps ascending - // order) and fully complete each group before starting the next. - Map> grouped = new TreeMap<>(); - for (FileStatus child : fileSystem.listStatus(from)) { - grouped.computeIfAbsent(priorityFn.apply(child.getPath()), k -> Lists.newArrayList()).add(child); - } - - ExecutorService executorService = ScalingThreadPoolExecutor.newScalingThreadPool(1, 100, 100, - ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("rename-thread-%d"))); - try { - for (Entry> group : grouped.entrySet()) { - Queue> futures = Queues.newConcurrentLinkedQueue(); - for (FileStatus child : group.getValue()) { - Path relativeFilePath = new Path(StringUtils.substringAfter(child.getPath().toString(), - from.toString() + Path.SEPARATOR)); - Path toFilePath = new Path(to, relativeFilePath); - futures.add(executorService.submit( - new RenameRecursively(throttledFS, child, toFilePath, executorService, futures))); - } - // Barrier: drain this priority group completely before moving to the next. - while (!futures.isEmpty()) { - try { - futures.poll().get(); - } catch (ExecutionException | InterruptedException ee) { - throw new IOException(ee.getCause()); - } - } - log.info(String.format("Completed ordered rename group priority=%d (%d children) under %s.", - group.getKey(), group.getValue().size(), from)); - } - } finally { - ExecutorsUtils.shutdownExecutorService(executorService, Optional.of(log), 1, TimeUnit.SECONDS); - } - } - - /** - * Default priority for Iceberg distcp publishing: {@code data/} first, {@code metadata/} last, anything else in - * between. Intended for use as the {@code priorityFn} of - * {@link #renameRecursivelyOrdered(FileSystem, Path, Path, Function)}. - */ - public static int icebergRenamePriority(Path immediateChild) { - String name = immediateChild.getName(); - if ("data".equals(name)) { - return 0; - } - if ("metadata".equals(name)) { - return 2; - } - return 1; - } - /** * Calls {@link #getOptionallyThrottledFileSystem(FileSystem, int)} parsing the qps from the input {@link State} * at key {@link #MAX_FILESYSTEM_QPS}.