-
Notifications
You must be signed in to change notification settings - Fork 214
fix(hadoop): use codec-aware metadata filenames #1326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
2f67046
9ef6e8e
6042f46
97363f4
a61039c
b840e2c
6d18001
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,7 @@ import ( | |
| "regexp" | ||
| "strconv" | ||
| "strings" | ||
| "time" | ||
|
|
||
| "github.com/apache/iceberg-go" | ||
| "github.com/apache/iceberg-go/catalog" | ||
|
|
@@ -63,6 +64,8 @@ var uuidMetadataPattern = regexp.MustCompile( | |
| `^([0-9]{5})-[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}(?:\.gz)?\.metadata\.json$`, | ||
| ) | ||
|
|
||
| const metadataClaimStaleAfter = time.Minute | ||
|
|
||
| type metadataFile struct { | ||
| location string | ||
| version int | ||
|
|
@@ -286,8 +289,22 @@ func (c *Catalog) metadataDir(ident table.Identifier) string { | |
| return joinPath(c.isLocal, c.tableToPath(ident), "metadata") | ||
| } | ||
|
|
||
| func (c *Catalog) metadataFilePath(ident table.Identifier, version int) string { | ||
| return joinPath(c.isLocal, c.metadataDir(ident), fmt.Sprintf("v%d.metadata.json", version)) | ||
| func (c *Catalog) metadataFilePathForCompression(ident table.Identifier, version int, compression string) (string, error) { | ||
| var suffix string | ||
| switch compression { | ||
| case table.MetadataCompressionCodecNone: | ||
| suffix = ".metadata.json" | ||
| case table.MetadataCompressionCodecGzip: | ||
| suffix = ".gz.metadata.json" | ||
| default: | ||
| return "", fmt.Errorf("unsupported write metadata compression codec: %s", compression) | ||
| } | ||
|
|
||
| return joinPath(c.isLocal, c.metadataDir(ident), fmt.Sprintf("v%d%s", version, suffix)), nil | ||
| } | ||
|
|
||
| func (c *Catalog) metadataVersionClaimPath(ident table.Identifier, version int) string { | ||
| return joinPath(c.isLocal, c.metadataDir(ident), fmt.Sprintf("v%d.metadata.json.claim", version)) | ||
| } | ||
|
|
||
| func (c *Catalog) versionHintPath(ident table.Identifier) string { | ||
|
|
@@ -450,6 +467,17 @@ func (c *Catalog) metadataVersionExists(ident table.Identifier, version int) (bo | |
| return found, nil | ||
| } | ||
|
|
||
| func (c *Catalog) metadataVersionLocation(ident table.Identifier, version int) (string, bool) { | ||
| files, _, err := c.scanMetadataFiles(ident) | ||
| if err != nil { | ||
| return "", false | ||
| } | ||
|
|
||
| file, ok := files[version] | ||
|
|
||
| return file.location, ok | ||
| } | ||
|
|
||
| func (c *Catalog) findMetadataLocation(ident table.Identifier) (string, int, error) { | ||
| _, latest, err := c.scanMetadataFiles(ident) | ||
| if err != nil { | ||
|
|
@@ -525,23 +553,27 @@ func (c *Catalog) CreateTable(ctx context.Context, ident table.Identifier, sc *i | |
| } | ||
|
|
||
| version := 1 | ||
| metaPath := c.metadataFilePath(ident, version) | ||
| tempPath := joinPath(c.isLocal, metaDir, uuid.New().String()+".metadata.json") | ||
|
|
||
| compression := table.MetadataCompressionDefault | ||
| if cfg.Properties != nil { | ||
| if v, ok := cfg.Properties[table.MetadataCompressionKey]; ok { | ||
| compression = v | ||
| } | ||
| } | ||
|
|
||
| metaPath, err := c.metadataFilePathForCompression(ident, version, compression) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("hadoop catalog: failed to determine metadata file path: %w", err) | ||
| } | ||
|
|
||
| tempPath := joinPath(c.isLocal, metaDir, uuid.New().String()+".metadata.json") | ||
|
|
||
| if err := internal.WriteTableMetadata(metadata, c.filesystem, tempPath, compression); err != nil { | ||
| _ = c.filesystem.Remove(tempPath) | ||
|
|
||
| return nil, fmt.Errorf("hadoop catalog: failed to write table metadata: %w", err) | ||
| } | ||
|
|
||
| if err := c.commitMetadataFile(ident, tempPath, metaPath, catalog.ErrTableAlreadyExists); err != nil { | ||
| if err := c.commitMetadataFile(ident, version, tempPath, metaPath, catalog.ErrTableAlreadyExists); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
|
|
@@ -655,31 +687,21 @@ func (c *Catalog) CommitTable(ctx context.Context, ident table.Identifier, reqs | |
| return nil, "", fmt.Errorf("hadoop catalog: failed to create metadata directory: %w", err) | ||
| } | ||
|
|
||
| newMetaPath := c.metadataFilePath(ident, newVersion) | ||
| tempPath := joinPath(c.isLocal, metaDir, uuid.New().String()+".metadata.json") | ||
|
|
||
| compression := updated.Properties().Get(table.MetadataCompressionKey, table.MetadataCompressionDefault) | ||
| newMetaPath, err := c.metadataFilePathForCompression(ident, newVersion, compression) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The conflict guard below only stats the codec-specific path for Java's I'd make the guard probe all variants: call |
||
| if err != nil { | ||
| return nil, "", fmt.Errorf("hadoop catalog: failed to determine metadata file path: %w", err) | ||
| } | ||
|
|
||
| tempPath := joinPath(c.isLocal, metaDir, uuid.New().String()+".metadata.json") | ||
|
|
||
| if err := internal.WriteTableMetadata(updated, c.filesystem, tempPath, compression); err != nil { | ||
| _ = c.filesystem.Remove(tempPath) | ||
|
|
||
| return nil, "", fmt.Errorf("hadoop catalog: failed to write table metadata: %w", err) | ||
| } | ||
|
|
||
| exists, err := c.metadataVersionExists(ident, newVersion) | ||
| if err != nil { | ||
| _ = c.filesystem.Remove(tempPath) | ||
|
|
||
| return nil, "", fmt.Errorf("hadoop catalog: failed to inspect metadata directory for version %d: %w", | ||
| newVersion, err) | ||
| } | ||
| if exists { | ||
| _ = c.filesystem.Remove(tempPath) | ||
|
|
||
| return nil, "", fmt.Errorf("hadoop catalog: version %d already exists for table %s", | ||
| newVersion, strings.Join(ident, ".")) | ||
| } | ||
| if err := c.commitMetadataFile(ident, tempPath, newMetaPath, table.ErrCommitFailed); err != nil { | ||
| if err := c.commitMetadataFile(ident, newVersion, tempPath, newMetaPath, table.ErrCommitFailed); err != nil { | ||
| return nil, "", err | ||
| } | ||
|
|
||
|
|
@@ -689,18 +711,82 @@ func (c *Catalog) CommitTable(ctx context.Context, ident table.Identifier, reqs | |
| return updated, newMetaPath, nil | ||
| } | ||
|
|
||
| func (c *Catalog) commitMetadataFile(ident table.Identifier, tempPath, metaPath string, conflictErr error) error { | ||
| if err := c.filesystem.RenameNoReplace(tempPath, metaPath); err != nil { | ||
| _ = c.filesystem.Remove(tempPath) | ||
| func (c *Catalog) commitMetadataFile(ident table.Identifier, version int, tempPath, metaPath string, conflictErr error) error { | ||
| claimPath := c.metadataVersionClaimPath(ident, version) | ||
| for { | ||
| if err := c.filesystem.RenameNoReplace(tempPath, claimPath); err != nil { | ||
| if !errors.Is(err, fs.ErrExist) { | ||
| _ = c.filesystem.Remove(tempPath) | ||
|
|
||
| return fmt.Errorf("hadoop catalog: failed to claim metadata version: %w", err) | ||
| } | ||
|
|
||
| existingPath, exists := c.metadataVersionLocation(ident, version) | ||
| if exists { | ||
| _ = c.filesystem.Remove(tempPath) | ||
|
|
||
| return fmt.Errorf("%w: metadata file already exists for table %s: %s", | ||
| conflictErr, strings.Join(ident, "."), existingPath) | ||
| } | ||
|
|
||
| claimInfo, err := c.filesystem.Stat(claimPath) | ||
| if err != nil { | ||
| _ = c.filesystem.Remove(tempPath) | ||
|
|
||
| return fmt.Errorf("hadoop catalog: failed to inspect stale metadata claim %s: %w", claimPath, err) | ||
| } | ||
| if time.Since(claimInfo.ModTime()) < metadataClaimStaleAfter { | ||
| _ = c.filesystem.Remove(tempPath) | ||
|
|
||
| return fmt.Errorf("%w: metadata version already claimed for table %s: %s", | ||
| conflictErr, strings.Join(ident, "."), claimPath) | ||
| } | ||
|
|
||
| if err := c.filesystem.Remove(claimPath); err != nil && !errors.Is(err, fs.ErrNotExist) { | ||
| _ = c.filesystem.Remove(tempPath) | ||
|
|
||
| return fmt.Errorf("hadoop catalog: failed to clear stale metadata claim %s: %w", claimPath, err) | ||
| } | ||
|
|
||
| continue | ||
| } | ||
|
|
||
| break | ||
| } | ||
|
|
||
| removeClaim := true | ||
| defer func() { | ||
| if removeClaim { | ||
| _ = c.filesystem.Remove(claimPath) | ||
| } | ||
| }() | ||
|
|
||
| if existingPath, exists := c.metadataVersionLocation(ident, version); exists { | ||
| return fmt.Errorf("%w: metadata file already exists for table %s: %s", | ||
| conflictErr, strings.Join(ident, "."), existingPath) | ||
| } | ||
|
|
||
| if err := c.filesystem.RenameNoReplace(claimPath, metaPath); err != nil { | ||
| if errors.Is(err, fs.ErrExist) { | ||
| return fmt.Errorf("%w: metadata file already exists for table %s: %s", | ||
| conflictErr, strings.Join(ident, "."), metaPath) | ||
| } | ||
| if errors.Is(err, fs.ErrNotExist) { | ||
| existingPath, exists := c.metadataVersionLocation(ident, version) | ||
| if exists { | ||
| return fmt.Errorf("%w: metadata file already exists for table %s: %s", | ||
| conflictErr, strings.Join(ident, "."), existingPath) | ||
| } | ||
|
|
||
| return fmt.Errorf("%w: metadata claim for table %s and version %d was removed before publish", | ||
| conflictErr, strings.Join(ident, "."), version) | ||
| } | ||
|
|
||
| return fmt.Errorf("hadoop catalog: failed to commit metadata file: %w", err) | ||
| } | ||
|
|
||
| removeClaim = false | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[MAJOR] CreateTable has the same duplicate-version race with different compression. Two concurrent creators can both pass isTableDir, then publish v1.metadata.json and v1.gz.metadata.json successfully. Reuse a codec-independent per-version/table-creation claim before publishing metadata, and add a concurrent create test with default and gzip writers.