From 475b3a1e6b3c9bd3502c4524da3bd30f02aff1e9 Mon Sep 17 00:00:00 2001 From: Alessandro Nori Date: Mon, 2 Feb 2026 11:10:52 +0100 Subject: [PATCH 01/10] implement register pattern for io --- catalog/rest/rest_integration_test.go | 1 + catalog/sql/sql_integration_test.go | 1 + catalog/sql/sql_test.go | 1 + cmd/iceberg/main.go | 1 + io/{ => gocloud}/azure.go | 2 +- io/{ => gocloud}/azure_integration_test.go | 0 io/{ => gocloud}/azure_test.go | 2 +- io/{ => gocloud}/blob.go | 9 +- io/{ => gocloud}/blob_test.go | 2 +- io/{ => gocloud}/gcs.go | 2 +- io/{ => gocloud}/gcs_integration_test.go | 0 io/gocloud/register.go | 68 ++++++++++ io/{ => gocloud}/s3.go | 2 +- io/{ => gocloud}/s3_integration_test.go | 0 io/{ => gocloud}/utils.go | 2 +- io/io.go | 56 ++------ io/registry.go | 106 ++++++++++++++++ io/registry_test.go | 141 +++++++++++++++++++++ table/orphan_cleanup_integration_test.go | 1 + 19 files changed, 343 insertions(+), 54 deletions(-) rename io/{ => gocloud}/azure.go (99%) rename io/{ => gocloud}/azure_integration_test.go (100%) rename io/{ => gocloud}/azure_test.go (99%) rename io/{ => gocloud}/blob.go (96%) rename io/{ => gocloud}/blob_test.go (99%) rename io/{ => gocloud}/gcs.go (99%) rename io/{ => gocloud}/gcs_integration_test.go (100%) create mode 100644 io/gocloud/register.go rename io/{ => gocloud}/s3.go (99%) rename io/{ => gocloud}/s3_integration_test.go (100%) rename io/{ => gocloud}/utils.go (98%) create mode 100644 io/registry.go create mode 100644 io/registry_test.go diff --git a/catalog/rest/rest_integration_test.go b/catalog/rest/rest_integration_test.go index 17fe20263..ca156f620 100644 --- a/catalog/rest/rest_integration_test.go +++ b/catalog/rest/rest_integration_test.go @@ -32,6 +32,7 @@ import ( "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/rest" "github.com/apache/iceberg-go/io" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" "github.com/apache/iceberg-go/view" "github.com/stretchr/testify/require" diff --git a/catalog/sql/sql_integration_test.go b/catalog/sql/sql_integration_test.go index 5e879e9dc..82f83bbac 100644 --- a/catalog/sql/sql_integration_test.go +++ b/catalog/sql/sql_integration_test.go @@ -34,6 +34,7 @@ import ( "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/sql" "github.com/apache/iceberg-go/io" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" "github.com/stretchr/testify/suite" ) diff --git a/catalog/sql/sql_test.go b/catalog/sql/sql_test.go index 6ac0fdb86..039189bcc 100644 --- a/catalog/sql/sql_test.go +++ b/catalog/sql/sql_test.go @@ -36,6 +36,7 @@ import ( "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/internal" sqlcat "github.com/apache/iceberg-go/catalog/sql" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" diff --git a/cmd/iceberg/main.go b/cmd/iceberg/main.go index 128385914..a42e92bbf 100644 --- a/cmd/iceberg/main.go +++ b/cmd/iceberg/main.go @@ -31,6 +31,7 @@ import ( "github.com/apache/iceberg-go/catalog/hive" "github.com/apache/iceberg-go/catalog/rest" "github.com/apache/iceberg-go/config" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" awsconfig "github.com/aws/aws-sdk-go-v2/config" diff --git a/io/azure.go b/io/gocloud/azure.go similarity index 99% rename from io/azure.go rename to io/gocloud/azure.go index a9e71b56b..b12a064a8 100644 --- a/io/azure.go +++ b/io/gocloud/azure.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package io +package gocloud import ( "context" diff --git a/io/azure_integration_test.go b/io/gocloud/azure_integration_test.go similarity index 100% rename from io/azure_integration_test.go rename to io/gocloud/azure_integration_test.go diff --git a/io/azure_test.go b/io/gocloud/azure_test.go similarity index 99% rename from io/azure_test.go rename to io/gocloud/azure_test.go index 27596dbdf..6a650d2ff 100644 --- a/io/azure_test.go +++ b/io/gocloud/azure_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package io +package gocloud import ( "context" diff --git a/io/blob.go b/io/gocloud/blob.go similarity index 96% rename from io/blob.go rename to io/gocloud/blob.go index 14faf33e5..4c417f5ab 100644 --- a/io/blob.go +++ b/io/gocloud/blob.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package io +package gocloud import ( "context" @@ -26,6 +26,7 @@ import ( "path/filepath" "strings" + icebergio "github.com/apache/iceberg-go/io" "gocloud.dev/blob" ) @@ -103,7 +104,7 @@ func (bfs *blobFileIO) preprocess(path string) (string, error) { return bfs.keyExtractor(path) } -func (bfs *blobFileIO) Open(path string) (File, error) { +func (bfs *blobFileIO) Open(path string) (icebergio.File, error) { var err error path, err = bfs.preprocess(path) if err != nil { @@ -133,7 +134,7 @@ func (bfs *blobFileIO) Remove(name string) error { return bfs.Delete(bfs.ctx, name) } -func (bfs *blobFileIO) Create(name string) (FileWriter, error) { +func (bfs *blobFileIO) Create(name string) (icebergio.FileWriter, error) { return bfs.NewWriter(bfs.ctx, name, true, nil) } @@ -185,7 +186,7 @@ func (bfs *blobFileIO) NewWriter(ctx context.Context, path string, overwrite boo nil } -func createBlobFS(ctx context.Context, bucket *blob.Bucket, keyExtractor KeyExtractor) IO { +func createBlobFS(ctx context.Context, bucket *blob.Bucket, keyExtractor KeyExtractor) icebergio.IO { return &blobFileIO{Bucket: bucket, keyExtractor: keyExtractor, ctx: ctx} } diff --git a/io/blob_test.go b/io/gocloud/blob_test.go similarity index 99% rename from io/blob_test.go rename to io/gocloud/blob_test.go index c6029809e..6854be7e4 100644 --- a/io/blob_test.go +++ b/io/gocloud/blob_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package io +package gocloud import ( "context" diff --git a/io/gcs.go b/io/gocloud/gcs.go similarity index 99% rename from io/gcs.go rename to io/gocloud/gcs.go index fa0de9ef6..9e5d1ccc3 100644 --- a/io/gcs.go +++ b/io/gocloud/gcs.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package io +package gocloud import ( "context" diff --git a/io/gcs_integration_test.go b/io/gocloud/gcs_integration_test.go similarity index 100% rename from io/gcs_integration_test.go rename to io/gocloud/gcs_integration_test.go diff --git a/io/gocloud/register.go b/io/gocloud/register.go new file mode 100644 index 000000000..081cbaab6 --- /dev/null +++ b/io/gocloud/register.go @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package gocloud + +import ( + "context" + "net/url" + + icebergio "github.com/apache/iceberg-go/io" + "gocloud.dev/blob/memblob" +) + +func init() { + // Register S3 schemes + s3Factory := func(ctx context.Context, parsed *url.URL, props map[string]string) (icebergio.IO, error) { + bucket, err := createS3Bucket(ctx, parsed, props) + if err != nil { + return nil, err + } + return createBlobFS(ctx, bucket, defaultKeyExtractor(parsed.Host)), nil + } + icebergio.Register("s3", s3Factory) + icebergio.Register("s3a", s3Factory) + icebergio.Register("s3n", s3Factory) + + // Register GCS scheme + icebergio.Register("gs", func(ctx context.Context, parsed *url.URL, props map[string]string) (icebergio.IO, error) { + bucket, err := createGCSBucket(ctx, parsed, props) + if err != nil { + return nil, err + } + return createBlobFS(ctx, bucket, defaultKeyExtractor(parsed.Host)), nil + }) + + // Register memory blob scheme + icebergio.Register("mem", func(ctx context.Context, parsed *url.URL, props map[string]string) (icebergio.IO, error) { + bucket := memblob.OpenBucket(nil) + return createBlobFS(ctx, bucket, defaultKeyExtractor(parsed.Host)), nil + }) + + // Register Azure schemes + azureFactory := func(ctx context.Context, parsed *url.URL, props map[string]string) (icebergio.IO, error) { + bucket, err := createAzureBucket(ctx, parsed, props) + if err != nil { + return nil, err + } + return createBlobFS(ctx, bucket, adlsKeyExtractor()), nil + } + icebergio.Register("abfs", azureFactory) + icebergio.Register("abfss", azureFactory) + icebergio.Register("wasb", azureFactory) + icebergio.Register("wasbs", azureFactory) +} diff --git a/io/s3.go b/io/gocloud/s3.go similarity index 99% rename from io/s3.go rename to io/gocloud/s3.go index 98ad4c634..6aabc3c75 100644 --- a/io/s3.go +++ b/io/gocloud/s3.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package io +package gocloud import ( "context" diff --git a/io/s3_integration_test.go b/io/gocloud/s3_integration_test.go similarity index 100% rename from io/s3_integration_test.go rename to io/gocloud/s3_integration_test.go diff --git a/io/utils.go b/io/gocloud/utils.go similarity index 98% rename from io/utils.go rename to io/gocloud/utils.go index 5c7f32142..ba4ed9c6a 100644 --- a/io/utils.go +++ b/io/gocloud/utils.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package io +package gocloud import "strings" diff --git a/io/io.go b/io/io.go index 447b515b9..ccf017edd 100644 --- a/io/io.go +++ b/io/io.go @@ -15,6 +15,18 @@ // specific language governing permissions and limitations // under the License. +// Package io provides an interface for IO implementations along with +// a registry for registering IO implementations for different URI schemes. +// +// Subpackages of this package provide implementations for cloud storage providers +// which will register themselves if imported. For instance, adding the following +// import: +// +// import _ "github.com/apache/iceberg-go/io/gocloud" +// +// Will register cloud storage implementations for S3, GCS, Azure, and in-memory +// blob storage. The local filesystem (file:// and empty scheme) is registered +// by default. package io import ( @@ -23,11 +35,7 @@ import ( "fmt" "io" "io/fs" - "net/url" "strings" - - "gocloud.dev/blob" - "gocloud.dev/blob/memblob" ) // IO is an interface to a hierarchical file system. @@ -235,46 +243,6 @@ func (f ioFile) ReadDir(count int) ([]fs.DirEntry, error) { return d.ReadDir(count) } -func inferFileIOFromSchema(ctx context.Context, path string, props map[string]string) (IO, error) { - parsed, err := url.Parse(path) - if err != nil { - return nil, err - } - var bucket *blob.Bucket - var keyExtractor KeyExtractor - - switch parsed.Scheme { - case "s3", "s3a", "s3n": - bucket, err = createS3Bucket(ctx, parsed, props) - if err != nil { - return nil, err - } - keyExtractor = defaultKeyExtractor(parsed.Host) - case "gs": - bucket, err = createGCSBucket(ctx, parsed, props) - if err != nil { - return nil, err - } - keyExtractor = defaultKeyExtractor(parsed.Host) - case "mem": - // memblob doesn't use the URL host or path - bucket = memblob.OpenBucket(nil) - keyExtractor = defaultKeyExtractor(parsed.Host) - case "file", "": - return LocalFS{}, nil - case "abfs", "abfss", "wasb", "wasbs": - bucket, err = createAzureBucket(ctx, parsed, props) - if err != nil { - return nil, err - } - keyExtractor = adlsKeyExtractor() - default: - return nil, fmt.Errorf("IO for file '%s' not implemented", path) - } - - return createBlobFS(ctx, bucket, keyExtractor), nil -} - // LoadFS takes a map of properties and an optional URI location // and attempts to infer an IO object from it. // diff --git a/io/registry.go b/io/registry.go new file mode 100644 index 000000000..40b741b0e --- /dev/null +++ b/io/registry.go @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package io + +import ( + "context" + "fmt" + "maps" + "net/url" + "slices" + "sync" +) + +type registry map[string]SchemeFactory + +func (r registry) getKeys() []string { + regMutex.Lock() + defer regMutex.Unlock() + + return slices.Collect(maps.Keys(r)) +} + +func (r registry) set(scheme string, factory SchemeFactory) { + regMutex.Lock() + defer regMutex.Unlock() + r[scheme] = factory +} + +func (r registry) get(scheme string) (SchemeFactory, bool) { + regMutex.Lock() + defer regMutex.Unlock() + factory, ok := r[scheme] + + return factory, ok +} + +func (r registry) remove(scheme string) { + regMutex.Lock() + defer regMutex.Unlock() + delete(r, scheme) +} + +var ( + regMutex sync.Mutex + defaultRegistry = registry{} +) + +// SchemeFactory is a function that creates an IO implementation for a given URI and properties. +type SchemeFactory func(ctx context.Context, parsed *url.URL, props map[string]string) (IO, error) + +// Register adds a new scheme factory to the registry. If the scheme is already registered, it will be replaced. +func Register(scheme string, factory SchemeFactory) { + if factory == nil { + panic("io: Register factory is nil") + } + defaultRegistry.set(scheme, factory) +} + +// Unregister removes the requested scheme factory from the registry. +func Unregister(scheme string) { + defaultRegistry.remove(scheme) +} + +// GetRegisteredSchemes returns the list of registered scheme names. +func GetRegisteredSchemes() []string { + return defaultRegistry.getKeys() +} + +func init() { + // Register local filesystem schemes + localFSFactory := func(ctx context.Context, parsed *url.URL, props map[string]string) (IO, error) { + return LocalFS{}, nil + } + Register("file", localFSFactory) + Register("", localFSFactory) +} + +func inferFileIOFromSchema(ctx context.Context, path string, props map[string]string) (IO, error) { + parsed, err := url.Parse(path) + if err != nil { + return nil, err + } + + // Look up the scheme in the registry + factory, ok := defaultRegistry.get(parsed.Scheme) + if !ok { + return nil, fmt.Errorf("IO for scheme '%s' not implemented", parsed.Scheme) + } + + return factory(ctx, parsed, props) +} diff --git a/io/registry_test.go b/io/registry_test.go new file mode 100644 index 000000000..e96e80818 --- /dev/null +++ b/io/registry_test.go @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package io_test + +import ( + "context" + "net/url" + "testing" + + "github.com/apache/iceberg-go/io" + _ "github.com/apache/iceberg-go/io/gocloud" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestIORegistry(t *testing.T) { + ctx := context.Background() + + // Check that default schemes are registered + schemes := io.GetRegisteredSchemes() + assert.Contains(t, schemes, "file") + assert.Contains(t, schemes, "") + assert.Contains(t, schemes, "s3") + assert.Contains(t, schemes, "gs") + assert.Contains(t, schemes, "mem") + assert.Contains(t, schemes, "abfs") + + // Register a custom scheme + customFactoryCalled := false + io.Register("custom", func(ctx context.Context, parsed *url.URL, props map[string]string) (io.IO, error) { + customFactoryCalled = true + assert.Equal(t, "custom", parsed.Scheme) + assert.Equal(t, "bucket", parsed.Host) + return io.LocalFS{}, nil + }) + + schemes = io.GetRegisteredSchemes() + assert.Contains(t, schemes, "custom") + + // Test loading with custom scheme + customIO, err := io.LoadFS(ctx, map[string]string{}, "custom://bucket/path") + assert.NoError(t, err) + assert.NotNil(t, customIO) + assert.True(t, customFactoryCalled) + + // Unregister custom scheme + io.Unregister("custom") + schemes = io.GetRegisteredSchemes() + assert.NotContains(t, schemes, "custom") + + // Verify custom scheme no longer works + _, err = io.LoadFS(ctx, map[string]string{}, "custom://bucket/path") + assert.Error(t, err) + assert.Contains(t, err.Error(), "IO for scheme 'custom' not implemented") +} + +func TestRegistryPanic(t *testing.T) { + assert.PanicsWithValue(t, "io: Register factory is nil", func() { + io.Register("invalid", nil) + }) +} + +func TestLoadFS(t *testing.T) { + ctx := context.Background() + + tests := []struct { + name string + location string + expectError bool + ioType string + }{ + { + name: "file scheme", + location: "file:///tmp/test", + expectError: false, + ioType: "io.LocalFS", + }, + { + name: "empty scheme", + location: "/tmp/test", + expectError: false, + ioType: "io.LocalFS", + }, + { + name: "mem scheme", + location: "mem://bucket/path", + expectError: false, + ioType: "*gocloud.blobFileIO", + }, + { + name: "s3 scheme", + location: "s3://bucket/path", + expectError: false, + ioType: "*gocloud.blobFileIO", + }, + { + name: "unsupported scheme", + location: "unsupported://bucket/path", + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + iofs, err := io.LoadFS(ctx, map[string]string{}, tt.location) + if tt.expectError { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.NotNil(t, iofs) + } + }) + } +} + +func TestLoadFSWithWarehouse(t *testing.T) { + ctx := context.Background() + + // Test with warehouse property + iofs, err := io.LoadFS(ctx, map[string]string{ + "warehouse": "file:///tmp/warehouse", + }, "") + require.NoError(t, err) + assert.NotNil(t, iofs) + assert.IsType(t, io.LocalFS{}, iofs) +} diff --git a/table/orphan_cleanup_integration_test.go b/table/orphan_cleanup_integration_test.go index bc4d3883e..cce3fd0bb 100644 --- a/table/orphan_cleanup_integration_test.go +++ b/table/orphan_cleanup_integration_test.go @@ -34,6 +34,7 @@ import ( "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/rest" "github.com/apache/iceberg-go/io" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" "github.com/stretchr/testify/suite" ) From 00c3d1394e46527a74b5d25cd9b2694f825491b5 Mon Sep 17 00:00:00 2001 From: Alessandro Nori Date: Mon, 2 Feb 2026 11:19:47 +0100 Subject: [PATCH 02/10] update LoadFS doc --- io/io.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/io/io.go b/io/io.go index ccf017edd..eabebc7e7 100644 --- a/io/io.go +++ b/io/io.go @@ -244,13 +244,17 @@ func (f ioFile) ReadDir(count int) ([]fs.DirEntry, error) { } // LoadFS takes a map of properties and an optional URI location -// and attempts to infer an IO object from it. +// and attempts to infer an IO object from it using the registered +// scheme factories. // -// A schema of "file://" or an empty string will result in a LocalFS -// implementation. Otherwise this will return an error if the schema -// does not yet have an implementation here. +// The scheme is extracted from the location URI and used to look up +// the appropriate factory from the registry. The local filesystem +// (file:// or empty scheme) is registered by default. // -// Currently local, S3, GCS, and In-Memory FSs are implemented. +// Additional schemes can be registered by importing subpackages. +// For S3, GCS, Azure and in-memory support, import: +// +// import _ "github.com/apache/iceberg-go/io/gocloud" func LoadFS(ctx context.Context, props map[string]string, location string) (IO, error) { if location == "" { location = props["warehouse"] From f79c361a9be96438e2593297f0ba7c923ab4ed21 Mon Sep 17 00:00:00 2001 From: Alessandro Nori Date: Mon, 2 Feb 2026 11:43:41 +0100 Subject: [PATCH 03/10] improvements to tests --- catalog/glue/glue_test.go | 1 + catalog/rest/rest_integration_test.go | 8 +-- catalog/sql/sql_integration_test.go | 8 +-- io/gocloud/azure_integration_test.go | 14 ++--- io/gocloud/gcs_integration_test.go | 8 +-- io/gocloud/register.go | 25 +++++++-- io/gocloud/s3_integration_test.go | 16 +++--- io/io.go | 4 +- io/registry.go | 4 +- io/registry_test.go | 67 ++++++++++++++++-------- table/orphan_cleanup_integration_test.go | 8 +-- table/scanner_test.go | 8 +-- table/transaction_test.go | 9 ++-- 13 files changed, 111 insertions(+), 69 deletions(-) diff --git a/catalog/glue/glue_test.go b/catalog/glue/glue_test.go index c05757f5b..ddadcdf58 100644 --- a/catalog/glue/glue_test.go +++ b/catalog/glue/glue_test.go @@ -31,6 +31,7 @@ import ( "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/catalog" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" diff --git a/catalog/rest/rest_integration_test.go b/catalog/rest/rest_integration_test.go index ca156f620..bcb27bf84 100644 --- a/catalog/rest/rest_integration_test.go +++ b/catalog/rest/rest_integration_test.go @@ -32,7 +32,7 @@ import ( "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/rest" "github.com/apache/iceberg-go/io" - _ "github.com/apache/iceberg-go/io/gocloud" + "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" "github.com/apache/iceberg-go/view" "github.com/stretchr/testify/require" @@ -52,9 +52,9 @@ func (s *RestIntegrationSuite) loadCatalog(ctx context.Context) *rest.Catalog { cat, err := catalog.Load(ctx, "local", iceberg.Properties{ "type": "rest", "uri": "http://localhost:8181", - io.S3Region: "us-east-1", - io.S3AccessKeyID: "admin", - io.S3SecretAccessKey: "password", + gocloud.S3Region: "us-east-1", + gocloud.S3AccessKeyID: "admin", + gocloud.S3SecretAccessKey: "password", }) s.Require().NoError(err) s.Require().IsType(&rest.Catalog{}, cat) diff --git a/catalog/sql/sql_integration_test.go b/catalog/sql/sql_integration_test.go index 82f83bbac..252ed6faa 100644 --- a/catalog/sql/sql_integration_test.go +++ b/catalog/sql/sql_integration_test.go @@ -34,7 +34,7 @@ import ( "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/sql" "github.com/apache/iceberg-go/io" - _ "github.com/apache/iceberg-go/io/gocloud" + "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" "github.com/stretchr/testify/suite" ) @@ -71,9 +71,9 @@ func (s *SQLIntegrationSuite) loadCatalog(ctx context.Context) *sql.Catalog { "uri": "file:" + dbPath, "sql.dialect": "sqlite", "sql.driver": "sqlite", - io.S3Region: "us-east-1", - io.S3AccessKeyID: "admin", - io.S3SecretAccessKey: "password", + gocloud.S3Region: "us-east-1", + gocloud.S3AccessKeyID: "admin", + gocloud.S3SecretAccessKey: "password", "warehouse": location, }) s.Require().NoError(err) diff --git a/io/gocloud/azure_integration_test.go b/io/gocloud/azure_integration_test.go index cfe5ecb6b..44ab58d93 100644 --- a/io/gocloud/azure_integration_test.go +++ b/io/gocloud/azure_integration_test.go @@ -17,7 +17,7 @@ //go:build integration -package io_test +package gocloud_test import ( "context" @@ -29,7 +29,7 @@ import ( "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/catalog" sqlcat "github.com/apache/iceberg-go/catalog/sql" - "github.com/apache/iceberg-go/io" + "github.com/apache/iceberg-go/io/gocloud" "github.com/stretchr/testify/suite" "github.com/uptrace/bun/driver/sqliteshim" "gocloud.dev/blob/azureblob" @@ -64,10 +64,10 @@ func (s *AzureBlobIOTestSuite) TestAzureBlobWarehouseKey() { sqlcat.DriverKey: sqliteshim.ShimName, sqlcat.DialectKey: string(sqlcat.SQLite), "type": "sql", - io.AdlsSharedKeyAccountName: accountName, - io.AdlsSharedKeyAccountKey: accountKey, - io.AdlsEndpoint: endpoint, - io.AdlsProtocol: protocol, + gocloud.AdlsSharedKeyAccountName: accountName, + gocloud.AdlsSharedKeyAccountKey: accountKey, + gocloud.AdlsEndpoint: endpoint, + gocloud.AdlsProtocol: protocol, } cat, err := catalog.Load(context.Background(), "default", properties) @@ -99,7 +99,7 @@ func (s *AzureBlobIOTestSuite) TestAzuriteWarehouseConnectionString() { sqlcat.DriverKey: sqliteshim.ShimName, sqlcat.DialectKey: string(sqlcat.SQLite), "type": "sql", - io.AdlsConnectionStringPrefix + accountName: connectionString, + gocloud.AdlsConnectionStringPrefix + accountName: connectionString, } cat, err := catalog.Load(context.Background(), "default", properties) diff --git a/io/gocloud/gcs_integration_test.go b/io/gocloud/gcs_integration_test.go index f2c0556c3..4cf1e38e8 100644 --- a/io/gocloud/gcs_integration_test.go +++ b/io/gocloud/gcs_integration_test.go @@ -17,7 +17,7 @@ //go:build integration -package io_test +package gocloud_test import ( "bytes" @@ -31,7 +31,7 @@ import ( "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/catalog" sqlcat "github.com/apache/iceberg-go/catalog/sql" - "github.com/apache/iceberg-go/io" + "github.com/apache/iceberg-go/io/gocloud" "github.com/stretchr/testify/suite" "github.com/uptrace/bun/driver/sqliteshim" ) @@ -113,8 +113,8 @@ func (s *GCSIOTestSuite) TestGCSWarehouse() { sqlcat.DialectKey: string(sqlcat.SQLite), "type": "sql", "warehouse": fmt.Sprintf("gs://%s/iceberg/", gcsBucketName), - io.GCSEndpoint: fmt.Sprintf("http://%s/", gcsEndpoint), - io.GCSUseJsonAPI: "true", + gocloud.GCSEndpoint: fmt.Sprintf("http://%s/", gcsEndpoint), + gocloud.GCSUseJsonAPI: "true", } cat, err := catalog.Load(context.Background(), "default", properties) diff --git a/io/gocloud/register.go b/io/gocloud/register.go index 081cbaab6..7c2b48580 100644 --- a/io/gocloud/register.go +++ b/io/gocloud/register.go @@ -26,39 +26,56 @@ import ( ) func init() { - // Register S3 schemes + registerS3Schemes() + registerGCSScheme() + registerMemScheme() + registerAzureSchemes() +} + +// registerS3Schemes registers S3-compatible storage schemes (s3, s3a, s3n). +func registerS3Schemes() { s3Factory := func(ctx context.Context, parsed *url.URL, props map[string]string) (icebergio.IO, error) { bucket, err := createS3Bucket(ctx, parsed, props) if err != nil { return nil, err } + return createBlobFS(ctx, bucket, defaultKeyExtractor(parsed.Host)), nil } icebergio.Register("s3", s3Factory) icebergio.Register("s3a", s3Factory) icebergio.Register("s3n", s3Factory) +} - // Register GCS scheme +// registerGCSScheme registers the Google Cloud Storage scheme (gs). +func registerGCSScheme() { icebergio.Register("gs", func(ctx context.Context, parsed *url.URL, props map[string]string) (icebergio.IO, error) { bucket, err := createGCSBucket(ctx, parsed, props) if err != nil { return nil, err } + return createBlobFS(ctx, bucket, defaultKeyExtractor(parsed.Host)), nil }) +} - // Register memory blob scheme +// registerMemScheme registers the in-memory blob storage scheme (mem). +func registerMemScheme() { icebergio.Register("mem", func(ctx context.Context, parsed *url.URL, props map[string]string) (icebergio.IO, error) { bucket := memblob.OpenBucket(nil) + return createBlobFS(ctx, bucket, defaultKeyExtractor(parsed.Host)), nil }) +} - // Register Azure schemes +// registerAzureSchemes registers Azure Data Lake Storage schemes (abfs, abfss, wasb, wasbs). +func registerAzureSchemes() { azureFactory := func(ctx context.Context, parsed *url.URL, props map[string]string) (icebergio.IO, error) { bucket, err := createAzureBucket(ctx, parsed, props) if err != nil { return nil, err } + return createBlobFS(ctx, bucket, adlsKeyExtractor()), nil } icebergio.Register("abfs", azureFactory) diff --git a/io/gocloud/s3_integration_test.go b/io/gocloud/s3_integration_test.go index 0e51ad029..7ab9d6df0 100644 --- a/io/gocloud/s3_integration_test.go +++ b/io/gocloud/s3_integration_test.go @@ -17,7 +17,7 @@ //go:build integration -package io_test +package gocloud_test import ( "context" @@ -28,7 +28,7 @@ import ( "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/catalog" sqlcat "github.com/apache/iceberg-go/catalog/sql" - "github.com/apache/iceberg-go/io" + "github.com/apache/iceberg-go/io/gocloud" "github.com/stretchr/testify/require" "github.com/uptrace/bun/driver/sqliteshim" ) @@ -43,9 +43,9 @@ func TestMinioWarehouse(t *testing.T) { sqlcat.DialectKey: string(sqlcat.SQLite), "type": "sql", "warehouse": "s3a://warehouse/iceberg/", - io.S3Region: "local", - io.S3AccessKeyID: "admin", - io.S3SecretAccessKey: "password", + gocloud.S3Region: "local", + gocloud.S3AccessKeyID: "admin", + gocloud.S3SecretAccessKey: "password", // endpoint is passed via AWS_S3_ENDPOINT env var }) require.NoError(t, err) @@ -75,9 +75,9 @@ func TestMinioWarehouseNoLocation(t *testing.T) { sqlcat.DialectKey: string(sqlcat.SQLite), "type": "sql", "warehouse": "s3a://warehouse/iceberg/", - io.S3Region: "local", - io.S3AccessKeyID: "admin", - io.S3SecretAccessKey: "password", + gocloud.S3Region: "local", + gocloud.S3AccessKeyID: "admin", + gocloud.S3SecretAccessKey: "password", // endpoint is passed via AWS_S3_ENDPOINT env var }) require.NoError(t, err) diff --git a/io/io.go b/io/io.go index eabebc7e7..2a10c4459 100644 --- a/io/io.go +++ b/io/io.go @@ -202,6 +202,8 @@ func (f ioFS) Remove(name string) error { } var ( + ErrIONotFound = errors.New("io scheme not registered") + errMissingReadDir = errors.New("fs.File directory missing ReadDir method") errMissingSeek = errors.New("fs.File missing Seek method") errMissingReadAt = errors.New("fs.File missing ReadAt") @@ -260,7 +262,7 @@ func LoadFS(ctx context.Context, props map[string]string, location string) (IO, location = props["warehouse"] } - iofs, err := inferFileIOFromSchema(ctx, location, props) + iofs, err := inferFileIOFromScheme(ctx, location, props) if err != nil { return nil, err } diff --git a/io/registry.go b/io/registry.go index 40b741b0e..214d17599 100644 --- a/io/registry.go +++ b/io/registry.go @@ -90,7 +90,7 @@ func init() { Register("", localFSFactory) } -func inferFileIOFromSchema(ctx context.Context, path string, props map[string]string) (IO, error) { +func inferFileIOFromScheme(ctx context.Context, path string, props map[string]string) (IO, error) { parsed, err := url.Parse(path) if err != nil { return nil, err @@ -99,7 +99,7 @@ func inferFileIOFromSchema(ctx context.Context, path string, props map[string]st // Look up the scheme in the registry factory, ok := defaultRegistry.get(parsed.Scheme) if !ok { - return nil, fmt.Errorf("IO for scheme '%s' not implemented", parsed.Scheme) + return nil, fmt.Errorf("%w: %s", ErrIONotFound, parsed.Scheme) } return factory(ctx, parsed, props) diff --git a/io/registry_test.go b/io/registry_test.go index e96e80818..113939a0a 100644 --- a/io/registry_test.go +++ b/io/registry_test.go @@ -31,42 +31,67 @@ import ( func TestIORegistry(t *testing.T) { ctx := context.Background() - // Check that default schemes are registered - schemes := io.GetRegisteredSchemes() - assert.Contains(t, schemes, "file") - assert.Contains(t, schemes, "") - assert.Contains(t, schemes, "s3") - assert.Contains(t, schemes, "gs") - assert.Contains(t, schemes, "mem") - assert.Contains(t, schemes, "abfs") - - // Register a custom scheme + assert.ElementsMatch(t, []string{ + "file", + "", + "s3", + "s3a", + "s3n", + "gs", + "mem", + "abfs", + "abfss", + "wasb", + "wasbs", + }, io.GetRegisteredSchemes()) + customFactoryCalled := false io.Register("custom", func(ctx context.Context, parsed *url.URL, props map[string]string) (io.IO, error) { customFactoryCalled = true assert.Equal(t, "custom", parsed.Scheme) assert.Equal(t, "bucket", parsed.Host) + return io.LocalFS{}, nil }) - schemes = io.GetRegisteredSchemes() - assert.Contains(t, schemes, "custom") + assert.ElementsMatch(t, []string{ + "file", + "", + "s3", + "s3a", + "s3n", + "gs", + "mem", + "abfs", + "abfss", + "wasb", + "wasbs", + "custom", + }, io.GetRegisteredSchemes()) - // Test loading with custom scheme customIO, err := io.LoadFS(ctx, map[string]string{}, "custom://bucket/path") assert.NoError(t, err) assert.NotNil(t, customIO) assert.True(t, customFactoryCalled) - // Unregister custom scheme io.Unregister("custom") - schemes = io.GetRegisteredSchemes() - assert.NotContains(t, schemes, "custom") + assert.ElementsMatch(t, []string{ + "file", + "", + "s3", + "s3a", + "s3n", + "gs", + "mem", + "abfs", + "abfss", + "wasb", + "wasbs", + }, io.GetRegisteredSchemes()) - // Verify custom scheme no longer works _, err = io.LoadFS(ctx, map[string]string{}, "custom://bucket/path") assert.Error(t, err) - assert.Contains(t, err.Error(), "IO for scheme 'custom' not implemented") + assert.ErrorIs(t, err, io.ErrIONotFound) } func TestRegistryPanic(t *testing.T) { @@ -82,31 +107,26 @@ func TestLoadFS(t *testing.T) { name string location string expectError bool - ioType string }{ { name: "file scheme", location: "file:///tmp/test", expectError: false, - ioType: "io.LocalFS", }, { name: "empty scheme", location: "/tmp/test", expectError: false, - ioType: "io.LocalFS", }, { name: "mem scheme", location: "mem://bucket/path", expectError: false, - ioType: "*gocloud.blobFileIO", }, { name: "s3 scheme", location: "s3://bucket/path", expectError: false, - ioType: "*gocloud.blobFileIO", }, { name: "unsupported scheme", @@ -120,6 +140,7 @@ func TestLoadFS(t *testing.T) { iofs, err := io.LoadFS(ctx, map[string]string{}, tt.location) if tt.expectError { assert.Error(t, err) + assert.ErrorIs(t, err, io.ErrIONotFound) } else { require.NoError(t, err) assert.NotNil(t, iofs) diff --git a/table/orphan_cleanup_integration_test.go b/table/orphan_cleanup_integration_test.go index cce3fd0bb..9e1da3ca3 100644 --- a/table/orphan_cleanup_integration_test.go +++ b/table/orphan_cleanup_integration_test.go @@ -34,7 +34,7 @@ import ( "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/rest" "github.com/apache/iceberg-go/io" - _ "github.com/apache/iceberg-go/io/gocloud" + "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" "github.com/stretchr/testify/suite" ) @@ -71,9 +71,9 @@ func (s *OrphanCleanupIntegrationSuite) loadCatalog() *rest.Catalog { cat, err := catalog.Load(s.ctx, "local", iceberg.Properties{ "type": "rest", "uri": "http://localhost:8181", - io.S3Region: "us-east-1", - io.S3AccessKeyID: "admin", - io.S3SecretAccessKey: "password", + gocloud.S3Region: "us-east-1", + gocloud.S3AccessKeyID: "admin", + gocloud.S3SecretAccessKey: "password", }) s.Require().NoError(err) s.Require().IsType(&rest.Catalog{}, cat) diff --git a/table/scanner_test.go b/table/scanner_test.go index f3dadb06a..961b4e6f0 100644 --- a/table/scanner_test.go +++ b/table/scanner_test.go @@ -37,7 +37,7 @@ import ( "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/rest" "github.com/apache/iceberg-go/internal/recipe" - "github.com/apache/iceberg-go/io" + "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -60,9 +60,9 @@ func (s *ScannerSuite) SetupTest() { cat, err := rest.NewCatalog(s.ctx, "rest", "http://localhost:8181", rest.WithAdditionalProps( iceberg.Properties{ - io.S3Region: "us-east-1", - io.S3AccessKeyID: "admin", - io.S3SecretAccessKey: "password", + gocloud.S3Region: "us-east-1", + gocloud.S3AccessKeyID: "admin", + gocloud.S3SecretAccessKey: "password", }, )) s.Require().NoError(err) diff --git a/table/transaction_test.go b/table/transaction_test.go index 1f64adf1c..92330dffe 100644 --- a/table/transaction_test.go +++ b/table/transaction_test.go @@ -35,6 +35,7 @@ import ( "github.com/apache/iceberg-go/catalog/rest" "github.com/apache/iceberg-go/internal/recipe" iceio "github.com/apache/iceberg-go/io" + "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" "github.com/stretchr/testify/suite" "github.com/testcontainers/testcontainers-go/modules/compose" @@ -58,10 +59,10 @@ func (s *SparkIntegrationTestSuite) SetupSuite() { func (s *SparkIntegrationTestSuite) SetupTest() { s.ctx = context.Background() s.props = iceberg.Properties{ - iceio.S3Region: "us-east-1", - iceio.S3EndpointURL: "http://localhost:9000", - iceio.S3AccessKeyID: "admin", - iceio.S3SecretAccessKey: "password", + gocloud.S3Region: "us-east-1", + gocloud.S3EndpointURL: "http://localhost:9000", + gocloud.S3AccessKeyID: "admin", + gocloud.S3SecretAccessKey: "password", } cat, err := rest.NewCatalog(s.ctx, "rest", "http://localhost:8181", rest.WithAdditionalProps(s.props)) From 41161261680da7d4b76c0ec84fcc7401226e47a8 Mon Sep 17 00:00:00 2001 From: Alessandro Nori Date: Fri, 6 Feb 2026 09:57:58 +0100 Subject: [PATCH 04/10] panic if attempting to register the same scheme twice --- io/registry.go | 3 +++ io/registry_test.go | 13 +++++++++++++ 2 files changed, 16 insertions(+) diff --git a/io/registry.go b/io/registry.go index 214d17599..6c8a7818c 100644 --- a/io/registry.go +++ b/io/registry.go @@ -68,6 +68,9 @@ func Register(scheme string, factory SchemeFactory) { if factory == nil { panic("io: Register factory is nil") } + if _, dup := defaultRegistry.get(scheme); dup { + panic("io: Register called twice for scheme " + scheme) + } defaultRegistry.set(scheme, factory) } diff --git a/io/registry_test.go b/io/registry_test.go index 113939a0a..f7d618063 100644 --- a/io/registry_test.go +++ b/io/registry_test.go @@ -100,6 +100,19 @@ func TestRegistryPanic(t *testing.T) { }) } +func TestRegisterDuplicatePanic(t *testing.T) { + dummyFactory := func(ctx context.Context, parsed *url.URL, props map[string]string) (io.IO, error) { + return io.LocalFS{}, nil + } + + io.Register("test-duplicate", dummyFactory) + defer io.Unregister("test-duplicate") + + assert.PanicsWithValue(t, "io: Register called twice for scheme test-duplicate", func() { + io.Register("test-duplicate", dummyFactory) + }, "Attempting to register the same scheme twice should panic") +} + func TestLoadFS(t *testing.T) { ctx := context.Background() From aa64b9c6b73fa2a27c403bc671cf0aa53c72369a Mon Sep 17 00:00:00 2001 From: Alessandro Nori Date: Fri, 6 Feb 2026 10:23:31 +0100 Subject: [PATCH 05/10] move io config keys to `io` package --- catalog/rest/rest_integration_test.go | 8 ++-- catalog/sql/sql_integration_test.go | 8 ++-- io/config.go | 54 ++++++++++++++++++++++++ io/gocloud/azure.go | 29 ++++--------- io/gocloud/azure_integration_test.go | 13 +++--- io/gocloud/gcs.go | 20 +++------ io/gocloud/gcs_integration_test.go | 7 +-- io/gocloud/s3.go | 32 +++++--------- io/gocloud/s3_integration_test.go | 15 ++++--- table/orphan_cleanup_integration_test.go | 8 ++-- table/scanner_test.go | 9 ++-- table/transaction_test.go | 10 ++--- 12 files changed, 119 insertions(+), 94 deletions(-) create mode 100644 io/config.go diff --git a/catalog/rest/rest_integration_test.go b/catalog/rest/rest_integration_test.go index bcb27bf84..ca156f620 100644 --- a/catalog/rest/rest_integration_test.go +++ b/catalog/rest/rest_integration_test.go @@ -32,7 +32,7 @@ import ( "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/rest" "github.com/apache/iceberg-go/io" - "github.com/apache/iceberg-go/io/gocloud" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" "github.com/apache/iceberg-go/view" "github.com/stretchr/testify/require" @@ -52,9 +52,9 @@ func (s *RestIntegrationSuite) loadCatalog(ctx context.Context) *rest.Catalog { cat, err := catalog.Load(ctx, "local", iceberg.Properties{ "type": "rest", "uri": "http://localhost:8181", - gocloud.S3Region: "us-east-1", - gocloud.S3AccessKeyID: "admin", - gocloud.S3SecretAccessKey: "password", + io.S3Region: "us-east-1", + io.S3AccessKeyID: "admin", + io.S3SecretAccessKey: "password", }) s.Require().NoError(err) s.Require().IsType(&rest.Catalog{}, cat) diff --git a/catalog/sql/sql_integration_test.go b/catalog/sql/sql_integration_test.go index 252ed6faa..82f83bbac 100644 --- a/catalog/sql/sql_integration_test.go +++ b/catalog/sql/sql_integration_test.go @@ -34,7 +34,7 @@ import ( "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/sql" "github.com/apache/iceberg-go/io" - "github.com/apache/iceberg-go/io/gocloud" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" "github.com/stretchr/testify/suite" ) @@ -71,9 +71,9 @@ func (s *SQLIntegrationSuite) loadCatalog(ctx context.Context) *sql.Catalog { "uri": "file:" + dbPath, "sql.dialect": "sqlite", "sql.driver": "sqlite", - gocloud.S3Region: "us-east-1", - gocloud.S3AccessKeyID: "admin", - gocloud.S3SecretAccessKey: "password", + io.S3Region: "us-east-1", + io.S3AccessKeyID: "admin", + io.S3SecretAccessKey: "password", "warehouse": location, }) s.Require().NoError(err) diff --git a/io/config.go b/io/config.go new file mode 100644 index 000000000..6b9a952ec --- /dev/null +++ b/io/config.go @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package io + +// Constants for S3 configuration options +const ( + S3Region = "s3.region" + S3SessionToken = "s3.session-token" + S3SecretAccessKey = "s3.secret-access-key" + S3AccessKeyID = "s3.access-key-id" + S3EndpointURL = "s3.endpoint" + S3ProxyURI = "s3.proxy-uri" + S3ConnectTimeout = "s3.connect-timeout" + S3SignerUri = "s3.signer.uri" + S3ForceVirtualAddressing = "s3.force-virtual-addressing" +) + +// Constants for GCS configuration options +const ( + GCSEndpoint = "gcs.endpoint" + GCSKeyPath = "gcs.keypath" + GCSJSONKey = "gcs.jsonkey" + GCSCredType = "gcs.credtype" + GCSUseJsonAPI = "gcs.usejsonapi" // set to anything to enable +) + +// Constants for Azure configuration options +const ( + AdlsSasTokenPrefix = "adls.sas-token." + AdlsConnectionStringPrefix = "adls.connection-string." + AdlsSharedKeyAccountName = "adls.auth.shared-key.account.name" + AdlsSharedKeyAccountKey = "adls.auth.shared-key.account.key" + AdlsEndpoint = "adls.endpoint" + AdlsProtocol = "adls.protocol" + + // Not in use yet + // AdlsReadBlockSize = "adls.read.block-size-bytes" + // AdlsWriteBlockSize = "adls.write.block-size-bytes" +) diff --git a/io/gocloud/azure.go b/io/gocloud/azure.go index b12a064a8..faf81e62f 100644 --- a/io/gocloud/azure.go +++ b/io/gocloud/azure.go @@ -28,6 +28,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" + "github.com/apache/iceberg-go/io" "gocloud.dev/blob" "gocloud.dev/blob/azureblob" ) @@ -36,20 +37,6 @@ import ( // https://github.com/apache/iceberg/blob/2114bf631e49af532d66e2ce148ee49dd1dd1f1f/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSLocation.java#L47 var adlsURIPattern = regexp.MustCompile(`^(abfss?|wasbs?)://([^/?#]+)(.*)?$`) -// Constants for Azure configuration options -const ( - AdlsSasTokenPrefix = "adls.sas-token." - AdlsConnectionStringPrefix = "adls.connection-string." - AdlsSharedKeyAccountName = "adls.auth.shared-key.account.name" - AdlsSharedKeyAccountKey = "adls.auth.shared-key.account.key" - AdlsEndpoint = "adls.endpoint" - AdlsProtocol = "adls.protocol" - - // Not in use yet - // AdlsReadBlockSize = "adls.read.block-size-bytes" - // AdlsWriteBlockSize = "adls.write.block-size-bytes" -) - // adlsLocation represents the parsed components of an Azure Data Lake Storage URI type adlsLocation struct { accountName string // Azure storage account name @@ -119,8 +106,8 @@ func newAdlsLocation(adlsURI *url.URL) (*adlsLocation, error) { // Construct a Azure bucket from a URL func createAzureBucket(ctx context.Context, parsed *url.URL, props map[string]string) (*blob.Bucket, error) { - adlsSasTokens := propertiesWithPrefix(props, AdlsSasTokenPrefix) - adlsConnectionStrings := propertiesWithPrefix(props, AdlsConnectionStringPrefix) + adlsSasTokens := propertiesWithPrefix(props, io.AdlsSasTokenPrefix) + adlsConnectionStrings := propertiesWithPrefix(props, io.AdlsConnectionStringPrefix) // Construct the client location, err := newAdlsLocation(parsed) @@ -128,16 +115,16 @@ func createAzureBucket(ctx context.Context, parsed *url.URL, props map[string]st return nil, err } - sharedKeyAccountName := props[AdlsSharedKeyAccountName] - endpoint := props[AdlsEndpoint] - protocol := props[AdlsProtocol] + sharedKeyAccountName := props[io.AdlsSharedKeyAccountName] + endpoint := props[io.AdlsEndpoint] + protocol := props[io.AdlsProtocol] var client *container.Client if sharedKeyAccountName != "" { - sharedKeyAccountKey, ok := props[AdlsSharedKeyAccountKey] + sharedKeyAccountKey, ok := props[io.AdlsSharedKeyAccountKey] if !ok || sharedKeyAccountKey == "" { - return nil, fmt.Errorf("azure authentication: shared-key requires both %s and %s", AdlsSharedKeyAccountName, AdlsSharedKeyAccountKey) + return nil, fmt.Errorf("azure authentication: shared-key requires both %s and %s", io.AdlsSharedKeyAccountName, io.AdlsSharedKeyAccountKey) } containerURL, err := createContainerURL(location.accountName, protocol, endpoint, "", location.containerName) diff --git a/io/gocloud/azure_integration_test.go b/io/gocloud/azure_integration_test.go index 44ab58d93..a4c51a41a 100644 --- a/io/gocloud/azure_integration_test.go +++ b/io/gocloud/azure_integration_test.go @@ -29,7 +29,8 @@ import ( "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/catalog" sqlcat "github.com/apache/iceberg-go/catalog/sql" - "github.com/apache/iceberg-go/io/gocloud" + "github.com/apache/iceberg-go/io" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/stretchr/testify/suite" "github.com/uptrace/bun/driver/sqliteshim" "gocloud.dev/blob/azureblob" @@ -64,10 +65,10 @@ func (s *AzureBlobIOTestSuite) TestAzureBlobWarehouseKey() { sqlcat.DriverKey: sqliteshim.ShimName, sqlcat.DialectKey: string(sqlcat.SQLite), "type": "sql", - gocloud.AdlsSharedKeyAccountName: accountName, - gocloud.AdlsSharedKeyAccountKey: accountKey, - gocloud.AdlsEndpoint: endpoint, - gocloud.AdlsProtocol: protocol, + io.AdlsSharedKeyAccountName: accountName, + io.AdlsSharedKeyAccountKey: accountKey, + io.AdlsEndpoint: endpoint, + io.AdlsProtocol: protocol, } cat, err := catalog.Load(context.Background(), "default", properties) @@ -99,7 +100,7 @@ func (s *AzureBlobIOTestSuite) TestAzuriteWarehouseConnectionString() { sqlcat.DriverKey: sqliteshim.ShimName, sqlcat.DialectKey: string(sqlcat.SQLite), "type": "sql", - gocloud.AdlsConnectionStringPrefix + accountName: connectionString, + io.AdlsConnectionStringPrefix + accountName: connectionString, } cat, err := catalog.Load(context.Background(), "default", properties) diff --git a/io/gocloud/gcs.go b/io/gocloud/gcs.go index 9e5d1ccc3..ad7b1bdf6 100644 --- a/io/gocloud/gcs.go +++ b/io/gocloud/gcs.go @@ -23,21 +23,13 @@ import ( "cloud.google.com/go/storage" + "github.com/apache/iceberg-go/io" "gocloud.dev/blob" "gocloud.dev/blob/gcsblob" "gocloud.dev/gcp" "google.golang.org/api/option" ) -// Constants for GCS configuration options -const ( - GCSEndpoint = "gcs.endpoint" - GCSKeyPath = "gcs.keypath" - GCSJSONKey = "gcs.jsonkey" - GCSCredType = "gcs.credtype" - GCSUseJsonAPI = "gcs.usejsonapi" // set to anything to enable -) - var allowedGCSCredTypes = map[string]option.CredentialsType{ "service_account": option.ServiceAccount, "authorized_user": option.AuthorizedUser, @@ -48,22 +40,22 @@ var allowedGCSCredTypes = map[string]option.CredentialsType{ // ParseGCSConfig parses GCS properties and returns a configuration. func ParseGCSConfig(props map[string]string) *gcsblob.Options { var o []option.ClientOption - if url := props[GCSEndpoint]; url != "" { + if url := props[io.GCSEndpoint]; url != "" { o = append(o, option.WithEndpoint(url)) } var credType option.CredentialsType - if key := props[GCSCredType]; key != "" { + if key := props[io.GCSCredType]; key != "" { if ct, ok := allowedGCSCredTypes[key]; ok { credType = ct } } - if key := props[GCSJSONKey]; key != "" { + if key := props[io.GCSJSONKey]; key != "" { o = append(o, option.WithAuthCredentialsJSON(credType, []byte(key))) } - if path := props[GCSKeyPath]; path != "" { + if path := props[io.GCSKeyPath]; path != "" { o = append(o, option.WithAuthCredentialsFile(credType, path)) } - if _, ok := props[GCSUseJsonAPI]; ok { + if _, ok := props[io.GCSUseJsonAPI]; ok { o = append(o, storage.WithJSONReads()) } diff --git a/io/gocloud/gcs_integration_test.go b/io/gocloud/gcs_integration_test.go index 4cf1e38e8..b20e04e41 100644 --- a/io/gocloud/gcs_integration_test.go +++ b/io/gocloud/gcs_integration_test.go @@ -31,7 +31,8 @@ import ( "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/catalog" sqlcat "github.com/apache/iceberg-go/catalog/sql" - "github.com/apache/iceberg-go/io/gocloud" + "github.com/apache/iceberg-go/io" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/stretchr/testify/suite" "github.com/uptrace/bun/driver/sqliteshim" ) @@ -113,8 +114,8 @@ func (s *GCSIOTestSuite) TestGCSWarehouse() { sqlcat.DialectKey: string(sqlcat.SQLite), "type": "sql", "warehouse": fmt.Sprintf("gs://%s/iceberg/", gcsBucketName), - gocloud.GCSEndpoint: fmt.Sprintf("http://%s/", gcsEndpoint), - gocloud.GCSUseJsonAPI: "true", + io.GCSEndpoint: fmt.Sprintf("http://%s/", gcsEndpoint), + io.GCSUseJsonAPI: "true", } cat, err := catalog.Load(context.Background(), "default", properties) diff --git a/io/gocloud/s3.go b/io/gocloud/s3.go index 6aabc3c75..4adb3ee40 100644 --- a/io/gocloud/s3.go +++ b/io/gocloud/s3.go @@ -26,6 +26,7 @@ import ( "slices" "strconv" + "github.com/apache/iceberg-go/io" "github.com/apache/iceberg-go/utils" "github.com/aws/aws-sdk-go-v2/aws" awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http" @@ -37,22 +38,9 @@ import ( "gocloud.dev/blob/s3blob" ) -// Constants for S3 configuration options -const ( - S3Region = "s3.region" - S3SessionToken = "s3.session-token" - S3SecretAccessKey = "s3.secret-access-key" - S3AccessKeyID = "s3.access-key-id" - S3EndpointURL = "s3.endpoint" - S3ProxyURI = "s3.proxy-uri" - S3ConnectTimeout = "s3.connect-timeout" - S3SignerUri = "s3.signer.uri" - S3ForceVirtualAddressing = "s3.force-virtual-addressing" -) - var unsupportedS3Props = []string{ - S3ConnectTimeout, - S3SignerUri, + io.S3ConnectTimeout, + io.S3SignerUri, } // ParseAWSConfig parses S3 properties and returns a configuration. @@ -71,20 +59,20 @@ func ParseAWSConfig(ctx context.Context, props map[string]string) (*aws.Config, &bearer.StaticTokenProvider{Token: bearer.Token{Value: tok}})) } - if region, ok := props[S3Region]; ok { + if region, ok := props[io.S3Region]; ok { opts = append(opts, config.WithRegion(region)) } else if region, ok := props["client.region"]; ok { opts = append(opts, config.WithRegion(region)) } - accessKey, secretAccessKey := props[S3AccessKeyID], props[S3SecretAccessKey] - token := props[S3SessionToken] + accessKey, secretAccessKey := props[io.S3AccessKeyID], props[io.S3SecretAccessKey] + token := props[io.S3SessionToken] if accessKey != "" || secretAccessKey != "" || token != "" { opts = append(opts, config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider( - props[S3AccessKeyID], props[S3SecretAccessKey], props[S3SessionToken]))) + props[io.S3AccessKeyID], props[io.S3SecretAccessKey], props[io.S3SessionToken]))) } - if proxy, ok := props[S3ProxyURI]; ok { + if proxy, ok := props[io.S3ProxyURI]; ok { proxyURL, err := url.Parse(proxy) if err != nil { return nil, fmt.Errorf("invalid s3 proxy url '%s'", proxy) @@ -121,13 +109,13 @@ func createS3Bucket(ctx context.Context, parsed *url.URL, props map[string]strin } } - endpoint, ok := props[S3EndpointURL] + endpoint, ok := props[io.S3EndpointURL] if !ok { endpoint = os.Getenv("AWS_S3_ENDPOINT") } usePathStyle := true - if forceVirtual, ok := props[S3ForceVirtualAddressing]; ok { + if forceVirtual, ok := props[io.S3ForceVirtualAddressing]; ok { if cfgForceVirtual, err := strconv.ParseBool(forceVirtual); err == nil { usePathStyle = !cfgForceVirtual } diff --git a/io/gocloud/s3_integration_test.go b/io/gocloud/s3_integration_test.go index 7ab9d6df0..c3197edda 100644 --- a/io/gocloud/s3_integration_test.go +++ b/io/gocloud/s3_integration_test.go @@ -28,7 +28,8 @@ import ( "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/catalog" sqlcat "github.com/apache/iceberg-go/catalog/sql" - "github.com/apache/iceberg-go/io/gocloud" + "github.com/apache/iceberg-go/io" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/stretchr/testify/require" "github.com/uptrace/bun/driver/sqliteshim" ) @@ -43,9 +44,9 @@ func TestMinioWarehouse(t *testing.T) { sqlcat.DialectKey: string(sqlcat.SQLite), "type": "sql", "warehouse": "s3a://warehouse/iceberg/", - gocloud.S3Region: "local", - gocloud.S3AccessKeyID: "admin", - gocloud.S3SecretAccessKey: "password", + io.S3Region: "local", + io.S3AccessKeyID: "admin", + io.S3SecretAccessKey: "password", // endpoint is passed via AWS_S3_ENDPOINT env var }) require.NoError(t, err) @@ -75,9 +76,9 @@ func TestMinioWarehouseNoLocation(t *testing.T) { sqlcat.DialectKey: string(sqlcat.SQLite), "type": "sql", "warehouse": "s3a://warehouse/iceberg/", - gocloud.S3Region: "local", - gocloud.S3AccessKeyID: "admin", - gocloud.S3SecretAccessKey: "password", + io.S3Region: "local", + io.S3AccessKeyID: "admin", + io.S3SecretAccessKey: "password", // endpoint is passed via AWS_S3_ENDPOINT env var }) require.NoError(t, err) diff --git a/table/orphan_cleanup_integration_test.go b/table/orphan_cleanup_integration_test.go index 9e1da3ca3..cce3fd0bb 100644 --- a/table/orphan_cleanup_integration_test.go +++ b/table/orphan_cleanup_integration_test.go @@ -34,7 +34,7 @@ import ( "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/rest" "github.com/apache/iceberg-go/io" - "github.com/apache/iceberg-go/io/gocloud" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" "github.com/stretchr/testify/suite" ) @@ -71,9 +71,9 @@ func (s *OrphanCleanupIntegrationSuite) loadCatalog() *rest.Catalog { cat, err := catalog.Load(s.ctx, "local", iceberg.Properties{ "type": "rest", "uri": "http://localhost:8181", - gocloud.S3Region: "us-east-1", - gocloud.S3AccessKeyID: "admin", - gocloud.S3SecretAccessKey: "password", + io.S3Region: "us-east-1", + io.S3AccessKeyID: "admin", + io.S3SecretAccessKey: "password", }) s.Require().NoError(err) s.Require().IsType(&rest.Catalog{}, cat) diff --git a/table/scanner_test.go b/table/scanner_test.go index 961b4e6f0..6554c1343 100644 --- a/table/scanner_test.go +++ b/table/scanner_test.go @@ -37,7 +37,8 @@ import ( "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/rest" "github.com/apache/iceberg-go/internal/recipe" - "github.com/apache/iceberg-go/io/gocloud" + "github.com/apache/iceberg-go/io" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -60,9 +61,9 @@ func (s *ScannerSuite) SetupTest() { cat, err := rest.NewCatalog(s.ctx, "rest", "http://localhost:8181", rest.WithAdditionalProps( iceberg.Properties{ - gocloud.S3Region: "us-east-1", - gocloud.S3AccessKeyID: "admin", - gocloud.S3SecretAccessKey: "password", + io.S3Region: "us-east-1", + io.S3AccessKeyID: "admin", + io.S3SecretAccessKey: "password", }, )) s.Require().NoError(err) diff --git a/table/transaction_test.go b/table/transaction_test.go index 92330dffe..db06d64d1 100644 --- a/table/transaction_test.go +++ b/table/transaction_test.go @@ -35,7 +35,7 @@ import ( "github.com/apache/iceberg-go/catalog/rest" "github.com/apache/iceberg-go/internal/recipe" iceio "github.com/apache/iceberg-go/io" - "github.com/apache/iceberg-go/io/gocloud" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" "github.com/stretchr/testify/suite" "github.com/testcontainers/testcontainers-go/modules/compose" @@ -59,10 +59,10 @@ func (s *SparkIntegrationTestSuite) SetupSuite() { func (s *SparkIntegrationTestSuite) SetupTest() { s.ctx = context.Background() s.props = iceberg.Properties{ - gocloud.S3Region: "us-east-1", - gocloud.S3EndpointURL: "http://localhost:9000", - gocloud.S3AccessKeyID: "admin", - gocloud.S3SecretAccessKey: "password", + iceio.S3Region: "us-east-1", + iceio.S3EndpointURL: "http://localhost:9000", + iceio.S3AccessKeyID: "admin", + iceio.S3SecretAccessKey: "password", } cat, err := rest.NewCatalog(s.ctx, "rest", "http://localhost:8181", rest.WithAdditionalProps(s.props)) From cb8613776638cc27bbf35c502d3e51854814b169 Mon Sep 17 00:00:00 2001 From: Alessandro Nori Date: Fri, 6 Feb 2026 11:31:15 +0100 Subject: [PATCH 06/10] add some tests for mem io --- io/gocloud/mem_test.go | 133 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 io/gocloud/mem_test.go diff --git a/io/gocloud/mem_test.go b/io/gocloud/mem_test.go new file mode 100644 index 000000000..a7d536ba2 --- /dev/null +++ b/io/gocloud/mem_test.go @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package gocloud_test + +import ( + "context" + "io" + "testing" + + icebergio "github.com/apache/iceberg-go/io" + _ "github.com/apache/iceberg-go/io/gocloud" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMemIO_BasicOperations(t *testing.T) { + ctx := context.Background() + + memIO, err := icebergio.LoadFS(ctx, map[string]string{}, "mem://bucket/") + require.NoError(t, err) + require.NotNil(t, memIO) + + writeIO, ok := memIO.(icebergio.WriteFileIO) + require.True(t, ok, "mem IO should implement WriteFileIO") + + testData := []byte("Hello, Iceberg!") + err = writeIO.WriteFile("test-file.txt", testData) + require.NoError(t, err) + + file, err := memIO.Open("test-file.txt") + require.NoError(t, err) + defer file.Close() + + content, err := io.ReadAll(file) + require.NoError(t, err) + assert.Equal(t, testData, content) + + err = memIO.Remove("test-file.txt") + require.NoError(t, err) + + _, err = memIO.Open("test-file.txt") + assert.Error(t, err) +} + +func TestMemIO_Create(t *testing.T) { + ctx := context.Background() + + memIO, err := icebergio.LoadFS(ctx, map[string]string{}, "mem://bucket/") + require.NoError(t, err) + + writeIO := memIO.(icebergio.WriteFileIO) + + writer, err := writeIO.Create("created-file.txt") + require.NoError(t, err) + require.NotNil(t, writer) + + testData := []byte("Data written via Create") + n, err := writer.Write(testData) + require.NoError(t, err) + assert.Equal(t, len(testData), n) + + err = writer.Close() + require.NoError(t, err) + + file, err := memIO.Open("created-file.txt") + require.NoError(t, err) + defer file.Close() + + content, err := io.ReadAll(file) + require.NoError(t, err) + assert.Equal(t, testData, content) +} + +func TestMemIO_MultipleFiles(t *testing.T) { + ctx := context.Background() + + memIO, err := icebergio.LoadFS(ctx, map[string]string{}, "mem://bucket/") + require.NoError(t, err) + + writeIO := memIO.(icebergio.WriteFileIO) + + files := map[string][]byte{ + "file1.txt": []byte("Content of file 1"), + "file2.txt": []byte("Content of file 2"), + "file3.txt": []byte("Content of file 3"), + } + + for name, content := range files { + err := writeIO.WriteFile(name, content) + require.NoError(t, err) + } + + for name, expectedContent := range files { + file, err := memIO.Open(name) + require.NoError(t, err) + + content, err := io.ReadAll(file) + require.NoError(t, err) + assert.Equal(t, expectedContent, content) + + err = file.Close() + require.NoError(t, err) + } + + err = memIO.Remove("file2.txt") + require.NoError(t, err) + + _, err = memIO.Open("file2.txt") + assert.Error(t, err) + + file1, err := memIO.Open("file1.txt") + require.NoError(t, err) + file1.Close() + + file3, err := memIO.Open("file3.txt") + require.NoError(t, err) + file3.Close() +} From 7bac60c0836a2adfbfca2f495989342c11950199 Mon Sep 17 00:00:00 2001 From: Alessandro Nori Date: Tue, 10 Feb 2026 12:05:00 +0100 Subject: [PATCH 07/10] run integration tests on all `io` directory --- .github/workflows/go-integration.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/go-integration.yml b/.github/workflows/go-integration.yml index 76941e45f..30e39c6e3 100644 --- a/.github/workflows/go-integration.yml +++ b/.github/workflows/go-integration.yml @@ -65,9 +65,9 @@ jobs: env: AWS_S3_ENDPOINT: "${{ env.AWS_S3_ENDPOINT }}" AWS_REGION: "us-east-1" - run: | + run: | go test -tags integration -v -run="^TestScanner" ./table - go test -tags integration -v ./io + go test -tags integration -v ./io/... go test -tags integration -v -run="^TestRestIntegration$" ./catalog/rest go test -tags=integration -v ./catalog/hive/... - name: Run spark integration tests From 96e4f26a8c08551436c0050453eec0aefca9b430 Mon Sep 17 00:00:00 2001 From: Alessandro Nori Date: Wed, 11 Feb 2026 11:26:30 +0100 Subject: [PATCH 08/10] use mutex to check for duplicated scheme in registry --- io/registry.go | 52 +++++++++++++++++--------------------------------- 1 file changed, 18 insertions(+), 34 deletions(-) diff --git a/io/registry.go b/io/registry.go index 6c8a7818c..16ef5ab27 100644 --- a/io/registry.go +++ b/io/registry.go @@ -28,33 +28,6 @@ import ( type registry map[string]SchemeFactory -func (r registry) getKeys() []string { - regMutex.Lock() - defer regMutex.Unlock() - - return slices.Collect(maps.Keys(r)) -} - -func (r registry) set(scheme string, factory SchemeFactory) { - regMutex.Lock() - defer regMutex.Unlock() - r[scheme] = factory -} - -func (r registry) get(scheme string) (SchemeFactory, bool) { - regMutex.Lock() - defer regMutex.Unlock() - factory, ok := r[scheme] - - return factory, ok -} - -func (r registry) remove(scheme string) { - regMutex.Lock() - defer regMutex.Unlock() - delete(r, scheme) -} - var ( regMutex sync.Mutex defaultRegistry = registry{} @@ -63,25 +36,34 @@ var ( // SchemeFactory is a function that creates an IO implementation for a given URI and properties. type SchemeFactory func(ctx context.Context, parsed *url.URL, props map[string]string) (IO, error) -// Register adds a new scheme factory to the registry. If the scheme is already registered, it will be replaced. +// Register adds a new scheme factory to the registry. If the scheme is already registered, it will panic. func Register(scheme string, factory SchemeFactory) { if factory == nil { panic("io: Register factory is nil") } - if _, dup := defaultRegistry.get(scheme); dup { + + regMutex.Lock() + defer regMutex.Unlock() + + if _, dup := defaultRegistry[scheme]; dup { panic("io: Register called twice for scheme " + scheme) } - defaultRegistry.set(scheme, factory) + defaultRegistry[scheme] = factory } // Unregister removes the requested scheme factory from the registry. func Unregister(scheme string) { - defaultRegistry.remove(scheme) + regMutex.Lock() + defer regMutex.Unlock() + delete(defaultRegistry, scheme) } // GetRegisteredSchemes returns the list of registered scheme names. func GetRegisteredSchemes() []string { - return defaultRegistry.getKeys() + regMutex.Lock() + defer regMutex.Unlock() + + return slices.Collect(maps.Keys(defaultRegistry)) } func init() { @@ -99,8 +81,10 @@ func inferFileIOFromScheme(ctx context.Context, path string, props map[string]st return nil, err } - // Look up the scheme in the registry - factory, ok := defaultRegistry.get(parsed.Scheme) + regMutex.Lock() + factory, ok := defaultRegistry[parsed.Scheme] + regMutex.Unlock() + if !ok { return nil, fmt.Errorf("%w: %s", ErrIONotFound, parsed.Scheme) } From 9271d2c1f6d54c2931b6d2451349dba177058472 Mon Sep 17 00:00:00 2001 From: Alessandro Nori Date: Fri, 13 Feb 2026 11:21:06 +0100 Subject: [PATCH 09/10] address review comments --- io/io.go | 2 +- io/registry.go | 12 ++++++------ io/registry_test.go | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/io/io.go b/io/io.go index 2a10c4459..1eac931f0 100644 --- a/io/io.go +++ b/io/io.go @@ -202,7 +202,7 @@ func (f ioFS) Remove(name string) error { } var ( - ErrIONotFound = errors.New("io scheme not registered") + ErrIOSchemeNotFound = errors.New("io scheme not registered") errMissingReadDir = errors.New("fs.File directory missing ReadDir method") errMissingSeek = errors.New("fs.File missing Seek method") diff --git a/io/registry.go b/io/registry.go index 16ef5ab27..f18385fab 100644 --- a/io/registry.go +++ b/io/registry.go @@ -29,7 +29,7 @@ import ( type registry map[string]SchemeFactory var ( - regMutex sync.Mutex + regMutex sync.RWMutex defaultRegistry = registry{} ) @@ -60,8 +60,8 @@ func Unregister(scheme string) { // GetRegisteredSchemes returns the list of registered scheme names. func GetRegisteredSchemes() []string { - regMutex.Lock() - defer regMutex.Unlock() + regMutex.RLock() + defer regMutex.RUnlock() return slices.Collect(maps.Keys(defaultRegistry)) } @@ -81,12 +81,12 @@ func inferFileIOFromScheme(ctx context.Context, path string, props map[string]st return nil, err } - regMutex.Lock() + regMutex.RLock() factory, ok := defaultRegistry[parsed.Scheme] - regMutex.Unlock() + regMutex.RUnlock() if !ok { - return nil, fmt.Errorf("%w: %s", ErrIONotFound, parsed.Scheme) + return nil, fmt.Errorf("%w for path %q (scheme: %s)", ErrIOSchemeNotFound, path, parsed.Scheme) } return factory(ctx, parsed, props) diff --git a/io/registry_test.go b/io/registry_test.go index f7d618063..bb257daa2 100644 --- a/io/registry_test.go +++ b/io/registry_test.go @@ -91,7 +91,7 @@ func TestIORegistry(t *testing.T) { _, err = io.LoadFS(ctx, map[string]string{}, "custom://bucket/path") assert.Error(t, err) - assert.ErrorIs(t, err, io.ErrIONotFound) + assert.ErrorIs(t, err, io.ErrIOSchemeNotFound) } func TestRegistryPanic(t *testing.T) { @@ -153,7 +153,7 @@ func TestLoadFS(t *testing.T) { iofs, err := io.LoadFS(ctx, map[string]string{}, tt.location) if tt.expectError { assert.Error(t, err) - assert.ErrorIs(t, err, io.ErrIONotFound) + assert.ErrorIs(t, err, io.ErrIOSchemeNotFound) } else { require.NoError(t, err) assert.NotNil(t, iofs) From 368102085ccbfc5a7907d124ddff871f9c773be4 Mon Sep 17 00:00:00 2001 From: Alessandro Nori Date: Mon, 16 Feb 2026 10:41:09 +0100 Subject: [PATCH 10/10] refactor config key names --- io/config.go | 20 ++++++++++---------- io/gocloud/azure.go | 14 +++++++------- io/gocloud/azure_integration_test.go | 10 +++++----- io/gocloud/gcs.go | 2 +- io/gocloud/gcs_integration_test.go | 2 +- io/gocloud/s3.go | 2 +- 6 files changed, 25 insertions(+), 25 deletions(-) diff --git a/io/config.go b/io/config.go index 6b9a952ec..33feeed2f 100644 --- a/io/config.go +++ b/io/config.go @@ -26,7 +26,7 @@ const ( S3EndpointURL = "s3.endpoint" S3ProxyURI = "s3.proxy-uri" S3ConnectTimeout = "s3.connect-timeout" - S3SignerUri = "s3.signer.uri" + S3SignerURI = "s3.signer.uri" S3ForceVirtualAddressing = "s3.force-virtual-addressing" ) @@ -36,19 +36,19 @@ const ( GCSKeyPath = "gcs.keypath" GCSJSONKey = "gcs.jsonkey" GCSCredType = "gcs.credtype" - GCSUseJsonAPI = "gcs.usejsonapi" // set to anything to enable + GCSUseJSONAPI = "gcs.usejsonapi" // set to anything to enable ) // Constants for Azure configuration options const ( - AdlsSasTokenPrefix = "adls.sas-token." - AdlsConnectionStringPrefix = "adls.connection-string." - AdlsSharedKeyAccountName = "adls.auth.shared-key.account.name" - AdlsSharedKeyAccountKey = "adls.auth.shared-key.account.key" - AdlsEndpoint = "adls.endpoint" - AdlsProtocol = "adls.protocol" + ADLSSasTokenPrefix = "adls.sas-token." + ADLSConnectionStringPrefix = "adls.connection-string." + ADLSSharedKeyAccountName = "adls.auth.shared-key.account.name" + ADLSSharedKeyAccountKey = "adls.auth.shared-key.account.key" + ADLSEndpoint = "adls.endpoint" + ADLSProtocol = "adls.protocol" // Not in use yet - // AdlsReadBlockSize = "adls.read.block-size-bytes" - // AdlsWriteBlockSize = "adls.write.block-size-bytes" + // ADLSReadBlockSize = "adls.read.block-size-bytes" + // ADLSWriteBlockSize = "adls.write.block-size-bytes" ) diff --git a/io/gocloud/azure.go b/io/gocloud/azure.go index faf81e62f..796b8d38e 100644 --- a/io/gocloud/azure.go +++ b/io/gocloud/azure.go @@ -106,8 +106,8 @@ func newAdlsLocation(adlsURI *url.URL) (*adlsLocation, error) { // Construct a Azure bucket from a URL func createAzureBucket(ctx context.Context, parsed *url.URL, props map[string]string) (*blob.Bucket, error) { - adlsSasTokens := propertiesWithPrefix(props, io.AdlsSasTokenPrefix) - adlsConnectionStrings := propertiesWithPrefix(props, io.AdlsConnectionStringPrefix) + adlsSasTokens := propertiesWithPrefix(props, io.ADLSSasTokenPrefix) + adlsConnectionStrings := propertiesWithPrefix(props, io.ADLSConnectionStringPrefix) // Construct the client location, err := newAdlsLocation(parsed) @@ -115,16 +115,16 @@ func createAzureBucket(ctx context.Context, parsed *url.URL, props map[string]st return nil, err } - sharedKeyAccountName := props[io.AdlsSharedKeyAccountName] - endpoint := props[io.AdlsEndpoint] - protocol := props[io.AdlsProtocol] + sharedKeyAccountName := props[io.ADLSSharedKeyAccountName] + endpoint := props[io.ADLSEndpoint] + protocol := props[io.ADLSProtocol] var client *container.Client if sharedKeyAccountName != "" { - sharedKeyAccountKey, ok := props[io.AdlsSharedKeyAccountKey] + sharedKeyAccountKey, ok := props[io.ADLSSharedKeyAccountKey] if !ok || sharedKeyAccountKey == "" { - return nil, fmt.Errorf("azure authentication: shared-key requires both %s and %s", io.AdlsSharedKeyAccountName, io.AdlsSharedKeyAccountKey) + return nil, fmt.Errorf("azure authentication: shared-key requires both %s and %s", io.ADLSSharedKeyAccountName, io.ADLSSharedKeyAccountKey) } containerURL, err := createContainerURL(location.accountName, protocol, endpoint, "", location.containerName) diff --git a/io/gocloud/azure_integration_test.go b/io/gocloud/azure_integration_test.go index a4c51a41a..cd6998400 100644 --- a/io/gocloud/azure_integration_test.go +++ b/io/gocloud/azure_integration_test.go @@ -65,10 +65,10 @@ func (s *AzureBlobIOTestSuite) TestAzureBlobWarehouseKey() { sqlcat.DriverKey: sqliteshim.ShimName, sqlcat.DialectKey: string(sqlcat.SQLite), "type": "sql", - io.AdlsSharedKeyAccountName: accountName, - io.AdlsSharedKeyAccountKey: accountKey, - io.AdlsEndpoint: endpoint, - io.AdlsProtocol: protocol, + io.ADLSSharedKeyAccountName: accountName, + io.ADLSSharedKeyAccountKey: accountKey, + io.ADLSEndpoint: endpoint, + io.ADLSProtocol: protocol, } cat, err := catalog.Load(context.Background(), "default", properties) @@ -100,7 +100,7 @@ func (s *AzureBlobIOTestSuite) TestAzuriteWarehouseConnectionString() { sqlcat.DriverKey: sqliteshim.ShimName, sqlcat.DialectKey: string(sqlcat.SQLite), "type": "sql", - io.AdlsConnectionStringPrefix + accountName: connectionString, + io.ADLSConnectionStringPrefix + accountName: connectionString, } cat, err := catalog.Load(context.Background(), "default", properties) diff --git a/io/gocloud/gcs.go b/io/gocloud/gcs.go index ad7b1bdf6..96e5e1dc4 100644 --- a/io/gocloud/gcs.go +++ b/io/gocloud/gcs.go @@ -55,7 +55,7 @@ func ParseGCSConfig(props map[string]string) *gcsblob.Options { if path := props[io.GCSKeyPath]; path != "" { o = append(o, option.WithAuthCredentialsFile(credType, path)) } - if _, ok := props[io.GCSUseJsonAPI]; ok { + if _, ok := props[io.GCSUseJSONAPI]; ok { o = append(o, storage.WithJSONReads()) } diff --git a/io/gocloud/gcs_integration_test.go b/io/gocloud/gcs_integration_test.go index b20e04e41..0b86628df 100644 --- a/io/gocloud/gcs_integration_test.go +++ b/io/gocloud/gcs_integration_test.go @@ -115,7 +115,7 @@ func (s *GCSIOTestSuite) TestGCSWarehouse() { "type": "sql", "warehouse": fmt.Sprintf("gs://%s/iceberg/", gcsBucketName), io.GCSEndpoint: fmt.Sprintf("http://%s/", gcsEndpoint), - io.GCSUseJsonAPI: "true", + io.GCSUseJSONAPI: "true", } cat, err := catalog.Load(context.Background(), "default", properties) diff --git a/io/gocloud/s3.go b/io/gocloud/s3.go index 4adb3ee40..81c630573 100644 --- a/io/gocloud/s3.go +++ b/io/gocloud/s3.go @@ -40,7 +40,7 @@ import ( var unsupportedS3Props = []string{ io.S3ConnectTimeout, - io.S3SignerUri, + io.S3SignerURI, } // ParseAWSConfig parses S3 properties and returns a configuration.