Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ enum TransactionType {
private final Consumer<String> enqueueDelete = deletedFiles::add;
private final TransactionType type;
private TableMetadata base;
private final TableMetadata stagedReplacement;
private TableMetadata current;
private boolean hasLastOpCommitted;
private final MetricsReporter reporter;
Expand All @@ -91,6 +92,7 @@ enum TransactionType {
this.tableName = tableName;
this.ops = ops;
this.transactionTable = new TransactionTable();
this.stagedReplacement = start;
this.current = start;
this.transactionOps = new TransactionTableOperations();
this.updates = Lists.newArrayList();
Expand Down Expand Up @@ -320,10 +322,16 @@ private void commitReplaceTransaction(boolean orCreate) {
}
}

// because this is a replace table, it will always completely replace the table
// metadata. even if it was just updated.
// If the table changed while this transaction was open (usually a concurrent
// commit), rebuild the replacement on the latest metadata so that commit's changes
// are not silently dropped. If the latest metadata is null the table no longer
// exists, so there is nothing to rebuild on and the staged replacement is committed
// as a new table.
if (base != underlyingOps.current()) {
this.base = underlyingOps.current(); // just refreshed
if (base != null) {
rebaseReplaceOnto(base);
}
}

Comment on lines +332 to 336
underlyingOps.commit(base, current);
Expand All @@ -348,6 +356,31 @@ private void commitReplaceTransaction(boolean orCreate) {
}
}

/**
* Re-applies this replace transaction on top of the latest table metadata.
*
* <p>A replace replaces the current schema, spec, and data, but it keeps the table's history
* rather than dropping and recreating the table. If it simply committed the metadata staged when
* the transaction opened, any commit that landed in the meantime would be lost. Instead, the
* staged replacement (schema, spec, sort order, location, and properties) is rebuilt on the
* refreshed metadata, which retains the other commit's snapshots, and the staged updates are
* replayed so their new snapshots get sequence numbers that follow the refreshed metadata.
*/
private void rebaseReplaceOnto(TableMetadata refreshed) {
this.current =
refreshed.buildReplacementPreservingIds(
stagedReplacement.schema(),
stagedReplacement.spec(),
stagedReplacement.sortOrder(),
stagedReplacement.location(),
stagedReplacement.properties());
// Updates are replayed against the transaction's own ops, which return the rebuilt metadata, so
// they re-apply cleanly without seeing a stale base.
for (PendingUpdate update : updates) {
update.commit();
}
}

private void commitSimpleTransaction() {
// if there were no changes, don't try to commit
if (base == current) {
Expand Down
51 changes: 51 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,57 @@ public TableMetadata buildReplacement(
.build();
}

/**
* Like {@link #buildReplacement}, but keeps the field IDs already assigned in {@code
* updatedSchema} rather than reassigning fresh ones by matching column names.
*
* <p>Use this when the replacement schema was built earlier against a different version of the
* table and may be committed on top of newer metadata. {@link #buildReplacement} would reassign
* field IDs by name against this (newer) schema, which can move a column to a different ID than
* the one its already-written data files use, making that data unreadable. Keeping the schema's
* existing IDs avoids that.
*/
TableMetadata buildReplacementPreservingIds(
Schema updatedSchema,
PartitionSpec updatedPartitionSpec,
SortOrder updatedSortOrder,
String newLocation,
Map<String, String> updatedProperties) {
ValidationException.check(
formatVersion > 1 || PartitionSpec.hasSequentialIds(updatedPartitionSpec),
"Spec does not use sequential IDs that are required in v1: %s",
updatedPartitionSpec);

// keep the schema's field IDs as-is; only raise lastColumnId so it covers them
int preservedLastColumnId = Math.max(lastColumnId, updatedSchema.highestFieldId());

// rebuild the partition spec using the provided schema's ids and reassign partition field ids
// to align with existing partition specs in the table
PartitionSpec freshSpec =
reassignPartitionIds(
freshSpec(INITIAL_SPEC_ID, updatedSchema, updatedPartitionSpec),
new AtomicInteger(lastAssignedPartitionId)::incrementAndGet);

// rebuild the sort order using the provided schema's ids
SortOrder freshSortOrder =
freshSortOrder(INITIAL_SORT_ORDER_ID, updatedSchema, updatedSortOrder);

// check if there is format version override
int newFormatVersion =
PropertyUtil.propertyAsInt(
updatedProperties, TableProperties.FORMAT_VERSION, formatVersion);

return new Builder(this)
.upgradeFormatVersion(newFormatVersion)
.removeRef(SnapshotRef.MAIN_BRANCH)
.setCurrentSchema(updatedSchema, preservedLastColumnId)
.setDefaultPartitionSpec(freshSpec)
.setDefaultSortOrder(freshSortOrder)
.setLocation(newLocation)
.setProperties(persistedProperties(updatedProperties))
.build();
}

public TableMetadata updateLocation(String newLocation) {
return new Builder(this).setLocation(newLocation).build();
}
Expand Down
181 changes: 181 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,187 @@ public void testReplaceTransactionConflict() {
assertThat(listManifestFiles()).containsExactlyElementsOf(manifests);
}

/**
* Verifies that a replace transaction rebases onto refreshed metadata when a concurrent commit
* lands BEFORE the replace's first commit attempt -- so no {@link CommitFailedException} is
* thrown and the retry loop is never exercised.
*
* <p>A replace is last-writer-wins on the current schema, spec, and data, but it is not a
* drop-and-recreate: the table keeps its history. When {@code commitReplaceTransaction} observes
* that the base advanced (a concurrent commit), it rebuilds the replacement on the refreshed base
* and replays the staged updates. Rebuilding carries forward the concurrent commit's snapshots so
* history is preserved, and {@link SnapshotProducer#apply()} re-derives the replacement head's
* sequence number from the refreshed base ({@code base.nextSequenceNumber()}). The committed head
* therefore receives a STRICTLY greater sequence number than the concurrent commit and the
* table's {@code lastSequenceNumber} advances, preserving the monotonicity that {@link
* TableMetadata.Builder} otherwise guarantees across a replace.
*/
@TestTemplate
public void testReplaceTransactionRebasesOntoConcurrentCommit() {
// Sequence numbers are only meaningful in format v2+.
assumeThat(formatVersion).isGreaterThanOrEqualTo(2);

// Use random snapshot ids so the staged replacement snapshot and the concurrent commit's
// snapshot do not collide on a sequential id (which would send the rebased commit down the
// rollback path in TestTableOperations). Production catalogs assign random snapshot ids.
table.updateProperties().set("random-snapshot-ids", "true").commit();

// Seed: one append -> seq 1, lastSequenceNumber 1.
table.newAppend().appendFile(FILE_A).commit();
table.refresh();
assertThat(table.ops().current().lastSequenceNumber()).isEqualTo(1L);

// Begin the replace (MV full refresh / CREATE OR REPLACE). Base is captured at lastSeq 1.
Transaction replace = TestTables.beginReplace(tableDir, "test", table.schema(), table.spec());
// Staged replacement head, assigned seq 2 from the stage-time (seq-1) base. This value will be
// re-derived when the replace rebases onto the refreshed base at commit time.
replace.newAppend().appendFile(FILE_B).commit();

// Concurrent writer (e.g. compaction) commits against the SAME seq-1 base -> also gets seq 2.
// This lands BEFORE replace.commitTransaction(), so the replace's first refresh() observes it
// and the commit succeeds on the first attempt with no retry.
table.newAppend().appendFile(FILE_C).commit();
table.refresh();
long concurrentSeq = table.ops().current().lastSequenceNumber();
assertThat(concurrentSeq).isEqualTo(2L);
int versionBeforeReplace = version();

// No failCommits injection: refresh() reads the up-to-date (concurrent) version token, base is
// advanced to the concurrent metadata, the replace rebuilds on it, and the CAS matches ->
// first-attempt success, no retry.
replace.commitTransaction();
table.refresh();

// Exactly one additional successful commit (no retry).
assertThat(version()).isEqualTo(versionBeforeReplace + 1);

TableMetadata after = table.ops().current();
Snapshot newHead = after.currentSnapshot();

// The replacement head receives a STRICTLY greater sequence number than the concurrent commit,
// because it was re-derived from the refreshed (seq-2) base the replace actually committed
// against rather than the stale stage-time (seq-1) base.
assertThat(newHead.sequenceNumber()).isGreaterThan(concurrentSeq); // == 3
// The table's monotonic sequence counter advances past the concurrent commit.
assertThat(after.lastSequenceNumber()).isGreaterThan(concurrentSeq); // == 3

// History is preserved: the concurrent commit's snapshot remains in the table's snapshot list,
// so the replace did not erase it "as though the concurrent commit never happened".
assertThat(after.snapshots())
.as("Concurrent commit's snapshot must be preserved in history after replace")
.anyMatch(snapshot -> snapshot.sequenceNumber() == concurrentSeq);
}

/**
* Mirrors the concurrent-schema example from the REPLACE TABLE concurrency discussion: a
* concurrent commit evolves the schema (adding a new schema id and making it current) after a
* replace transaction has been staged. A replace is last-writer-wins on the current schema, so
* the replacement's schema becomes current, but the table keeps its history and must not drop the
* concurrently added schema id "as though the concurrent commit never happened".
*/
@TestTemplate
public void testReplaceTransactionPreservesConcurrentlyAddedSchema() {
table.updateProperties().set("random-snapshot-ids", "true").commit();

table.newAppend().appendFile(FILE_A).commit();
table.refresh();
int originalSchemaId = table.schema().schemaId();

// Begin the replace before the concurrent schema change, so the replace's base is stale.
Transaction replace = TestTables.beginReplace(tableDir, "test", table.schema(), table.spec());
replace.newAppend().appendFile(FILE_B).commit();

// Concurrent writer evolves the schema: adds a column, producing a new current schema id.
table.updateSchema().addColumn("extra", Types.StringType.get()).commit();
table.refresh();
int concurrentSchemaId = table.schema().schemaId();
assertThat(concurrentSchemaId).isNotEqualTo(originalSchemaId);
boolean concurrentSchemaHasExtra = table.schema().findField("extra") != null;
assertThat(concurrentSchemaHasExtra).isTrue();

replace.commitTransaction();
table.refresh();

// The concurrently added schema id must remain in the table's schema history after the replace.
assertThat(table.schemas())
.as("Concurrently added schema id must be preserved in history after replace")
.containsKey(concurrentSchemaId);
}

/**
* Verifies that rebasing the replacement onto refreshed metadata keeps the replacement schema's
* field IDs stable. The staged data files were written against the stage-time replacement
* schema's field IDs; re-deriving IDs by name against the concurrently-changed schema would shift
* them and leave the committed schema no longer matching the staged data. Preserving the IDs
* keeps the staged data readable.
*/
@TestTemplate
public void testReplaceTransactionPreservesFieldIdsUnderConcurrentSchemaChange() {
table.updateProperties().set("random-snapshot-ids", "true").commit();

table.newAppend().appendFile(FILE_A).commit();
table.refresh();

// Replace with a schema that introduces a column NOT present in the original schema. Built
// against the original (seq-1) base, "added_by_replace" receives a fresh field id.
Schema replaceSchema =
new Schema(
required(10, "id", Types.IntegerType.get()),
required(11, "data", Types.StringType.get()),
required(12, "added_by_replace", Types.StringType.get()));
Transaction replace = TestTables.beginReplace(tableDir, "test", replaceSchema, unpartitioned());
replace.newAppend().appendFile(FILE_B).commit();

int stagedReplaceColumnId =
((BaseTransaction) replace)
.currentMetadata()
.schema()
.findField("added_by_replace")
.fieldId();

// Concurrent writer adds its own column to the original schema, consuming the next field id.
table.updateSchema().addColumn("added_by_concurrent", Types.StringType.get()).commit();
table.refresh();

replace.commitTransaction();
table.refresh();

int committedReplaceColumnId = table.schema().findField("added_by_replace").fieldId();

// The replacement column keeps the field id the staged data files were written against, even
// though a concurrent schema change consumed that id in the refreshed base's schema.
assertThat(committedReplaceColumnId)
.as("Replacement column field id must be stable across rebase")
.isEqualTo(stagedReplaceColumnId);
}

/**
* Exercises a replace whose table is concurrently dropped after the transaction is staged. The
* refreshed base is null (TestTables.refresh returns null for a missing table), so there is
* nothing to rebase onto; the replace must commit the staged replacement as a create rather than
* attempting to rebuild on a null base.
*/
@TestTemplate
public void testReplaceTransactionConcurrentlyDroppedTable() {
table.newAppend().appendFile(FILE_A).commit();
table.refresh();
assertThat(version()).isEqualTo(1);

// Begin the replace against the existing table; base is captured non-null.
Transaction replace = TestTables.beginReplace(tableDir, "test", table.schema(), table.spec());
replace.newAppend().appendFile(FILE_B).commit();

// Concurrent drop: the underlying table disappears before the replace commits, so refresh()
// returns a null base.
TestTables.clearTables();

replace.commitTransaction();

TableMetadata after = TestTables.readMetadata("test");
assertThat(after).isNotNull();
validateSnapshot(null, after.currentSnapshot(), FILE_B);
}

@TestTemplate
public void testReplaceToCreateAndAppend() throws IOException {
// this table doesn't exist.
Expand Down
36 changes: 24 additions & 12 deletions core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -2933,12 +2933,18 @@ public void testConcurrentReplaceTransactionSchemaConflict() {
assertUUIDsMatch(original, afterFirstReplace);
assertFiles(afterFirstReplace, FILE_B);

// even though the new schema is identical, the assertion that the last assigned id has not
// changed will fail
assertThatThrownBy(secondReplace::commitTransaction)
.isInstanceOf(CommitFailedException.class)
.hasMessageStartingWith(
"Commit failed: Requirement failed: last assigned field id changed");
// The second replace rebases onto the first replace's committed metadata. Because the
// replacement schema is identical, the rebuilt replacement reuses the existing schema and adds
// no conflicting field ids, so the replace succeeds as last-writer-wins instead of failing on a
// spurious last-assigned-field-id requirement.
secondReplace.commitTransaction();

Table afterSecondReplace = catalog.loadTable(TABLE);
assertThat(afterSecondReplace.schema().asStruct())
.as("Table schema should match the replace schema")
.isEqualTo(REPLACE_SCHEMA.asStruct());
assertUUIDsMatch(original, afterSecondReplace);
assertFiles(afterSecondReplace, FILE_C);
}

@Test
Expand Down Expand Up @@ -3053,12 +3059,18 @@ public void testConcurrentReplaceTransactionPartitionSpecConflict() {
assertUUIDsMatch(original, afterFirstReplace);
assertFiles(afterFirstReplace, FILE_B);

// even though the new spec is identical, the assertion that the last assigned id has not
// changed will fail
assertThatThrownBy(secondReplace::commitTransaction)
.isInstanceOf(CommitFailedException.class)
.hasMessageStartingWith(
"Commit failed: Requirement failed: last assigned partition id changed");
// The second replace rebases onto the first replace's committed metadata. Because the
// replacement spec is identical, the rebuilt replacement reuses the existing spec and adds no
// conflicting partition ids, so the replace succeeds as last-writer-wins instead of failing on
// a spurious last-assigned-partition-id requirement.
secondReplace.commitTransaction();

Table afterSecondReplace = catalog.loadTable(TABLE);
assertThat(afterSecondReplace.spec().fields())
.as("Table spec should match the new spec")
.isEqualTo(TABLE_SPEC.fields());
assertUUIDsMatch(original, afterSecondReplace);
assertFiles(afterSecondReplace, FILE_C);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,12 @@ public void testReplaceTableTxnTableModifiedConcurrently() {
txn.updateProperties().set("prop", "value").commit();
txn.commitTransaction();

// the replace should still succeed
// the replace rebases onto the concurrent change, so the concurrently-set property is kept
// alongside the replace's own property
table = catalog.loadTable(TABLE_IDENTIFIER);
assertThat(table.properties())
.as("Table props should be updated")
.doesNotContainKey("another-prop")
.containsEntry("another-prop", "another-value")
.containsEntry("prop", "value");
}

Expand Down Expand Up @@ -329,9 +330,11 @@ public void testCreateOrReplaceTableTxnTableCreatedConcurrently() {
txn.commitTransaction();

Table table = catalog.loadTable(TABLE_IDENTIFIER);
assertThat(table.spec())
.as("Partition spec should match")
.isEqualTo(PartitionSpec.unpartitioned());
// rebuilding the replacement on the concurrently-created table can reassign the spec id, so the
// spec is unpartitioned by content even though it is not object-equal to the canonical instance
assertThat(table.spec().isUnpartitioned())
.as("Partition spec should be unpartitioned")
.isTrue();
assertThat(table.properties()).as("Table props should match").containsEntry("prop", "value");
}

Expand Down