diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java index b0a6d082363..6a2bfdc8930 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -39,6 +40,7 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import javax.annotation.concurrent.NotThreadSafe; import lombok.Data; @@ -207,28 +209,51 @@ protected TableMetadata getCurrentDestTableMetadata() throws IOException { protected GetFilePathsToFileStatusResult getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig, boolean shouldIncludeMetadataPath) throws IOException { IcebergTable icebergTable = this.getSrcIcebergTable(); - /** @return whether `pathStr` is present on `targetFs`, caching results while tunneling checked exceptions outward */ - Function isPresentOnTarget = CheckedExceptionFunction.wrapToTunneled(pathStr -> - // omit considering timestamp (or other markers of freshness), as files should be immutable - // ATTENTION: `CopyContext.getFileStatus()`, to partake in caching - copyConfig.getCopyContext().getFileStatus(targetFs, new Path(pathStr)).isPresent() - ); - // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg - // NOTE: if `shouldIncludeMetadataPath` was false during the prior executions, this condition will be false + // Decide which source files are "already copied" to the destination. Two strategies (config-gated): + // (a) DEST-CATALOG DIFF (default): a path is already-copied iff the DESTINATION Iceberg table's committed metadata + // references it. Because the dest table is committed (`IcebergRegisterStep`, a post-publish step) only AFTER a + // fully successful publish, every path the dest catalog references is guaranteed present AND part of a + // self-consistent snapshot -- so the subtree short-circuit below is sound. Crucially, orphan metadata left on + // the dest *filesystem* by a prior partially-failed run is NOT referenced by the dest catalog, so its missing + // data files are still re-copied. This averts the table corruption that pure filesystem-presence checking + // causes: there, a metadata file present on the dest FS would skip its whole subtree forever, leaving the + // committed table permanently referencing data files that never get copied. + // (b) FILESYSTEM PRESENCE (legacy): a path is already-copied iff it physically exists on `targetFs`. + final boolean determineCopyFromDestCatalog = + IcebergDatasetFinder.getConfigShouldDetermineCopyFromDestCatalog(this.properties); + IcebergSnapshotInfo currentSnapshotOverview = icebergTable.getCurrentSnapshotInfoOverviewOnly(); - if (currentSnapshotOverview.getMetadataPath().map(isPresentOnTarget).orElse(false) && - isPresentOnTarget.apply(currentSnapshotOverview.getManifestListPath())) { - log.info("~{}~ skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target", - this.getFileSetId(), currentSnapshotOverview.getSnapshotId(), - currentSnapshotOverview.getManifestListPath(), - currentSnapshotOverview.getMetadataPath().orElse("<>")); - TableMetadata readTimeTableMetadata = currentSnapshotOverview.getTableMetadata().orElseThrow(() -> new RuntimeException( - String.format("~%s~ no table metadata for current snapshot '%s' at '%s' with metadata path '%s'", - this.getFileSetId(), currentSnapshotOverview.getSnapshotId(), - currentSnapshotOverview.getManifestListPath(), - currentSnapshotOverview.getMetadataPath().orElse("<>")))); - return new GetFilePathsToFileStatusResult(Maps.newHashMap(), readTimeTableMetadata); + final Function isAlreadyCopied; + if (determineCopyFromDestCatalog) { + // cheap steady-state short-circuit: if the dest table's current snapshot IS the source's current snapshot (same, + // path-preserved manifest list), the tables are in sync -- skip without reading either side's manifests + Optional destCurrentManifestListPath = getDestCurrentManifestListPath(); + if (destCurrentManifestListPath.map(currentSnapshotOverview.getManifestListPath()::equals).orElse(false)) { + log.info("~{}~ skipping entire iceberg, since dest table is already at source current snapshot '{}' ('{}')", + this.getFileSetId(), currentSnapshotOverview.getSnapshotId(), currentSnapshotOverview.getManifestListPath()); + return new GetFilePathsToFileStatusResult(Maps.newHashMap(), getRequiredCurrentTableMetadata(currentSnapshotOverview)); + } + // not in sync: enumerate every path the dest catalog references (across its retained snapshots) and diff against it + Set destReferencedPaths = collectDestTableReferencedPaths(shouldIncludeMetadataPath); + log.info("~{}~ destination catalog references {} paths; diffing source against it to determine files to copy", + this.getFileSetId(), destReferencedPaths.size()); + isAlreadyCopied = destReferencedPaths::contains; + } else { + // omit considering timestamp (or other markers of freshness), as files should be immutable + // ATTENTION: `CopyContext.getFileStatus()`, to partake in caching + isAlreadyCopied = CheckedExceptionFunction.wrapToTunneled(pathStr -> + copyConfig.getCopyContext().getFileStatus(targetFs, new Path(pathStr)).isPresent()); + // check first for case of nothing to replicate, to avoid needless scanning of a potentially massive iceberg + // NOTE: if `shouldIncludeMetadataPath` was false during the prior executions, this condition will be false + if (currentSnapshotOverview.getMetadataPath().map(isAlreadyCopied).orElse(false) && + isAlreadyCopied.apply(currentSnapshotOverview.getManifestListPath())) { + log.info("~{}~ skipping entire iceberg, since snapshot '{}' at '{}' and metadata '{}' both present on target", + this.getFileSetId(), currentSnapshotOverview.getSnapshotId(), + currentSnapshotOverview.getManifestListPath(), + currentSnapshotOverview.getMetadataPath().orElse("<>")); + return new GetFilePathsToFileStatusResult(Maps.newHashMap(), getRequiredCurrentTableMetadata(currentSnapshotOverview)); + } } List readTimeTableMetadataHolder = Lists.newArrayList(); // expecting exactly one elem @@ -241,26 +266,19 @@ protected GetFilePathsToFileStatusResult getFilePathsToFileStatus(FileSystem tar log.info("~{}~ loaded snapshot '{}' at '{}' from metadata path: '{}'", this.getFileSetId(), snapshotInfo.getSnapshotId(), manListPath, snapshotInfo.getMetadataPath().orElse("<>")); // ALGO: an iceberg's files form a tree of four levels: metadata.json -> manifest-list -> manifest -> data; - // most critically, all are presumed immutable and uniquely named, although any may be replaced. we depend - // also on incremental copy being run always atomically: to commit each iceberg only upon its full success. - // thus established, the presence of a file at dest (identified by path/name) guarantees its entire subtree is - // already copied--and, given immutability, completion of a prior copy naturally renders that file up-to-date. - // hence, its entire subtree may be short-circuited. nevertheless, absence of a file at dest cannot imply - // its entire subtree necessarily requires copying, because it is possible, even likely in practice, that some - // metadata files would have been replaced (e.g. during snapshot compaction). in such instances, at least - // some of the children pointed to within could have been copied prior, when they previously appeared as a - // child of the current file's predecessor (which this new meta file now replaces). - if (!isPresentOnTarget.apply(manListPath)) { + // all are presumed immutable and uniquely named, although any may be replaced. judging "already-copied" by + // the destination catalog (default; see above), a referenced manifest-list/manifest guarantees its entire + // subtree was committed--hence already fully copied--so its subtree may be short-circuited. nevertheless, + // absence cannot imply its entire subtree necessarily requires copying, because it is possible, even likely + // in practice, that some metadata files would have been replaced (e.g. during snapshot compaction). in such + // instances, at least some of the children pointed to within could have been copied prior, when they + // previously appeared as a child of the current file's predecessor (which this new meta file now replaces). + if (!isAlreadyCopied.apply(manListPath)) { List missingPaths = snapshotInfo.getSnapshotApexPaths(shouldIncludeMetadataPath); for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshotInfo.getManifestFiles()) { - if (!isPresentOnTarget.apply(mfi.getManifestFilePath())) { + if (!isAlreadyCopied.apply(mfi.getManifestFilePath())) { missingPaths.add(mfi.getManifestFilePath()); // being incremental info, no listed paths would have appeared prior w/ other snapshots, so add all now. - // skip verification despite corner case of a snapshot having reorganized/rebalanced manifest contents - // during a period where replication fell so far behind that no snapshots listed among current metadata - // are yet at dest. since the consequence of unnecessary copy is merely wasted data transfer and - // compute--and overall, potential is small--prefer sidestepping expense of exhaustive checking, since - // file count may run into 100k+ (even beyond!) missingPaths.addAll(mfi.getListedFilePaths()); } } @@ -268,13 +286,13 @@ protected GetFilePathsToFileStatusResult getFilePathsToFileStatus(FileSystem tar this.getFileSetId(), snapshotInfo.getSnapshotId(), missingPaths.size()); return missingPaths.iterator(); } else { - log.info("~{}~ snapshot '{}' already present on target... skipping (including contents)", + log.info("~{}~ snapshot '{}' already present on destination... skipping (including contents)", this.getFileSetId(), snapshotInfo.getSnapshotId()); // IMPORTANT: separately consider metadata path, to handle case of 'metadata-only' snapshot reusing manifest list Optional metadataPath = snapshotInfo.getMetadataPath(); - Optional nonReplicatedMetadataPath = metadataPath.filter(p -> !isPresentOnTarget.apply(p)); + Optional nonReplicatedMetadataPath = metadataPath.filter(p -> !isAlreadyCopied.apply(p)); metadataPath.ifPresent(ignore -> - log.info("~{}~ metadata IS {} present on target", + log.info("~{}~ metadata IS {} present on destination", this.getFileSetId(), !nonReplicatedMetadataPath.isPresent() ? "already" @@ -335,6 +353,47 @@ protected GetFilePathsToFileStatusResult getFilePathsToFileStatus(FileSystem tar return new GetFilePathsToFileStatusResult(results, readTimeTableMetadataHolder.get(0)); } + /** + * @return the manifest-list path of the destination table's current snapshot, or {@link Optional#empty()} if the + * destination table does not exist or has no snapshot yet (i.e. nothing committed to short-circuit against) + */ + private Optional getDestCurrentManifestListPath() throws IOException { + try { + return Optional.of(this.destIcebergTable.getCurrentSnapshotInfoOverviewOnly().getManifestListPath()); + } catch (IcebergTable.TableNotFoundException | IcebergTable.NoSnapshotFoundException e) { + log.info("~{}~ destination table has no current snapshot to short-circuit against ({})", + this.getFileSetId(), e.getMessage()); + return Optional.empty(); + } + } + + /** + * @return every file path the destination Iceberg table's catalog currently references, across all of its retained + * snapshots. Because the dest table is committed only after a fully successful publish, every such path is guaranteed + * already present and part of a consistent snapshot. Returns an empty set if the destination table does not exist yet + * (first-ever copy), in which case all source files will be copied. + */ + private Set collectDestTableReferencedPaths(boolean shouldIncludeMetadataPath) throws IOException { + Set referencedPaths = Sets.newHashSet(); + try { + Iterator destSnapshotInfos = this.destIcebergTable.getIncrementalSnapshotInfosIterator(); + while (destSnapshotInfos.hasNext()) { + referencedPaths.addAll(destSnapshotInfos.next().getAllPaths(shouldIncludeMetadataPath)); + } + } catch (IcebergTable.TableNotFoundException tnfe) { + log.info("~{}~ destination table not found ('{}'); treating as empty, so all source files will be copied", + this.getFileSetId(), tnfe.getTableId()); + } + return referencedPaths; + } + + /** @return the {@link TableMetadata} carried by {@code currentSnapshotOverview}, or throw if (unexpectedly) absent */ + private TableMetadata getRequiredCurrentTableMetadata(IcebergSnapshotInfo currentSnapshotOverview) { + return currentSnapshotOverview.getTableMetadata().orElseThrow(() -> new RuntimeException( + String.format("~%s~ no table metadata for current snapshot '%s' at '%s'", this.getFileSetId(), + currentSnapshotOverview.getSnapshotId(), currentSnapshotOverview.getManifestListPath()))); + } + /** * Stateful object to consolidate error messages (e.g. for logging), per a {@link Path} consolidation strategy. * OVERVIEW: to avoid run-away logging into the 1000s of lines, consolidate to parent (directory) level: diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java index f13a2ed8ebb..6ba39bb2f55 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java @@ -56,6 +56,20 @@ public class IcebergDatasetFinder implements IterableDatasetFinder every source file is copied */ + @Test + public void testCatalogDiffCopiesEverythingWhenDestTableAbsent() throws IOException { + Set expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), true, SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0); + validateCatalogDiffGetFilePaths(Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0), mockAbsentDestTable(), + expectedResultPaths, true); + } + + /** dest catalog is one snapshot behind => only the newer snapshot's files are copied */ + @Test + public void testCatalogDiffCopiesOnlyDeltaWhenDestBehind() throws IOException { + IcebergTable destTable = MockIcebergTable.withSnapshots(destTableId(), Lists.newArrayList(SNAPSHOT_PATHS_1)); + Set expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), true, SNAPSHOT_PATHS_0); + validateCatalogDiffGetFilePaths(Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0), destTable, + expectedResultPaths, true); + } + + /** + * THE CORRUPTION GUARD: even if a manifest + manifest-list were left on the dest *filesystem* by a prior failed run + * (here the dest catalog still references an unrelated snapshot), the source snapshot's files are ALL re-copied, + * because the dest catalog does not reference them. Filesystem-presence short-circuiting would instead skip them. + */ + @Test + public void testCatalogDiffReCopiesFilesOrphanedOnDestFsButNotCommitted() throws IOException { + // dest catalog committed only SNAPSHOT_PATHS_1; SNAPSHOT_PATHS_0 (the source current) is NOT referenced by it + IcebergTable destTable = MockIcebergTable.withSnapshots(destTableId(), Lists.newArrayList(SNAPSHOT_PATHS_1)); + Set expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), true, SNAPSHOT_PATHS_0); + validateCatalogDiffGetFilePaths(Lists.newArrayList(SNAPSHOT_PATHS_0), destTable, expectedResultPaths, true); + } + /** Exception wrapping is used internally--ensure that doesn't lapse into silently swallowing errors */ @Test(expectedExceptions = IOException.class) public void testGetFilePathsDoesNotSwallowDestFileSystemException() throws IOException { @@ -230,7 +280,7 @@ public void testGetFilePathsDoesNotSwallowDestFileSystemException() throws IOExc MockFileSystemBuilder sourceFsBuilder = new MockFileSystemBuilder(SRC_FS_URI); FileSystem sourceFs = sourceFsBuilder.build(); boolean shouldIncludeMetadataPathMakesNoDifference = true; - IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, null, new Properties(), sourceFs, shouldIncludeMetadataPathMakesNoDifference); + IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, null, legacyFsPresenceProperties(), sourceFs, shouldIncludeMetadataPathMakesNoDifference); MockFileSystemBuilder destFsBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destFsBuilder.build(); @@ -272,7 +322,7 @@ public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException { IcebergTable srcIcebergTbl = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_0)); IcebergTable destIcebergTbl = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_1)); boolean shouldIncludeManifestPath = true; - IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTbl, destIcebergTbl, new Properties(), sourceFs, shouldIncludeManifestPath); + IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTbl, destIcebergTbl, legacyFsPresenceProperties(), sourceFs, shouldIncludeManifestPath); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -299,7 +349,7 @@ public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws IOExcept IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0)); IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_1)); boolean shouldIncludeManifestPath = false; - IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, destIcebergTable, new Properties(), sourceFs, shouldIncludeManifestPath); + IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, destIcebergTable, legacyFsPresenceProperties(), sourceFs, shouldIncludeManifestPath); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -331,7 +381,7 @@ public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() throws IOExc IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_0)); IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_1)); boolean shouldIncludeManifestPath = true; - IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, destIcebergTable, new Properties(), sourceFs, shouldIncludeManifestPath); + IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, destIcebergTable, legacyFsPresenceProperties(), sourceFs, shouldIncludeManifestPath); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -361,7 +411,7 @@ public void testFsOwnershipAndPermissionWithoutPreservationWhenDestEmpty() throw IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_0)); IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_1)); boolean shouldIncludeManifestPath = true; - IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, destIcebergTable, new Properties(), sourceFs, shouldIncludeManifestPath); + IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, destIcebergTable, legacyFsPresenceProperties(), sourceFs, shouldIncludeManifestPath); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -385,6 +435,14 @@ protected IcebergTable validateGetFilePathsGivenDestState(List sourceSnapshotPathSets, + List existingDestPaths, Set expectedResultPaths, boolean shouldIncludeMetadataPath, + Properties datasetProperties) throws IOException { + return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, Optional.empty(), existingDestPaths, + expectedResultPaths, shouldIncludeMetadataPath, datasetProperties); + } + /** * exercise {@link IcebergDataset::getFilePaths} and validate the result * @return {@link IcebergTable} (mock!), for behavioral verification @@ -392,12 +450,23 @@ protected IcebergTable validateGetFilePathsGivenDestState(List sourceSnapshotPathSets, Optional> optExistingSourcePaths, List existingDestPaths, Set expectedResultPaths, boolean shouldIncludeMetadataPath) throws IOException { + return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, optExistingSourcePaths, existingDestPaths, + expectedResultPaths, shouldIncludeMetadataPath, legacyFsPresenceProperties()); + } + + /** + * exercise {@link IcebergDataset::getFilePaths} and validate the result + * @return {@link IcebergTable} (mock!), for behavioral verification + */ + protected IcebergTable validateGetFilePathsGivenDestState(List sourceSnapshotPathSets, + Optional> optExistingSourcePaths, List existingDestPaths, Set expectedResultPaths, + boolean shouldIncludeMetadataPath, Properties datasetProperties) throws IOException { IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(TableIdentifier.of(testDbName, testTblName), sourceSnapshotPathSets); MockFileSystemBuilder sourceFsBuilder = new MockFileSystemBuilder(SRC_FS_URI, !optExistingSourcePaths.isPresent()); optExistingSourcePaths.ifPresent(sourceFsBuilder::addPaths); FileSystem sourceFs = sourceFsBuilder.build(); - IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, null, new Properties(), sourceFs, shouldIncludeMetadataPath); + IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, null, datasetProperties, sourceFs, shouldIncludeMetadataPath); MockFileSystemBuilder destFsBuilder = new MockFileSystemBuilder(DEST_FS_URI); destFsBuilder.addPaths(existingDestPaths); @@ -414,6 +483,41 @@ protected IcebergTable validateGetFilePathsGivenDestState(List sourceSnapshotPathSets, + IcebergTable destIcebergTable, Set expectedResultPaths, boolean shouldIncludeMetadataPath) throws IOException { + IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(TableIdentifier.of(testDbName, testTblName), sourceSnapshotPathSets); + FileSystem sourceFs = new MockFileSystemBuilder(SRC_FS_URI, true).build(); // every source path "exists" + // default properties => dest-catalog diff (ICEBERG_DATASET_DETERMINE_COPY_FROM_DEST_CATALOG defaults to true) + IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, destIcebergTable, new Properties(), sourceFs, shouldIncludeMetadataPath); + + // dest FS is unused by catalog diff, but a CopyConfiguration still requires a target FileSystem + FileSystem destFs = new MockFileSystemBuilder(DEST_FS_URI, true).build(); + CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs); + + IcebergDataset.GetFilePathsToFileStatusResult pathsResult = + icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration, shouldIncludeMetadataPath); + Assert.assertEquals(pathsResult.getPathsToFileStatus().keySet(), expectedResultPaths); + } + /** @return `paths` after adding to it all paths of every one of `snapshotDefs` */ protected static Set withAllSnapshotPaths(Set paths, boolean shouldIncludeMetadataPath, MockIcebergTable.SnapshotPaths... snapshotDefs) { Arrays.stream(snapshotDefs).flatMap(snapshotDef ->