diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index eceb7f72f..5a5571149 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -43,25 +43,25 @@ jobs: name: test (${{ matrix.os }}/go-${{ matrix.go-version }}) runs-on: ${{ matrix.os }}-latest - services: - minio: - image: ${{ (matrix.os == 'ubuntu') && 'bitnami/minio:2023.7.18' || ''}} - ports: - - 45677:9000 - options: >- - --health-cmd "curl -I http://localhost:9000/minio/health/live -s" - --health-interval 10s - --health-timeout 5s - --health-retries 5 - env: - MINIO_ROOT_USER: minioadmin - MINIO_ROOT_PASSWORD: minioadmin steps: - uses: actions/checkout@v2 - uses: actions/setup-go@v2 with: go-version: ${{ matrix.go-version }} + - name: Start MinIO + if: matrix.os == 'ubuntu' + run: | + docker run -d --name minio \ + -p 45677:9000 \ + -e MINIO_ROOT_USER=minioadmin \ + -e MINIO_ROOT_PASSWORD=minioadmin \ + minio/minio:RELEASE.2025-09-07T16-13-09Z server /data + for i in $(seq 1 30); do + curl -sf http://localhost:45677/minio/health/live && break + sleep 1 + done + - run: make test env: S5CMD_TEST_ENDPOINT_URL: ${{ (matrix.os == 'ubuntu') && 'http://localhost:45677' || '' }} diff --git a/storage/s3.go b/storage/s3.go index 3313be66e..4c1c32d8b 100644 --- a/storage/s3.go +++ b/storage/s3.go @@ -50,6 +50,9 @@ const ( // the key of the object metadata which is used to handle retry decision on NoSuchUpload error metadataKeyRetryID = "s5cmd-upload-retry-id" + + // defaultCopyPartSize is the part size for multipart copy operations (5 GiB). + defaultCopyPartSize = 5 * 1024 * 1024 * 1024 ) // Re-used AWS sessions dramatically improve performance. @@ -476,12 +479,41 @@ func (s *S3) listObjects(ctx context.Context, url *url.URL) <-chan *Object { } // Copy is a single-object copy operation which copies objects to S3 -// destination from another S3 source. +// destination from another S3 source. For objects larger than 5 GiB, +// it automatically falls back to multipart copy. func (s *S3) Copy(ctx context.Context, from, to *url.URL, metadata Metadata) error { if s.dryRun { return nil } + err := s.singleCopy(ctx, from, to, metadata) + if err == nil { + return nil + } + + // CopyObject fails for objects >5 GiB. S3 may return EntityTooLarge + // or InvalidRequest with a "copy source is larger" message. + // Fall back to multipart copy. + if !isCopySourceTooLargeError(err) { + return err + } + + obj, statErr := s.Stat(ctx, from) + if statErr != nil { + return err // return the original copy error + } + + msg := log.DebugMessage{ + Err: fmt.Sprintf("object too large for single copy (%d bytes), using multipart copy: %v", obj.Size, from), + } + log.Debug(msg) + + return s.multipartCopy(ctx, from, to, metadata, obj.Size) +} + +// singleCopy copies an object using a single CopyObject API call. +// This supports objects up to 5 GiB. +func (s *S3) singleCopy(ctx context.Context, from, to *url.URL, metadata Metadata) error { // SDK expects CopySource like "bucket[/key]" copySource := from.EscapedPath() @@ -564,10 +596,131 @@ func (s *S3) Copy(ctx context.Context, from, to *url.URL, metadata Metadata) err input.Metadata = m } - _, err := s.api.CopyObject(input) + _, err := s.api.CopyObjectWithContext(ctx, input) return err } +// multipartCopy copies an object using multipart upload with UploadPartCopy. +// This is required for objects larger than 5 GiB which exceed the CopyObject +// API limit. +func (s *S3) multipartCopy(ctx context.Context, from, to *url.URL, metadata Metadata, srcSize int64) error { + copySource := from.EscapedPath() + if from.VersionID != "" { + copySource += "?versionId=" + from.VersionID + } + + partSize := int64(defaultCopyPartSize) + numParts := (srcSize + partSize - 1) / partSize + + createInput := &s3.CreateMultipartUploadInput{ + Bucket: aws.String(to.Bucket), + Key: aws.String(to.Path), + RequestPayer: s.RequestPayer(), + } + + if metadata.StorageClass != "" { + createInput.StorageClass = aws.String(metadata.StorageClass) + } + if metadata.ACL != "" { + createInput.ACL = aws.String(metadata.ACL) + } + if metadata.CacheControl != "" { + createInput.CacheControl = aws.String(metadata.CacheControl) + } + if metadata.Expires != "" { + t, err := time.Parse(time.RFC3339, metadata.Expires) + if err != nil { + return err + } + createInput.Expires = aws.Time(t) + } + if metadata.EncryptionMethod != "" { + createInput.ServerSideEncryption = aws.String(metadata.EncryptionMethod) + if metadata.EncryptionKeyID != "" { + createInput.SSEKMSKeyId = aws.String(metadata.EncryptionKeyID) + } + } + if metadata.ContentEncoding != "" { + createInput.ContentEncoding = aws.String(metadata.ContentEncoding) + } + if metadata.ContentDisposition != "" { + createInput.ContentDisposition = aws.String(metadata.ContentDisposition) + } + if metadata.ContentType != "" { + createInput.ContentType = aws.String(metadata.ContentType) + } + if len(metadata.UserDefined) != 0 { + m := make(map[string]*string) + for k, v := range metadata.UserDefined { + m[k] = aws.String(v) + } + createInput.Metadata = m + } + + createOutput, err := s.api.CreateMultipartUploadWithContext(ctx, createInput) + if err != nil { + return err + } + + uploadID := aws.StringValue(createOutput.UploadId) + + // abort cleans up the multipart upload on failure. + abort := func() { + // Use background context for cleanup in case the original is cancelled. + s.api.AbortMultipartUploadWithContext(context.Background(), &s3.AbortMultipartUploadInput{ + Bucket: aws.String(to.Bucket), + Key: aws.String(to.Path), + UploadId: aws.String(uploadID), + RequestPayer: s.RequestPayer(), + }) + } + + completedParts := make([]*s3.CompletedPart, 0, numParts) + + for i := int64(0); i < numParts; i++ { + start := i * partSize + end := start + partSize - 1 + if end >= srcSize { + end = srcSize - 1 + } + + partOutput, err := s.api.UploadPartCopyWithContext(ctx, &s3.UploadPartCopyInput{ + Bucket: aws.String(to.Bucket), + Key: aws.String(to.Path), + CopySource: aws.String(copySource), + CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", start, end)), + PartNumber: aws.Int64(i + 1), + UploadId: aws.String(uploadID), + RequestPayer: s.RequestPayer(), + }) + if err != nil { + abort() + return err + } + + completedParts = append(completedParts, &s3.CompletedPart{ + ETag: partOutput.CopyPartResult.ETag, + PartNumber: aws.Int64(i + 1), + }) + } + + _, err = s.api.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(to.Bucket), + Key: aws.String(to.Path), + UploadId: aws.String(uploadID), + MultipartUpload: &s3.CompletedMultipartUpload{ + Parts: completedParts, + }, + RequestPayer: s.RequestPayer(), + }) + if err != nil { + abort() + return err + } + + return nil +} + // Read fetches the remote object and returns its contents as an io.ReadCloser. func (s *S3) Read(ctx context.Context, src *url.URL) (io.ReadCloser, error) { input := &s3.GetObjectInput{ @@ -1429,6 +1582,26 @@ func isVirtualHostStyle(endpoint urlpkg.URL) bool { return endpoint == sentinelURL || supportsTransferAcceleration(endpoint) || IsGoogleEndpoint(endpoint) } +// isCopySourceTooLargeError reports whether the error indicates that a +// CopyObject call failed because the source exceeds the 5 GiB single-copy +// limit. S3 may return either "EntityTooLarge" or "InvalidRequest" with a +// message mentioning the copy source size limit. +func isCopySourceTooLargeError(err error) bool { + if errHasCode(err, "EntityTooLarge") { + return true + } + + var awsErr awserr.Error + if errors.As(err, &awsErr) { + if awsErr.Code() == "InvalidRequest" && + strings.Contains(awsErr.Message(), "copy source is larger") { + return true + } + } + + return false +} + func errHasCode(err error, code string) bool { if err == nil || code == "" { return false diff --git a/storage/s3_test.go b/storage/s3_test.go index 11b52ebc3..2515cb8d6 100644 --- a/storage/s3_test.go +++ b/storage/s3_test.go @@ -1341,3 +1341,135 @@ func (e tempError) Error() string { return e.err.Error() } func (e tempError) Temporary() bool { return e.temp } func (e *tempError) Unwrap() error { return e.err } + +func TestIsCopySourceTooLargeError(t *testing.T) { + testcases := []struct { + name string + err error + expected bool + }{ + { + name: "EntityTooLarge", + err: awserr.New("EntityTooLarge", "entity too large", nil), + expected: true, + }, + { + name: "InvalidRequest with copy source message", + err: awserr.New("InvalidRequest", "The specified copy source is larger than the maximum allowable size for a copy source: 5368709120", nil), + expected: true, + }, + { + name: "InvalidRequest with different message", + err: awserr.New("InvalidRequest", "some other problem", nil), + expected: false, + }, + { + name: "other error code", + err: awserr.New("AccessDenied", "access denied", nil), + expected: false, + }, + { + name: "nil error", + err: nil, + expected: false, + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + got := isCopySourceTooLargeError(tc.err) + assert.Equal(t, got, tc.expected) + }) + } +} + +func TestS3MultipartCopyFallback(t *testing.T) { + u, err := url.New("s3://bucket/key") + if err != nil { + t.Fatal(err) + } + + var operations []string + + mockAPI := s3.New(unit.Session) + mockAPI.Handlers.Unmarshal.Clear() + mockAPI.Handlers.UnmarshalMeta.Clear() + mockAPI.Handlers.UnmarshalError.Clear() + mockAPI.Handlers.Send.Clear() + + mockAPI.Handlers.Send.PushBack(func(r *request.Request) { + operations = append(operations, r.Operation.Name) + + switch r.Operation.Name { + case "CopyObject": + r.Error = awserr.New("InvalidRequest", + "The specified copy source is larger than the maximum allowable size for a copy source: 5368709120", nil) + r.HTTPResponse = &http.Response{ + StatusCode: http.StatusBadRequest, + Body: io.NopCloser(strings.NewReader("")), + } + case "HeadObject": + r.HTTPResponse = &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("")), + } + // Modify the existing output struct (the SDK returns the original pointer). + r.Data.(*s3.HeadObjectOutput).ContentLength = aws.Int64(6 * 1024 * 1024 * 1024) // 6 GiB + case "CreateMultipartUpload": + r.HTTPResponse = &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("")), + } + r.Data.(*s3.CreateMultipartUploadOutput).UploadId = aws.String("test-upload-id") + case "UploadPartCopy": + r.HTTPResponse = &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("")), + } + r.Data.(*s3.UploadPartCopyOutput).CopyPartResult = &s3.CopyPartResult{ + ETag: aws.String("\"etag123\""), + } + case "CompleteMultipartUpload": + r.HTTPResponse = &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("")), + } + default: + r.HTTPResponse = &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("")), + } + } + }) + + mockAPI.Handlers.Unmarshal.PushBack(func(r *request.Request) { + if r.Error != nil { + if awsErr, ok := r.Error.(awserr.Error); ok { + if awsErr.Code() == request.ErrCodeSerialization { + r.Error = nil + } + } + } + }) + + log.Init("debug", false) + + mockS3 := &S3{api: mockAPI} + err = mockS3.Copy(context.Background(), u, u, Metadata{}) + if err != nil { + t.Fatalf("expected no error, got: %v", err) + } + + // Expect: CopyObject (fail) -> HeadObject -> CreateMultipartUpload -> UploadPartCopy x2 -> CompleteMultipartUpload + expectedOps := []string{ + "CopyObject", + "HeadObject", + "CreateMultipartUpload", + "UploadPartCopy", + "UploadPartCopy", + "CompleteMultipartUpload", + } + if diff := cmp.Diff(expectedOps, operations); diff != "" { + t.Errorf("unexpected API operations (-want +got):\n%s", diff) + } +}