Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -39,6 +40,7 @@
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import javax.annotation.concurrent.NotThreadSafe;
import lombok.Data;
Expand Down Expand Up @@ -207,28 +209,51 @@ protected TableMetadata getCurrentDestTableMetadata() throws IOException {
protected GetFilePathsToFileStatusResult getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig, boolean shouldIncludeMetadataPath)
throws IOException {
IcebergTable icebergTable = this.getSrcIcebergTable();
/** @return whether `pathStr` is present on `targetFs`, caching results while tunneling checked exceptions outward */
Function<String, Boolean> isPresentOnTarget = CheckedExceptionFunction.wrapToTunneled(pathStr ->
// omit considering timestamp (or other markers of freshness), as files should be immutable
// ATTENTION: `CopyContext.getFileStatus()`, to partake in caching
copyConfig.getCopyContext().getFileStatus(targetFs, new Path(pathStr)).isPresent()
);

// check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg
// NOTE: if `shouldIncludeMetadataPath` was false during the prior executions, this condition will be false
// Decide which source files are "already copied" to the destination. Two strategies (config-gated):
// (a) DEST-CATALOG DIFF (default): a path is already-copied iff the DESTINATION Iceberg table's committed metadata
// references it. Because the dest table is committed (`IcebergRegisterStep`, a post-publish step) only AFTER a
// fully successful publish, every path the dest catalog references is guaranteed present AND part of a
// self-consistent snapshot -- so the subtree short-circuit below is sound. Crucially, orphan metadata left on
// the dest *filesystem* by a prior partially-failed run is NOT referenced by the dest catalog, so its missing
// data files are still re-copied. This averts the table corruption that pure filesystem-presence checking
// causes: there, a metadata file present on the dest FS would skip its whole subtree forever, leaving the
// committed table permanently referencing data files that never get copied.
// (b) FILESYSTEM PRESENCE (legacy): a path is already-copied iff it physically exists on `targetFs`.
final boolean determineCopyFromDestCatalog =
IcebergDatasetFinder.getConfigShouldDetermineCopyFromDestCatalog(this.properties);

IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly();
if (currentSnapshotOverview.getMetadataPath().map(isPresentOnTarget).orElse(false) &&
isPresentOnTarget.apply(currentSnapshotOverview.getManifestListPath())) {
log.info("~{}~ skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
this.getFileSetId(), currentSnapshotOverview.getSnapshotId(),
currentSnapshotOverview.getManifestListPath(),
currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"));
TableMetadata readTimeTableMetadata = currentSnapshotOverview.getTableMetadata().orElseThrow(() -> new RuntimeException(
String.format("~%s~ no table metadata for current snapshot '%s' at '%s' with metadata path '%s'",
this.getFileSetId(), currentSnapshotOverview.getSnapshotId(),
currentSnapshotOverview.getManifestListPath(),
currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"))));
return new GetFilePathsToFileStatusResult(Maps.newHashMap(), readTimeTableMetadata);
final Function<String, Boolean> isAlreadyCopied;
if (determineCopyFromDestCatalog) {
// cheap steady-state short-circuit: if the dest table's current snapshot IS the source's current snapshot (same,
// path-preserved manifest list), the tables are in sync -- skip without reading either side's manifests
Optional<String> destCurrentManifestListPath = getDestCurrentManifestListPath();
if (destCurrentManifestListPath.map(currentSnapshotOverview.getManifestListPath()::equals).orElse(false)) {
log.info("~{}~ skipping entire iceberg, since dest table is already at source current snapshot '{}' ('{}')",
this.getFileSetId(), currentSnapshotOverview.getSnapshotId(), currentSnapshotOverview.getManifestListPath());
return new GetFilePathsToFileStatusResult(Maps.newHashMap(), getRequiredCurrentTableMetadata(currentSnapshotOverview));
}
// not in sync: enumerate every path the dest catalog references (across its retained snapshots) and diff against it
Set<String> destReferencedPaths = collectDestTableReferencedPaths(shouldIncludeMetadataPath);
log.info("~{}~ destination catalog references {} paths; diffing source against it to determine files to copy",
this.getFileSetId(), destReferencedPaths.size());
isAlreadyCopied = destReferencedPaths::contains;
} else {
// omit considering timestamp (or other markers of freshness), as files should be immutable
// ATTENTION: `CopyContext.getFileStatus()`, to partake in caching
isAlreadyCopied = CheckedExceptionFunction.wrapToTunneled(pathStr ->
copyConfig.getCopyContext().getFileStatus(targetFs, new Path(pathStr)).isPresent());
// check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg
// NOTE: if `shouldIncludeMetadataPath` was false during the prior executions, this condition will be false
if (currentSnapshotOverview.getMetadataPath().map(isAlreadyCopied).orElse(false) &&
isAlreadyCopied.apply(currentSnapshotOverview.getManifestListPath())) {
log.info("~{}~ skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target",
this.getFileSetId(), currentSnapshotOverview.getSnapshotId(),
currentSnapshotOverview.getManifestListPath(),
currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: MISSING!>>"));
return new GetFilePathsToFileStatusResult(Maps.newHashMap(), getRequiredCurrentTableMetadata(currentSnapshotOverview));
}
}

List<TableMetadata> readTimeTableMetadataHolder = Lists.newArrayList(); // expecting exactly one elem
Expand All @@ -241,40 +266,33 @@ protected GetFilePathsToFileStatusResult getFilePathsToFileStatus(FileSystem tar
log.info("~{}~ loaded snapshot '{}' at '{}' from metadata path: '{}'", this.getFileSetId(),
snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
// ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data;
// most critically, all are presumed immutable and uniquely named, although any may be replaced. we depend
// also on incremental copy being run always atomically: to commit each iceberg only upon its full success.
// thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is
// already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date.
// hence, its entire subtree may be short-circuited. nevertheless, absence of a file at dest cannot imply
// its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some
// metadata files would have been replaced (e.g. during snapshot compaction). in such instances, at least
// some of the children pointed to within could have been copied prior, when they previously appeared as a
// child of the current file's predecessor (which this new meta file now replaces).
if (!isPresentOnTarget.apply(manListPath)) {
// all are presumed immutable and uniquely named, although any may be replaced. judging "already-copied" by
// the destination catalog (default; see above), a referenced manifest-list/manifest guarantees its entire
// subtree was committed--hence already fully copied--so its subtree may be short-circuited. nevertheless,
// absence cannot imply its entire subtree necessarily requires copying, because it is possible, even likely
// in practice, that some metadata files would have been replaced (e.g. during snapshot compaction). in such
// instances, at least some of the children pointed to within could have been copied prior, when they
// previously appeared as a child of the current file's predecessor (which this new meta file now replaces).
if (!isAlreadyCopied.apply(manListPath)) {
List<String> missingPaths = snapshotInfo.getSnapshotApexPaths(shouldIncludeMetadataPath);
for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) {
if (!isPresentOnTarget.apply(mfi.getManifestFilePath())) {
if (!isAlreadyCopied.apply(mfi.getManifestFilePath())) {
missingPaths.add(mfi.getManifestFilePath());
// being incremental info, no listed paths would have appeared prior w/ other snapshots, so add all now.
// skip verification despite corner case of a snapshot having reorganized/rebalanced manifest contents
// during a period where replication fell so far behind that no snapshots listed among current metadata
// are yet at dest. since the consequence of unnecessary copy is merely wasted data transfer and
// compute--and overall, potential is small--prefer sidestepping expense of exhaustive checking, since
// file count may run into 100k+ (even beyond!)
missingPaths.addAll(mfi.getListedFilePaths());
}
}
log.info("~{}~ snapshot '{}': collected {} additional source paths",
this.getFileSetId(), snapshotInfo.getSnapshotId(), missingPaths.size());
return missingPaths.iterator();
} else {
log.info("~{}~ snapshot '{}' already present on target... skipping (including contents)",
log.info("~{}~ snapshot '{}' already present on destination... skipping (including contents)",
this.getFileSetId(), snapshotInfo.getSnapshotId());
// IMPORTANT: separately consider metadata path, to handle case of 'metadata-only' snapshot reusing manifest list
Optional<String> metadataPath = snapshotInfo.getMetadataPath();
Optional<String> nonReplicatedMetadataPath = metadataPath.filter(p -> !isPresentOnTarget.apply(p));
Optional<String> nonReplicatedMetadataPath = metadataPath.filter(p -> !isAlreadyCopied.apply(p));
metadataPath.ifPresent(ignore ->
log.info("~{}~ metadata IS {} present on target",
log.info("~{}~ metadata IS {} present on destination",
this.getFileSetId(),
!nonReplicatedMetadataPath.isPresent()
? "already"
Expand Down Expand Up @@ -335,6 +353,47 @@ protected GetFilePathsToFileStatusResult getFilePathsToFileStatus(FileSystem tar
return new GetFilePathsToFileStatusResult(results, readTimeTableMetadataHolder.get(0));
}

/**
* @return the manifest-list path of the destination table's current snapshot, or {@link Optional#empty()} if the
* destination table does not exist or has no snapshot yet (i.e. nothing committed to short-circuit against)
*/
private Optional<String> getDestCurrentManifestListPath() throws IOException {
try {
return Optional.of(this.destIcebergTable.getCurrentSnapshotInfoOverviewOnly().getManifestListPath());
} catch (IcebergTable.TableNotFoundException | IcebergTable.NoSnapshotFoundException e) {
log.info("~{}~ destination table has no current snapshot to short-circuit against ({})",
this.getFileSetId(), e.getMessage());
return Optional.empty();
}
}

/**
* @return every file path the destination Iceberg table's catalog currently references, across all of its retained
* snapshots. Because the dest table is committed only after a fully successful publish, every such path is guaranteed
* already present and part of a consistent snapshot. Returns an empty set if the destination table does not exist yet
* (first-ever copy), in which case all source files will be copied.
*/
private Set<String> collectDestTableReferencedPaths(boolean shouldIncludeMetadataPath) throws IOException {
Set<String> referencedPaths = Sets.newHashSet();
try {
Iterator<IcebergSnapshotInfo> destSnapshotInfos = this.destIcebergTable.getIncrementalSnapshotInfosIterator();
while (destSnapshotInfos.hasNext()) {
referencedPaths.addAll(destSnapshotInfos.next().getAllPaths(shouldIncludeMetadataPath));
}
} catch (IcebergTable.TableNotFoundException tnfe) {
log.info("~{}~ destination table not found ('{}'); treating as empty, so all source files will be copied",
this.getFileSetId(), tnfe.getTableId());
}
return referencedPaths;
}

/** @return the {@link TableMetadata} carried by {@code currentSnapshotOverview}, or throw if (unexpectedly) absent */
private TableMetadata getRequiredCurrentTableMetadata(IcebergSnapshotInfo currentSnapshotOverview) {
return currentSnapshotOverview.getTableMetadata().orElseThrow(() -> new RuntimeException(
String.format("~%s~ no table metadata for current snapshot '%s' at '%s'", this.getFileSetId(),
currentSnapshotOverview.getSnapshotId(), currentSnapshotOverview.getManifestListPath())));
}

/**
* Stateful object to consolidate error messages (e.g. for logging), per a {@link Path} consolidation strategy.
* OVERVIEW: to avoid run-away logging into the 1000s of lines, consolidate to parent (directory) level:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,20 @@ public class IcebergDatasetFinder implements IterableDatasetFinder<IcebergDatase
public static final String ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH = ICEBERG_DATASET_PREFIX + ".should.copy.metadata.path";
public static final String DEFAULT_ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH = "false";

/**
* When {@code true} (default), the set of files to copy is determined by diffing the source table against what the
* DESTINATION Iceberg table's committed catalog references, rather than by probing each file's presence on the
* destination filesystem. Because the dest table is committed only after a fully successful publish, a path it
* references is guaranteed present and consistent -- while orphan metadata left on the dest filesystem by a prior
* partially-failed run is NOT referenced, so its missing data files are still re-copied. This averts the table
* corruption that filesystem-presence short-circuiting causes (a metadata file present on the dest FS would skip its
* whole subtree forever, leaving the committed table referencing data files that never get copied). Setting this to
* {@code false} restores the legacy filesystem-presence behavior.
*/
public static final String ICEBERG_DATASET_DETERMINE_COPY_FROM_DEST_CATALOG =
ICEBERG_DATASET_PREFIX + ".determine.copy.from.dest.catalog";
public static final String DEFAULT_ICEBERG_DATASET_DETERMINE_COPY_FROM_DEST_CATALOG = "true";

public static final String DEFAULT_ICEBERG_CATALOG_CLASS = "org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
public static final String ICEBERG_CATALOG_KEY = "catalog";
/**
Expand Down Expand Up @@ -174,6 +188,11 @@ protected static boolean getConfigShouldCopyMetadataPath(Properties properties)
return Boolean.valueOf(properties.getProperty(ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH, DEFAULT_ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH));
}

public static boolean getConfigShouldDetermineCopyFromDestCatalog(Properties properties) {
return Boolean.valueOf(properties.getProperty(ICEBERG_DATASET_DETERMINE_COPY_FROM_DEST_CATALOG,
DEFAULT_ICEBERG_DATASET_DETERMINE_COPY_FROM_DEST_CATALOG));
}

/** @return property value or `null` */
protected static String getLocationQualifiedProperty(Properties properties, CatalogLocation location, String relativePropName) {
return properties.getProperty(calcLocationQualifiedPropName(location, relativePropName));
Expand Down
Loading
Loading