Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/iceberg/test/merging_snapshot_update_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,38 @@ TEST_F(MergingSnapshotUpdateTest, AddManifestCopiesManifestWithAssignedSnapshotI
EXPECT_NE(data_manifests[0].manifest_path, path);
}

TEST_F(MergingSnapshotUpdateTest, AddManifestRetryCopiesManifestAgain) {
auto path = table_location_ + "/metadata/retry-input.avro";
ICEBERG_UNWRAP_OR_FAIL(auto manifest, WriteManifest(path, {file_a_}));
manifest.added_snapshot_id = 12345;

ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
EXPECT_THAT(op->AppendManifest(manifest), IsOk());

ICEBERG_UNWRAP_OR_FAIL(auto first_apply, static_cast<SnapshotUpdate&>(*op).Apply());
SnapshotCache first_snapshot_cache(first_apply.snapshot.get());
ICEBERG_UNWRAP_OR_FAIL(auto first_manifests,
first_snapshot_cache.DataManifests(file_io_));
ASSERT_EQ(first_manifests.size(), 1U);
EXPECT_NE(first_manifests[0].manifest_path, path);

ICEBERG_UNWRAP_OR_FAIL(auto second_apply, static_cast<SnapshotUpdate&>(*op).Apply());
SnapshotCache second_snapshot_cache(second_apply.snapshot.get());
ICEBERG_UNWRAP_OR_FAIL(auto second_manifests,
second_snapshot_cache.DataManifests(file_io_));
ASSERT_EQ(second_manifests.size(), 1U);
EXPECT_NE(second_manifests[0].manifest_path, path);
EXPECT_NE(second_manifests[0].manifest_path, first_manifests[0].manifest_path);

std::vector<ManifestFile> second_manifest_vector(second_manifests.begin(),
second_manifests.end());
ICEBERG_UNWRAP_OR_FAIL(auto entries,
ReadAllEntries(second_manifest_vector, *table_->metadata()));
ASSERT_EQ(entries.size(), 1U);
ASSERT_NE(entries[0].data_file, nullptr);
EXPECT_EQ(entries[0].data_file->file_path, file_a_->file_path);
}

TEST_F(MergingSnapshotUpdateTest, AddManifestRejectsManifestWithFirstRowId) {
auto path = table_location_ + "/metadata/rowid.avro";
ICEBERG_UNWRAP_OR_FAIL(auto manifest, WriteManifest(path, {file_a_}));
Expand Down
18 changes: 14 additions & 4 deletions src/iceberg/update/merging_snapshot_update.cc
Original file line number Diff line number Diff line change
Expand Up @@ -649,20 +649,23 @@ Status MergingSnapshotUpdate::AddManifest(ManifestFile manifest) {
appended_manifests_summary_.AddedManifest(manifest);
append_manifests_.push_back(std::move(manifest));
} else {
ICEBERG_ASSIGN_OR_RAISE(auto copied, CopyManifest(manifest));
ICEBERG_ASSIGN_OR_RAISE(auto copied, CopyManifest(manifest, /*update_summary=*/true));
append_manifests_to_copy_.push_back(std::move(manifest));
rewritten_append_manifests_.push_back(std::move(copied));
}
return {};
}

Result<ManifestFile> MergingSnapshotUpdate::CopyManifest(const ManifestFile& manifest) {
Result<ManifestFile> MergingSnapshotUpdate::CopyManifest(const ManifestFile& manifest,
bool update_summary) {
const TableMetadata& current = base();
ICEBERG_ASSIGN_OR_RAISE(auto schema, SnapshotUtil::SchemaFor(current, target_branch()));
ICEBERG_ASSIGN_OR_RAISE(auto spec,
current.PartitionSpecById(manifest.partition_spec_id));
std::string path = ManifestPath();
return CopyAppendManifest(manifest, ctx_->table->io(), schema, spec, SnapshotId(), path,
current.format_version, &appended_manifests_summary_);
current.format_version,
update_summary ? &appended_manifests_summary_ : nullptr);
}

// -------------------------------------------------------------------------
Expand Down Expand Up @@ -890,6 +893,13 @@ Result<std::vector<ManifestFile>> MergingSnapshotUpdate::Apply(

// Step 4: Write (or retrieve cached) new data manifests.
ICEBERG_ASSIGN_OR_RAISE(auto written_data_manifests, WriteNewDataManifests());
if (rewritten_append_manifests_.empty() && !append_manifests_to_copy_.empty()) {
for (const auto& manifest : append_manifests_to_copy_) {
ICEBERG_ASSIGN_OR_RAISE(auto copied, CopyManifest(manifest,
/*update_summary=*/false));
rewritten_append_manifests_.push_back(std::move(copied));
}
}

// Incorporate append manifests (from AddManifest), stamping each with the
// current snapshot ID. append_manifests_ are used directly (inherit path);
Expand Down Expand Up @@ -987,7 +997,7 @@ Status MergingSnapshotUpdate::CleanUncommittedAppends(
DeleteUncommitted(cached_new_delete_manifests_, committed, /*clear=*/true));
// rewritten_append_manifests_ are always owned by the table.
ICEBERG_RETURN_UNEXPECTED(
DeleteUncommitted(rewritten_append_manifests_, committed, /*clear=*/false));
DeleteUncommitted(rewritten_append_manifests_, committed, /*clear=*/true));

// append_manifests_ are only owned by the table if the commit succeeded.
if (!committed.empty()) {
Expand Down
3 changes: 2 additions & 1 deletion src/iceberg/update/merging_snapshot_update.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ class ICEBERG_EXPORT MergingSnapshotUpdate : public SnapshotUpdate {

/// \brief Copy a manifest with the current snapshot ID, for use when snapshot
/// ID inheritance is not possible.
Result<ManifestFile> CopyManifest(const ManifestFile& manifest);
Result<ManifestFile> CopyManifest(const ManifestFile& manifest, bool update_summary);

Status AddDeleteFile(std::shared_ptr<DataFile> file,
std::optional<int64_t> data_sequence_number);
Expand Down Expand Up @@ -371,6 +371,7 @@ class ICEBERG_EXPORT MergingSnapshotUpdate : public SnapshotUpdate {
// Manifests passed via AddManifest(): inherit path (no copy needed) and
// rewrite path (must be copied with the current snapshot ID).
std::vector<ManifestFile> append_manifests_;
std::vector<ManifestFile> append_manifests_to_copy_;
std::vector<ManifestFile> rewritten_append_manifests_;

// Set to true when new files are staged after the cache was populated, so the
Expand Down
Loading