Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 112 additions & 26 deletions catalog/hadoop/hadoop.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"regexp"
"strconv"
"strings"
"time"

"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/catalog"
Expand Down Expand Up @@ -63,6 +64,8 @@ var uuidMetadataPattern = regexp.MustCompile(
`^([0-9]{5})-[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}(?:\.gz)?\.metadata\.json$`,
)

const metadataClaimStaleAfter = time.Minute

type metadataFile struct {
location string
version int
Expand Down Expand Up @@ -286,8 +289,22 @@ func (c *Catalog) metadataDir(ident table.Identifier) string {
return joinPath(c.isLocal, c.tableToPath(ident), "metadata")
}

func (c *Catalog) metadataFilePath(ident table.Identifier, version int) string {
return joinPath(c.isLocal, c.metadataDir(ident), fmt.Sprintf("v%d.metadata.json", version))
func (c *Catalog) metadataFilePathForCompression(ident table.Identifier, version int, compression string) (string, error) {
var suffix string
switch compression {
case table.MetadataCompressionCodecNone:
suffix = ".metadata.json"
case table.MetadataCompressionCodecGzip:
suffix = ".gz.metadata.json"
default:
return "", fmt.Errorf("unsupported write metadata compression codec: %s", compression)
}

return joinPath(c.isLocal, c.metadataDir(ident), fmt.Sprintf("v%d%s", version, suffix)), nil
}

func (c *Catalog) metadataVersionClaimPath(ident table.Identifier, version int) string {
return joinPath(c.isLocal, c.metadataDir(ident), fmt.Sprintf("v%d.metadata.json.claim", version))
}

func (c *Catalog) versionHintPath(ident table.Identifier) string {
Expand Down Expand Up @@ -450,6 +467,17 @@ func (c *Catalog) metadataVersionExists(ident table.Identifier, version int) (bo
return found, nil
}

func (c *Catalog) metadataVersionLocation(ident table.Identifier, version int) (string, bool) {
files, _, err := c.scanMetadataFiles(ident)
if err != nil {
return "", false
}

file, ok := files[version]

return file.location, ok
}

func (c *Catalog) findMetadataLocation(ident table.Identifier) (string, int, error) {
_, latest, err := c.scanMetadataFiles(ident)
if err != nil {
Expand Down Expand Up @@ -525,23 +553,27 @@ func (c *Catalog) CreateTable(ctx context.Context, ident table.Identifier, sc *i
}

version := 1
metaPath := c.metadataFilePath(ident, version)
tempPath := joinPath(c.isLocal, metaDir, uuid.New().String()+".metadata.json")

compression := table.MetadataCompressionDefault
if cfg.Properties != nil {
if v, ok := cfg.Properties[table.MetadataCompressionKey]; ok {
compression = v
}
}

metaPath, err := c.metadataFilePathForCompression(ident, version, compression)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MAJOR] CreateTable has the same duplicate-version race with different compression. Two concurrent creators can both pass isTableDir, then publish v1.metadata.json and v1.gz.metadata.json successfully. Reuse a codec-independent per-version/table-creation claim before publishing metadata, and add a concurrent create test with default and gzip writers.

if err != nil {
return nil, fmt.Errorf("hadoop catalog: failed to determine metadata file path: %w", err)
}

tempPath := joinPath(c.isLocal, metaDir, uuid.New().String()+".metadata.json")

if err := internal.WriteTableMetadata(metadata, c.filesystem, tempPath, compression); err != nil {
_ = c.filesystem.Remove(tempPath)

return nil, fmt.Errorf("hadoop catalog: failed to write table metadata: %w", err)
}

if err := c.commitMetadataFile(ident, tempPath, metaPath, catalog.ErrTableAlreadyExists); err != nil {
if err := c.commitMetadataFile(ident, version, tempPath, metaPath, catalog.ErrTableAlreadyExists); err != nil {
return nil, err
}

Expand Down Expand Up @@ -655,31 +687,21 @@ func (c *Catalog) CommitTable(ctx context.Context, ident table.Identifier, reqs
return nil, "", fmt.Errorf("hadoop catalog: failed to create metadata directory: %w", err)
}

newMetaPath := c.metadataFilePath(ident, newVersion)
tempPath := joinPath(c.isLocal, metaDir, uuid.New().String()+".metadata.json")

compression := updated.Properties().Get(table.MetadataCompressionKey, table.MetadataCompressionDefault)
newMetaPath, err := c.metadataFilePathForCompression(ident, newVersion, compression)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conflict guard below only stats the codec-specific path for newVersion, so cross-codec races slip through. If writer A plans v2.gz.metadata.json and writer B plans v2.metadata.json concurrently, neither Stat sees the other and both commit — two different files both claiming version 2, breaking the monotonic chain.

Java's getMetadataFile iterates every codec variant, so its conflict detection is codec-agnostic by construction.

I'd make the guard probe all variants: call metadataVersionLocation(ident, newVersion) and fail the commit if it finds any file for that version, regardless of suffix. That also covers the migration case where someone switches codecs between commits.

if err != nil {
return nil, "", fmt.Errorf("hadoop catalog: failed to determine metadata file path: %w", err)
}

tempPath := joinPath(c.isLocal, metaDir, uuid.New().String()+".metadata.json")

if err := internal.WriteTableMetadata(updated, c.filesystem, tempPath, compression); err != nil {
_ = c.filesystem.Remove(tempPath)

return nil, "", fmt.Errorf("hadoop catalog: failed to write table metadata: %w", err)
}

exists, err := c.metadataVersionExists(ident, newVersion)
if err != nil {
_ = c.filesystem.Remove(tempPath)

return nil, "", fmt.Errorf("hadoop catalog: failed to inspect metadata directory for version %d: %w",
newVersion, err)
}
if exists {
_ = c.filesystem.Remove(tempPath)

return nil, "", fmt.Errorf("hadoop catalog: version %d already exists for table %s",
newVersion, strings.Join(ident, "."))
}
if err := c.commitMetadataFile(ident, tempPath, newMetaPath, table.ErrCommitFailed); err != nil {
if err := c.commitMetadataFile(ident, newVersion, tempPath, newMetaPath, table.ErrCommitFailed); err != nil {
return nil, "", err
}

Expand All @@ -689,18 +711,82 @@ func (c *Catalog) CommitTable(ctx context.Context, ident table.Identifier, reqs
return updated, newMetaPath, nil
}

func (c *Catalog) commitMetadataFile(ident table.Identifier, tempPath, metaPath string, conflictErr error) error {
if err := c.filesystem.RenameNoReplace(tempPath, metaPath); err != nil {
_ = c.filesystem.Remove(tempPath)
func (c *Catalog) commitMetadataFile(ident table.Identifier, version int, tempPath, metaPath string, conflictErr error) error {
claimPath := c.metadataVersionClaimPath(ident, version)
for {
if err := c.filesystem.RenameNoReplace(tempPath, claimPath); err != nil {
if !errors.Is(err, fs.ErrExist) {
_ = c.filesystem.Remove(tempPath)

return fmt.Errorf("hadoop catalog: failed to claim metadata version: %w", err)
}

existingPath, exists := c.metadataVersionLocation(ident, version)
if exists {
_ = c.filesystem.Remove(tempPath)

return fmt.Errorf("%w: metadata file already exists for table %s: %s",
conflictErr, strings.Join(ident, "."), existingPath)
}

claimInfo, err := c.filesystem.Stat(claimPath)
if err != nil {
_ = c.filesystem.Remove(tempPath)

return fmt.Errorf("hadoop catalog: failed to inspect stale metadata claim %s: %w", claimPath, err)
}
if time.Since(claimInfo.ModTime()) < metadataClaimStaleAfter {
_ = c.filesystem.Remove(tempPath)

return fmt.Errorf("%w: metadata version already claimed for table %s: %s",
conflictErr, strings.Join(ident, "."), claimPath)
}

if err := c.filesystem.Remove(claimPath); err != nil && !errors.Is(err, fs.ErrNotExist) {
_ = c.filesystem.Remove(tempPath)

return fmt.Errorf("hadoop catalog: failed to clear stale metadata claim %s: %w", claimPath, err)
}

continue
}

break
}

removeClaim := true
defer func() {
if removeClaim {
_ = c.filesystem.Remove(claimPath)
}
}()

if existingPath, exists := c.metadataVersionLocation(ident, version); exists {
return fmt.Errorf("%w: metadata file already exists for table %s: %s",
conflictErr, strings.Join(ident, "."), existingPath)
}

if err := c.filesystem.RenameNoReplace(claimPath, metaPath); err != nil {
if errors.Is(err, fs.ErrExist) {
return fmt.Errorf("%w: metadata file already exists for table %s: %s",
conflictErr, strings.Join(ident, "."), metaPath)
}
if errors.Is(err, fs.ErrNotExist) {
existingPath, exists := c.metadataVersionLocation(ident, version)
if exists {
return fmt.Errorf("%w: metadata file already exists for table %s: %s",
conflictErr, strings.Join(ident, "."), existingPath)
}

return fmt.Errorf("%w: metadata claim for table %s and version %d was removed before publish",
conflictErr, strings.Join(ident, "."), version)
}

return fmt.Errorf("hadoop catalog: failed to commit metadata file: %w", err)
}

removeClaim = false

return nil
}

Expand Down
Loading
Loading