diff --git a/agent/go.mod b/agent/go.mod index 950dc9e7a270..7db1b52865aa 100644 --- a/agent/go.mod +++ b/agent/go.mod @@ -6,7 +6,6 @@ replace github.com/moby/go-archive => github.com/moby/go-archive v0.1.0 require ( github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible - github.com/aws/aws-sdk-go v1.55.0 github.com/compose-spec/compose-go/v2 v2.9.1 github.com/creack/pty v1.1.24 github.com/docker/cli v29.2.1+incompatible @@ -57,6 +56,7 @@ require ( golang.org/x/oauth2 v0.35.0 golang.org/x/sys v0.41.0 golang.org/x/text v0.35.0 + golang.org/x/time v0.14.0 google.golang.org/genproto v0.0.0-20241021214115-324edc3d5d38 gopkg.in/ini.v1 v1.67.1 gopkg.in/yaml.v3 v3.0.1 @@ -165,7 +165,6 @@ require ( github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect - github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/json-iterator/go v1.1.13-0.20220915233716-71ac16282d12 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/klauspost/pgzip v1.2.5 // indirect @@ -267,7 +266,6 @@ require ( golang.org/x/mod v0.33.0 // indirect golang.org/x/sync v0.20.0 // indirect golang.org/x/term v0.40.0 // indirect - golang.org/x/time v0.14.0 // indirect golang.org/x/tools v0.42.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect diff --git a/agent/go.sum b/agent/go.sum index dd4f11d3a507..97c8e34a4905 100644 --- a/agent/go.sum +++ b/agent/go.sum @@ -135,8 +135,6 @@ github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3d github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw= github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= github.com/aws/aws-sdk-go v1.40.45/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= -github.com/aws/aws-sdk-go v1.55.0 h1:hVALKPjXz33kP1R9nTyJpUK7qF59dO2mleQxUW9mCVE= -github.com/aws/aws-sdk-go v1.55.0/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= github.com/aws/aws-sdk-go-v2 v1.9.1/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4= github.com/aws/aws-sdk-go-v2 v1.41.1 h1:ABlyEARCDLN034NhxlRUSZr4l71mh+T5KAeGh6cerhU= github.com/aws/aws-sdk-go-v2 v1.41.1/go.mod h1:MayyLB8y+buD9hZqkCW3kX1AKq07Y5pXxtgB+rRFhz0= @@ -650,9 +648,7 @@ github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkr github.com/jinzhu/now v1.0.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= -github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= -github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= diff --git a/agent/utils/cloud_storage/client/s3.go b/agent/utils/cloud_storage/client/s3.go index 3a00647146d7..2f85bda3cd52 100644 --- a/agent/utils/cloud_storage/client/s3.go +++ b/agent/utils/cloud_storage/client/s3.go @@ -1,20 +1,30 @@ package client import ( + "context" + "crypto/tls" + "net/http" "os" + "strings" + "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3" - "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" +) + +const ( + s3DefaultTimeout = 30 * time.Second + s3TransferTimeout = 24 * time.Hour ) type s3Client struct { scType string bucket string - Sess session.Session + client *minio.Client +} + +func (s *s3Client) ctx(timeout time.Duration) (context.Context, context.CancelFunc) { + return context.WithTimeout(context.Background(), timeout) } func NewS3Client(vars map[string]interface{}) (*s3Client, error) { @@ -31,69 +41,80 @@ func NewS3Client(vars map[string]interface{}) (*s3Client, error) { if len(mode) == 0 { mode = "virtual hosted" } - sess, err := session.NewSession(&aws.Config{ - Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""), - Endpoint: aws.String(endpoint), - Region: aws.String(region), - DisableSSL: aws.Bool(true), S3ForcePathStyle: aws.Bool(mode == "path"), + + lookupStyle := minio.BucketLookupDNS + if mode == "path" { + lookupStyle = minio.BucketLookupPath + } + + ssl := strings.Split(endpoint, ":")[0] + secure := false + tlsConfig := &tls.Config{} + if ssl == "https" { + secure = true + tlsConfig.InsecureSkipVerify = true + } + var transport http.RoundTripper = &http.Transport{ + TLSClientConfig: tlsConfig, + } + + endpoint = strings.TrimPrefix(endpoint, ssl+"://") + + client, err := minio.New(endpoint, &minio.Options{ + Creds: credentials.NewStaticV4(accessKey, secretKey, ""), + Secure: secure, + Region: region, + BucketLookup: lookupStyle, + Transport: transport, }) if err != nil { return nil, err } - return &s3Client{scType: scType, bucket: bucket, Sess: *sess}, nil + return &s3Client{scType: scType, bucket: bucket, client: client}, nil } func (s s3Client) ListBuckets() ([]interface{}, error) { - var result []interface{} - svc := s3.New(&s.Sess) - res, err := svc.ListBuckets(nil) + ctx, cancel := s.ctx(s3DefaultTimeout) + defer cancel() + buckets, err := s.client.ListBuckets(ctx) if err != nil { return nil, err } - for _, b := range res.Buckets { + var result []interface{} + for _, b := range buckets { result = append(result, b.Name) } return result, nil } func (s s3Client) Exist(path string) (bool, error) { - svc := s3.New(&s.Sess) - if _, err := svc.HeadObject(&s3.HeadObjectInput{ - Bucket: &s.bucket, - Key: &path, - }); err != nil { - if aerr, ok := err.(awserr.RequestFailure); ok { - if aerr.StatusCode() == 404 { - return false, nil - } - } else { - return false, aerr + ctx, cancel := s.ctx(s3DefaultTimeout) + defer cancel() + _, err := s.client.StatObject(ctx, s.bucket, path, minio.StatObjectOptions{}) + if err != nil { + resp := minio.ToErrorResponse(err) + if resp.StatusCode == 404 { + return false, nil } + return false, err } return true, nil } func (s *s3Client) Size(path string) (int64, error) { - svc := s3.New(&s.Sess) - file, err := svc.GetObject(&s3.GetObjectInput{ - Bucket: &s.bucket, - Key: &path, - }) + ctx, cancel := s.ctx(s3DefaultTimeout) + defer cancel() + info, err := s.client.StatObject(ctx, s.bucket, path, minio.StatObjectOptions{}) if err != nil { return 0, err } - return *file.ContentLength, nil + return info.Size, nil } func (s s3Client) Delete(path string) (bool, error) { - svc := s3.New(&s.Sess) - if _, err := svc.DeleteObject(&s3.DeleteObjectInput{Bucket: aws.String(s.bucket), Key: aws.String(path)}); err != nil { - return false, err - } - if err := svc.WaitUntilObjectNotExists(&s3.HeadObjectInput{ - Bucket: aws.String(s.bucket), - Key: aws.String(path), - }); err != nil { + ctx, cancel := s.ctx(s3DefaultTimeout) + defer cancel() + if err := s.client.RemoveObject(ctx, s.bucket, path, minio.RemoveObjectOptions{}); err != nil { return false, err } return true, nil @@ -110,16 +131,21 @@ func (s s3Client) Upload(src, target string) (bool, error) { } defer file.Close() - uploader := s3manager.NewUploader(&s.Sess) - if fileInfo.Size() > s3manager.MaxUploadParts*s3manager.DefaultUploadPartSize { - uploader.PartSize = fileInfo.Size() / (s3manager.MaxUploadParts - 1) + opts := minio.PutObjectOptions{ + StorageClass: s.scType, + } + + const maxParts = 10000 + const defaultPartSize = 64 * 1024 * 1024 // 64 MiB + partSize := uint64(defaultPartSize) + if fileInfo.Size() > int64(maxParts)*int64(defaultPartSize) { + partSize = uint64(fileInfo.Size()) / (maxParts - 1) } - if _, err := uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String(s.bucket), - Key: aws.String(target), - Body: file, - StorageClass: &s.scType, - }); err != nil { + opts.PartSize = partSize + + ctx, cancel := s.ctx(s3TransferTimeout) + defer cancel() + if _, err := s.client.PutObject(ctx, s.bucket, target, file, fileInfo.Size(), opts); err != nil { return false, err } return true, nil @@ -129,16 +155,9 @@ func (s s3Client) Download(src, target string) (bool, error) { if _, err := os.Stat(target); err == nil { _ = os.Remove(target) } - file, err := os.Create(target) - if err != nil { - return false, err - } - defer file.Close() - downloader := s3manager.NewDownloader(&s.Sess) - if _, err = downloader.Download(file, &s3.GetObjectInput{ - Bucket: aws.String(s.bucket), - Key: aws.String(src), - }); err != nil { + ctx, cancel := s.ctx(s3TransferTimeout) + defer cancel() + if err := s.client.FGetObject(ctx, s.bucket, src, target, minio.GetObjectOptions{}); err != nil { os.Remove(target) return false, err } @@ -146,17 +165,18 @@ func (s s3Client) Download(src, target string) (bool, error) { } func (s *s3Client) ListObjects(prefix string) ([]string, error) { - svc := s3.New(&s.Sess) - var result []string - outputs, err := svc.ListObjects(&s3.ListObjectsInput{ - Bucket: &s.bucket, - Prefix: &prefix, - }) - if err != nil { - return result, err + opts := minio.ListObjectsOptions{ + Recursive: true, + Prefix: prefix, } - for _, item := range outputs.Contents { - result = append(result, *item.Key) + var result []string + ctx, cancel := s.ctx(s3DefaultTimeout) + defer cancel() + for object := range s.client.ListObjects(ctx, s.bucket, opts) { + if object.Err != nil { + return result, object.Err + } + result = append(result, object.Key) } return result, nil }