From b12c59dae84ca880446186a52e92c8740379f097 Mon Sep 17 00:00:00 2001 From: Zehua Zou Date: Mon, 8 Jun 2026 17:55:35 +0800 Subject: [PATCH 1/2] fix: merging snapshot update should not contain deleted file during retry process --- .../test/merging_snapshot_update_test.cc | 32 +++++++++++++++++++ src/iceberg/update/merging_snapshot_update.cc | 18 ++++++++--- src/iceberg/update/merging_snapshot_update.h | 3 +- 3 files changed, 48 insertions(+), 5 deletions(-) diff --git a/src/iceberg/test/merging_snapshot_update_test.cc b/src/iceberg/test/merging_snapshot_update_test.cc index 69ee10b91..f1e674a71 100644 --- a/src/iceberg/test/merging_snapshot_update_test.cc +++ b/src/iceberg/test/merging_snapshot_update_test.cc @@ -882,6 +882,38 @@ TEST_F(MergingSnapshotUpdateTest, AddManifestCopiesManifestWithAssignedSnapshotI EXPECT_NE(data_manifests[0].manifest_path, path); } +TEST_F(MergingSnapshotUpdateTest, AddManifestRetryCopiesManifestAgain) { + auto path = table_location_ + "/metadata/retry-input.avro"; + ICEBERG_UNWRAP_OR_FAIL(auto manifest, WriteManifest(path, {file_a_})); + manifest.added_snapshot_id = 12345; + + ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend()); + EXPECT_THAT(op->AppendManifest(manifest), IsOk()); + + ICEBERG_UNWRAP_OR_FAIL(auto first_apply, static_cast(*op).Apply()); + SnapshotCache first_snapshot_cache(first_apply.snapshot.get()); + ICEBERG_UNWRAP_OR_FAIL(auto first_manifests, + first_snapshot_cache.DataManifests(file_io_)); + ASSERT_EQ(first_manifests.size(), 1U); + EXPECT_NE(first_manifests[0].manifest_path, path); + + ICEBERG_UNWRAP_OR_FAIL(auto second_apply, static_cast(*op).Apply()); + SnapshotCache second_snapshot_cache(second_apply.snapshot.get()); + ICEBERG_UNWRAP_OR_FAIL(auto second_manifests, + second_snapshot_cache.DataManifests(file_io_)); + ASSERT_EQ(second_manifests.size(), 1U); + EXPECT_NE(second_manifests[0].manifest_path, path); + EXPECT_NE(second_manifests[0].manifest_path, first_manifests[0].manifest_path); + + std::vector second_manifest_vector(second_manifests.begin(), + second_manifests.end()); + ICEBERG_UNWRAP_OR_FAIL( + auto entries, ReadAllEntries(second_manifest_vector, *table_->metadata())); + ASSERT_EQ(entries.size(), 1U); + ASSERT_NE(entries[0].data_file, nullptr); + EXPECT_EQ(entries[0].data_file->file_path, file_a_->file_path); +} + TEST_F(MergingSnapshotUpdateTest, AddManifestRejectsManifestWithFirstRowId) { auto path = table_location_ + "/metadata/rowid.avro"; ICEBERG_UNWRAP_OR_FAIL(auto manifest, WriteManifest(path, {file_a_})); diff --git a/src/iceberg/update/merging_snapshot_update.cc b/src/iceberg/update/merging_snapshot_update.cc index a0d882d14..66413eef2 100644 --- a/src/iceberg/update/merging_snapshot_update.cc +++ b/src/iceberg/update/merging_snapshot_update.cc @@ -649,20 +649,23 @@ Status MergingSnapshotUpdate::AddManifest(ManifestFile manifest) { appended_manifests_summary_.AddedManifest(manifest); append_manifests_.push_back(std::move(manifest)); } else { - ICEBERG_ASSIGN_OR_RAISE(auto copied, CopyManifest(manifest)); + ICEBERG_ASSIGN_OR_RAISE(auto copied, CopyManifest(manifest, /*update_summary=*/true)); + append_manifests_to_copy_.push_back(std::move(manifest)); rewritten_append_manifests_.push_back(std::move(copied)); } return {}; } -Result MergingSnapshotUpdate::CopyManifest(const ManifestFile& manifest) { +Result MergingSnapshotUpdate::CopyManifest(const ManifestFile& manifest, + bool update_summary) { const TableMetadata& current = base(); ICEBERG_ASSIGN_OR_RAISE(auto schema, SnapshotUtil::SchemaFor(current, target_branch())); ICEBERG_ASSIGN_OR_RAISE(auto spec, current.PartitionSpecById(manifest.partition_spec_id)); std::string path = ManifestPath(); return CopyAppendManifest(manifest, ctx_->table->io(), schema, spec, SnapshotId(), path, - current.format_version, &appended_manifests_summary_); + current.format_version, + update_summary ? &appended_manifests_summary_ : nullptr); } // ------------------------------------------------------------------------- @@ -890,6 +893,13 @@ Result> MergingSnapshotUpdate::Apply( // Step 4: Write (or retrieve cached) new data manifests. ICEBERG_ASSIGN_OR_RAISE(auto written_data_manifests, WriteNewDataManifests()); + if (rewritten_append_manifests_.empty() && !append_manifests_to_copy_.empty()) { + for (const auto& manifest : append_manifests_to_copy_) { + ICEBERG_ASSIGN_OR_RAISE(auto copied, CopyManifest(manifest, + /*update_summary=*/false)); + rewritten_append_manifests_.push_back(std::move(copied)); + } + } // Incorporate append manifests (from AddManifest), stamping each with the // current snapshot ID. append_manifests_ are used directly (inherit path); @@ -987,7 +997,7 @@ Status MergingSnapshotUpdate::CleanUncommittedAppends( DeleteUncommitted(cached_new_delete_manifests_, committed, /*clear=*/true)); // rewritten_append_manifests_ are always owned by the table. ICEBERG_RETURN_UNEXPECTED( - DeleteUncommitted(rewritten_append_manifests_, committed, /*clear=*/false)); + DeleteUncommitted(rewritten_append_manifests_, committed, /*clear=*/true)); // append_manifests_ are only owned by the table if the commit succeeded. if (!committed.empty()) { diff --git a/src/iceberg/update/merging_snapshot_update.h b/src/iceberg/update/merging_snapshot_update.h index 5d4e128e9..a8b008b54 100644 --- a/src/iceberg/update/merging_snapshot_update.h +++ b/src/iceberg/update/merging_snapshot_update.h @@ -318,7 +318,7 @@ class ICEBERG_EXPORT MergingSnapshotUpdate : public SnapshotUpdate { /// \brief Copy a manifest with the current snapshot ID, for use when snapshot /// ID inheritance is not possible. - Result CopyManifest(const ManifestFile& manifest); + Result CopyManifest(const ManifestFile& manifest, bool update_summary); Status AddDeleteFile(std::shared_ptr file, std::optional data_sequence_number); @@ -371,6 +371,7 @@ class ICEBERG_EXPORT MergingSnapshotUpdate : public SnapshotUpdate { // Manifests passed via AddManifest(): inherit path (no copy needed) and // rewrite path (must be copied with the current snapshot ID). std::vector append_manifests_; + std::vector append_manifests_to_copy_; std::vector rewritten_append_manifests_; // Set to true when new files are staged after the cache was populated, so the From 513dfc4e1594c93e8067d189e859f149c96e00b4 Mon Sep 17 00:00:00 2001 From: Zehua Zou Date: Tue, 9 Jun 2026 10:53:44 +0800 Subject: [PATCH 2/2] fix format --- src/iceberg/test/merging_snapshot_update_test.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/iceberg/test/merging_snapshot_update_test.cc b/src/iceberg/test/merging_snapshot_update_test.cc index f1e674a71..1b50a124e 100644 --- a/src/iceberg/test/merging_snapshot_update_test.cc +++ b/src/iceberg/test/merging_snapshot_update_test.cc @@ -893,22 +893,22 @@ TEST_F(MergingSnapshotUpdateTest, AddManifestRetryCopiesManifestAgain) { ICEBERG_UNWRAP_OR_FAIL(auto first_apply, static_cast(*op).Apply()); SnapshotCache first_snapshot_cache(first_apply.snapshot.get()); ICEBERG_UNWRAP_OR_FAIL(auto first_manifests, - first_snapshot_cache.DataManifests(file_io_)); + first_snapshot_cache.DataManifests(file_io_)); ASSERT_EQ(first_manifests.size(), 1U); EXPECT_NE(first_manifests[0].manifest_path, path); ICEBERG_UNWRAP_OR_FAIL(auto second_apply, static_cast(*op).Apply()); SnapshotCache second_snapshot_cache(second_apply.snapshot.get()); ICEBERG_UNWRAP_OR_FAIL(auto second_manifests, - second_snapshot_cache.DataManifests(file_io_)); + second_snapshot_cache.DataManifests(file_io_)); ASSERT_EQ(second_manifests.size(), 1U); EXPECT_NE(second_manifests[0].manifest_path, path); EXPECT_NE(second_manifests[0].manifest_path, first_manifests[0].manifest_path); std::vector second_manifest_vector(second_manifests.begin(), - second_manifests.end()); - ICEBERG_UNWRAP_OR_FAIL( - auto entries, ReadAllEntries(second_manifest_vector, *table_->metadata())); + second_manifests.end()); + ICEBERG_UNWRAP_OR_FAIL(auto entries, + ReadAllEntries(second_manifest_vector, *table_->metadata())); ASSERT_EQ(entries.size(), 1U); ASSERT_NE(entries[0].data_file, nullptr); EXPECT_EQ(entries[0].data_file->file_path, file_a_->file_path);