From 05aa96df9dee2320fc4d5263b4e283a412ff9627 Mon Sep 17 00:00:00 2001 From: Kurtis Wright Date: Thu, 25 Jun 2026 18:52:41 +0000 Subject: [PATCH 1/2] Core: Rebase replace transaction onto refreshed metadata on concurrent commit When a concurrent commit lands after a replace transaction is staged, commitReplaceTransaction advanced base to the refreshed metadata to satisfy the optimistic lock but committed the stale staged metadata. This dropped the concurrent commit's snapshots from history and reused its sequence number, violating sequence-number monotonicity. A replace is last-writer-wins on the current schema, spec, and data, but it is not a drop-and-recreate: the table must keep its history. Rebuild the replacement on the refreshed base and replay the staged updates so the concurrent commit's history is preserved and the replacement snapshot's sequence number is re-derived from the refreshed base. Add TableMetadata.buildReplacementPreservingIds, which keeps the staged schema's field IDs instead of re-deriving them by name, so a concurrent schema change cannot shift the IDs the staged data files were written against. Add tests covering the rebase, history preservation, and field-id stability. Update the two server-side conflict tests in CatalogTests, which previously asserted a spurious last-assigned-id requirement failure, to assert the replace succeeds as last-writer-wins. --- .../org/apache/iceberg/BaseTransaction.java | 37 +++- .../org/apache/iceberg/TableMetadata.java | 51 +++++ .../iceberg/TestReplaceTransaction.java | 181 ++++++++++++++++++ .../apache/iceberg/catalog/CatalogTests.java | 36 ++-- 4 files changed, 291 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 9884ac297079..7db04042d6bd 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -73,6 +73,7 @@ enum TransactionType { private final Consumer enqueueDelete = deletedFiles::add; private final TransactionType type; private TableMetadata base; + private final TableMetadata stagedReplacement; private TableMetadata current; private boolean hasLastOpCommitted; private final MetricsReporter reporter; @@ -91,6 +92,7 @@ enum TransactionType { this.tableName = tableName; this.ops = ops; this.transactionTable = new TransactionTable(); + this.stagedReplacement = start; this.current = start; this.transactionOps = new TransactionTableOperations(); this.updates = Lists.newArrayList(); @@ -320,10 +322,16 @@ private void commitReplaceTransaction(boolean orCreate) { } } - // because this is a replace table, it will always completely replace the table - // metadata. even if it was just updated. + // If the table changed while this transaction was open (usually a concurrent + // commit), rebuild the replacement on the latest metadata so that commit's changes + // are not silently dropped. If the latest metadata is null the table no longer + // exists, so there is nothing to rebuild on and the staged replacement is committed + // as a new table. if (base != underlyingOps.current()) { this.base = underlyingOps.current(); // just refreshed + if (base != null) { + rebaseReplaceOnto(base); + } } underlyingOps.commit(base, current); @@ -348,6 +356,31 @@ private void commitReplaceTransaction(boolean orCreate) { } } + /** + * Re-applies this replace transaction on top of the latest table metadata. + * + *

A replace replaces the current schema, spec, and data, but it keeps the table's history + * rather than dropping and recreating the table. If it simply committed the metadata staged when + * the transaction opened, any commit that landed in the meantime would be lost. Instead, the + * staged replacement (schema, spec, sort order, location, and properties) is rebuilt on the + * refreshed metadata, which retains the other commit's snapshots, and the staged updates are + * replayed so their new snapshots get sequence numbers that follow the refreshed metadata. + */ + private void rebaseReplaceOnto(TableMetadata refreshed) { + this.current = + refreshed.buildReplacementPreservingIds( + stagedReplacement.schema(), + stagedReplacement.spec(), + stagedReplacement.sortOrder(), + stagedReplacement.location(), + stagedReplacement.properties()); + // Updates are replayed against the transaction's own ops, which return the rebuilt metadata, so + // they re-apply cleanly without seeing a stale base. + for (PendingUpdate update : updates) { + update.commit(); + } + } + private void commitSimpleTransaction() { // if there were no changes, don't try to commit if (base == current) { diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index b811d8c42f15..4f971254fdd3 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -757,6 +757,57 @@ public TableMetadata buildReplacement( .build(); } + /** + * Like {@link #buildReplacement}, but keeps the field IDs already assigned in {@code + * updatedSchema} rather than reassigning fresh ones by matching column names. + * + *

Use this when the replacement schema was built earlier against a different version of the + * table and may be committed on top of newer metadata. {@link #buildReplacement} would reassign + * field IDs by name against this (newer) schema, which can move a column to a different ID than + * the one its already-written data files use, making that data unreadable. Keeping the schema's + * existing IDs avoids that. + */ + TableMetadata buildReplacementPreservingIds( + Schema updatedSchema, + PartitionSpec updatedPartitionSpec, + SortOrder updatedSortOrder, + String newLocation, + Map updatedProperties) { + ValidationException.check( + formatVersion > 1 || PartitionSpec.hasSequentialIds(updatedPartitionSpec), + "Spec does not use sequential IDs that are required in v1: %s", + updatedPartitionSpec); + + // keep the schema's field IDs as-is; only raise lastColumnId so it covers them + int preservedLastColumnId = Math.max(lastColumnId, updatedSchema.highestFieldId()); + + // rebuild the partition spec using the provided schema's ids and reassign partition field ids + // to align with existing partition specs in the table + PartitionSpec freshSpec = + reassignPartitionIds( + freshSpec(INITIAL_SPEC_ID, updatedSchema, updatedPartitionSpec), + new AtomicInteger(lastAssignedPartitionId)::incrementAndGet); + + // rebuild the sort order using the provided schema's ids + SortOrder freshSortOrder = + freshSortOrder(INITIAL_SORT_ORDER_ID, updatedSchema, updatedSortOrder); + + // check if there is format version override + int newFormatVersion = + PropertyUtil.propertyAsInt( + updatedProperties, TableProperties.FORMAT_VERSION, formatVersion); + + return new Builder(this) + .upgradeFormatVersion(newFormatVersion) + .removeRef(SnapshotRef.MAIN_BRANCH) + .setCurrentSchema(updatedSchema, preservedLastColumnId) + .setDefaultPartitionSpec(freshSpec) + .setDefaultSortOrder(freshSortOrder) + .setLocation(newLocation) + .setProperties(persistedProperties(updatedProperties)) + .build(); + } + public TableMetadata updateLocation(String newLocation) { return new Builder(this).setLocation(newLocation).build(); } diff --git a/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java b/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java index 79196c0a7517..c7cd901741cf 100644 --- a/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java +++ b/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java @@ -315,6 +315,187 @@ public void testReplaceTransactionConflict() { assertThat(listManifestFiles()).containsExactlyElementsOf(manifests); } + /** + * Verifies that a replace transaction rebases onto refreshed metadata when a concurrent commit + * lands BEFORE the replace's first commit attempt -- so no {@link CommitFailedException} is + * thrown and the retry loop is never exercised. + * + *

A replace is last-writer-wins on the current schema, spec, and data, but it is not a + * drop-and-recreate: the table keeps its history. When {@code commitReplaceTransaction} observes + * that the base advanced (a concurrent commit), it rebuilds the replacement on the refreshed base + * and replays the staged updates. Rebuilding carries forward the concurrent commit's snapshots so + * history is preserved, and {@link SnapshotProducer#apply()} re-derives the replacement head's + * sequence number from the refreshed base ({@code base.nextSequenceNumber()}). The committed head + * therefore receives a STRICTLY greater sequence number than the concurrent commit and the + * table's {@code lastSequenceNumber} advances, preserving the monotonicity that {@link + * TableMetadata.Builder} otherwise guarantees across a replace. + */ + @TestTemplate + public void testReplaceTransactionRebasesOntoConcurrentCommit() { + // Sequence numbers are only meaningful in format v2+. + assumeThat(formatVersion).isGreaterThanOrEqualTo(2); + + // Use random snapshot ids so the staged replacement snapshot and the concurrent commit's + // snapshot do not collide on a sequential id (which would send the rebased commit down the + // rollback path in TestTableOperations). Production catalogs assign random snapshot ids. + table.updateProperties().set("random-snapshot-ids", "true").commit(); + + // Seed: one append -> seq 1, lastSequenceNumber 1. + table.newAppend().appendFile(FILE_A).commit(); + table.refresh(); + assertThat(table.ops().current().lastSequenceNumber()).isEqualTo(1L); + + // Begin the replace (MV full refresh / CREATE OR REPLACE). Base is captured at lastSeq 1. + Transaction replace = TestTables.beginReplace(tableDir, "test", table.schema(), table.spec()); + // Staged replacement head, assigned seq 2 from the stage-time (seq-1) base. This value will be + // re-derived when the replace rebases onto the refreshed base at commit time. + replace.newAppend().appendFile(FILE_B).commit(); + + // Concurrent writer (e.g. compaction) commits against the SAME seq-1 base -> also gets seq 2. + // This lands BEFORE replace.commitTransaction(), so the replace's first refresh() observes it + // and the commit succeeds on the first attempt with no retry. + table.newAppend().appendFile(FILE_C).commit(); + table.refresh(); + long concurrentSeq = table.ops().current().lastSequenceNumber(); + assertThat(concurrentSeq).isEqualTo(2L); + int versionBeforeReplace = version(); + + // No failCommits injection: refresh() reads the up-to-date (concurrent) version token, base is + // advanced to the concurrent metadata, the replace rebuilds on it, and the CAS matches -> + // first-attempt success, no retry. + replace.commitTransaction(); + table.refresh(); + + // Exactly one additional successful commit (no retry). + assertThat(version()).isEqualTo(versionBeforeReplace + 1); + + TableMetadata after = table.ops().current(); + Snapshot newHead = after.currentSnapshot(); + + // The replacement head receives a STRICTLY greater sequence number than the concurrent commit, + // because it was re-derived from the refreshed (seq-2) base the replace actually committed + // against rather than the stale stage-time (seq-1) base. + assertThat(newHead.sequenceNumber()).isGreaterThan(concurrentSeq); // == 3 + // The table's monotonic sequence counter advances past the concurrent commit. + assertThat(after.lastSequenceNumber()).isGreaterThan(concurrentSeq); // == 3 + + // History is preserved: the concurrent commit's snapshot remains in the table's snapshot list, + // so the replace did not erase it "as though the concurrent commit never happened". + assertThat(after.snapshots()) + .as("Concurrent commit's snapshot must be preserved in history after replace") + .anyMatch(snapshot -> snapshot.sequenceNumber() == concurrentSeq); + } + + /** + * Mirrors the concurrent-schema example from the REPLACE TABLE concurrency discussion: a + * concurrent commit evolves the schema (adding a new schema id and making it current) after a + * replace transaction has been staged. A replace is last-writer-wins on the current schema, so + * the replacement's schema becomes current, but the table keeps its history and must not drop the + * concurrently added schema id "as though the concurrent commit never happened". + */ + @TestTemplate + public void testReplaceTransactionPreservesConcurrentlyAddedSchema() { + table.updateProperties().set("random-snapshot-ids", "true").commit(); + + table.newAppend().appendFile(FILE_A).commit(); + table.refresh(); + int originalSchemaId = table.schema().schemaId(); + + // Begin the replace before the concurrent schema change, so the replace's base is stale. + Transaction replace = TestTables.beginReplace(tableDir, "test", table.schema(), table.spec()); + replace.newAppend().appendFile(FILE_B).commit(); + + // Concurrent writer evolves the schema: adds a column, producing a new current schema id. + table.updateSchema().addColumn("extra", Types.StringType.get()).commit(); + table.refresh(); + int concurrentSchemaId = table.schema().schemaId(); + assertThat(concurrentSchemaId).isNotEqualTo(originalSchemaId); + boolean concurrentSchemaHasExtra = table.schema().findField("extra") != null; + assertThat(concurrentSchemaHasExtra).isTrue(); + + replace.commitTransaction(); + table.refresh(); + + // The concurrently added schema id must remain in the table's schema history after the replace. + assertThat(table.schemas()) + .as("Concurrently added schema id must be preserved in history after replace") + .containsKey(concurrentSchemaId); + } + + /** + * Verifies that rebasing the replacement onto refreshed metadata keeps the replacement schema's + * field IDs stable. The staged data files were written against the stage-time replacement + * schema's field IDs; re-deriving IDs by name against the concurrently-changed schema would shift + * them and leave the committed schema no longer matching the staged data. Preserving the IDs + * keeps the staged data readable. + */ + @TestTemplate + public void testReplaceTransactionPreservesFieldIdsUnderConcurrentSchemaChange() { + table.updateProperties().set("random-snapshot-ids", "true").commit(); + + table.newAppend().appendFile(FILE_A).commit(); + table.refresh(); + + // Replace with a schema that introduces a column NOT present in the original schema. Built + // against the original (seq-1) base, "added_by_replace" receives a fresh field id. + Schema replaceSchema = + new Schema( + required(10, "id", Types.IntegerType.get()), + required(11, "data", Types.StringType.get()), + required(12, "added_by_replace", Types.StringType.get())); + Transaction replace = TestTables.beginReplace(tableDir, "test", replaceSchema, unpartitioned()); + replace.newAppend().appendFile(FILE_B).commit(); + + int stagedReplaceColumnId = + ((BaseTransaction) replace) + .currentMetadata() + .schema() + .findField("added_by_replace") + .fieldId(); + + // Concurrent writer adds its own column to the original schema, consuming the next field id. + table.updateSchema().addColumn("added_by_concurrent", Types.StringType.get()).commit(); + table.refresh(); + + replace.commitTransaction(); + table.refresh(); + + int committedReplaceColumnId = table.schema().findField("added_by_replace").fieldId(); + + // The replacement column keeps the field id the staged data files were written against, even + // though a concurrent schema change consumed that id in the refreshed base's schema. + assertThat(committedReplaceColumnId) + .as("Replacement column field id must be stable across rebase") + .isEqualTo(stagedReplaceColumnId); + } + + /** + * Exercises a replace whose table is concurrently dropped after the transaction is staged. The + * refreshed base is null (TestTables.refresh returns null for a missing table), so there is + * nothing to rebase onto; the replace must commit the staged replacement as a create rather than + * attempting to rebuild on a null base. + */ + @TestTemplate + public void testReplaceTransactionConcurrentlyDroppedTable() { + table.newAppend().appendFile(FILE_A).commit(); + table.refresh(); + assertThat(version()).isEqualTo(1); + + // Begin the replace against the existing table; base is captured non-null. + Transaction replace = TestTables.beginReplace(tableDir, "test", table.schema(), table.spec()); + replace.newAppend().appendFile(FILE_B).commit(); + + // Concurrent drop: the underlying table disappears before the replace commits, so refresh() + // returns a null base. + TestTables.clearTables(); + + replace.commitTransaction(); + + TableMetadata after = TestTables.readMetadata("test"); + assertThat(after).isNotNull(); + validateSnapshot(null, after.currentSnapshot(), FILE_B); + } + @TestTemplate public void testReplaceToCreateAndAppend() throws IOException { // this table doesn't exist. diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index 6c6949ce42ae..686d9f86a6ae 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -2933,12 +2933,18 @@ public void testConcurrentReplaceTransactionSchemaConflict() { assertUUIDsMatch(original, afterFirstReplace); assertFiles(afterFirstReplace, FILE_B); - // even though the new schema is identical, the assertion that the last assigned id has not - // changed will fail - assertThatThrownBy(secondReplace::commitTransaction) - .isInstanceOf(CommitFailedException.class) - .hasMessageStartingWith( - "Commit failed: Requirement failed: last assigned field id changed"); + // The second replace rebases onto the first replace's committed metadata. Because the + // replacement schema is identical, the rebuilt replacement reuses the existing schema and adds + // no conflicting field ids, so the replace succeeds as last-writer-wins instead of failing on a + // spurious last-assigned-field-id requirement. + secondReplace.commitTransaction(); + + Table afterSecondReplace = catalog.loadTable(TABLE); + assertThat(afterSecondReplace.schema().asStruct()) + .as("Table schema should match the replace schema") + .isEqualTo(REPLACE_SCHEMA.asStruct()); + assertUUIDsMatch(original, afterSecondReplace); + assertFiles(afterSecondReplace, FILE_C); } @Test @@ -3053,12 +3059,18 @@ public void testConcurrentReplaceTransactionPartitionSpecConflict() { assertUUIDsMatch(original, afterFirstReplace); assertFiles(afterFirstReplace, FILE_B); - // even though the new spec is identical, the assertion that the last assigned id has not - // changed will fail - assertThatThrownBy(secondReplace::commitTransaction) - .isInstanceOf(CommitFailedException.class) - .hasMessageStartingWith( - "Commit failed: Requirement failed: last assigned partition id changed"); + // The second replace rebases onto the first replace's committed metadata. Because the + // replacement spec is identical, the rebuilt replacement reuses the existing spec and adds no + // conflicting partition ids, so the replace succeeds as last-writer-wins instead of failing on + // a spurious last-assigned-partition-id requirement. + secondReplace.commitTransaction(); + + Table afterSecondReplace = catalog.loadTable(TABLE); + assertThat(afterSecondReplace.spec().fields()) + .as("Table spec should match the new spec") + .isEqualTo(TABLE_SPEC.fields()); + assertUUIDsMatch(original, afterSecondReplace); + assertFiles(afterSecondReplace, FILE_C); } @Test From f19a18cef2a69f650c73ad1e6f0c242c8f36b29c Mon Sep 17 00:00:00 2001 From: Kurtis Wright Date: Fri, 26 Jun 2026 19:00:59 +0000 Subject: [PATCH 2/2] Fix Hive CI Tests Co-authored-by: Sreesh Maheshwar --- .../iceberg/hive/TestHiveCreateReplaceTable.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCreateReplaceTable.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCreateReplaceTable.java index c3f66c0286d0..4d9f0d463ec1 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCreateReplaceTable.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCreateReplaceTable.java @@ -235,11 +235,12 @@ public void testReplaceTableTxnTableModifiedConcurrently() { txn.updateProperties().set("prop", "value").commit(); txn.commitTransaction(); - // the replace should still succeed + // the replace rebases onto the concurrent change, so the concurrently-set property is kept + // alongside the replace's own property table = catalog.loadTable(TABLE_IDENTIFIER); assertThat(table.properties()) .as("Table props should be updated") - .doesNotContainKey("another-prop") + .containsEntry("another-prop", "another-value") .containsEntry("prop", "value"); } @@ -329,9 +330,11 @@ public void testCreateOrReplaceTableTxnTableCreatedConcurrently() { txn.commitTransaction(); Table table = catalog.loadTable(TABLE_IDENTIFIER); - assertThat(table.spec()) - .as("Partition spec should match") - .isEqualTo(PartitionSpec.unpartitioned()); + // rebuilding the replacement on the concurrently-created table can reassign the spec id, so the + // spec is unpartitioned by content even though it is not object-equal to the canonical instance + assertThat(table.spec().isUnpartitioned()) + .as("Partition spec should be unpartitioned") + .isTrue(); assertThat(table.properties()).as("Table props should match").containsEntry("prop", "value"); }