diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 9884ac297079..7db04042d6bd 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -73,6 +73,7 @@ enum TransactionType { private final Consumer enqueueDelete = deletedFiles::add; private final TransactionType type; private TableMetadata base; + private final TableMetadata stagedReplacement; private TableMetadata current; private boolean hasLastOpCommitted; private final MetricsReporter reporter; @@ -91,6 +92,7 @@ enum TransactionType { this.tableName = tableName; this.ops = ops; this.transactionTable = new TransactionTable(); + this.stagedReplacement = start; this.current = start; this.transactionOps = new TransactionTableOperations(); this.updates = Lists.newArrayList(); @@ -320,10 +322,16 @@ private void commitReplaceTransaction(boolean orCreate) { } } - // because this is a replace table, it will always completely replace the table - // metadata. even if it was just updated. + // If the table changed while this transaction was open (usually a concurrent + // commit), rebuild the replacement on the latest metadata so that commit's changes + // are not silently dropped. If the latest metadata is null the table no longer + // exists, so there is nothing to rebuild on and the staged replacement is committed + // as a new table. if (base != underlyingOps.current()) { this.base = underlyingOps.current(); // just refreshed + if (base != null) { + rebaseReplaceOnto(base); + } } underlyingOps.commit(base, current); @@ -348,6 +356,31 @@ private void commitReplaceTransaction(boolean orCreate) { } } + /** + * Re-applies this replace transaction on top of the latest table metadata. + * + *

A replace replaces the current schema, spec, and data, but it keeps the table's history + * rather than dropping and recreating the table. If it simply committed the metadata staged when + * the transaction opened, any commit that landed in the meantime would be lost. Instead, the + * staged replacement (schema, spec, sort order, location, and properties) is rebuilt on the + * refreshed metadata, which retains the other commit's snapshots, and the staged updates are + * replayed so their new snapshots get sequence numbers that follow the refreshed metadata. + */ + private void rebaseReplaceOnto(TableMetadata refreshed) { + this.current = + refreshed.buildReplacementPreservingIds( + stagedReplacement.schema(), + stagedReplacement.spec(), + stagedReplacement.sortOrder(), + stagedReplacement.location(), + stagedReplacement.properties()); + // Updates are replayed against the transaction's own ops, which return the rebuilt metadata, so + // they re-apply cleanly without seeing a stale base. + for (PendingUpdate update : updates) { + update.commit(); + } + } + private void commitSimpleTransaction() { // if there were no changes, don't try to commit if (base == current) { diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java index 07f6d3342727..f3296d2fbe60 100644 --- a/core/src/main/java/org/apache/iceberg/TableMetadata.java +++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java @@ -757,6 +757,57 @@ public TableMetadata buildReplacement( .build(); } + /** + * Like {@link #buildReplacement}, but keeps the field IDs already assigned in {@code + * updatedSchema} rather than reassigning fresh ones by matching column names. + * + *

Use this when the replacement schema was built earlier against a different version of the + * table and may be committed on top of newer metadata. {@link #buildReplacement} would reassign + * field IDs by name against this (newer) schema, which can move a column to a different ID than + * the one its already-written data files use, making that data unreadable. Keeping the schema's + * existing IDs avoids that. + */ + TableMetadata buildReplacementPreservingIds( + Schema updatedSchema, + PartitionSpec updatedPartitionSpec, + SortOrder updatedSortOrder, + String newLocation, + Map updatedProperties) { + ValidationException.check( + formatVersion > 1 || PartitionSpec.hasSequentialIds(updatedPartitionSpec), + "Spec does not use sequential IDs that are required in v1: %s", + updatedPartitionSpec); + + // keep the schema's field IDs as-is; only raise lastColumnId so it covers them + int preservedLastColumnId = Math.max(lastColumnId, updatedSchema.highestFieldId()); + + // rebuild the partition spec using the provided schema's ids and reassign partition field ids + // to align with existing partition specs in the table + PartitionSpec freshSpec = + reassignPartitionIds( + freshSpec(INITIAL_SPEC_ID, updatedSchema, updatedPartitionSpec), + new AtomicInteger(lastAssignedPartitionId)::incrementAndGet); + + // rebuild the sort order using the provided schema's ids + SortOrder freshSortOrder = + freshSortOrder(INITIAL_SORT_ORDER_ID, updatedSchema, updatedSortOrder); + + // check if there is format version override + int newFormatVersion = + PropertyUtil.propertyAsInt( + updatedProperties, TableProperties.FORMAT_VERSION, formatVersion); + + return new Builder(this) + .upgradeFormatVersion(newFormatVersion) + .removeRef(SnapshotRef.MAIN_BRANCH) + .setCurrentSchema(updatedSchema, preservedLastColumnId) + .setDefaultPartitionSpec(freshSpec) + .setDefaultSortOrder(freshSortOrder) + .setLocation(newLocation) + .setProperties(persistedProperties(updatedProperties)) + .build(); + } + public TableMetadata updateLocation(String newLocation) { return new Builder(this).setLocation(newLocation).build(); } diff --git a/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java b/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java index 79196c0a7517..c7cd901741cf 100644 --- a/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java +++ b/core/src/test/java/org/apache/iceberg/TestReplaceTransaction.java @@ -315,6 +315,187 @@ public void testReplaceTransactionConflict() { assertThat(listManifestFiles()).containsExactlyElementsOf(manifests); } + /** + * Verifies that a replace transaction rebases onto refreshed metadata when a concurrent commit + * lands BEFORE the replace's first commit attempt -- so no {@link CommitFailedException} is + * thrown and the retry loop is never exercised. + * + *

A replace is last-writer-wins on the current schema, spec, and data, but it is not a + * drop-and-recreate: the table keeps its history. When {@code commitReplaceTransaction} observes + * that the base advanced (a concurrent commit), it rebuilds the replacement on the refreshed base + * and replays the staged updates. Rebuilding carries forward the concurrent commit's snapshots so + * history is preserved, and {@link SnapshotProducer#apply()} re-derives the replacement head's + * sequence number from the refreshed base ({@code base.nextSequenceNumber()}). The committed head + * therefore receives a STRICTLY greater sequence number than the concurrent commit and the + * table's {@code lastSequenceNumber} advances, preserving the monotonicity that {@link + * TableMetadata.Builder} otherwise guarantees across a replace. + */ + @TestTemplate + public void testReplaceTransactionRebasesOntoConcurrentCommit() { + // Sequence numbers are only meaningful in format v2+. + assumeThat(formatVersion).isGreaterThanOrEqualTo(2); + + // Use random snapshot ids so the staged replacement snapshot and the concurrent commit's + // snapshot do not collide on a sequential id (which would send the rebased commit down the + // rollback path in TestTableOperations). Production catalogs assign random snapshot ids. + table.updateProperties().set("random-snapshot-ids", "true").commit(); + + // Seed: one append -> seq 1, lastSequenceNumber 1. + table.newAppend().appendFile(FILE_A).commit(); + table.refresh(); + assertThat(table.ops().current().lastSequenceNumber()).isEqualTo(1L); + + // Begin the replace (MV full refresh / CREATE OR REPLACE). Base is captured at lastSeq 1. + Transaction replace = TestTables.beginReplace(tableDir, "test", table.schema(), table.spec()); + // Staged replacement head, assigned seq 2 from the stage-time (seq-1) base. This value will be + // re-derived when the replace rebases onto the refreshed base at commit time. + replace.newAppend().appendFile(FILE_B).commit(); + + // Concurrent writer (e.g. compaction) commits against the SAME seq-1 base -> also gets seq 2. + // This lands BEFORE replace.commitTransaction(), so the replace's first refresh() observes it + // and the commit succeeds on the first attempt with no retry. + table.newAppend().appendFile(FILE_C).commit(); + table.refresh(); + long concurrentSeq = table.ops().current().lastSequenceNumber(); + assertThat(concurrentSeq).isEqualTo(2L); + int versionBeforeReplace = version(); + + // No failCommits injection: refresh() reads the up-to-date (concurrent) version token, base is + // advanced to the concurrent metadata, the replace rebuilds on it, and the CAS matches -> + // first-attempt success, no retry. + replace.commitTransaction(); + table.refresh(); + + // Exactly one additional successful commit (no retry). + assertThat(version()).isEqualTo(versionBeforeReplace + 1); + + TableMetadata after = table.ops().current(); + Snapshot newHead = after.currentSnapshot(); + + // The replacement head receives a STRICTLY greater sequence number than the concurrent commit, + // because it was re-derived from the refreshed (seq-2) base the replace actually committed + // against rather than the stale stage-time (seq-1) base. + assertThat(newHead.sequenceNumber()).isGreaterThan(concurrentSeq); // == 3 + // The table's monotonic sequence counter advances past the concurrent commit. + assertThat(after.lastSequenceNumber()).isGreaterThan(concurrentSeq); // == 3 + + // History is preserved: the concurrent commit's snapshot remains in the table's snapshot list, + // so the replace did not erase it "as though the concurrent commit never happened". + assertThat(after.snapshots()) + .as("Concurrent commit's snapshot must be preserved in history after replace") + .anyMatch(snapshot -> snapshot.sequenceNumber() == concurrentSeq); + } + + /** + * Mirrors the concurrent-schema example from the REPLACE TABLE concurrency discussion: a + * concurrent commit evolves the schema (adding a new schema id and making it current) after a + * replace transaction has been staged. A replace is last-writer-wins on the current schema, so + * the replacement's schema becomes current, but the table keeps its history and must not drop the + * concurrently added schema id "as though the concurrent commit never happened". + */ + @TestTemplate + public void testReplaceTransactionPreservesConcurrentlyAddedSchema() { + table.updateProperties().set("random-snapshot-ids", "true").commit(); + + table.newAppend().appendFile(FILE_A).commit(); + table.refresh(); + int originalSchemaId = table.schema().schemaId(); + + // Begin the replace before the concurrent schema change, so the replace's base is stale. + Transaction replace = TestTables.beginReplace(tableDir, "test", table.schema(), table.spec()); + replace.newAppend().appendFile(FILE_B).commit(); + + // Concurrent writer evolves the schema: adds a column, producing a new current schema id. + table.updateSchema().addColumn("extra", Types.StringType.get()).commit(); + table.refresh(); + int concurrentSchemaId = table.schema().schemaId(); + assertThat(concurrentSchemaId).isNotEqualTo(originalSchemaId); + boolean concurrentSchemaHasExtra = table.schema().findField("extra") != null; + assertThat(concurrentSchemaHasExtra).isTrue(); + + replace.commitTransaction(); + table.refresh(); + + // The concurrently added schema id must remain in the table's schema history after the replace. + assertThat(table.schemas()) + .as("Concurrently added schema id must be preserved in history after replace") + .containsKey(concurrentSchemaId); + } + + /** + * Verifies that rebasing the replacement onto refreshed metadata keeps the replacement schema's + * field IDs stable. The staged data files were written against the stage-time replacement + * schema's field IDs; re-deriving IDs by name against the concurrently-changed schema would shift + * them and leave the committed schema no longer matching the staged data. Preserving the IDs + * keeps the staged data readable. + */ + @TestTemplate + public void testReplaceTransactionPreservesFieldIdsUnderConcurrentSchemaChange() { + table.updateProperties().set("random-snapshot-ids", "true").commit(); + + table.newAppend().appendFile(FILE_A).commit(); + table.refresh(); + + // Replace with a schema that introduces a column NOT present in the original schema. Built + // against the original (seq-1) base, "added_by_replace" receives a fresh field id. + Schema replaceSchema = + new Schema( + required(10, "id", Types.IntegerType.get()), + required(11, "data", Types.StringType.get()), + required(12, "added_by_replace", Types.StringType.get())); + Transaction replace = TestTables.beginReplace(tableDir, "test", replaceSchema, unpartitioned()); + replace.newAppend().appendFile(FILE_B).commit(); + + int stagedReplaceColumnId = + ((BaseTransaction) replace) + .currentMetadata() + .schema() + .findField("added_by_replace") + .fieldId(); + + // Concurrent writer adds its own column to the original schema, consuming the next field id. + table.updateSchema().addColumn("added_by_concurrent", Types.StringType.get()).commit(); + table.refresh(); + + replace.commitTransaction(); + table.refresh(); + + int committedReplaceColumnId = table.schema().findField("added_by_replace").fieldId(); + + // The replacement column keeps the field id the staged data files were written against, even + // though a concurrent schema change consumed that id in the refreshed base's schema. + assertThat(committedReplaceColumnId) + .as("Replacement column field id must be stable across rebase") + .isEqualTo(stagedReplaceColumnId); + } + + /** + * Exercises a replace whose table is concurrently dropped after the transaction is staged. The + * refreshed base is null (TestTables.refresh returns null for a missing table), so there is + * nothing to rebase onto; the replace must commit the staged replacement as a create rather than + * attempting to rebuild on a null base. + */ + @TestTemplate + public void testReplaceTransactionConcurrentlyDroppedTable() { + table.newAppend().appendFile(FILE_A).commit(); + table.refresh(); + assertThat(version()).isEqualTo(1); + + // Begin the replace against the existing table; base is captured non-null. + Transaction replace = TestTables.beginReplace(tableDir, "test", table.schema(), table.spec()); + replace.newAppend().appendFile(FILE_B).commit(); + + // Concurrent drop: the underlying table disappears before the replace commits, so refresh() + // returns a null base. + TestTables.clearTables(); + + replace.commitTransaction(); + + TableMetadata after = TestTables.readMetadata("test"); + assertThat(after).isNotNull(); + validateSnapshot(null, after.currentSnapshot(), FILE_B); + } + @TestTemplate public void testReplaceToCreateAndAppend() throws IOException { // this table doesn't exist. diff --git a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java index 6c6949ce42ae..686d9f86a6ae 100644 --- a/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java +++ b/core/src/test/java/org/apache/iceberg/catalog/CatalogTests.java @@ -2933,12 +2933,18 @@ public void testConcurrentReplaceTransactionSchemaConflict() { assertUUIDsMatch(original, afterFirstReplace); assertFiles(afterFirstReplace, FILE_B); - // even though the new schema is identical, the assertion that the last assigned id has not - // changed will fail - assertThatThrownBy(secondReplace::commitTransaction) - .isInstanceOf(CommitFailedException.class) - .hasMessageStartingWith( - "Commit failed: Requirement failed: last assigned field id changed"); + // The second replace rebases onto the first replace's committed metadata. Because the + // replacement schema is identical, the rebuilt replacement reuses the existing schema and adds + // no conflicting field ids, so the replace succeeds as last-writer-wins instead of failing on a + // spurious last-assigned-field-id requirement. + secondReplace.commitTransaction(); + + Table afterSecondReplace = catalog.loadTable(TABLE); + assertThat(afterSecondReplace.schema().asStruct()) + .as("Table schema should match the replace schema") + .isEqualTo(REPLACE_SCHEMA.asStruct()); + assertUUIDsMatch(original, afterSecondReplace); + assertFiles(afterSecondReplace, FILE_C); } @Test @@ -3053,12 +3059,18 @@ public void testConcurrentReplaceTransactionPartitionSpecConflict() { assertUUIDsMatch(original, afterFirstReplace); assertFiles(afterFirstReplace, FILE_B); - // even though the new spec is identical, the assertion that the last assigned id has not - // changed will fail - assertThatThrownBy(secondReplace::commitTransaction) - .isInstanceOf(CommitFailedException.class) - .hasMessageStartingWith( - "Commit failed: Requirement failed: last assigned partition id changed"); + // The second replace rebases onto the first replace's committed metadata. Because the + // replacement spec is identical, the rebuilt replacement reuses the existing spec and adds no + // conflicting partition ids, so the replace succeeds as last-writer-wins instead of failing on + // a spurious last-assigned-partition-id requirement. + secondReplace.commitTransaction(); + + Table afterSecondReplace = catalog.loadTable(TABLE); + assertThat(afterSecondReplace.spec().fields()) + .as("Table spec should match the new spec") + .isEqualTo(TABLE_SPEC.fields()); + assertUUIDsMatch(original, afterSecondReplace); + assertFiles(afterSecondReplace, FILE_C); } @Test diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCreateReplaceTable.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCreateReplaceTable.java index c3f66c0286d0..4d9f0d463ec1 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCreateReplaceTable.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveCreateReplaceTable.java @@ -235,11 +235,12 @@ public void testReplaceTableTxnTableModifiedConcurrently() { txn.updateProperties().set("prop", "value").commit(); txn.commitTransaction(); - // the replace should still succeed + // the replace rebases onto the concurrent change, so the concurrently-set property is kept + // alongside the replace's own property table = catalog.loadTable(TABLE_IDENTIFIER); assertThat(table.properties()) .as("Table props should be updated") - .doesNotContainKey("another-prop") + .containsEntry("another-prop", "another-value") .containsEntry("prop", "value"); } @@ -329,9 +330,11 @@ public void testCreateOrReplaceTableTxnTableCreatedConcurrently() { txn.commitTransaction(); Table table = catalog.loadTable(TABLE_IDENTIFIER); - assertThat(table.spec()) - .as("Partition spec should match") - .isEqualTo(PartitionSpec.unpartitioned()); + // rebuilding the replacement on the concurrently-created table can reassign the spec id, so the + // spec is unpartitioned by content even though it is not object-equal to the canonical instance + assertThat(table.spec().isUnpartitioned()) + .as("Partition spec should be unpartitioned") + .isTrue(); assertThat(table.properties()).as("Table props should match").containsEntry("prop", "value"); }