Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 6 additions & 64 deletions table/orphan_cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"net/url"
"os"
"path/filepath"
"reflect"
"runtime"
"slices"
"strings"
Expand Down Expand Up @@ -404,69 +403,12 @@ func (t Table) getReferencedFiles(ctx context.Context, fs iceio.IO, maxConcurren
}

func walkDirectory(fsys iceio.IO, root string, fn func(path string, info stdfs.FileInfo) error) error {
// Prefer ListableIO when available.
if listable, ok := fsys.(iceio.ListableIO); ok {
return listable.WalkDir(root, func(path string, d stdfs.DirEntry, err error) error {
if err != nil {
return err
}

if d.IsDir() {
return nil
}

info, err := d.Info()
if err != nil {
return err
}

return fn(path, info)
})
}

// Fallback to original implementation for IO types that don't
// implement ListableIO yet.
switch v := fsys.(type) {
case iceio.LocalFS:
cleanRoot := strings.TrimPrefix(root, "file://")
if cleanRoot == "" {
cleanRoot = "."
}

return filepath.WalkDir(cleanRoot, makeFileWalkFunc(fn, func(path string) string {
return path
}))

default:
// For blob storage: direct field access since we know the structure.
bucket := getBucketName(v)

parsed, err := url.Parse(root)
if err != nil {
return fmt.Errorf("invalid URL %s: %w", root, err)
}

walkPath := strings.TrimPrefix(parsed.Path, "/")
if walkPath == "" {
walkPath = "."
}

return stdfs.WalkDir(bucket, walkPath, makeFileWalkFunc(fn, func(path string) string {
return parsed.Scheme + "://" + parsed.Host + "/" + path
}))
listable, ok := fsys.(iceio.ListableIO)
if !ok {
return fmt.Errorf("filesystem %T does not implement ListableIO", fsys)
}
}

// getBucketName gets the Bucket field from blob storage - absolute minimal approach.
func getBucketName(fsys iceio.IO) stdfs.FS {
v := reflect.ValueOf(fsys).Elem()

return v.FieldByName("Bucket").Interface().(stdfs.FS)
}

// makeFileWalkFunc creates a WalkDirFunc that processes only files with path transformation.
func makeFileWalkFunc(fn func(path string, info stdfs.FileInfo) error, pathTransform func(string) string) stdfs.WalkDirFunc {
return func(path string, d stdfs.DirEntry, err error) error {
return listable.WalkDir(root, func(path string, d stdfs.DirEntry, err error) error {
if err != nil {
return err
}
Expand All @@ -480,8 +422,8 @@ func makeFileWalkFunc(fn func(path string, info stdfs.FileInfo) error, pathTrans
return err
}

return fn(pathTransform(path), info)
}
return fn(path, info)
})
}

func isFileOrphan(file string, referencedFiles map[string]bool, normalizedReferencedFiles map[string]string, cfg *orphanCleanupConfig) (bool, error) {
Expand Down
22 changes: 22 additions & 0 deletions table/orphan_cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
stdfs "io/fs"
"strings"
"testing"
"time"
Expand All @@ -34,6 +35,16 @@ import (
"github.com/stretchr/testify/require"
)

type nonListableIO struct{}

func (nonListableIO) Open(string) (io.File, error) {
return nil, errors.New("not implemented")
}

func (nonListableIO) Remove(string) error {
return nil
}

func TestPrefixMismatchMode_String(t *testing.T) {
tests := []struct {
mode PrefixMismatchMode
Expand All @@ -52,6 +63,17 @@ func TestPrefixMismatchMode_String(t *testing.T) {
}
}

func TestWalkDirectoryRequiresListableIO(t *testing.T) {
err := walkDirectory(nonListableIO{}, "s3://bucket/data", func(string, stdfs.FileInfo) error {
require.Fail(t, "walkDirectory should not invoke callback for non-listable IO")

return nil
})

require.Error(t, err)
assert.Contains(t, err.Error(), "does not implement ListableIO")
}

func TestOrphanCleanupOptions(t *testing.T) {
cfg := &orphanCleanupConfig{}

Expand Down
Loading