Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,12 @@ default RemoveDanglingDeleteFiles removeDanglingDeleteFiles(Table table) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement removeDanglingDeleteFiles");
}

/**
* Instantiates an action to validate that a rewritten table copy is complete at the destination.
*/
default ValidateRewriteTablePath validateRewriteTablePath(Table sourceTable) {
throw new UnsupportedOperationException(
this.getClass().getName() + " does not implement validateRewriteTablePath");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import org.apache.iceberg.Table;

/**
* An action that validates a rewritten table copy by checking that every metadata, data, and delete
* file the source table references is present at the destination.
*
* <p>The source table is the source of truth: this action walks its metadata to enumerate every
* expected file, applies any configured prefix rewrite, and verifies each path at the destination.
* Files referenced by the source table but missing at the destination are reported regardless of
* whether the destination's own metadata is internally consistent.
*
* <p>Two common uses:
*
* <ul>
* <li><b>Verify a copy before registering it</b> — after producing a destination via {@link
* RewriteTablePath}, run this action to confirm the destination contains every expected file
* before registering the new metadata.json with a catalog. For a first-time copy, leave
* {@link #destinationSnapshotId(long)} unset so the action validates the full source file set
* at the source's latest snapshot; alternatively call {@link #validateFullTable(boolean)}
* with {@code true}.
* <li><b>Audit any source/destination pair for sync</b> — the destination does not have to have
* been produced by {@link RewriteTablePath}. This works against DR replicas, migration
* targets, backups, Distcp output, manual file copies, or any other out-of-band copy. Run
* this action to confirm the destination still references every file the source does. The
* validator only requires loadable source and destination Tables plus {@link
* #rewriteLocationPrefix(String, String)} to map source paths to destination paths. Pass
* {@link #destinationSnapshotId(long)} (the destination's snapshot id from the previous
* successful copy or validation) to validate only the delta accumulated since that point;
* call {@link #validateFullTable(boolean)} with {@code true} to re-validate the full source
* file set regardless of any prior destination state.
* </ul>
*
* <p>The action operates in three modes selected by which parameters are configured: backfill
* (source only), incremental (source plus destination), and forced full validation against an
* existing destination via {@link #validateFullTable(boolean)}. Each mode validates a different
* file set:
*
* <ul>
* <li><b>Backfill</b> validates every file the source references at {@link
* #sourceSnapshotId(long)}, scoped by {@link #validateScope(ValidateScope)}.
* <li><b>Incremental</b> validates only the files source has accumulated between {@link
* #destinationSnapshotId(long)} (exclusive) and {@link #sourceSnapshotId(long)} (inclusive).
* It does not re-check files that already existed at {@link #destinationSnapshotId(long)};
* those are presumed to have been validated when the destination was last at that snapshot.
* <li><b>Forced full</b> validation, enabled with {@link #validateFullTable(boolean)} set to
* {@code true}, validates the full source file set against the destination regardless of any
* configured destination snapshot id.
* </ul>
*
* <p>Use forced full validation when the destination state at {@link #destinationSnapshotId(long)}
* cannot be presumed correct — for example, when no prior validation result is available, when
* destination files may have been altered out-of-band, or when running an audit-from-scratch.
*
* <p>Backfill example — verify a fresh copy before registering it:
*
* <pre>{@code
* Result result = SparkActions.get(spark)
* .validateRewriteTablePath(sourceTable)
* .sourceMetadataVersion(version)
* .sourceSnapshotId(snapshotId)
* .rewriteLocationPrefix(sourcePrefix, destinationPrefix)
* .execute();
* }</pre>
*
* <p>Incremental example — verify two tables stay in sync after an incremental copy. Pass the
* destination's snapshot id as it was <i>before</i> the latest copy ran:
*
* <pre>{@code
* Result result = SparkActions.get(spark)
* .validateRewriteTablePath(sourceTable)
* .sourceMetadataVersion(currentSourceVersion)
* .sourceSnapshotId(currentSourceSnapshotId)
* .destinationTable(destinationTable)
* .destinationMetadataVersion(currentDestinationVersion)
* .destinationSnapshotId(preCopyDestinationSnapshotId)
* .rewriteLocationPrefix(sourcePrefix, destinationPrefix)
* .execute();
* }</pre>
*/
public interface ValidateRewriteTablePath
extends Action<ValidateRewriteTablePath, ValidateRewriteTablePath.Result> {

/**
* Sets the destination table for incremental validation. Must be paired with {@link
* #destinationMetadataVersion}; both must be set to enable incremental mode, or both must be left
* unset to run backfill mode.
*/
ValidateRewriteTablePath destinationTable(Table table);

/** Sets the source metadata version (file name or absolute path). Required. */
ValidateRewriteTablePath sourceMetadataVersion(String version);

/**
* Sets the source snapshot id. Required unless the source metadata has no current snapshot (empty
* source table); in that case the action validates only the metadata files.
*/
ValidateRewriteTablePath sourceSnapshotId(long snapshotId);

/**
* Sets the destination metadata version. Must be paired with {@link #destinationTable}; both must
* be set or both must be left unset.
*/
ValidateRewriteTablePath destinationMetadataVersion(String version);

/**
* Sets the destination snapshot id used as the lower bound of the incremental diff. This is the
* snapshot id the destination held at the time of the previous successful copy or validation.
*
* <p>When set, the action validates only files the source has accumulated between this snapshot
* id (exclusive) and {@link #sourceSnapshotId(long)} (inclusive); files that existed at this
* snapshot id are not re-checked. Callers that require validation of every source file regardless
* of prior destination state should call {@link #validateFullTable(boolean)} with {@code true}.
*
* <p>Leave unset for a first-time copy: the action validates the full source file set at {@link
* #sourceSnapshotId(long)} against the destination, without performing a diff.
*
* <p>After a copy that preserves snapshot identity (such as a {@link RewriteTablePath} copy),
* setting this to the destination's current snapshot id produces an empty incremental diff (zero
* files validated). Pass the previous snapshot id, or call {@link #validateFullTable(boolean)}
* with {@code true}, instead.
*
* <p>If this snapshot id is not present in the source's snapshot history at execution time — for
* example, the source's retention policy has since expired it, or the destination committed
* independently of the source — the action falls back to a full source-vs-destination diff and
* logs a warning rather than throwing. The fallback result is semantically correct (any source
* files missing at the destination are still reported), and the warning makes the condition
* observable to callers that need to act on it.
*
* <p>Requires {@link #destinationTable(Table)} to be set.
*/
ValidateRewriteTablePath destinationSnapshotId(long snapshotId);

/**
* Forces backfill semantics even when destination parameters are set. Use this when the
* destination has diverged from the source and the incremental diff would be incorrect, or when a
* full audit of every expected file is required regardless of destination state.
*/
ValidateRewriteTablePath validateFullTable(boolean validateFull);

/**
* Sets the scope of content file validation. Affects backfill mode only — has no effect on
* incremental validation, which always computes the source-vs-destination snapshot diff.
*/
ValidateRewriteTablePath validateScope(ValidateScope scope);

/** Adds a source-to-destination location prefix rewrite rule. */
ValidateRewriteTablePath rewriteLocationPrefix(String sourcePrefix, String destinationPrefix);

/**
* Adds multiple source-to-destination location prefix rewrite rules. Equivalent to calling {@link
* #rewriteLocationPrefix(String, String)} once per entry; entries accumulate with any
* previously-added rules and are applied longest-prefix first at lookup time.
*/
default ValidateRewriteTablePath rewriteLocationPrefix(Map<String, String> prefixMap) {
prefixMap.forEach(this::rewriteLocationPrefix);
return this;
}

/**
* Sets destination catalog properties used to resolve {@link org.apache.iceberg.io.FileIO} when
* no destination table is configured. Useful for backfill validation against a destination that
* is not yet registered as a loaded Iceberg table; the properties are passed to {@code
* CatalogUtil.loadFileIO} together with any FileIO-specific configuration (region, endpoint,
* credentials, etc.).
*/
ValidateRewriteTablePath destinationCatalogProperties(Map<String, String> catalogProperties);

/** The action result. */
interface Result {

/** Returns true when no expected files are missing at the destination. */
boolean isValid();

/**
* Total metadata files validated. In incremental mode, counts only metadata files newly added
* at the source since {@link ValidateRewriteTablePath#destinationSnapshotId}. In backfill mode,
* counts every metadata file (metadata.json entries, manifest lists, manifests, statistics,
* partition statistics) reachable from the configured source snapshot.
*/
long totalMetadataFiles();

/** Paths of metadata files missing from the destination. */
List<String> missingMetadataFiles();

/**
* Total data files validated. In incremental mode, counts only data files newly added at the
* source since {@link ValidateRewriteTablePath#destinationSnapshotId}. In backfill mode, counts
* every data file referenced by the source — at the source snapshot when {@link
* ValidateScope#LATEST} is configured, across the entire source history when {@link
* ValidateScope#ALL} is configured.
*/
long totalDataFiles();

/** Paths of data files missing from the destination. */
List<String> missingDataFiles();

/**
* Total delete files validated. Scoped the same way as {@link #totalDataFiles()}: incremental
* mode counts only newly added delete files; backfill mode counts every delete file referenced
* by the source within the configured scope.
*/
long totalDeleteFiles();

/** Paths of delete files missing from the destination. */
List<String> missingDeleteFiles();

/** Combined count of all missing files. */
long missingFileCount();

/** Human-readable summary of the result. */
String validationSummary();
}

/** Scope of content file validation in backfill mode. Has no effect on incremental validation. */
enum ValidateScope {
/** Validate content files referenced by every snapshot in the table history. */
ALL,

/** Validate content files referenced only by the latest snapshot. */
LATEST;

public static ValidateScope fromString(String value) {
try {
return ValidateScope.valueOf(value.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
String.format(
"Invalid validateRewriteTablePath scope '%s'. Must be one of: all, latest", value));
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;

import java.util.List;
import java.util.Locale;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.immutables.value.Value;

@Value.Enclosing
@SuppressWarnings("ImmutablesStyle")
@Value.Style(
typeImmutableEnclosing = "ImmutableValidateRewriteTablePath",
visibilityString = "PUBLIC",
builderVisibilityString = "PUBLIC")
interface BaseValidateRewriteTablePath extends ValidateRewriteTablePath {

@Value.Immutable
interface Result extends ValidateRewriteTablePath.Result {

@Override
@Value.Default
default long totalMetadataFiles() {
return 0L;
}

@Override
@Value.Default
default List<String> missingMetadataFiles() {
return ImmutableList.of();
}

@Override
@Value.Default
default long totalDataFiles() {
return 0L;
}

@Override
@Value.Default
default List<String> missingDataFiles() {
return ImmutableList.of();
}

@Override
@Value.Default
default long totalDeleteFiles() {
return 0L;
}

@Override
@Value.Default
default List<String> missingDeleteFiles() {
return ImmutableList.of();
}

@Override
@Value.Derived
default boolean isValid() {
return missingMetadataFiles().isEmpty()
&& missingDataFiles().isEmpty()
&& missingDeleteFiles().isEmpty();
}

@Override
@Value.Derived
default long missingFileCount() {
return (long) missingMetadataFiles().size()
+ missingDataFiles().size()
+ missingDeleteFiles().size();
}

@Override
@Value.Derived
default String validationSummary() {
return String.format(
Locale.ROOT,
"Validation Summary:%n"
+ " Total Metadata Files Validated: %d%n"
+ " Missing Metadata Files: %d%n"
+ " Total Data Files Validated: %d%n"
+ " Missing Data Files: %d%n"
+ " Total Delete Files Validated: %d%n"
+ " Missing Delete Files: %d%n"
+ " Status: %s",
totalMetadataFiles(),
missingMetadataFiles().size(),
totalDataFiles(),
missingDataFiles().size(),
totalDeleteFiles(),
missingDeleteFiles().size(),
isValid() ? "PASSED" : "FAILED");
}
}
}
Loading