Skip to content

Commit 994f4e4

Browse files
committed
test: make DeleteWith callbacks thread-safe
The cleanup tests collected deleted paths via deleted_files.push_back inside the DeleteWith callback. With the new parallel DeleteFiles, that callback can now run concurrently from multiple worker threads, racing on a non-thread-safe std::vector. The race surfaced as a flaky failure in CI's ASAN/UBSAN job for ExpireSnapshotsCleanupTest.MetadataOnlySkipsDataDeletion: the test expected three deleted entries, but one was lost to the race. Local serial-run timing happened to mask it. Wrap each test's collector with a per-test std::mutex so the push_back is serialized. Functionally equivalent for sequential deletes; correct under parallel deletes. 20-iteration loop is now green locally.
1 parent dc63f45 commit 994f4e4

1 file changed

Lines changed: 61 additions & 24 deletions

File tree

src/iceberg/test/expire_snapshots_test.cc

Lines changed: 61 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include "iceberg/update/expire_snapshots.h"
2121

22+
#include <mutex>
2223
#include <optional>
2324
#include <string>
2425
#include <vector>
@@ -244,10 +245,13 @@ TEST_F(ExpireSnapshotsTest, ExpireOlderThan) {
244245
}
245246

246247
TEST_F(ExpireSnapshotsTest, FinalizeRequiresCommittedMetadata) {
248+
std::mutex deleted_files_mu;
247249
std::vector<std::string> deleted_files;
248250
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
249-
update->DeleteWith(
250-
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
251+
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
252+
std::lock_guard<std::mutex> lock(deleted_files_mu);
253+
deleted_files.push_back(path);
254+
});
251255

252256
// Apply first so apply_result_ is cached
253257
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
@@ -262,11 +266,14 @@ TEST_F(ExpireSnapshotsTest, FinalizeRequiresCommittedMetadata) {
262266
}
263267

264268
TEST_F(ExpireSnapshotsTest, CleanupNoneSkipsDeletion) {
269+
std::mutex deleted_files_mu;
265270
std::vector<std::string> deleted_files;
266271
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
267272
update->CleanupLevel(CleanupLevel::kNone);
268-
update->DeleteWith(
269-
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
273+
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
274+
std::lock_guard<std::mutex> lock(deleted_files_mu);
275+
deleted_files.push_back(path);
276+
});
270277

271278
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
272279
EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1);
@@ -278,10 +285,13 @@ TEST_F(ExpireSnapshotsTest, CleanupNoneSkipsDeletion) {
278285
}
279286

280287
TEST_F(ExpireSnapshotsTest, FinalizeSkippedOnCommitError) {
288+
std::mutex deleted_files_mu;
281289
std::vector<std::string> deleted_files;
282290
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
283-
update->DeleteWith(
284-
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
291+
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
292+
std::lock_guard<std::mutex> lock(deleted_files_mu);
293+
deleted_files.push_back(path);
294+
});
285295

286296
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
287297
EXPECT_EQ(result.snapshot_ids_to_remove.size(), 1);
@@ -294,11 +304,14 @@ TEST_F(ExpireSnapshotsTest, FinalizeSkippedOnCommitError) {
294304
}
295305

296306
TEST_F(ExpireSnapshotsTest, FinalizeSkipsWhenNothingExpired) {
307+
std::mutex deleted_files_mu;
297308
std::vector<std::string> deleted_files;
298309
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
299310
update->RetainLast(2);
300-
update->DeleteWith(
301-
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
311+
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
312+
std::lock_guard<std::mutex> lock(deleted_files_mu);
313+
deleted_files.push_back(path);
314+
});
302315

303316
ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
304317
EXPECT_TRUE(result.snapshot_ids_to_remove.empty());
@@ -348,10 +361,13 @@ TEST_F(ExpireSnapshotsCleanupTest, IgnoresExpiredDeleteManifestReadFailures) {
348361
kCurrentSequenceNumber, {});
349362
RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path);
350363

364+
std::mutex deleted_files_mu;
351365
std::vector<std::string> deleted_files;
352366
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
353-
update->DeleteWith(
354-
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
367+
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
368+
std::lock_guard<std::mutex> lock(deleted_files_mu);
369+
deleted_files.push_back(path);
370+
});
355371

356372
EXPECT_THAT(update->Commit(), IsOk());
357373
EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path,
@@ -386,10 +402,13 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredFiles) {
386402
kCurrentSequenceNumber, {});
387403
RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path);
388404

405+
std::mutex deleted_files_mu;
389406
std::vector<std::string> deleted_files;
390407
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
391-
update->DeleteWith(
392-
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
408+
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
409+
std::lock_guard<std::mutex> lock(deleted_files_mu);
410+
deleted_files.push_back(path);
411+
});
393412

394413
EXPECT_THAT(update->Commit(), IsOk());
395414
EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_file_path,
@@ -424,11 +443,14 @@ TEST_F(ExpireSnapshotsCleanupTest, MetadataOnlySkipsDataDeletion) {
424443
kCurrentSequenceNumber, {});
425444
RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path);
426445

446+
std::mutex deleted_files_mu;
427447
std::vector<std::string> deleted_files;
428448
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
429449
update->CleanupLevel(CleanupLevel::kMetadataOnly);
430-
update->DeleteWith(
431-
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
450+
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
451+
std::lock_guard<std::mutex> lock(deleted_files_mu);
452+
deleted_files.push_back(path);
453+
});
432454

433455
EXPECT_THAT(update->Commit(), IsOk());
434456
EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_manifest_path,
@@ -462,10 +484,13 @@ TEST_F(ExpireSnapshotsCleanupTest, RetainedDeleteManifestSkipsDataDeletion) {
462484
kCurrentSequenceNumber, {current_delete_manifest});
463485
RewriteTableWithManifestLists(expired_manifest_list_path, current_manifest_list_path);
464486

487+
std::mutex deleted_files_mu;
465488
std::vector<std::string> deleted_files;
466489
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
467-
update->DeleteWith(
468-
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
490+
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
491+
std::lock_guard<std::mutex> lock(deleted_files_mu);
492+
deleted_files.push_back(path);
493+
});
469494

470495
EXPECT_THAT(update->Commit(), IsOk());
471496
EXPECT_THAT(deleted_files, testing::UnorderedElementsAre(expired_data_manifest_path,
@@ -487,10 +512,13 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredStats) {
487512
expired_manifest_list_path, current_manifest_list_path,
488513
{MakeStatisticsFile(kExpiredSnapshotId, expired_statistics_path)}, {});
489514

515+
std::mutex deleted_files_mu;
490516
std::vector<std::string> deleted_files;
491517
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
492-
update->DeleteWith(
493-
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
518+
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
519+
std::lock_guard<std::mutex> lock(deleted_files_mu);
520+
deleted_files.push_back(path);
521+
});
494522

495523
EXPECT_THAT(update->Commit(), IsOk());
496524
EXPECT_THAT(deleted_files, testing::Contains(expired_statistics_path));
@@ -513,10 +541,13 @@ TEST_F(ExpireSnapshotsCleanupTest, KeepsReusedStats) {
513541
MakeStatisticsFile(kCurrentSnapshotId, reused_statistics_path)},
514542
{});
515543

544+
std::mutex deleted_files_mu;
516545
std::vector<std::string> deleted_files;
517546
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
518-
update->DeleteWith(
519-
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
547+
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
548+
std::lock_guard<std::mutex> lock(deleted_files_mu);
549+
deleted_files.push_back(path);
550+
});
520551

521552
EXPECT_THAT(update->Commit(), IsOk());
522553
EXPECT_THAT(deleted_files, testing::Not(testing::Contains(reused_statistics_path)));
@@ -538,10 +569,13 @@ TEST_F(ExpireSnapshotsCleanupTest, DeletesExpiredPartitionStats) {
538569
expired_manifest_list_path, current_manifest_list_path, {},
539570
{MakePartitionStatisticsFile(kExpiredSnapshotId, expired_statistics_path)});
540571

572+
std::mutex deleted_files_mu;
541573
std::vector<std::string> deleted_files;
542574
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
543-
update->DeleteWith(
544-
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
575+
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
576+
std::lock_guard<std::mutex> lock(deleted_files_mu);
577+
deleted_files.push_back(path);
578+
});
545579

546580
EXPECT_THAT(update->Commit(), IsOk());
547581
EXPECT_THAT(deleted_files, testing::Contains(expired_statistics_path));
@@ -564,10 +598,13 @@ TEST_F(ExpireSnapshotsCleanupTest, KeepsReusedPartitionStats) {
564598
{MakePartitionStatisticsFile(kExpiredSnapshotId, reused_statistics_path),
565599
MakePartitionStatisticsFile(kCurrentSnapshotId, reused_statistics_path)});
566600

601+
std::mutex deleted_files_mu;
567602
std::vector<std::string> deleted_files;
568603
ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewExpireSnapshots());
569-
update->DeleteWith(
570-
[&deleted_files](const std::string& path) { deleted_files.push_back(path); });
604+
update->DeleteWith([&deleted_files, &deleted_files_mu](const std::string& path) {
605+
std::lock_guard<std::mutex> lock(deleted_files_mu);
606+
deleted_files.push_back(path);
607+
});
571608

572609
EXPECT_THAT(update->Commit(), IsOk());
573610
EXPECT_THAT(deleted_files, testing::Not(testing::Contains(reused_statistics_path)));

0 commit comments

Comments
 (0)