diff --git a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
index 240e34113721..d5aa7dde2daf 100644
--- a/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
+++ b/api/src/main/java/org/apache/iceberg/actions/ActionsProvider.java
@@ -94,4 +94,12 @@ default RemoveDanglingDeleteFiles removeDanglingDeleteFiles(Table table) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement removeDanglingDeleteFiles");
}
+
+ /**
+ * Instantiates an action to validate that a rewritten table copy is complete at the destination.
+ */
+ default ValidateRewriteTablePath validateRewriteTablePath(Table sourceTable) {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " does not implement validateRewriteTablePath");
+ }
}
diff --git a/api/src/main/java/org/apache/iceberg/actions/ValidateRewriteTablePath.java b/api/src/main/java/org/apache/iceberg/actions/ValidateRewriteTablePath.java
new file mode 100644
index 000000000000..51539a734552
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/actions/ValidateRewriteTablePath.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.iceberg.Table;
+
+/**
+ * An action that validates a rewritten table copy by checking that every metadata, data, and delete
+ * file the source table references is present at the destination.
+ *
+ *
The source table is the source of truth: this action walks its metadata to enumerate every
+ * expected file, applies any configured prefix rewrite, and verifies each path at the destination.
+ * Files referenced by the source table but missing at the destination are reported regardless of
+ * whether the destination's own metadata is internally consistent.
+ *
+ *
Two common uses:
+ *
+ *
+ * - Verify a copy before registering it — after producing a destination via {@link
+ * RewriteTablePath}, run this action to confirm the destination contains every expected file
+ * before registering the new metadata.json with a catalog. For a first-time copy, leave
+ * {@link #destinationSnapshotId(long)} unset so the action validates the full source file set
+ * at the source's latest snapshot; alternatively call {@link #validateFullTable(boolean)}
+ * with {@code true}.
+ *
- Audit any source/destination pair for sync — the destination does not have to have
+ * been produced by {@link RewriteTablePath}. This works against DR replicas, migration
+ * targets, backups, Distcp output, manual file copies, or any other out-of-band copy. Run
+ * this action to confirm the destination still references every file the source does. The
+ * validator only requires loadable source and destination Tables plus {@link
+ * #rewriteLocationPrefix(String, String)} to map source paths to destination paths. Pass
+ * {@link #destinationSnapshotId(long)} (the destination's snapshot id from the previous
+ * successful copy or validation) to validate only the delta accumulated since that point;
+ * call {@link #validateFullTable(boolean)} with {@code true} to re-validate the full source
+ * file set regardless of any prior destination state.
+ *
+ *
+ * The action operates in three modes selected by which parameters are configured: backfill
+ * (source only), incremental (source plus destination), and forced full validation against an
+ * existing destination via {@link #validateFullTable(boolean)}. Each mode validates a different
+ * file set:
+ *
+ *
+ * - Backfill validates every file the source references at {@link
+ * #sourceSnapshotId(long)}, scoped by {@link #validateScope(ValidateScope)}.
+ *
- Incremental validates only the files source has accumulated between {@link
+ * #destinationSnapshotId(long)} (exclusive) and {@link #sourceSnapshotId(long)} (inclusive).
+ * It does not re-check files that already existed at {@link #destinationSnapshotId(long)};
+ * those are presumed to have been validated when the destination was last at that snapshot.
+ *
- Forced full validation, enabled with {@link #validateFullTable(boolean)} set to
+ * {@code true}, validates the full source file set against the destination regardless of any
+ * configured destination snapshot id.
+ *
+ *
+ * Use forced full validation when the destination state at {@link #destinationSnapshotId(long)}
+ * cannot be presumed correct — for example, when no prior validation result is available, when
+ * destination files may have been altered out-of-band, or when running an audit-from-scratch.
+ *
+ *
Backfill example — verify a fresh copy before registering it:
+ *
+ *
{@code
+ * Result result = SparkActions.get(spark)
+ * .validateRewriteTablePath(sourceTable)
+ * .sourceMetadataVersion(version)
+ * .sourceSnapshotId(snapshotId)
+ * .rewriteLocationPrefix(sourcePrefix, destinationPrefix)
+ * .execute();
+ * }
+ *
+ * Incremental example — verify two tables stay in sync after an incremental copy. Pass the
+ * destination's snapshot id as it was before the latest copy ran:
+ *
+ *
{@code
+ * Result result = SparkActions.get(spark)
+ * .validateRewriteTablePath(sourceTable)
+ * .sourceMetadataVersion(currentSourceVersion)
+ * .sourceSnapshotId(currentSourceSnapshotId)
+ * .destinationTable(destinationTable)
+ * .destinationMetadataVersion(currentDestinationVersion)
+ * .destinationSnapshotId(preCopyDestinationSnapshotId)
+ * .rewriteLocationPrefix(sourcePrefix, destinationPrefix)
+ * .execute();
+ * }
+ */
+public interface ValidateRewriteTablePath
+ extends Action {
+
+ /**
+ * Sets the destination table for incremental validation. Must be paired with {@link
+ * #destinationMetadataVersion}; both must be set to enable incremental mode, or both must be left
+ * unset to run backfill mode.
+ */
+ ValidateRewriteTablePath destinationTable(Table table);
+
+ /** Sets the source metadata version (file name or absolute path). Required. */
+ ValidateRewriteTablePath sourceMetadataVersion(String version);
+
+ /**
+ * Sets the source snapshot id. Required unless the source metadata has no current snapshot (empty
+ * source table); in that case the action validates only the metadata files.
+ */
+ ValidateRewriteTablePath sourceSnapshotId(long snapshotId);
+
+ /**
+ * Sets the destination metadata version. Must be paired with {@link #destinationTable}; both must
+ * be set or both must be left unset.
+ */
+ ValidateRewriteTablePath destinationMetadataVersion(String version);
+
+ /**
+ * Sets the destination snapshot id used as the lower bound of the incremental diff. This is the
+ * snapshot id the destination held at the time of the previous successful copy or validation.
+ *
+ * When set, the action validates only files the source has accumulated between this snapshot
+ * id (exclusive) and {@link #sourceSnapshotId(long)} (inclusive); files that existed at this
+ * snapshot id are not re-checked. Callers that require validation of every source file regardless
+ * of prior destination state should call {@link #validateFullTable(boolean)} with {@code true}.
+ *
+ *
Leave unset for a first-time copy: the action validates the full source file set at {@link
+ * #sourceSnapshotId(long)} against the destination, without performing a diff.
+ *
+ *
After a copy that preserves snapshot identity (such as a {@link RewriteTablePath} copy),
+ * setting this to the destination's current snapshot id produces an empty incremental diff (zero
+ * files validated). Pass the previous snapshot id, or call {@link #validateFullTable(boolean)}
+ * with {@code true}, instead.
+ *
+ *
If this snapshot id is not present in the source's snapshot history at execution time — for
+ * example, the source's retention policy has since expired it, or the destination committed
+ * independently of the source — the action falls back to a full source-vs-destination diff and
+ * logs a warning rather than throwing. The fallback result is semantically correct (any source
+ * files missing at the destination are still reported), and the warning makes the condition
+ * observable to callers that need to act on it.
+ *
+ *
Requires {@link #destinationTable(Table)} to be set.
+ */
+ ValidateRewriteTablePath destinationSnapshotId(long snapshotId);
+
+ /**
+ * Forces backfill semantics even when destination parameters are set. Use this when the
+ * destination has diverged from the source and the incremental diff would be incorrect, or when a
+ * full audit of every expected file is required regardless of destination state.
+ */
+ ValidateRewriteTablePath validateFullTable(boolean validateFull);
+
+ /**
+ * Sets the scope of content file validation. Affects backfill mode only — has no effect on
+ * incremental validation, which always computes the source-vs-destination snapshot diff.
+ */
+ ValidateRewriteTablePath validateScope(ValidateScope scope);
+
+ /** Adds a source-to-destination location prefix rewrite rule. */
+ ValidateRewriteTablePath rewriteLocationPrefix(String sourcePrefix, String destinationPrefix);
+
+ /**
+ * Adds multiple source-to-destination location prefix rewrite rules. Equivalent to calling {@link
+ * #rewriteLocationPrefix(String, String)} once per entry; entries accumulate with any
+ * previously-added rules and are applied longest-prefix first at lookup time.
+ */
+ default ValidateRewriteTablePath rewriteLocationPrefix(Map prefixMap) {
+ prefixMap.forEach(this::rewriteLocationPrefix);
+ return this;
+ }
+
+ /**
+ * Sets destination catalog properties used to resolve {@link org.apache.iceberg.io.FileIO} when
+ * no destination table is configured. Useful for backfill validation against a destination that
+ * is not yet registered as a loaded Iceberg table; the properties are passed to {@code
+ * CatalogUtil.loadFileIO} together with any FileIO-specific configuration (region, endpoint,
+ * credentials, etc.).
+ */
+ ValidateRewriteTablePath destinationCatalogProperties(Map catalogProperties);
+
+ /** The action result. */
+ interface Result {
+
+ /** Returns true when no expected files are missing at the destination. */
+ boolean isValid();
+
+ /**
+ * Total metadata files validated. In incremental mode, counts only metadata files newly added
+ * at the source since {@link ValidateRewriteTablePath#destinationSnapshotId}. In backfill mode,
+ * counts every metadata file (metadata.json entries, manifest lists, manifests, statistics,
+ * partition statistics) reachable from the configured source snapshot.
+ */
+ long totalMetadataFiles();
+
+ /** Paths of metadata files missing from the destination. */
+ List missingMetadataFiles();
+
+ /**
+ * Total data files validated. In incremental mode, counts only data files newly added at the
+ * source since {@link ValidateRewriteTablePath#destinationSnapshotId}. In backfill mode, counts
+ * every data file referenced by the source — at the source snapshot when {@link
+ * ValidateScope#LATEST} is configured, across the entire source history when {@link
+ * ValidateScope#ALL} is configured.
+ */
+ long totalDataFiles();
+
+ /** Paths of data files missing from the destination. */
+ List missingDataFiles();
+
+ /**
+ * Total delete files validated. Scoped the same way as {@link #totalDataFiles()}: incremental
+ * mode counts only newly added delete files; backfill mode counts every delete file referenced
+ * by the source within the configured scope.
+ */
+ long totalDeleteFiles();
+
+ /** Paths of delete files missing from the destination. */
+ List missingDeleteFiles();
+
+ /** Combined count of all missing files. */
+ long missingFileCount();
+
+ /** Human-readable summary of the result. */
+ String validationSummary();
+ }
+
+ /** Scope of content file validation in backfill mode. Has no effect on incremental validation. */
+ enum ValidateScope {
+ /** Validate content files referenced by every snapshot in the table history. */
+ ALL,
+
+ /** Validate content files referenced only by the latest snapshot. */
+ LATEST;
+
+ public static ValidateScope fromString(String value) {
+ try {
+ return ValidateScope.valueOf(value.toUpperCase(Locale.ROOT));
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Invalid validateRewriteTablePath scope '%s'. Must be one of: all, latest", value));
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/actions/BaseValidateRewriteTablePath.java b/core/src/main/java/org/apache/iceberg/actions/BaseValidateRewriteTablePath.java
new file mode 100644
index 000000000000..33628cc670e5
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/BaseValidateRewriteTablePath.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import java.util.Locale;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.immutables.value.Value;
+
+@Value.Enclosing
+@SuppressWarnings("ImmutablesStyle")
+@Value.Style(
+ typeImmutableEnclosing = "ImmutableValidateRewriteTablePath",
+ visibilityString = "PUBLIC",
+ builderVisibilityString = "PUBLIC")
+interface BaseValidateRewriteTablePath extends ValidateRewriteTablePath {
+
+ @Value.Immutable
+ interface Result extends ValidateRewriteTablePath.Result {
+
+ @Override
+ @Value.Default
+ default long totalMetadataFiles() {
+ return 0L;
+ }
+
+ @Override
+ @Value.Default
+ default List missingMetadataFiles() {
+ return ImmutableList.of();
+ }
+
+ @Override
+ @Value.Default
+ default long totalDataFiles() {
+ return 0L;
+ }
+
+ @Override
+ @Value.Default
+ default List missingDataFiles() {
+ return ImmutableList.of();
+ }
+
+ @Override
+ @Value.Default
+ default long totalDeleteFiles() {
+ return 0L;
+ }
+
+ @Override
+ @Value.Default
+ default List missingDeleteFiles() {
+ return ImmutableList.of();
+ }
+
+ @Override
+ @Value.Derived
+ default boolean isValid() {
+ return missingMetadataFiles().isEmpty()
+ && missingDataFiles().isEmpty()
+ && missingDeleteFiles().isEmpty();
+ }
+
+ @Override
+ @Value.Derived
+ default long missingFileCount() {
+ return (long) missingMetadataFiles().size()
+ + missingDataFiles().size()
+ + missingDeleteFiles().size();
+ }
+
+ @Override
+ @Value.Derived
+ default String validationSummary() {
+ return String.format(
+ Locale.ROOT,
+ "Validation Summary:%n"
+ + " Total Metadata Files Validated: %d%n"
+ + " Missing Metadata Files: %d%n"
+ + " Total Data Files Validated: %d%n"
+ + " Missing Data Files: %d%n"
+ + " Total Delete Files Validated: %d%n"
+ + " Missing Delete Files: %d%n"
+ + " Status: %s",
+ totalMetadataFiles(),
+ missingMetadataFiles().size(),
+ totalDataFiles(),
+ missingDataFiles().size(),
+ totalDeleteFiles(),
+ missingDeleteFiles().size(),
+ isValid() ? "PASSED" : "FAILED");
+ }
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/actions/TestValidateRewriteTablePathResult.java b/core/src/test/java/org/apache/iceberg/actions/TestValidateRewriteTablePathResult.java
new file mode 100644
index 000000000000..b48201b63e2d
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/actions/TestValidateRewriteTablePathResult.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iceberg.actions.ValidateRewriteTablePath.ValidateScope;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit tests for the Immutables-generated {@link ValidateRewriteTablePath.Result} and {@link
+ * ValidateScope#fromString}.
+ */
+public class TestValidateRewriteTablePathResult {
+
+ @Test
+ public void emptyResultIsValid() {
+ ValidateRewriteTablePath.Result result =
+ ImmutableValidateRewriteTablePath.Result.builder().build();
+ assertThat(result.isValid()).isTrue();
+ assertThat(result.missingFileCount()).isEqualTo(0);
+ assertThat(result.totalMetadataFiles()).isEqualTo(0);
+ assertThat(result.totalDataFiles()).isEqualTo(0);
+ assertThat(result.totalDeleteFiles()).isEqualTo(0);
+ }
+
+ @Test
+ public void missingMetadataFilesAreReported() {
+ ValidateRewriteTablePath.Result result =
+ ImmutableValidateRewriteTablePath.Result.builder()
+ .totalMetadataFiles(10)
+ .missingMetadataFiles(
+ Arrays.asList(
+ "s3://bucket/metadata/manifest1.avro", "s3://bucket/metadata/manifest2.avro"))
+ .build();
+ assertThat(result.isValid()).isFalse();
+ assertThat(result.missingFileCount()).isEqualTo(2);
+ }
+
+ @Test
+ public void missingDataFilesAreReported() {
+ ValidateRewriteTablePath.Result result =
+ ImmutableValidateRewriteTablePath.Result.builder()
+ .totalDataFiles(100)
+ .missingDataFiles(
+ Arrays.asList(
+ "s3://bucket/data/file1.parquet",
+ "s3://bucket/data/file2.parquet",
+ "s3://bucket/data/file3.parquet"))
+ .build();
+ assertThat(result.isValid()).isFalse();
+ assertThat(result.missingFileCount()).isEqualTo(3);
+ }
+
+ @Test
+ public void missingDeleteFilesAreReported() {
+ ValidateRewriteTablePath.Result result =
+ ImmutableValidateRewriteTablePath.Result.builder()
+ .totalDeleteFiles(5)
+ .missingDeleteFiles(Collections.singletonList("s3://bucket/data/delete-file1.parquet"))
+ .build();
+ assertThat(result.isValid()).isFalse();
+ assertThat(result.missingFileCount()).isEqualTo(1);
+ }
+
+ @Test
+ public void mixedMissingFilesSumCorrectly() {
+ ValidateRewriteTablePath.Result result =
+ ImmutableValidateRewriteTablePath.Result.builder()
+ .missingMetadataFiles(Collections.singletonList("s3://bucket/metadata/manifest1.avro"))
+ .missingDataFiles(
+ Arrays.asList("s3://bucket/data/file1.parquet", "s3://bucket/data/file2.parquet"))
+ .missingDeleteFiles(Collections.singletonList("s3://bucket/data/delete-file1.parquet"))
+ .build();
+ assertThat(result.isValid()).isFalse();
+ assertThat(result.missingFileCount()).isEqualTo(4);
+ }
+
+ @Test
+ public void validationSummaryRendersForValidResult() {
+ ValidateRewriteTablePath.Result result =
+ ImmutableValidateRewriteTablePath.Result.builder()
+ .totalMetadataFiles(10)
+ .totalDataFiles(100)
+ .totalDeleteFiles(5)
+ .build();
+ assertThat(result.validationSummary())
+ .contains("Total Metadata Files Validated: 10")
+ .contains("Missing Metadata Files: 0")
+ .contains("Total Data Files Validated: 100")
+ .contains("Missing Data Files: 0")
+ .contains("Total Delete Files Validated: 5")
+ .contains("Missing Delete Files: 0")
+ .contains("Status: PASSED");
+ }
+
+ @Test
+ public void validationSummaryRendersForInvalidResult() {
+ ValidateRewriteTablePath.Result result =
+ ImmutableValidateRewriteTablePath.Result.builder()
+ .totalMetadataFiles(10)
+ .missingMetadataFiles(Arrays.asList("manifest1.avro", "manifest2.avro"))
+ .totalDataFiles(100)
+ .missingDataFiles(Collections.singletonList("file1.parquet"))
+ .totalDeleteFiles(5)
+ .missingDeleteFiles(Collections.singletonList("delete1.parquet"))
+ .build();
+ assertThat(result.validationSummary())
+ .contains("Missing Metadata Files: 2")
+ .contains("Missing Data Files: 1")
+ .contains("Missing Delete Files: 1")
+ .contains("Status: FAILED");
+ }
+
+ @Test
+ public void missingFilePathsArePreserved() {
+ List metadataFiles = Collections.singletonList("s3://bucket/metadata/v1.metadata.json");
+ List dataFiles =
+ Arrays.asList("s3://bucket/data/part-00001.parquet", "s3://bucket/data/part-00002.parquet");
+ List deleteFiles = Collections.singletonList("s3://bucket/data/delete-00001.parquet");
+
+ ValidateRewriteTablePath.Result result =
+ ImmutableValidateRewriteTablePath.Result.builder()
+ .missingMetadataFiles(metadataFiles)
+ .missingDataFiles(dataFiles)
+ .missingDeleteFiles(deleteFiles)
+ .build();
+
+ assertThat(result.missingMetadataFiles()).isEqualTo(metadataFiles);
+ assertThat(result.missingDataFiles()).isEqualTo(dataFiles);
+ assertThat(result.missingDeleteFiles()).isEqualTo(deleteFiles);
+ assertThat(result.missingFileCount()).isEqualTo(4);
+ }
+
+ @Test
+ public void handlesLargeMissingFileLists() {
+ List largeList = Lists.newArrayListWithExpectedSize(1000);
+ for (int i = 0; i < 1000; i++) {
+ largeList.add("file.parquet");
+ }
+ ValidateRewriteTablePath.Result result =
+ ImmutableValidateRewriteTablePath.Result.builder()
+ .totalDataFiles(10000)
+ .missingDataFiles(largeList)
+ .build();
+ assertThat(result.isValid()).isFalse();
+ assertThat(result.missingFileCount()).isEqualTo(1000);
+ assertThat(result.missingDataFiles()).hasSize(1000);
+ }
+
+ @Test
+ public void validateScopeParsesCaseInsensitively() {
+ assertThat(ValidateScope.fromString("all")).isEqualTo(ValidateScope.ALL);
+ assertThat(ValidateScope.fromString("latest")).isEqualTo(ValidateScope.LATEST);
+ assertThat(ValidateScope.fromString("ALL")).isEqualTo(ValidateScope.ALL);
+ assertThat(ValidateScope.fromString("Latest")).isEqualTo(ValidateScope.LATEST);
+ }
+
+ @Test
+ public void validateScopeThrowsOnUnknownValue() {
+ assertThatThrownBy(() -> ValidateScope.fromString("invalid"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Must be one of: all, latest");
+ }
+
+ @Test
+ public void validateScopeThrowsOnNoneValue() {
+ assertThatThrownBy(() -> ValidateScope.fromString("none"))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Invalid validateRewriteTablePath scope");
+ }
+}