From 2f67046e68eef5c219ca1d19451ad30374077bb0 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Sat, 27 Jun 2026 11:55:00 +0200 Subject: [PATCH 1/7] fix(hadoop): use codec-aware metadata filenames --- catalog/hadoop/hadoop.go | 33 +++++-- catalog/hadoop/hadoop_test.go | 164 ++++++++++++++++++++++++++++++++++ 2 files changed, 191 insertions(+), 6 deletions(-) diff --git a/catalog/hadoop/hadoop.go b/catalog/hadoop/hadoop.go index ec99dfc9a..9096fc104 100644 --- a/catalog/hadoop/hadoop.go +++ b/catalog/hadoop/hadoop.go @@ -290,6 +290,20 @@ func (c *Catalog) metadataFilePath(ident table.Identifier, version int) string { return joinPath(c.isLocal, c.metadataDir(ident), fmt.Sprintf("v%d.metadata.json", version)) } +func (c *Catalog) metadataFilePathForCompression(ident table.Identifier, version int, compression string) (string, error) { + var suffix string + switch compression { + case table.MetadataCompressionCodecNone: + suffix = ".metadata.json" + case table.MetadataCompressionCodecGzip: + suffix = ".gz.metadata.json" + default: + return "", fmt.Errorf("unsupported write metadata compression codec: %s", compression) + } + + return joinPath(c.isLocal, c.metadataDir(ident), fmt.Sprintf("v%d%s", version, suffix)), nil +} + func (c *Catalog) versionHintPath(ident table.Identifier) string { return joinPath(c.isLocal, c.metadataDir(ident), "version-hint.text") } @@ -525,9 +539,6 @@ func (c *Catalog) CreateTable(ctx context.Context, ident table.Identifier, sc *i } version := 1 - metaPath := c.metadataFilePath(ident, version) - tempPath := joinPath(c.isLocal, metaDir, uuid.New().String()+".metadata.json") - compression := table.MetadataCompressionDefault if cfg.Properties != nil { if v, ok := cfg.Properties[table.MetadataCompressionKey]; ok { @@ -535,6 +546,13 @@ func (c *Catalog) CreateTable(ctx context.Context, ident table.Identifier, sc *i } } + metaPath, err := c.metadataFilePathForCompression(ident, version, compression) + if err != nil { + return nil, fmt.Errorf("hadoop catalog: failed to determine metadata file path: %w", err) + } + + tempPath := joinPath(c.isLocal, metaDir, uuid.New().String()+".metadata.json") + if err := internal.WriteTableMetadata(metadata, c.filesystem, tempPath, compression); err != nil { _ = c.filesystem.Remove(tempPath) @@ -655,10 +673,13 @@ func (c *Catalog) CommitTable(ctx context.Context, ident table.Identifier, reqs return nil, "", fmt.Errorf("hadoop catalog: failed to create metadata directory: %w", err) } - newMetaPath := c.metadataFilePath(ident, newVersion) - tempPath := joinPath(c.isLocal, metaDir, uuid.New().String()+".metadata.json") - compression := updated.Properties().Get(table.MetadataCompressionKey, table.MetadataCompressionDefault) + newMetaPath, err := c.metadataFilePathForCompression(ident, newVersion, compression) + if err != nil { + return nil, "", fmt.Errorf("hadoop catalog: failed to determine metadata file path: %w", err) + } + + tempPath := joinPath(c.isLocal, metaDir, uuid.New().String()+".metadata.json") if err := internal.WriteTableMetadata(updated, c.filesystem, tempPath, compression); err != nil { _ = c.filesystem.Remove(tempPath) diff --git a/catalog/hadoop/hadoop_test.go b/catalog/hadoop/hadoop_test.go index d884ccab7..049c5332d 100644 --- a/catalog/hadoop/hadoop_test.go +++ b/catalog/hadoop/hadoop_test.go @@ -451,6 +451,26 @@ func (s *HadoopCatalogTestSuite) TestMetadataFilePath() { s.Equal(filepath.Join(s.warehouse, "ns", "tbl", "metadata", "v42.metadata.json"), path) } +func (s *HadoopCatalogTestSuite) TestMetadataFilePathForCompression() { + ident := []string{"ns", "tbl"} + + path, err := s.cat.metadataFilePathForCompression(ident, 1, table.MetadataCompressionCodecNone) + s.Require().NoError(err) + s.Equal(filepath.Join(s.warehouse, "ns", "tbl", "metadata", "v1.metadata.json"), path) + + path, err = s.cat.metadataFilePathForCompression(ident, 1, table.MetadataCompressionCodecGzip) + s.Require().NoError(err) + s.Equal(filepath.Join(s.warehouse, "ns", "tbl", "metadata", "v1.gz.metadata.json"), path) + + path, err = s.cat.metadataFilePathForCompression(ident, 1, table.MetadataCompressionCodecZstd) + s.Require().NoError(err) + s.Equal(filepath.Join(s.warehouse, "ns", "tbl", "metadata", "v1.zstd.metadata.json"), path) + + _, err = s.cat.metadataFilePathForCompression(ident, 1, "brotli") + s.Require().Error(err) + s.Contains(err.Error(), "unsupported write metadata compression codec") +} + func (s *HadoopCatalogTestSuite) TestVersionHintPath() { path := s.cat.versionHintPath([]string{"ns", "tbl"}) s.Equal(filepath.Join(s.warehouse, "ns", "tbl", "metadata", "version-hint.text"), path) @@ -473,6 +493,8 @@ func (s *HadoopCatalogTestSuite) TestVersionPatternMatches() { "v100.metadata.json", "v1.gz.metadata.json", "v42.gz.metadata.json", + "v1.zstd.metadata.json", + "v42.zstd.metadata.json", } for _, name := range names { @@ -564,6 +586,15 @@ func (s *HadoopCatalogTestSuite) TestIsTableDirWithGzipMetadata() { s.True(s.requireIsTableDir(tableDir)) } +func (s *HadoopCatalogTestSuite) TestIsTableDirWithZstdMetadata() { + tableDir := filepath.Join(s.warehouse, "ns", "tbl") + metaDir := filepath.Join(tableDir, "metadata") + s.Require().NoError(os.MkdirAll(metaDir, 0o755)) + s.Require().NoError(os.WriteFile(filepath.Join(metaDir, "v1.zstd.metadata.json"), nil, 0o644)) + + s.True(isTableDir(s.cat.filesystem, tableDir)) +} + func (s *HadoopCatalogTestSuite) TestIsTableDirNonExistentPath() { s.False(s.requireIsTableDir(filepath.Join(s.warehouse, "does", "not", "exist"))) } @@ -778,6 +809,33 @@ func (s *HadoopCatalogTestSuite) TestFindVersionMixedGzipAndPlain() { s.Equal(3, ver) } +func (s *HadoopCatalogTestSuite) TestFindVersionZstdOnlyWithHint() { + ident := []string{"ns", "tbl"} + dir := s.cat.metadataDir(ident) + s.Require().NoError(os.MkdirAll(dir, 0o755)) + s.Require().NoError(os.WriteFile(filepath.Join(dir, "v1.zstd.metadata.json"), nil, 0o644)) + s.Require().NoError(os.WriteFile(filepath.Join(dir, "v2.zstd.metadata.json"), nil, 0o644)) + s.cat.writeVersionHint(ident, 1) + + ver, err := s.cat.findVersion(ident) + s.Require().NoError(err) + s.Equal(2, ver) +} + +func (s *HadoopCatalogTestSuite) TestFindVersionMixedZstdAndPlain() { + ident := []string{"ns", "tbl"} + dir := s.cat.metadataDir(ident) + s.Require().NoError(os.MkdirAll(dir, 0o755)) + s.createMetadataFile(ident, 1) + s.createMetadataFile(ident, 2) + s.Require().NoError(os.WriteFile(filepath.Join(dir, "v3.zstd.metadata.json"), nil, 0o644)) + s.cat.writeVersionHint(ident, 1) + + ver, err := s.cat.findVersion(ident) + s.Require().NoError(err) + s.Equal(3, ver) +} + // CreateNamespace tests func (s *HadoopCatalogTestSuite) TestCreateNamespace() { @@ -1402,6 +1460,50 @@ func (s *HadoopCatalogTestSuite) TestCreateTableAndLoad() { s.Equal(len(created.Schema().Fields()), len(loaded.Schema().Fields())) } +func (s *HadoopCatalogTestSuite) TestCreateTableGzipMetadata() { + ctx := context.Background() + s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755)) + + props := iceberg.Properties{ + table.MetadataCompressionKey: table.MetadataCompressionCodecGzip, + } + + tbl, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, s.testSchema(), + catalog.WithProperties(props)) + s.Require().NoError(err) + + metaPath := filepath.Join(s.warehouse, "ns", "tbl", "metadata", "v1.gz.metadata.json") + s.FileExists(metaPath) + s.Equal(metaPath, tbl.MetadataLocation()) + + loaded, err := s.cat.LoadTable(ctx, []string{"ns", "tbl"}) + s.Require().NoError(err) + s.Equal(metaPath, loaded.MetadataLocation()) + s.Equal(tbl.Metadata().TableUUID(), loaded.Metadata().TableUUID()) +} + +func (s *HadoopCatalogTestSuite) TestCreateTableZstdMetadata() { + ctx := context.Background() + s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755)) + + props := iceberg.Properties{ + table.MetadataCompressionKey: table.MetadataCompressionCodecZstd, + } + + tbl, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, s.testSchema(), + catalog.WithProperties(props)) + s.Require().NoError(err) + + metaPath := filepath.Join(s.warehouse, "ns", "tbl", "metadata", "v1.zstd.metadata.json") + s.FileExists(metaPath) + s.Equal(metaPath, tbl.MetadataLocation()) + + loaded, err := s.cat.LoadTable(ctx, []string{"ns", "tbl"}) + s.Require().NoError(err) + s.Equal(metaPath, loaded.MetadataLocation()) + s.Equal(tbl.Metadata().TableUUID(), loaded.Metadata().TableUUID()) +} + func (s *HadoopCatalogTestSuite) TestCreateTableCustomLocation() { ctx := context.Background() s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755)) @@ -1966,6 +2068,68 @@ func (s *HadoopCatalogTestSuite) TestCommitTableSingleUpdate() { s.FileExists(metaLoc) } +func (s *HadoopCatalogTestSuite) TestCommitTableGzipMetadata() { + ctx := context.Background() + s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755)) + + props := iceberg.Properties{ + table.MetadataCompressionKey: table.MetadataCompressionCodecGzip, + } + + tbl, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, s.testSchema(), + catalog.WithProperties(props)) + s.Require().NoError(err) + + _, metaLoc, err := s.cat.CommitTable( + ctx, []string{"ns", "tbl"}, + []table.Requirement{ + table.AssertTableUUID(tbl.Metadata().TableUUID()), + }, + []table.Update{ + table.NewSetPropertiesUpdate(iceberg.Properties{"test.key": "test.value"}), + }, + ) + s.Require().NoError(err) + s.Equal(filepath.Join(s.warehouse, "ns", "tbl", "metadata", "v2.gz.metadata.json"), metaLoc) + s.FileExists(metaLoc) + + loaded, err := s.cat.LoadTable(ctx, []string{"ns", "tbl"}) + s.Require().NoError(err) + s.Equal(metaLoc, loaded.MetadataLocation()) + s.Equal("test.value", loaded.Properties()["test.key"]) +} + +func (s *HadoopCatalogTestSuite) TestCommitTableZstdMetadata() { + ctx := context.Background() + s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755)) + + props := iceberg.Properties{ + table.MetadataCompressionKey: table.MetadataCompressionCodecZstd, + } + + tbl, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, s.testSchema(), + catalog.WithProperties(props)) + s.Require().NoError(err) + + _, metaLoc, err := s.cat.CommitTable( + ctx, []string{"ns", "tbl"}, + []table.Requirement{ + table.AssertTableUUID(tbl.Metadata().TableUUID()), + }, + []table.Update{ + table.NewSetPropertiesUpdate(iceberg.Properties{"test.key": "test.value"}), + }, + ) + s.Require().NoError(err) + s.Equal(filepath.Join(s.warehouse, "ns", "tbl", "metadata", "v2.zstd.metadata.json"), metaLoc) + s.FileExists(metaLoc) + + loaded, err := s.cat.LoadTable(ctx, []string{"ns", "tbl"}) + s.Require().NoError(err) + s.Equal(metaLoc, loaded.MetadataLocation()) + s.Equal("test.value", loaded.Properties()["test.key"]) +} + func (s *HadoopCatalogTestSuite) TestCommitTableMultipleSequential() { ctx := context.Background() tbl := s.createTestTable("ns", "tbl") From 9ef6e8e43f269bb7a46d2c786b717465ca16aa69 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Sat, 27 Jun 2026 18:51:08 +0200 Subject: [PATCH 2/7] test(hadoop): use production metadata path helper --- catalog/hadoop/hadoop.go | 4 - catalog/hadoop/hadoop_test.go | 162 +++++++++++++++++++--------------- 2 files changed, 92 insertions(+), 74 deletions(-) diff --git a/catalog/hadoop/hadoop.go b/catalog/hadoop/hadoop.go index 9096fc104..343afda55 100644 --- a/catalog/hadoop/hadoop.go +++ b/catalog/hadoop/hadoop.go @@ -286,10 +286,6 @@ func (c *Catalog) metadataDir(ident table.Identifier) string { return joinPath(c.isLocal, c.tableToPath(ident), "metadata") } -func (c *Catalog) metadataFilePath(ident table.Identifier, version int) string { - return joinPath(c.isLocal, c.metadataDir(ident), fmt.Sprintf("v%d.metadata.json", version)) -} - func (c *Catalog) metadataFilePathForCompression(ident table.Identifier, version int, compression string) (string, error) { var suffix string switch compression { diff --git a/catalog/hadoop/hadoop_test.go b/catalog/hadoop/hadoop_test.go index 049c5332d..acb302f95 100644 --- a/catalog/hadoop/hadoop_test.go +++ b/catalog/hadoop/hadoop_test.go @@ -79,6 +79,33 @@ func (f *barrierRenameNoReplaceFS) RenameNoReplace(oldpath, newpath string) erro return f.LocalFS.RenameNoReplace(oldpath, newpath) } +type versionConflictOnCreateFS struct { + icebergio.LocalFS + + conflictPath string + + once sync.Once + conflictErr error +} + +func (f *versionConflictOnCreateFS) Create(name string) (icebergio.FileWriter, error) { + w, err := f.LocalFS.Create(name) + if err != nil { + return nil, err + } + + f.once.Do(func() { + f.conflictErr = f.WriteFile(f.conflictPath, nil) + }) + if f.conflictErr != nil { + _ = w.Close() + + return nil, f.conflictErr + } + + return w, nil +} + // a mock hadoop catalog filesystem to ensure that we // can load arbitrary new filesystem implementations // as long as they full the HadoopCatalogFS interface @@ -401,7 +428,9 @@ func (s *HadoopCatalogTestSuite) TestNewCatalogRemoteHappyPathWithUnsafeCommits( s.Equal(scheme+"://bucket/wh/ns", cat.namespaceToPath(ident[:1])) s.Equal(scheme+"://bucket/wh/ns/tbl", cat.tableToPath(ident)) s.Equal(scheme+"://bucket/wh/ns/tbl/metadata", cat.metadataDir(ident)) - s.Equal(scheme+"://bucket/wh/ns/tbl/metadata/v1.metadata.json", cat.metadataFilePath(ident, 1)) + metaPath, err := cat.metadataFilePathForCompression(ident, 1, table.MetadataCompressionCodecNone) + s.Require().NoError(err) + s.Equal(scheme+"://bucket/wh/ns/tbl/metadata/v1.metadata.json", metaPath) s.Equal(scheme+"://bucket/wh/ns/tbl/metadata/version-hint.text", cat.versionHintPath(ident)) s.Equal(scheme+"://bucket/wh/ns/tbl", cat.defaultTableLocation(ident)) } @@ -443,14 +472,6 @@ func (s *HadoopCatalogTestSuite) TestMetadataDir() { s.Equal(filepath.Join(s.warehouse, "ns", "tbl", "metadata"), path) } -func (s *HadoopCatalogTestSuite) TestMetadataFilePath() { - path := s.cat.metadataFilePath([]string{"ns", "tbl"}, 1) - s.Equal(filepath.Join(s.warehouse, "ns", "tbl", "metadata", "v1.metadata.json"), path) - - path = s.cat.metadataFilePath([]string{"ns", "tbl"}, 42) - s.Equal(filepath.Join(s.warehouse, "ns", "tbl", "metadata", "v42.metadata.json"), path) -} - func (s *HadoopCatalogTestSuite) TestMetadataFilePathForCompression() { ident := []string{"ns", "tbl"} @@ -462,13 +483,15 @@ func (s *HadoopCatalogTestSuite) TestMetadataFilePathForCompression() { s.Require().NoError(err) s.Equal(filepath.Join(s.warehouse, "ns", "tbl", "metadata", "v1.gz.metadata.json"), path) - path, err = s.cat.metadataFilePathForCompression(ident, 1, table.MetadataCompressionCodecZstd) + path, err = s.cat.metadataFilePathForCompression(ident, 42, table.MetadataCompressionCodecGzip) s.Require().NoError(err) - s.Equal(filepath.Join(s.warehouse, "ns", "tbl", "metadata", "v1.zstd.metadata.json"), path) + s.Equal(filepath.Join(s.warehouse, "ns", "tbl", "metadata", "v42.gz.metadata.json"), path) - _, err = s.cat.metadataFilePathForCompression(ident, 1, "brotli") - s.Require().Error(err) - s.Contains(err.Error(), "unsupported write metadata compression codec") + for _, codec := range []string{table.MetadataCompressionCodecZstd, "brotli"} { + _, err = s.cat.metadataFilePathForCompression(ident, 1, codec) + s.Require().Error(err) + s.Contains(err.Error(), "unsupported write metadata compression codec") + } } func (s *HadoopCatalogTestSuite) TestVersionHintPath() { @@ -493,8 +516,6 @@ func (s *HadoopCatalogTestSuite) TestVersionPatternMatches() { "v100.metadata.json", "v1.gz.metadata.json", "v42.gz.metadata.json", - "v1.zstd.metadata.json", - "v42.zstd.metadata.json", } for _, name := range names { @@ -510,6 +531,7 @@ func (s *HadoopCatalogTestSuite) TestVersionPatternRejects() { "metadata.json", "v1.metadata.json.bak", "v-1.metadata.json", + "v1.zstd.metadata.json", } for _, name := range tests { @@ -586,13 +608,13 @@ func (s *HadoopCatalogTestSuite) TestIsTableDirWithGzipMetadata() { s.True(s.requireIsTableDir(tableDir)) } -func (s *HadoopCatalogTestSuite) TestIsTableDirWithZstdMetadata() { +func (s *HadoopCatalogTestSuite) TestIsTableDirIgnoresZstdMetadata() { tableDir := filepath.Join(s.warehouse, "ns", "tbl") metaDir := filepath.Join(tableDir, "metadata") s.Require().NoError(os.MkdirAll(metaDir, 0o755)) s.Require().NoError(os.WriteFile(filepath.Join(metaDir, "v1.zstd.metadata.json"), nil, 0o644)) - s.True(isTableDir(s.cat.filesystem, tableDir)) + s.False(isTableDir(s.cat.filesystem, s.cat.isLocal, tableDir)) } func (s *HadoopCatalogTestSuite) TestIsTableDirNonExistentPath() { @@ -600,7 +622,8 @@ func (s *HadoopCatalogTestSuite) TestIsTableDirNonExistentPath() { } func (s *HadoopCatalogTestSuite) createMetadataFile(ident table.Identifier, version int) { - path := s.cat.metadataFilePath(ident, version) + path, err := s.cat.metadataFilePathForCompression(ident, version, table.MetadataCompressionCodecNone) + s.Require().NoError(err) s.Require().NoError(os.MkdirAll(filepath.Dir(path), 0o755)) s.Require().NoError(os.WriteFile(path, nil, 0o644)) } @@ -788,9 +811,10 @@ func (s *HadoopCatalogTestSuite) TestFindVersionGzipOnlyWithHint() { s.Require().NoError(os.WriteFile(filepath.Join(dir, "v2.gz.metadata.json"), nil, 0o644)) s.cat.writeVersionHint(ident, 1) - ver, err := s.cat.findVersion(ident) + path, ver, err := s.cat.findMetadataLocation(ident) s.Require().NoError(err) s.Equal(2, ver) + s.Equal(filepath.Join(dir, "v2.gz.metadata.json"), path) } func (s *HadoopCatalogTestSuite) TestFindVersionMixedGzipAndPlain() { @@ -804,36 +828,25 @@ func (s *HadoopCatalogTestSuite) TestFindVersionMixedGzipAndPlain() { s.Require().NoError(os.WriteFile(filepath.Join(dir, "v3.gz.metadata.json"), nil, 0o644)) s.cat.writeVersionHint(ident, 1) - ver, err := s.cat.findVersion(ident) + path, ver, err := s.cat.findMetadataLocation(ident) s.Require().NoError(err) s.Equal(3, ver) + s.Equal(filepath.Join(dir, "v3.gz.metadata.json"), path) } -func (s *HadoopCatalogTestSuite) TestFindVersionZstdOnlyWithHint() { - ident := []string{"ns", "tbl"} - dir := s.cat.metadataDir(ident) - s.Require().NoError(os.MkdirAll(dir, 0o755)) - s.Require().NoError(os.WriteFile(filepath.Join(dir, "v1.zstd.metadata.json"), nil, 0o644)) - s.Require().NoError(os.WriteFile(filepath.Join(dir, "v2.zstd.metadata.json"), nil, 0o644)) - s.cat.writeVersionHint(ident, 1) - - ver, err := s.cat.findVersion(ident) - s.Require().NoError(err) - s.Equal(2, ver) -} - -func (s *HadoopCatalogTestSuite) TestFindVersionMixedZstdAndPlain() { +func (s *HadoopCatalogTestSuite) TestFindMetadataLocationKeepsDiscoveredPathForDuplicateVersion() { ident := []string{"ns", "tbl"} dir := s.cat.metadataDir(ident) s.Require().NoError(os.MkdirAll(dir, 0o755)) s.createMetadataFile(ident, 1) s.createMetadataFile(ident, 2) - s.Require().NoError(os.WriteFile(filepath.Join(dir, "v3.zstd.metadata.json"), nil, 0o644)) - s.cat.writeVersionHint(ident, 1) + s.createMetadataFile(ident, 3) + s.Require().NoError(os.WriteFile(filepath.Join(dir, "v3.gz.metadata.json"), nil, 0o644)) - ver, err := s.cat.findVersion(ident) + path, ver, err := s.cat.findMetadataLocation(ident) s.Require().NoError(err) s.Equal(3, ver) + s.Equal(filepath.Join(dir, "v3.gz.metadata.json"), path) } // CreateNamespace tests @@ -1482,7 +1495,7 @@ func (s *HadoopCatalogTestSuite) TestCreateTableGzipMetadata() { s.Equal(tbl.Metadata().TableUUID(), loaded.Metadata().TableUUID()) } -func (s *HadoopCatalogTestSuite) TestCreateTableZstdMetadata() { +func (s *HadoopCatalogTestSuite) TestCreateTableRejectsZstdMetadata() { ctx := context.Background() s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755)) @@ -1490,18 +1503,10 @@ func (s *HadoopCatalogTestSuite) TestCreateTableZstdMetadata() { table.MetadataCompressionKey: table.MetadataCompressionCodecZstd, } - tbl, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, s.testSchema(), + _, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, s.testSchema(), catalog.WithProperties(props)) - s.Require().NoError(err) - - metaPath := filepath.Join(s.warehouse, "ns", "tbl", "metadata", "v1.zstd.metadata.json") - s.FileExists(metaPath) - s.Equal(metaPath, tbl.MetadataLocation()) - - loaded, err := s.cat.LoadTable(ctx, []string{"ns", "tbl"}) - s.Require().NoError(err) - s.Equal(metaPath, loaded.MetadataLocation()) - s.Equal(tbl.Metadata().TableUUID(), loaded.Metadata().TableUUID()) + s.Require().Error(err) + s.Contains(err.Error(), "unsupported write metadata compression codec") } func (s *HadoopCatalogTestSuite) TestCreateTableCustomLocation() { @@ -1590,7 +1595,9 @@ func (s *HadoopCatalogTestSuite) TestCreateTableConcurrentMetadataPublishConflic s.Equal(1, successes) s.Equal(1, conflicts) - s.FileExists(s.cat.metadataFilePath([]string{"ns", "tbl"}, 1)) + metaPath, err := s.cat.metadataFilePathForCompression([]string{"ns", "tbl"}, 1, table.MetadataCompressionCodecNone) + s.Require().NoError(err) + s.FileExists(metaPath) } func (s *HadoopCatalogTestSuite) TestCreateTableWithPartitionSpec() { @@ -2099,35 +2106,46 @@ func (s *HadoopCatalogTestSuite) TestCommitTableGzipMetadata() { s.Equal("test.value", loaded.Properties()["test.key"]) } -func (s *HadoopCatalogTestSuite) TestCommitTableZstdMetadata() { +func (s *HadoopCatalogTestSuite) TestCommitTableDetectsCrossCodecVersionConflict() { ctx := context.Background() - s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755)) + tbl := s.createTestTable("ns", "tbl") + ident := []string{"ns", "tbl"} + conflictPath := filepath.Join(s.cat.metadataDir(ident), "v2.gz.metadata.json") + s.cat.filesystem = &versionConflictOnCreateFS{conflictPath: conflictPath} - props := iceberg.Properties{ - table.MetadataCompressionKey: table.MetadataCompressionCodecZstd, - } + _, _, err := s.cat.CommitTable( + ctx, ident, + []table.Requirement{ + table.AssertTableUUID(tbl.Metadata().TableUUID()), + }, + []table.Update{ + table.NewSetPropertiesUpdate(iceberg.Properties{"test.key": "test.value"}), + }, + ) + s.Require().ErrorIs(err, table.ErrCommitFailed) + s.Contains(err.Error(), conflictPath) +} - tbl, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, s.testSchema(), - catalog.WithProperties(props)) +func (s *HadoopCatalogTestSuite) TestCommitTableRejectsZstdMetadata() { + ctx := context.Background() + s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755)) + + tbl, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, s.testSchema()) s.Require().NoError(err) - _, metaLoc, err := s.cat.CommitTable( + _, _, err = s.cat.CommitTable( ctx, []string{"ns", "tbl"}, []table.Requirement{ table.AssertTableUUID(tbl.Metadata().TableUUID()), }, []table.Update{ - table.NewSetPropertiesUpdate(iceberg.Properties{"test.key": "test.value"}), + table.NewSetPropertiesUpdate(iceberg.Properties{ + table.MetadataCompressionKey: table.MetadataCompressionCodecZstd, + }), }, ) - s.Require().NoError(err) - s.Equal(filepath.Join(s.warehouse, "ns", "tbl", "metadata", "v2.zstd.metadata.json"), metaLoc) - s.FileExists(metaLoc) - - loaded, err := s.cat.LoadTable(ctx, []string{"ns", "tbl"}) - s.Require().NoError(err) - s.Equal(metaLoc, loaded.MetadataLocation()) - s.Equal("test.value", loaded.Properties()["test.key"]) + s.Require().Error(err) + s.Contains(err.Error(), "unsupported write metadata compression codec") } func (s *HadoopCatalogTestSuite) TestCommitTableMultipleSequential() { @@ -2202,7 +2220,9 @@ func (s *HadoopCatalogTestSuite) TestCommitTableConflictDetection() { ) s.Require().NoError(err) s.Contains(metaLoc, fmt.Sprintf("v%d.metadata.json", i)) - s.FileExists(s.cat.metadataFilePath(ident, i)) + expectedPath, pathErr := s.cat.metadataFilePathForCompression(ident, i, table.MetadataCompressionCodecNone) + s.Require().NoError(pathErr) + s.FileExists(expectedPath) } } @@ -2257,7 +2277,9 @@ func (s *HadoopCatalogTestSuite) TestCommitTableConcurrentMetadataPublishConflic s.Equal(1, successes) s.Equal(1, conflicts) - s.FileExists(s.cat.metadataFilePath(ident, 2)) + metaPath, err := s.cat.metadataFilePathForCompression(ident, 2, table.MetadataCompressionCodecNone) + s.Require().NoError(err) + s.FileExists(metaPath) s.Equal(2, s.cat.readVersionHint(ident)) } From 6042f4652ddccb222090b8ff62977b1f1259444f Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Fri, 3 Jul 2026 18:32:33 +0200 Subject: [PATCH 3/7] fix(hadoop): canonicalize metadata discovery --- catalog/hadoop/hadoop_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/catalog/hadoop/hadoop_test.go b/catalog/hadoop/hadoop_test.go index acb302f95..842c0bb28 100644 --- a/catalog/hadoop/hadoop_test.go +++ b/catalog/hadoop/hadoop_test.go @@ -834,7 +834,7 @@ func (s *HadoopCatalogTestSuite) TestFindVersionMixedGzipAndPlain() { s.Equal(filepath.Join(dir, "v3.gz.metadata.json"), path) } -func (s *HadoopCatalogTestSuite) TestFindMetadataLocationKeepsDiscoveredPathForDuplicateVersion() { +func (s *HadoopCatalogTestSuite) TestFindMetadataLocationUsesCanonicalPathForDuplicateVersion() { ident := []string{"ns", "tbl"} dir := s.cat.metadataDir(ident) s.Require().NoError(os.MkdirAll(dir, 0o755)) @@ -846,7 +846,7 @@ func (s *HadoopCatalogTestSuite) TestFindMetadataLocationKeepsDiscoveredPathForD path, ver, err := s.cat.findMetadataLocation(ident) s.Require().NoError(err) s.Equal(3, ver) - s.Equal(filepath.Join(dir, "v3.gz.metadata.json"), path) + s.Equal(filepath.Join(dir, "v3.metadata.json"), path) } // CreateNamespace tests From 97363f4e30493fb3c3800ce409de43ec1d01c004 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Fri, 3 Jul 2026 23:21:21 +0200 Subject: [PATCH 4/7] fix(hadoop): claim metadata versions before publish --- catalog/hadoop/hadoop.go | 59 ++++++++--- catalog/hadoop/hadoop_test.go | 188 ++++++++++++++++++++++++++++++++-- 2 files changed, 220 insertions(+), 27 deletions(-) diff --git a/catalog/hadoop/hadoop.go b/catalog/hadoop/hadoop.go index 343afda55..f0b274628 100644 --- a/catalog/hadoop/hadoop.go +++ b/catalog/hadoop/hadoop.go @@ -300,6 +300,10 @@ func (c *Catalog) metadataFilePathForCompression(ident table.Identifier, version return joinPath(c.isLocal, c.metadataDir(ident), fmt.Sprintf("v%d%s", version, suffix)), nil } +func (c *Catalog) metadataVersionClaimPath(ident table.Identifier, version int) string { + return joinPath(c.isLocal, c.metadataDir(ident), fmt.Sprintf("v%d.metadata.json.claim", version)) +} + func (c *Catalog) versionHintPath(ident table.Identifier) string { return joinPath(c.isLocal, c.metadataDir(ident), "version-hint.text") } @@ -460,6 +464,16 @@ func (c *Catalog) metadataVersionExists(ident table.Identifier, version int) (bo return found, nil } +func (c *Catalog) metadataVersionLocation(ident table.Identifier, version int) (string, bool) { + files, _, err := c.scanMetadataFiles(ident) + if err != nil { + return "", false + } + + file, ok := files[version] + return file.location, ok +} + func (c *Catalog) findMetadataLocation(ident table.Identifier) (string, int, error) { _, latest, err := c.scanMetadataFiles(ident) if err != nil { @@ -555,7 +569,7 @@ func (c *Catalog) CreateTable(ctx context.Context, ident table.Identifier, sc *i return nil, fmt.Errorf("hadoop catalog: failed to write table metadata: %w", err) } - if err := c.commitMetadataFile(ident, tempPath, metaPath, catalog.ErrTableAlreadyExists); err != nil { + if err := c.commitMetadataFile(ident, version, tempPath, metaPath, catalog.ErrTableAlreadyExists); err != nil { return nil, err } @@ -683,20 +697,7 @@ func (c *Catalog) CommitTable(ctx context.Context, ident table.Identifier, reqs return nil, "", fmt.Errorf("hadoop catalog: failed to write table metadata: %w", err) } - exists, err := c.metadataVersionExists(ident, newVersion) - if err != nil { - _ = c.filesystem.Remove(tempPath) - - return nil, "", fmt.Errorf("hadoop catalog: failed to inspect metadata directory for version %d: %w", - newVersion, err) - } - if exists { - _ = c.filesystem.Remove(tempPath) - - return nil, "", fmt.Errorf("hadoop catalog: version %d already exists for table %s", - newVersion, strings.Join(ident, ".")) - } - if err := c.commitMetadataFile(ident, tempPath, newMetaPath, table.ErrCommitFailed); err != nil { + if err := c.commitMetadataFile(ident, newVersion, tempPath, newMetaPath, table.ErrCommitFailed); err != nil { return nil, "", err } @@ -706,10 +707,32 @@ func (c *Catalog) CommitTable(ctx context.Context, ident table.Identifier, reqs return updated, newMetaPath, nil } -func (c *Catalog) commitMetadataFile(ident table.Identifier, tempPath, metaPath string, conflictErr error) error { - if err := c.filesystem.RenameNoReplace(tempPath, metaPath); err != nil { +func (c *Catalog) commitMetadataFile(ident table.Identifier, version int, tempPath, metaPath string, conflictErr error) error { + claimPath := c.metadataVersionClaimPath(ident, version) + if err := c.filesystem.RenameNoReplace(tempPath, claimPath); err != nil { _ = c.filesystem.Remove(tempPath) + if errors.Is(err, fs.ErrExist) { + return fmt.Errorf("%w: metadata version already claimed for table %s: %s", + conflictErr, strings.Join(ident, "."), claimPath) + } + + return fmt.Errorf("hadoop catalog: failed to claim metadata version: %w", err) + } + + removeClaim := true + defer func() { + if removeClaim { + _ = c.filesystem.Remove(claimPath) + } + }() + + if existingPath, exists := c.metadataVersionLocation(ident, version); exists { + return fmt.Errorf("%w: metadata file already exists for table %s: %s", + conflictErr, strings.Join(ident, "."), existingPath) + } + + if err := c.filesystem.RenameNoReplace(claimPath, metaPath); err != nil { if errors.Is(err, fs.ErrExist) { return fmt.Errorf("%w: metadata file already exists for table %s: %s", conflictErr, strings.Join(ident, "."), metaPath) @@ -718,6 +741,8 @@ func (c *Catalog) commitMetadataFile(ident table.Identifier, tempPath, metaPath return fmt.Errorf("hadoop catalog: failed to commit metadata file: %w", err) } + removeClaim = false + return nil } diff --git a/catalog/hadoop/hadoop_test.go b/catalog/hadoop/hadoop_test.go index 842c0bb28..ed885ec5e 100644 --- a/catalog/hadoop/hadoop_test.go +++ b/catalog/hadoop/hadoop_test.go @@ -43,22 +43,27 @@ import ( type barrierRenameNoReplaceFS struct { icebergio.LocalFS - targetName string - release chan struct{} + targetNames map[string]struct{} + release chan struct{} mu sync.Mutex calls int } -func newBarrierRenameNoReplaceFS(targetName string) *barrierRenameNoReplaceFS { +func newBarrierRenameNoReplaceFS(targetNames ...string) *barrierRenameNoReplaceFS { + targets := make(map[string]struct{}, len(targetNames)) + for _, target := range targetNames { + targets[target] = struct{}{} + } + return &barrierRenameNoReplaceFS{ - targetName: targetName, - release: make(chan struct{}), + targetNames: targets, + release: make(chan struct{}), } } func (f *barrierRenameNoReplaceFS) RenameNoReplace(oldpath, newpath string) error { - if filepath.Base(newpath) != f.targetName { + if _, ok := f.targetNames[filepath.Base(newpath)]; !ok { return f.LocalFS.RenameNoReplace(oldpath, newpath) } @@ -1551,7 +1556,8 @@ func (s *HadoopCatalogTestSuite) TestCreateTableAlreadyExists() { func (s *HadoopCatalogTestSuite) TestCreateTableConcurrentMetadataPublishConflict() { ctx := context.Background() s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755)) - s.cat.filesystem = newBarrierRenameNoReplaceFS("v1.metadata.json") + ident := []string{"ns", "tbl"} + s.cat.filesystem = newBarrierRenameNoReplaceFS(filepath.Base(s.cat.metadataVersionClaimPath(ident, 1))) type createResult struct { metaLoc string @@ -1566,7 +1572,7 @@ func (s *HadoopCatalogTestSuite) TestCreateTableConcurrentMetadataPublishConflic go func() { defer wg.Done() - tbl, err := s.cat.CreateTable(ctx, []string{"ns", "tbl"}, s.testSchema()) + tbl, err := s.cat.CreateTable(ctx, ident, s.testSchema()) result := createResult{err: err} if err == nil { result.metaLoc = tbl.MetadataLocation() @@ -1595,9 +1601,91 @@ func (s *HadoopCatalogTestSuite) TestCreateTableConcurrentMetadataPublishConflic s.Equal(1, successes) s.Equal(1, conflicts) - metaPath, err := s.cat.metadataFilePathForCompression([]string{"ns", "tbl"}, 1, table.MetadataCompressionCodecNone) + metaPath, err := s.cat.metadataFilePathForCompression(ident, 1, table.MetadataCompressionCodecNone) s.Require().NoError(err) s.FileExists(metaPath) + s.NoFileExists(s.cat.metadataVersionClaimPath(ident, 1)) +} + +func (s *HadoopCatalogTestSuite) TestCreateTableConcurrentMixedCodecVersionClaim() { + ctx := context.Background() + s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755)) + ident := []string{"ns", "tbl"} + s.cat.filesystem = newBarrierRenameNoReplaceFS(filepath.Base(s.cat.metadataVersionClaimPath(ident, 1))) + + type createResult struct { + metaLoc string + err error + } + + results := make(chan createResult, 2) + var wg sync.WaitGroup + for _, props := range []iceberg.Properties{ + nil, + {table.MetadataCompressionKey: table.MetadataCompressionCodecGzip}, + } { + props := props + wg.Add(1) + go func() { + defer wg.Done() + + opts := []catalog.CreateTableOpt(nil) + if props != nil { + opts = append(opts, catalog.WithProperties(props)) + } + + tbl, err := s.cat.CreateTable(ctx, ident, s.testSchema(), opts...) + result := createResult{err: err} + if err == nil { + result.metaLoc = tbl.MetadataLocation() + } + + results <- result + }() + } + + wg.Wait() + close(results) + + successes := 0 + conflicts := 0 + var successLoc string + for result := range results { + if result.err == nil { + successes++ + successLoc = result.metaLoc + s.Contains([]string{"v1.metadata.json", "v1.gz.metadata.json"}, filepath.Base(result.metaLoc)) + + continue + } + + conflicts++ + s.ErrorIs(result.err, catalog.ErrTableAlreadyExists) + } + + s.Equal(1, successes) + s.Equal(1, conflicts) + + plainPath, err := s.cat.metadataFilePathForCompression(ident, 1, table.MetadataCompressionCodecNone) + s.Require().NoError(err) + gzipPath, err := s.cat.metadataFilePathForCompression(ident, 1, table.MetadataCompressionCodecGzip) + s.Require().NoError(err) + + existing := 0 + for _, path := range []string{plainPath, gzipPath} { + if _, err := os.Stat(path); err == nil { + existing++ + } else { + s.True(os.IsNotExist(err)) + } + } + + s.Equal(1, existing) + s.NoFileExists(s.cat.metadataVersionClaimPath(ident, 1)) + + loaded, err := s.cat.LoadTable(ctx, ident) + s.Require().NoError(err) + s.Equal(successLoc, loaded.MetadataLocation()) } func (s *HadoopCatalogTestSuite) TestCreateTableWithPartitionSpec() { @@ -2229,8 +2317,8 @@ func (s *HadoopCatalogTestSuite) TestCommitTableConflictDetection() { func (s *HadoopCatalogTestSuite) TestCommitTableConcurrentMetadataPublishConflict() { ctx := context.Background() s.createTestTable("ns", "tbl") - s.cat.filesystem = newBarrierRenameNoReplaceFS("v2.metadata.json") ident := []string{"ns", "tbl"} + s.cat.filesystem = newBarrierRenameNoReplaceFS(filepath.Base(s.cat.metadataVersionClaimPath(ident, 2))) type commitResult struct { metaLoc string @@ -2281,6 +2369,86 @@ func (s *HadoopCatalogTestSuite) TestCommitTableConcurrentMetadataPublishConflic s.Require().NoError(err) s.FileExists(metaPath) s.Equal(2, s.cat.readVersionHint(ident)) + s.NoFileExists(s.cat.metadataVersionClaimPath(ident, 2)) +} + +func (s *HadoopCatalogTestSuite) TestCommitTableConcurrentMixedCodecVersionClaim() { + ctx := context.Background() + s.createTestTable("ns", "tbl") + ident := []string{"ns", "tbl"} + s.cat.filesystem = newBarrierRenameNoReplaceFS(filepath.Base(s.cat.metadataVersionClaimPath(ident, 2))) + + type commitResult struct { + metaLoc string + err error + } + + results := make(chan commitResult, 2) + var wg sync.WaitGroup + for _, props := range []iceberg.Properties{ + {"writer.default": "committed"}, + { + "writer.gzip": "committed", + table.MetadataCompressionKey: table.MetadataCompressionCodecGzip, + }, + } { + props := props + wg.Add(1) + go func() { + defer wg.Done() + + _, metaLoc, err := s.cat.CommitTable( + ctx, ident, + nil, + []table.Update{table.NewSetPropertiesUpdate(props)}, + ) + results <- commitResult{metaLoc: metaLoc, err: err} + }() + } + + wg.Wait() + close(results) + + successes := 0 + conflicts := 0 + var successLoc string + for result := range results { + if result.err == nil { + successes++ + successLoc = result.metaLoc + s.Contains([]string{"v2.metadata.json", "v2.gz.metadata.json"}, filepath.Base(result.metaLoc)) + + continue + } + + conflicts++ + s.ErrorIs(result.err, table.ErrCommitFailed) + } + + s.Equal(1, successes) + s.Equal(1, conflicts) + + plainPath, err := s.cat.metadataFilePathForCompression(ident, 2, table.MetadataCompressionCodecNone) + s.Require().NoError(err) + gzipPath, err := s.cat.metadataFilePathForCompression(ident, 2, table.MetadataCompressionCodecGzip) + s.Require().NoError(err) + + existing := 0 + for _, path := range []string{plainPath, gzipPath} { + if _, err := os.Stat(path); err == nil { + existing++ + } else { + s.True(os.IsNotExist(err)) + } + } + + s.Equal(1, existing) + s.Equal(2, s.cat.readVersionHint(ident)) + s.NoFileExists(s.cat.metadataVersionClaimPath(ident, 2)) + + loaded, err := s.cat.LoadTable(ctx, ident) + s.Require().NoError(err) + s.Equal(successLoc, loaded.MetadataLocation()) } func (s *HadoopCatalogTestSuite) TestCommitTableConflictDetectionRecognizesConcurrentNextVersionMetadata() { From a61039cb2cbe7458f0f8b8b24fc4ef9238de5ae7 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Sat, 4 Jul 2026 12:47:21 +0200 Subject: [PATCH 5/7] test(hadoop): align tests with metadata path helper --- catalog/hadoop/hadoop_test.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/catalog/hadoop/hadoop_test.go b/catalog/hadoop/hadoop_test.go index ed885ec5e..424ca1961 100644 --- a/catalog/hadoop/hadoop_test.go +++ b/catalog/hadoop/hadoop_test.go @@ -634,7 +634,8 @@ func (s *HadoopCatalogTestSuite) createMetadataFile(ident table.Identifier, vers } func (s *HadoopCatalogTestSuite) replaceMetadataWithGzip(ident table.Identifier, version int) string { - plainPath := s.cat.metadataFilePath(ident, version) + plainPath, err := s.cat.metadataFilePathForCompression(ident, version, table.MetadataCompressionCodecNone) + s.Require().NoError(err) data, err := os.ReadFile(plainPath) s.Require().NoError(err) @@ -656,7 +657,8 @@ func (s *HadoopCatalogTestSuite) replaceMetadataWithUUIDName(ident table.Identif } func (s *HadoopCatalogTestSuite) replaceMetadataWithUUIDSequence(ident table.Identifier, version, sequence int) string { - plainPath := s.cat.metadataFilePath(ident, version) + plainPath, err := s.cat.metadataFilePathForCompression(ident, version, table.MetadataCompressionCodecNone) + s.Require().NoError(err) uuidPath := filepath.Join( s.cat.metadataDir(ident), fmt.Sprintf("%05d-a1b2c3d4-e5f6-7890-abcd-ef1234567890.metadata.json", sequence), @@ -744,7 +746,9 @@ func (s *HadoopCatalogTestSuite) TestFindMetadataLocationHintGapKeepsLatest() { location, version, err := s.cat.findMetadataLocation(ident) s.Require().NoError(err) s.Equal(5, version) - s.Equal(s.cat.metadataFilePath(ident, 5), location) + plainPath, err := s.cat.metadataFilePathForCompression(ident, 5, table.MetadataCompressionCodecNone) + s.Require().NoError(err) + s.Equal(plainPath, location) } func (s *HadoopCatalogTestSuite) TestFindVersionNoMetadataDir() { @@ -2500,9 +2504,11 @@ func (s *HadoopCatalogTestSuite) TestCommitTableConflictDetectionRecognizesConcu }, ) s.Require().Error(err) - s.Contains(err.Error(), "version 2 already exists") + s.Contains(err.Error(), "already exists") s.FileExists(conflictPath) - s.NoFileExists(s.cat.metadataFilePath(ident, 2)) + plainPath, err := s.cat.metadataFilePathForCompression(ident, 2, table.MetadataCompressionCodecNone) + s.Require().NoError(err) + s.NoFileExists(plainPath) }) } } From b840e2c2d219f3ed3799381216ae5805098520f4 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Sat, 4 Jul 2026 12:49:49 +0200 Subject: [PATCH 6/7] test(hadoop): satisfy nlreturn lint in metadata lookup --- catalog/hadoop/hadoop.go | 1 + 1 file changed, 1 insertion(+) diff --git a/catalog/hadoop/hadoop.go b/catalog/hadoop/hadoop.go index f0b274628..a6dbc1af0 100644 --- a/catalog/hadoop/hadoop.go +++ b/catalog/hadoop/hadoop.go @@ -471,6 +471,7 @@ func (c *Catalog) metadataVersionLocation(ident table.Identifier, version int) ( } file, ok := files[version] + return file.location, ok } From 6d18001d7e310671859c3837cb790467225c2f74 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Sat, 4 Jul 2026 20:52:57 +0200 Subject: [PATCH 7/7] hadoop: recover stale metadata version claims --- catalog/hadoop/hadoop.go | 55 +++++++++++++++++++++++++++++++---- catalog/hadoop/hadoop_test.go | 53 +++++++++++++++++++++++++++++++++ 2 files changed, 102 insertions(+), 6 deletions(-) diff --git a/catalog/hadoop/hadoop.go b/catalog/hadoop/hadoop.go index a6dbc1af0..c21220c83 100644 --- a/catalog/hadoop/hadoop.go +++ b/catalog/hadoop/hadoop.go @@ -30,6 +30,7 @@ import ( "regexp" "strconv" "strings" + "time" "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/catalog" @@ -63,6 +64,8 @@ var uuidMetadataPattern = regexp.MustCompile( `^([0-9]{5})-[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}(?:\.gz)?\.metadata\.json$`, ) +const metadataClaimStaleAfter = time.Minute + type metadataFile struct { location string version int @@ -710,15 +713,45 @@ func (c *Catalog) CommitTable(ctx context.Context, ident table.Identifier, reqs func (c *Catalog) commitMetadataFile(ident table.Identifier, version int, tempPath, metaPath string, conflictErr error) error { claimPath := c.metadataVersionClaimPath(ident, version) - if err := c.filesystem.RenameNoReplace(tempPath, claimPath); err != nil { - _ = c.filesystem.Remove(tempPath) + for { + if err := c.filesystem.RenameNoReplace(tempPath, claimPath); err != nil { + if !errors.Is(err, fs.ErrExist) { + _ = c.filesystem.Remove(tempPath) - if errors.Is(err, fs.ErrExist) { - return fmt.Errorf("%w: metadata version already claimed for table %s: %s", - conflictErr, strings.Join(ident, "."), claimPath) + return fmt.Errorf("hadoop catalog: failed to claim metadata version: %w", err) + } + + existingPath, exists := c.metadataVersionLocation(ident, version) + if exists { + _ = c.filesystem.Remove(tempPath) + + return fmt.Errorf("%w: metadata file already exists for table %s: %s", + conflictErr, strings.Join(ident, "."), existingPath) + } + + claimInfo, err := c.filesystem.Stat(claimPath) + if err != nil { + _ = c.filesystem.Remove(tempPath) + + return fmt.Errorf("hadoop catalog: failed to inspect stale metadata claim %s: %w", claimPath, err) + } + if time.Since(claimInfo.ModTime()) < metadataClaimStaleAfter { + _ = c.filesystem.Remove(tempPath) + + return fmt.Errorf("%w: metadata version already claimed for table %s: %s", + conflictErr, strings.Join(ident, "."), claimPath) + } + + if err := c.filesystem.Remove(claimPath); err != nil && !errors.Is(err, fs.ErrNotExist) { + _ = c.filesystem.Remove(tempPath) + + return fmt.Errorf("hadoop catalog: failed to clear stale metadata claim %s: %w", claimPath, err) + } + + continue } - return fmt.Errorf("hadoop catalog: failed to claim metadata version: %w", err) + break } removeClaim := true @@ -738,6 +771,16 @@ func (c *Catalog) commitMetadataFile(ident table.Identifier, version int, tempPa return fmt.Errorf("%w: metadata file already exists for table %s: %s", conflictErr, strings.Join(ident, "."), metaPath) } + if errors.Is(err, fs.ErrNotExist) { + existingPath, exists := c.metadataVersionLocation(ident, version) + if exists { + return fmt.Errorf("%w: metadata file already exists for table %s: %s", + conflictErr, strings.Join(ident, "."), existingPath) + } + + return fmt.Errorf("%w: metadata claim for table %s and version %d was removed before publish", + conflictErr, strings.Join(ident, "."), version) + } return fmt.Errorf("hadoop catalog: failed to commit metadata file: %w", err) } diff --git a/catalog/hadoop/hadoop_test.go b/catalog/hadoop/hadoop_test.go index 424ca1961..3696eadf0 100644 --- a/catalog/hadoop/hadoop_test.go +++ b/catalog/hadoop/hadoop_test.go @@ -1692,6 +1692,59 @@ func (s *HadoopCatalogTestSuite) TestCreateTableConcurrentMixedCodecVersionClaim s.Equal(successLoc, loaded.MetadataLocation()) } +func (s *HadoopCatalogTestSuite) TestCreateTableRecoversFromStaleVersionClaim() { + ctx := context.Background() + s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755)) + ident := []string{"ns", "tbl"} + + claimPath := s.cat.metadataVersionClaimPath(ident, 1) + s.Require().NoError(os.MkdirAll(filepath.Dir(claimPath), 0o755)) + s.Require().NoError(os.WriteFile(claimPath, []byte("stale claim"), 0o644)) + stale := time.Now().Add(-2 * metadataClaimStaleAfter) + s.Require().NoError(os.Chtimes(claimPath, stale, stale)) + + tbl, err := s.cat.CreateTable(ctx, ident, s.testSchema()) + s.Require().NoError(err) + + metaPath, err := s.cat.metadataFilePathForCompression(ident, 1, table.MetadataCompressionCodecNone) + s.Require().NoError(err) + s.Equal(metaPath, tbl.MetadataLocation()) + s.FileExists(metaPath) + s.NoFileExists(claimPath) +} + +func (s *HadoopCatalogTestSuite) TestCommitTableRecoversFromStaleVersionClaim() { + ctx := context.Background() + tbl := s.createTestTable("ns", "tbl") + ident := []string{"ns", "tbl"} + + claimPath := s.cat.metadataVersionClaimPath(ident, 2) + s.Require().NoError(os.MkdirAll(filepath.Dir(claimPath), 0o755)) + s.Require().NoError(os.WriteFile(claimPath, []byte("stale claim"), 0o644)) + stale := time.Now().Add(-2 * metadataClaimStaleAfter) + s.Require().NoError(os.Chtimes(claimPath, stale, stale)) + + meta, metaLoc, err := s.cat.CommitTable( + ctx, ident, + []table.Requirement{ + table.AssertTableUUID(tbl.Metadata().TableUUID()), + }, + []table.Update{ + table.NewSetPropertiesUpdate(iceberg.Properties{"test.key": "value"}), + }, + ) + s.Require().NoError(err) + s.Equal("value", meta.Properties()["test.key"]) + + s.Contains(metaLoc, "v2.metadata.json") + s.FileExists(metaLoc) + s.NoFileExists(claimPath) + + loaded, err := s.cat.LoadTable(ctx, ident) + s.Require().NoError(err) + s.Equal(metaLoc, loaded.MetadataLocation()) +} + func (s *HadoopCatalogTestSuite) TestCreateTableWithPartitionSpec() { ctx := context.Background() s.Require().NoError(os.Mkdir(filepath.Join(s.warehouse, "ns"), 0o755))