diff --git a/.github/workflows/go-integration.yml b/.github/workflows/go-integration.yml index 76941e45f..30e39c6e3 100644 --- a/.github/workflows/go-integration.yml +++ b/.github/workflows/go-integration.yml @@ -65,9 +65,9 @@ jobs: env: AWS_S3_ENDPOINT: "${{ env.AWS_S3_ENDPOINT }}" AWS_REGION: "us-east-1" - run: | + run: | go test -tags integration -v -run="^TestScanner" ./table - go test -tags integration -v ./io + go test -tags integration -v ./io/... go test -tags integration -v -run="^TestRestIntegration$" ./catalog/rest go test -tags=integration -v ./catalog/hive/... - name: Run spark integration tests diff --git a/catalog/glue/glue_test.go b/catalog/glue/glue_test.go index c05757f5b..ddadcdf58 100644 --- a/catalog/glue/glue_test.go +++ b/catalog/glue/glue_test.go @@ -31,6 +31,7 @@ import ( "github.com/apache/iceberg-go" "github.com/apache/iceberg-go/catalog" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" diff --git a/catalog/rest/rest_integration_test.go b/catalog/rest/rest_integration_test.go index 17fe20263..ca156f620 100644 --- a/catalog/rest/rest_integration_test.go +++ b/catalog/rest/rest_integration_test.go @@ -32,6 +32,7 @@ import ( "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/rest" "github.com/apache/iceberg-go/io" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" "github.com/apache/iceberg-go/view" "github.com/stretchr/testify/require" diff --git a/catalog/sql/sql_integration_test.go b/catalog/sql/sql_integration_test.go index 5e879e9dc..82f83bbac 100644 --- a/catalog/sql/sql_integration_test.go +++ b/catalog/sql/sql_integration_test.go @@ -34,6 +34,7 @@ import ( "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/sql" "github.com/apache/iceberg-go/io" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" "github.com/stretchr/testify/suite" ) diff --git a/catalog/sql/sql_test.go b/catalog/sql/sql_test.go index 6ac0fdb86..039189bcc 100644 --- a/catalog/sql/sql_test.go +++ b/catalog/sql/sql_test.go @@ -36,6 +36,7 @@ import ( "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/internal" sqlcat "github.com/apache/iceberg-go/catalog/sql" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" diff --git a/cmd/iceberg/main.go b/cmd/iceberg/main.go index 128385914..a42e92bbf 100644 --- a/cmd/iceberg/main.go +++ b/cmd/iceberg/main.go @@ -31,6 +31,7 @@ import ( "github.com/apache/iceberg-go/catalog/hive" "github.com/apache/iceberg-go/catalog/rest" "github.com/apache/iceberg-go/config" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" awsconfig "github.com/aws/aws-sdk-go-v2/config" diff --git a/io/config.go b/io/config.go new file mode 100644 index 000000000..33feeed2f --- /dev/null +++ b/io/config.go @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package io + +// Constants for S3 configuration options +const ( + S3Region = "s3.region" + S3SessionToken = "s3.session-token" + S3SecretAccessKey = "s3.secret-access-key" + S3AccessKeyID = "s3.access-key-id" + S3EndpointURL = "s3.endpoint" + S3ProxyURI = "s3.proxy-uri" + S3ConnectTimeout = "s3.connect-timeout" + S3SignerURI = "s3.signer.uri" + S3ForceVirtualAddressing = "s3.force-virtual-addressing" +) + +// Constants for GCS configuration options +const ( + GCSEndpoint = "gcs.endpoint" + GCSKeyPath = "gcs.keypath" + GCSJSONKey = "gcs.jsonkey" + GCSCredType = "gcs.credtype" + GCSUseJSONAPI = "gcs.usejsonapi" // set to anything to enable +) + +// Constants for Azure configuration options +const ( + ADLSSasTokenPrefix = "adls.sas-token." + ADLSConnectionStringPrefix = "adls.connection-string." + ADLSSharedKeyAccountName = "adls.auth.shared-key.account.name" + ADLSSharedKeyAccountKey = "adls.auth.shared-key.account.key" + ADLSEndpoint = "adls.endpoint" + ADLSProtocol = "adls.protocol" + + // Not in use yet + // ADLSReadBlockSize = "adls.read.block-size-bytes" + // ADLSWriteBlockSize = "adls.write.block-size-bytes" +) diff --git a/io/azure.go b/io/gocloud/azure.go similarity index 86% rename from io/azure.go rename to io/gocloud/azure.go index a9e71b56b..796b8d38e 100644 --- a/io/azure.go +++ b/io/gocloud/azure.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package io +package gocloud import ( "context" @@ -28,6 +28,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" + "github.com/apache/iceberg-go/io" "gocloud.dev/blob" "gocloud.dev/blob/azureblob" ) @@ -36,20 +37,6 @@ import ( // https://github.com/apache/iceberg/blob/2114bf631e49af532d66e2ce148ee49dd1dd1f1f/azure/src/main/java/org/apache/iceberg/azure/adlsv2/ADLSLocation.java#L47 var adlsURIPattern = regexp.MustCompile(`^(abfss?|wasbs?)://([^/?#]+)(.*)?$`) -// Constants for Azure configuration options -const ( - AdlsSasTokenPrefix = "adls.sas-token." - AdlsConnectionStringPrefix = "adls.connection-string." - AdlsSharedKeyAccountName = "adls.auth.shared-key.account.name" - AdlsSharedKeyAccountKey = "adls.auth.shared-key.account.key" - AdlsEndpoint = "adls.endpoint" - AdlsProtocol = "adls.protocol" - - // Not in use yet - // AdlsReadBlockSize = "adls.read.block-size-bytes" - // AdlsWriteBlockSize = "adls.write.block-size-bytes" -) - // adlsLocation represents the parsed components of an Azure Data Lake Storage URI type adlsLocation struct { accountName string // Azure storage account name @@ -119,8 +106,8 @@ func newAdlsLocation(adlsURI *url.URL) (*adlsLocation, error) { // Construct a Azure bucket from a URL func createAzureBucket(ctx context.Context, parsed *url.URL, props map[string]string) (*blob.Bucket, error) { - adlsSasTokens := propertiesWithPrefix(props, AdlsSasTokenPrefix) - adlsConnectionStrings := propertiesWithPrefix(props, AdlsConnectionStringPrefix) + adlsSasTokens := propertiesWithPrefix(props, io.ADLSSasTokenPrefix) + adlsConnectionStrings := propertiesWithPrefix(props, io.ADLSConnectionStringPrefix) // Construct the client location, err := newAdlsLocation(parsed) @@ -128,16 +115,16 @@ func createAzureBucket(ctx context.Context, parsed *url.URL, props map[string]st return nil, err } - sharedKeyAccountName := props[AdlsSharedKeyAccountName] - endpoint := props[AdlsEndpoint] - protocol := props[AdlsProtocol] + sharedKeyAccountName := props[io.ADLSSharedKeyAccountName] + endpoint := props[io.ADLSEndpoint] + protocol := props[io.ADLSProtocol] var client *container.Client if sharedKeyAccountName != "" { - sharedKeyAccountKey, ok := props[AdlsSharedKeyAccountKey] + sharedKeyAccountKey, ok := props[io.ADLSSharedKeyAccountKey] if !ok || sharedKeyAccountKey == "" { - return nil, fmt.Errorf("azure authentication: shared-key requires both %s and %s", AdlsSharedKeyAccountName, AdlsSharedKeyAccountKey) + return nil, fmt.Errorf("azure authentication: shared-key requires both %s and %s", io.ADLSSharedKeyAccountName, io.ADLSSharedKeyAccountKey) } containerURL, err := createContainerURL(location.accountName, protocol, endpoint, "", location.containerName) diff --git a/io/azure_integration_test.go b/io/gocloud/azure_integration_test.go similarity index 94% rename from io/azure_integration_test.go rename to io/gocloud/azure_integration_test.go index cfe5ecb6b..cd6998400 100644 --- a/io/azure_integration_test.go +++ b/io/gocloud/azure_integration_test.go @@ -17,7 +17,7 @@ //go:build integration -package io_test +package gocloud_test import ( "context" @@ -30,6 +30,7 @@ import ( "github.com/apache/iceberg-go/catalog" sqlcat "github.com/apache/iceberg-go/catalog/sql" "github.com/apache/iceberg-go/io" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/stretchr/testify/suite" "github.com/uptrace/bun/driver/sqliteshim" "gocloud.dev/blob/azureblob" @@ -64,10 +65,10 @@ func (s *AzureBlobIOTestSuite) TestAzureBlobWarehouseKey() { sqlcat.DriverKey: sqliteshim.ShimName, sqlcat.DialectKey: string(sqlcat.SQLite), "type": "sql", - io.AdlsSharedKeyAccountName: accountName, - io.AdlsSharedKeyAccountKey: accountKey, - io.AdlsEndpoint: endpoint, - io.AdlsProtocol: protocol, + io.ADLSSharedKeyAccountName: accountName, + io.ADLSSharedKeyAccountKey: accountKey, + io.ADLSEndpoint: endpoint, + io.ADLSProtocol: protocol, } cat, err := catalog.Load(context.Background(), "default", properties) @@ -99,7 +100,7 @@ func (s *AzureBlobIOTestSuite) TestAzuriteWarehouseConnectionString() { sqlcat.DriverKey: sqliteshim.ShimName, sqlcat.DialectKey: string(sqlcat.SQLite), "type": "sql", - io.AdlsConnectionStringPrefix + accountName: connectionString, + io.ADLSConnectionStringPrefix + accountName: connectionString, } cat, err := catalog.Load(context.Background(), "default", properties) diff --git a/io/azure_test.go b/io/gocloud/azure_test.go similarity index 99% rename from io/azure_test.go rename to io/gocloud/azure_test.go index 27596dbdf..6a650d2ff 100644 --- a/io/azure_test.go +++ b/io/gocloud/azure_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package io +package gocloud import ( "context" diff --git a/io/blob.go b/io/gocloud/blob.go similarity index 96% rename from io/blob.go rename to io/gocloud/blob.go index 14faf33e5..4c417f5ab 100644 --- a/io/blob.go +++ b/io/gocloud/blob.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package io +package gocloud import ( "context" @@ -26,6 +26,7 @@ import ( "path/filepath" "strings" + icebergio "github.com/apache/iceberg-go/io" "gocloud.dev/blob" ) @@ -103,7 +104,7 @@ func (bfs *blobFileIO) preprocess(path string) (string, error) { return bfs.keyExtractor(path) } -func (bfs *blobFileIO) Open(path string) (File, error) { +func (bfs *blobFileIO) Open(path string) (icebergio.File, error) { var err error path, err = bfs.preprocess(path) if err != nil { @@ -133,7 +134,7 @@ func (bfs *blobFileIO) Remove(name string) error { return bfs.Delete(bfs.ctx, name) } -func (bfs *blobFileIO) Create(name string) (FileWriter, error) { +func (bfs *blobFileIO) Create(name string) (icebergio.FileWriter, error) { return bfs.NewWriter(bfs.ctx, name, true, nil) } @@ -185,7 +186,7 @@ func (bfs *blobFileIO) NewWriter(ctx context.Context, path string, overwrite boo nil } -func createBlobFS(ctx context.Context, bucket *blob.Bucket, keyExtractor KeyExtractor) IO { +func createBlobFS(ctx context.Context, bucket *blob.Bucket, keyExtractor KeyExtractor) icebergio.IO { return &blobFileIO{Bucket: bucket, keyExtractor: keyExtractor, ctx: ctx} } diff --git a/io/blob_test.go b/io/gocloud/blob_test.go similarity index 99% rename from io/blob_test.go rename to io/gocloud/blob_test.go index c6029809e..6854be7e4 100644 --- a/io/blob_test.go +++ b/io/gocloud/blob_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package io +package gocloud import ( "context" diff --git a/io/gcs.go b/io/gocloud/gcs.go similarity index 84% rename from io/gcs.go rename to io/gocloud/gcs.go index fa0de9ef6..96e5e1dc4 100644 --- a/io/gcs.go +++ b/io/gocloud/gcs.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package io +package gocloud import ( "context" @@ -23,21 +23,13 @@ import ( "cloud.google.com/go/storage" + "github.com/apache/iceberg-go/io" "gocloud.dev/blob" "gocloud.dev/blob/gcsblob" "gocloud.dev/gcp" "google.golang.org/api/option" ) -// Constants for GCS configuration options -const ( - GCSEndpoint = "gcs.endpoint" - GCSKeyPath = "gcs.keypath" - GCSJSONKey = "gcs.jsonkey" - GCSCredType = "gcs.credtype" - GCSUseJsonAPI = "gcs.usejsonapi" // set to anything to enable -) - var allowedGCSCredTypes = map[string]option.CredentialsType{ "service_account": option.ServiceAccount, "authorized_user": option.AuthorizedUser, @@ -48,22 +40,22 @@ var allowedGCSCredTypes = map[string]option.CredentialsType{ // ParseGCSConfig parses GCS properties and returns a configuration. func ParseGCSConfig(props map[string]string) *gcsblob.Options { var o []option.ClientOption - if url := props[GCSEndpoint]; url != "" { + if url := props[io.GCSEndpoint]; url != "" { o = append(o, option.WithEndpoint(url)) } var credType option.CredentialsType - if key := props[GCSCredType]; key != "" { + if key := props[io.GCSCredType]; key != "" { if ct, ok := allowedGCSCredTypes[key]; ok { credType = ct } } - if key := props[GCSJSONKey]; key != "" { + if key := props[io.GCSJSONKey]; key != "" { o = append(o, option.WithAuthCredentialsJSON(credType, []byte(key))) } - if path := props[GCSKeyPath]; path != "" { + if path := props[io.GCSKeyPath]; path != "" { o = append(o, option.WithAuthCredentialsFile(credType, path)) } - if _, ok := props[GCSUseJsonAPI]; ok { + if _, ok := props[io.GCSUseJSONAPI]; ok { o = append(o, storage.WithJSONReads()) } diff --git a/io/gcs_integration_test.go b/io/gocloud/gcs_integration_test.go similarity index 97% rename from io/gcs_integration_test.go rename to io/gocloud/gcs_integration_test.go index f2c0556c3..0b86628df 100644 --- a/io/gcs_integration_test.go +++ b/io/gocloud/gcs_integration_test.go @@ -17,7 +17,7 @@ //go:build integration -package io_test +package gocloud_test import ( "bytes" @@ -32,6 +32,7 @@ import ( "github.com/apache/iceberg-go/catalog" sqlcat "github.com/apache/iceberg-go/catalog/sql" "github.com/apache/iceberg-go/io" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/stretchr/testify/suite" "github.com/uptrace/bun/driver/sqliteshim" ) @@ -114,7 +115,7 @@ func (s *GCSIOTestSuite) TestGCSWarehouse() { "type": "sql", "warehouse": fmt.Sprintf("gs://%s/iceberg/", gcsBucketName), io.GCSEndpoint: fmt.Sprintf("http://%s/", gcsEndpoint), - io.GCSUseJsonAPI: "true", + io.GCSUseJSONAPI: "true", } cat, err := catalog.Load(context.Background(), "default", properties) diff --git a/io/gocloud/mem_test.go b/io/gocloud/mem_test.go new file mode 100644 index 000000000..a7d536ba2 --- /dev/null +++ b/io/gocloud/mem_test.go @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package gocloud_test + +import ( + "context" + "io" + "testing" + + icebergio "github.com/apache/iceberg-go/io" + _ "github.com/apache/iceberg-go/io/gocloud" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMemIO_BasicOperations(t *testing.T) { + ctx := context.Background() + + memIO, err := icebergio.LoadFS(ctx, map[string]string{}, "mem://bucket/") + require.NoError(t, err) + require.NotNil(t, memIO) + + writeIO, ok := memIO.(icebergio.WriteFileIO) + require.True(t, ok, "mem IO should implement WriteFileIO") + + testData := []byte("Hello, Iceberg!") + err = writeIO.WriteFile("test-file.txt", testData) + require.NoError(t, err) + + file, err := memIO.Open("test-file.txt") + require.NoError(t, err) + defer file.Close() + + content, err := io.ReadAll(file) + require.NoError(t, err) + assert.Equal(t, testData, content) + + err = memIO.Remove("test-file.txt") + require.NoError(t, err) + + _, err = memIO.Open("test-file.txt") + assert.Error(t, err) +} + +func TestMemIO_Create(t *testing.T) { + ctx := context.Background() + + memIO, err := icebergio.LoadFS(ctx, map[string]string{}, "mem://bucket/") + require.NoError(t, err) + + writeIO := memIO.(icebergio.WriteFileIO) + + writer, err := writeIO.Create("created-file.txt") + require.NoError(t, err) + require.NotNil(t, writer) + + testData := []byte("Data written via Create") + n, err := writer.Write(testData) + require.NoError(t, err) + assert.Equal(t, len(testData), n) + + err = writer.Close() + require.NoError(t, err) + + file, err := memIO.Open("created-file.txt") + require.NoError(t, err) + defer file.Close() + + content, err := io.ReadAll(file) + require.NoError(t, err) + assert.Equal(t, testData, content) +} + +func TestMemIO_MultipleFiles(t *testing.T) { + ctx := context.Background() + + memIO, err := icebergio.LoadFS(ctx, map[string]string{}, "mem://bucket/") + require.NoError(t, err) + + writeIO := memIO.(icebergio.WriteFileIO) + + files := map[string][]byte{ + "file1.txt": []byte("Content of file 1"), + "file2.txt": []byte("Content of file 2"), + "file3.txt": []byte("Content of file 3"), + } + + for name, content := range files { + err := writeIO.WriteFile(name, content) + require.NoError(t, err) + } + + for name, expectedContent := range files { + file, err := memIO.Open(name) + require.NoError(t, err) + + content, err := io.ReadAll(file) + require.NoError(t, err) + assert.Equal(t, expectedContent, content) + + err = file.Close() + require.NoError(t, err) + } + + err = memIO.Remove("file2.txt") + require.NoError(t, err) + + _, err = memIO.Open("file2.txt") + assert.Error(t, err) + + file1, err := memIO.Open("file1.txt") + require.NoError(t, err) + file1.Close() + + file3, err := memIO.Open("file3.txt") + require.NoError(t, err) + file3.Close() +} diff --git a/io/gocloud/register.go b/io/gocloud/register.go new file mode 100644 index 000000000..7c2b48580 --- /dev/null +++ b/io/gocloud/register.go @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package gocloud + +import ( + "context" + "net/url" + + icebergio "github.com/apache/iceberg-go/io" + "gocloud.dev/blob/memblob" +) + +func init() { + registerS3Schemes() + registerGCSScheme() + registerMemScheme() + registerAzureSchemes() +} + +// registerS3Schemes registers S3-compatible storage schemes (s3, s3a, s3n). +func registerS3Schemes() { + s3Factory := func(ctx context.Context, parsed *url.URL, props map[string]string) (icebergio.IO, error) { + bucket, err := createS3Bucket(ctx, parsed, props) + if err != nil { + return nil, err + } + + return createBlobFS(ctx, bucket, defaultKeyExtractor(parsed.Host)), nil + } + icebergio.Register("s3", s3Factory) + icebergio.Register("s3a", s3Factory) + icebergio.Register("s3n", s3Factory) +} + +// registerGCSScheme registers the Google Cloud Storage scheme (gs). +func registerGCSScheme() { + icebergio.Register("gs", func(ctx context.Context, parsed *url.URL, props map[string]string) (icebergio.IO, error) { + bucket, err := createGCSBucket(ctx, parsed, props) + if err != nil { + return nil, err + } + + return createBlobFS(ctx, bucket, defaultKeyExtractor(parsed.Host)), nil + }) +} + +// registerMemScheme registers the in-memory blob storage scheme (mem). +func registerMemScheme() { + icebergio.Register("mem", func(ctx context.Context, parsed *url.URL, props map[string]string) (icebergio.IO, error) { + bucket := memblob.OpenBucket(nil) + + return createBlobFS(ctx, bucket, defaultKeyExtractor(parsed.Host)), nil + }) +} + +// registerAzureSchemes registers Azure Data Lake Storage schemes (abfs, abfss, wasb, wasbs). +func registerAzureSchemes() { + azureFactory := func(ctx context.Context, parsed *url.URL, props map[string]string) (icebergio.IO, error) { + bucket, err := createAzureBucket(ctx, parsed, props) + if err != nil { + return nil, err + } + + return createBlobFS(ctx, bucket, adlsKeyExtractor()), nil + } + icebergio.Register("abfs", azureFactory) + icebergio.Register("abfss", azureFactory) + icebergio.Register("wasb", azureFactory) + icebergio.Register("wasbs", azureFactory) +} diff --git a/io/s3.go b/io/gocloud/s3.go similarity index 79% rename from io/s3.go rename to io/gocloud/s3.go index 98ad4c634..81c630573 100644 --- a/io/s3.go +++ b/io/gocloud/s3.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package io +package gocloud import ( "context" @@ -26,6 +26,7 @@ import ( "slices" "strconv" + "github.com/apache/iceberg-go/io" "github.com/apache/iceberg-go/utils" "github.com/aws/aws-sdk-go-v2/aws" awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http" @@ -37,22 +38,9 @@ import ( "gocloud.dev/blob/s3blob" ) -// Constants for S3 configuration options -const ( - S3Region = "s3.region" - S3SessionToken = "s3.session-token" - S3SecretAccessKey = "s3.secret-access-key" - S3AccessKeyID = "s3.access-key-id" - S3EndpointURL = "s3.endpoint" - S3ProxyURI = "s3.proxy-uri" - S3ConnectTimeout = "s3.connect-timeout" - S3SignerUri = "s3.signer.uri" - S3ForceVirtualAddressing = "s3.force-virtual-addressing" -) - var unsupportedS3Props = []string{ - S3ConnectTimeout, - S3SignerUri, + io.S3ConnectTimeout, + io.S3SignerURI, } // ParseAWSConfig parses S3 properties and returns a configuration. @@ -71,20 +59,20 @@ func ParseAWSConfig(ctx context.Context, props map[string]string) (*aws.Config, &bearer.StaticTokenProvider{Token: bearer.Token{Value: tok}})) } - if region, ok := props[S3Region]; ok { + if region, ok := props[io.S3Region]; ok { opts = append(opts, config.WithRegion(region)) } else if region, ok := props["client.region"]; ok { opts = append(opts, config.WithRegion(region)) } - accessKey, secretAccessKey := props[S3AccessKeyID], props[S3SecretAccessKey] - token := props[S3SessionToken] + accessKey, secretAccessKey := props[io.S3AccessKeyID], props[io.S3SecretAccessKey] + token := props[io.S3SessionToken] if accessKey != "" || secretAccessKey != "" || token != "" { opts = append(opts, config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider( - props[S3AccessKeyID], props[S3SecretAccessKey], props[S3SessionToken]))) + props[io.S3AccessKeyID], props[io.S3SecretAccessKey], props[io.S3SessionToken]))) } - if proxy, ok := props[S3ProxyURI]; ok { + if proxy, ok := props[io.S3ProxyURI]; ok { proxyURL, err := url.Parse(proxy) if err != nil { return nil, fmt.Errorf("invalid s3 proxy url '%s'", proxy) @@ -121,13 +109,13 @@ func createS3Bucket(ctx context.Context, parsed *url.URL, props map[string]strin } } - endpoint, ok := props[S3EndpointURL] + endpoint, ok := props[io.S3EndpointURL] if !ok { endpoint = os.Getenv("AWS_S3_ENDPOINT") } usePathStyle := true - if forceVirtual, ok := props[S3ForceVirtualAddressing]; ok { + if forceVirtual, ok := props[io.S3ForceVirtualAddressing]; ok { if cfgForceVirtual, err := strconv.ParseBool(forceVirtual); err == nil { usePathStyle = !cfgForceVirtual } diff --git a/io/s3_integration_test.go b/io/gocloud/s3_integration_test.go similarity index 98% rename from io/s3_integration_test.go rename to io/gocloud/s3_integration_test.go index 0e51ad029..c3197edda 100644 --- a/io/s3_integration_test.go +++ b/io/gocloud/s3_integration_test.go @@ -17,7 +17,7 @@ //go:build integration -package io_test +package gocloud_test import ( "context" @@ -29,6 +29,7 @@ import ( "github.com/apache/iceberg-go/catalog" sqlcat "github.com/apache/iceberg-go/catalog/sql" "github.com/apache/iceberg-go/io" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/stretchr/testify/require" "github.com/uptrace/bun/driver/sqliteshim" ) diff --git a/io/utils.go b/io/gocloud/utils.go similarity index 98% rename from io/utils.go rename to io/gocloud/utils.go index 5c7f32142..ba4ed9c6a 100644 --- a/io/utils.go +++ b/io/gocloud/utils.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package io +package gocloud import "strings" diff --git a/io/io.go b/io/io.go index 447b515b9..1eac931f0 100644 --- a/io/io.go +++ b/io/io.go @@ -15,6 +15,18 @@ // specific language governing permissions and limitations // under the License. +// Package io provides an interface for IO implementations along with +// a registry for registering IO implementations for different URI schemes. +// +// Subpackages of this package provide implementations for cloud storage providers +// which will register themselves if imported. For instance, adding the following +// import: +// +// import _ "github.com/apache/iceberg-go/io/gocloud" +// +// Will register cloud storage implementations for S3, GCS, Azure, and in-memory +// blob storage. The local filesystem (file:// and empty scheme) is registered +// by default. package io import ( @@ -23,11 +35,7 @@ import ( "fmt" "io" "io/fs" - "net/url" "strings" - - "gocloud.dev/blob" - "gocloud.dev/blob/memblob" ) // IO is an interface to a hierarchical file system. @@ -194,6 +202,8 @@ func (f ioFS) Remove(name string) error { } var ( + ErrIOSchemeNotFound = errors.New("io scheme not registered") + errMissingReadDir = errors.New("fs.File directory missing ReadDir method") errMissingSeek = errors.New("fs.File missing Seek method") errMissingReadAt = errors.New("fs.File missing ReadAt") @@ -235,60 +245,24 @@ func (f ioFile) ReadDir(count int) ([]fs.DirEntry, error) { return d.ReadDir(count) } -func inferFileIOFromSchema(ctx context.Context, path string, props map[string]string) (IO, error) { - parsed, err := url.Parse(path) - if err != nil { - return nil, err - } - var bucket *blob.Bucket - var keyExtractor KeyExtractor - - switch parsed.Scheme { - case "s3", "s3a", "s3n": - bucket, err = createS3Bucket(ctx, parsed, props) - if err != nil { - return nil, err - } - keyExtractor = defaultKeyExtractor(parsed.Host) - case "gs": - bucket, err = createGCSBucket(ctx, parsed, props) - if err != nil { - return nil, err - } - keyExtractor = defaultKeyExtractor(parsed.Host) - case "mem": - // memblob doesn't use the URL host or path - bucket = memblob.OpenBucket(nil) - keyExtractor = defaultKeyExtractor(parsed.Host) - case "file", "": - return LocalFS{}, nil - case "abfs", "abfss", "wasb", "wasbs": - bucket, err = createAzureBucket(ctx, parsed, props) - if err != nil { - return nil, err - } - keyExtractor = adlsKeyExtractor() - default: - return nil, fmt.Errorf("IO for file '%s' not implemented", path) - } - - return createBlobFS(ctx, bucket, keyExtractor), nil -} - // LoadFS takes a map of properties and an optional URI location -// and attempts to infer an IO object from it. +// and attempts to infer an IO object from it using the registered +// scheme factories. +// +// The scheme is extracted from the location URI and used to look up +// the appropriate factory from the registry. The local filesystem +// (file:// or empty scheme) is registered by default. // -// A schema of "file://" or an empty string will result in a LocalFS -// implementation. Otherwise this will return an error if the schema -// does not yet have an implementation here. +// Additional schemes can be registered by importing subpackages. +// For S3, GCS, Azure and in-memory support, import: // -// Currently local, S3, GCS, and In-Memory FSs are implemented. +// import _ "github.com/apache/iceberg-go/io/gocloud" func LoadFS(ctx context.Context, props map[string]string, location string) (IO, error) { if location == "" { location = props["warehouse"] } - iofs, err := inferFileIOFromSchema(ctx, location, props) + iofs, err := inferFileIOFromScheme(ctx, location, props) if err != nil { return nil, err } diff --git a/io/registry.go b/io/registry.go new file mode 100644 index 000000000..f18385fab --- /dev/null +++ b/io/registry.go @@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package io + +import ( + "context" + "fmt" + "maps" + "net/url" + "slices" + "sync" +) + +type registry map[string]SchemeFactory + +var ( + regMutex sync.RWMutex + defaultRegistry = registry{} +) + +// SchemeFactory is a function that creates an IO implementation for a given URI and properties. +type SchemeFactory func(ctx context.Context, parsed *url.URL, props map[string]string) (IO, error) + +// Register adds a new scheme factory to the registry. If the scheme is already registered, it will panic. +func Register(scheme string, factory SchemeFactory) { + if factory == nil { + panic("io: Register factory is nil") + } + + regMutex.Lock() + defer regMutex.Unlock() + + if _, dup := defaultRegistry[scheme]; dup { + panic("io: Register called twice for scheme " + scheme) + } + defaultRegistry[scheme] = factory +} + +// Unregister removes the requested scheme factory from the registry. +func Unregister(scheme string) { + regMutex.Lock() + defer regMutex.Unlock() + delete(defaultRegistry, scheme) +} + +// GetRegisteredSchemes returns the list of registered scheme names. +func GetRegisteredSchemes() []string { + regMutex.RLock() + defer regMutex.RUnlock() + + return slices.Collect(maps.Keys(defaultRegistry)) +} + +func init() { + // Register local filesystem schemes + localFSFactory := func(ctx context.Context, parsed *url.URL, props map[string]string) (IO, error) { + return LocalFS{}, nil + } + Register("file", localFSFactory) + Register("", localFSFactory) +} + +func inferFileIOFromScheme(ctx context.Context, path string, props map[string]string) (IO, error) { + parsed, err := url.Parse(path) + if err != nil { + return nil, err + } + + regMutex.RLock() + factory, ok := defaultRegistry[parsed.Scheme] + regMutex.RUnlock() + + if !ok { + return nil, fmt.Errorf("%w for path %q (scheme: %s)", ErrIOSchemeNotFound, path, parsed.Scheme) + } + + return factory(ctx, parsed, props) +} diff --git a/io/registry_test.go b/io/registry_test.go new file mode 100644 index 000000000..bb257daa2 --- /dev/null +++ b/io/registry_test.go @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package io_test + +import ( + "context" + "net/url" + "testing" + + "github.com/apache/iceberg-go/io" + _ "github.com/apache/iceberg-go/io/gocloud" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestIORegistry(t *testing.T) { + ctx := context.Background() + + assert.ElementsMatch(t, []string{ + "file", + "", + "s3", + "s3a", + "s3n", + "gs", + "mem", + "abfs", + "abfss", + "wasb", + "wasbs", + }, io.GetRegisteredSchemes()) + + customFactoryCalled := false + io.Register("custom", func(ctx context.Context, parsed *url.URL, props map[string]string) (io.IO, error) { + customFactoryCalled = true + assert.Equal(t, "custom", parsed.Scheme) + assert.Equal(t, "bucket", parsed.Host) + + return io.LocalFS{}, nil + }) + + assert.ElementsMatch(t, []string{ + "file", + "", + "s3", + "s3a", + "s3n", + "gs", + "mem", + "abfs", + "abfss", + "wasb", + "wasbs", + "custom", + }, io.GetRegisteredSchemes()) + + customIO, err := io.LoadFS(ctx, map[string]string{}, "custom://bucket/path") + assert.NoError(t, err) + assert.NotNil(t, customIO) + assert.True(t, customFactoryCalled) + + io.Unregister("custom") + assert.ElementsMatch(t, []string{ + "file", + "", + "s3", + "s3a", + "s3n", + "gs", + "mem", + "abfs", + "abfss", + "wasb", + "wasbs", + }, io.GetRegisteredSchemes()) + + _, err = io.LoadFS(ctx, map[string]string{}, "custom://bucket/path") + assert.Error(t, err) + assert.ErrorIs(t, err, io.ErrIOSchemeNotFound) +} + +func TestRegistryPanic(t *testing.T) { + assert.PanicsWithValue(t, "io: Register factory is nil", func() { + io.Register("invalid", nil) + }) +} + +func TestRegisterDuplicatePanic(t *testing.T) { + dummyFactory := func(ctx context.Context, parsed *url.URL, props map[string]string) (io.IO, error) { + return io.LocalFS{}, nil + } + + io.Register("test-duplicate", dummyFactory) + defer io.Unregister("test-duplicate") + + assert.PanicsWithValue(t, "io: Register called twice for scheme test-duplicate", func() { + io.Register("test-duplicate", dummyFactory) + }, "Attempting to register the same scheme twice should panic") +} + +func TestLoadFS(t *testing.T) { + ctx := context.Background() + + tests := []struct { + name string + location string + expectError bool + }{ + { + name: "file scheme", + location: "file:///tmp/test", + expectError: false, + }, + { + name: "empty scheme", + location: "/tmp/test", + expectError: false, + }, + { + name: "mem scheme", + location: "mem://bucket/path", + expectError: false, + }, + { + name: "s3 scheme", + location: "s3://bucket/path", + expectError: false, + }, + { + name: "unsupported scheme", + location: "unsupported://bucket/path", + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + iofs, err := io.LoadFS(ctx, map[string]string{}, tt.location) + if tt.expectError { + assert.Error(t, err) + assert.ErrorIs(t, err, io.ErrIOSchemeNotFound) + } else { + require.NoError(t, err) + assert.NotNil(t, iofs) + } + }) + } +} + +func TestLoadFSWithWarehouse(t *testing.T) { + ctx := context.Background() + + // Test with warehouse property + iofs, err := io.LoadFS(ctx, map[string]string{ + "warehouse": "file:///tmp/warehouse", + }, "") + require.NoError(t, err) + assert.NotNil(t, iofs) + assert.IsType(t, io.LocalFS{}, iofs) +} diff --git a/table/orphan_cleanup_integration_test.go b/table/orphan_cleanup_integration_test.go index bc4d3883e..cce3fd0bb 100644 --- a/table/orphan_cleanup_integration_test.go +++ b/table/orphan_cleanup_integration_test.go @@ -34,6 +34,7 @@ import ( "github.com/apache/iceberg-go/catalog" "github.com/apache/iceberg-go/catalog/rest" "github.com/apache/iceberg-go/io" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" "github.com/stretchr/testify/suite" ) diff --git a/table/scanner_test.go b/table/scanner_test.go index f3dadb06a..6554c1343 100644 --- a/table/scanner_test.go +++ b/table/scanner_test.go @@ -38,6 +38,7 @@ import ( "github.com/apache/iceberg-go/catalog/rest" "github.com/apache/iceberg-go/internal/recipe" "github.com/apache/iceberg-go/io" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" diff --git a/table/transaction_test.go b/table/transaction_test.go index 1f64adf1c..db06d64d1 100644 --- a/table/transaction_test.go +++ b/table/transaction_test.go @@ -35,6 +35,7 @@ import ( "github.com/apache/iceberg-go/catalog/rest" "github.com/apache/iceberg-go/internal/recipe" iceio "github.com/apache/iceberg-go/io" + _ "github.com/apache/iceberg-go/io/gocloud" "github.com/apache/iceberg-go/table" "github.com/stretchr/testify/suite" "github.com/testcontainers/testcontainers-go/modules/compose"