Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,25 @@ jobs:

name: test (${{ matrix.os }}/go-${{ matrix.go-version }})
runs-on: ${{ matrix.os }}-latest
services:
minio:
image: ${{ (matrix.os == 'ubuntu') && 'bitnami/minio:2023.7.18' || ''}}
ports:
- 45677:9000
options: >-
--health-cmd "curl -I http://localhost:9000/minio/health/live -s"
--health-interval 10s
--health-timeout 5s
--health-retries 5
env:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
with:
go-version: ${{ matrix.go-version }}

- name: Start MinIO
if: matrix.os == 'ubuntu'
run: |
docker run -d --name minio \
-p 45677:9000 \
-e MINIO_ROOT_USER=minioadmin \
-e MINIO_ROOT_PASSWORD=minioadmin \
minio/minio:RELEASE.2025-09-07T16-13-09Z server /data
for i in $(seq 1 30); do
curl -sf http://localhost:45677/minio/health/live && break
sleep 1
done

- run: make test
env:
S5CMD_TEST_ENDPOINT_URL: ${{ (matrix.os == 'ubuntu') && 'http://localhost:45677' || '' }}
Expand Down
177 changes: 175 additions & 2 deletions storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ const (

// the key of the object metadata which is used to handle retry decision on NoSuchUpload error
metadataKeyRetryID = "s5cmd-upload-retry-id"

// defaultCopyPartSize is the part size for multipart copy operations (5 GiB).
defaultCopyPartSize = 5 * 1024 * 1024 * 1024
)

// Re-used AWS sessions dramatically improve performance.
Expand Down Expand Up @@ -476,12 +479,41 @@ func (s *S3) listObjects(ctx context.Context, url *url.URL) <-chan *Object {
}

// Copy is a single-object copy operation which copies objects to S3
// destination from another S3 source.
// destination from another S3 source. For objects larger than 5 GiB,
// it automatically falls back to multipart copy.
func (s *S3) Copy(ctx context.Context, from, to *url.URL, metadata Metadata) error {
if s.dryRun {
return nil
}

err := s.singleCopy(ctx, from, to, metadata)
if err == nil {
return nil
}

// CopyObject fails for objects >5 GiB. S3 may return EntityTooLarge
// or InvalidRequest with a "copy source is larger" message.
// Fall back to multipart copy.
if !isCopySourceTooLargeError(err) {
return err
}

obj, statErr := s.Stat(ctx, from)
if statErr != nil {
return err // return the original copy error
}

msg := log.DebugMessage{
Err: fmt.Sprintf("object too large for single copy (%d bytes), using multipart copy: %v", obj.Size, from),
}
log.Debug(msg)

return s.multipartCopy(ctx, from, to, metadata, obj.Size)
}

// singleCopy copies an object using a single CopyObject API call.
// This supports objects up to 5 GiB.
func (s *S3) singleCopy(ctx context.Context, from, to *url.URL, metadata Metadata) error {
// SDK expects CopySource like "bucket[/key]"
copySource := from.EscapedPath()

Expand Down Expand Up @@ -564,10 +596,131 @@ func (s *S3) Copy(ctx context.Context, from, to *url.URL, metadata Metadata) err
input.Metadata = m
}

_, err := s.api.CopyObject(input)
_, err := s.api.CopyObjectWithContext(ctx, input)
return err
}

// multipartCopy copies an object using multipart upload with UploadPartCopy.
// This is required for objects larger than 5 GiB which exceed the CopyObject
// API limit.
func (s *S3) multipartCopy(ctx context.Context, from, to *url.URL, metadata Metadata, srcSize int64) error {
copySource := from.EscapedPath()
if from.VersionID != "" {
copySource += "?versionId=" + from.VersionID
}

partSize := int64(defaultCopyPartSize)
numParts := (srcSize + partSize - 1) / partSize

createInput := &s3.CreateMultipartUploadInput{
Bucket: aws.String(to.Bucket),
Key: aws.String(to.Path),
RequestPayer: s.RequestPayer(),
}

if metadata.StorageClass != "" {
createInput.StorageClass = aws.String(metadata.StorageClass)
}
if metadata.ACL != "" {
createInput.ACL = aws.String(metadata.ACL)
}
if metadata.CacheControl != "" {
createInput.CacheControl = aws.String(metadata.CacheControl)
}
if metadata.Expires != "" {
t, err := time.Parse(time.RFC3339, metadata.Expires)
if err != nil {
return err
}
createInput.Expires = aws.Time(t)
}
if metadata.EncryptionMethod != "" {
createInput.ServerSideEncryption = aws.String(metadata.EncryptionMethod)
if metadata.EncryptionKeyID != "" {
createInput.SSEKMSKeyId = aws.String(metadata.EncryptionKeyID)
}
}
if metadata.ContentEncoding != "" {
createInput.ContentEncoding = aws.String(metadata.ContentEncoding)
}
if metadata.ContentDisposition != "" {
createInput.ContentDisposition = aws.String(metadata.ContentDisposition)
}
if metadata.ContentType != "" {
createInput.ContentType = aws.String(metadata.ContentType)
}
if len(metadata.UserDefined) != 0 {
m := make(map[string]*string)
for k, v := range metadata.UserDefined {
m[k] = aws.String(v)
}
createInput.Metadata = m
}

createOutput, err := s.api.CreateMultipartUploadWithContext(ctx, createInput)
if err != nil {
return err
}

uploadID := aws.StringValue(createOutput.UploadId)

// abort cleans up the multipart upload on failure.
abort := func() {
// Use background context for cleanup in case the original is cancelled.
s.api.AbortMultipartUploadWithContext(context.Background(), &s3.AbortMultipartUploadInput{
Bucket: aws.String(to.Bucket),
Key: aws.String(to.Path),
UploadId: aws.String(uploadID),
RequestPayer: s.RequestPayer(),
})
}

completedParts := make([]*s3.CompletedPart, 0, numParts)

for i := int64(0); i < numParts; i++ {
start := i * partSize
end := start + partSize - 1
if end >= srcSize {
end = srcSize - 1
}

partOutput, err := s.api.UploadPartCopyWithContext(ctx, &s3.UploadPartCopyInput{
Bucket: aws.String(to.Bucket),
Key: aws.String(to.Path),
CopySource: aws.String(copySource),
CopySourceRange: aws.String(fmt.Sprintf("bytes=%d-%d", start, end)),
PartNumber: aws.Int64(i + 1),
UploadId: aws.String(uploadID),
RequestPayer: s.RequestPayer(),
})
if err != nil {
abort()
return err
}

completedParts = append(completedParts, &s3.CompletedPart{
ETag: partOutput.CopyPartResult.ETag,
PartNumber: aws.Int64(i + 1),
})
}

_, err = s.api.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(to.Bucket),
Key: aws.String(to.Path),
UploadId: aws.String(uploadID),
MultipartUpload: &s3.CompletedMultipartUpload{
Parts: completedParts,
},
RequestPayer: s.RequestPayer(),
})
if err != nil {
abort()
return err
}

return nil
}

// Read fetches the remote object and returns its contents as an io.ReadCloser.
func (s *S3) Read(ctx context.Context, src *url.URL) (io.ReadCloser, error) {
input := &s3.GetObjectInput{
Expand Down Expand Up @@ -1429,6 +1582,26 @@ func isVirtualHostStyle(endpoint urlpkg.URL) bool {
return endpoint == sentinelURL || supportsTransferAcceleration(endpoint) || IsGoogleEndpoint(endpoint)
}

// isCopySourceTooLargeError reports whether the error indicates that a
// CopyObject call failed because the source exceeds the 5 GiB single-copy
// limit. S3 may return either "EntityTooLarge" or "InvalidRequest" with a
// message mentioning the copy source size limit.
func isCopySourceTooLargeError(err error) bool {
if errHasCode(err, "EntityTooLarge") {
return true
}

var awsErr awserr.Error
if errors.As(err, &awsErr) {
if awsErr.Code() == "InvalidRequest" &&
strings.Contains(awsErr.Message(), "copy source is larger") {
return true
}
}

return false
}

func errHasCode(err error, code string) bool {
if err == nil || code == "" {
return false
Expand Down
132 changes: 132 additions & 0 deletions storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1341,3 +1341,135 @@ func (e tempError) Error() string { return e.err.Error() }
func (e tempError) Temporary() bool { return e.temp }

func (e *tempError) Unwrap() error { return e.err }

func TestIsCopySourceTooLargeError(t *testing.T) {
testcases := []struct {
name string
err error
expected bool
}{
{
name: "EntityTooLarge",
err: awserr.New("EntityTooLarge", "entity too large", nil),
expected: true,
},
{
name: "InvalidRequest with copy source message",
err: awserr.New("InvalidRequest", "The specified copy source is larger than the maximum allowable size for a copy source: 5368709120", nil),
expected: true,
},
{
name: "InvalidRequest with different message",
err: awserr.New("InvalidRequest", "some other problem", nil),
expected: false,
},
{
name: "other error code",
err: awserr.New("AccessDenied", "access denied", nil),
expected: false,
},
{
name: "nil error",
err: nil,
expected: false,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
got := isCopySourceTooLargeError(tc.err)
assert.Equal(t, got, tc.expected)
})
}
}

func TestS3MultipartCopyFallback(t *testing.T) {
u, err := url.New("s3://bucket/key")
if err != nil {
t.Fatal(err)
}

var operations []string

mockAPI := s3.New(unit.Session)
mockAPI.Handlers.Unmarshal.Clear()
mockAPI.Handlers.UnmarshalMeta.Clear()
mockAPI.Handlers.UnmarshalError.Clear()
mockAPI.Handlers.Send.Clear()

mockAPI.Handlers.Send.PushBack(func(r *request.Request) {
operations = append(operations, r.Operation.Name)

switch r.Operation.Name {
case "CopyObject":
r.Error = awserr.New("InvalidRequest",
"The specified copy source is larger than the maximum allowable size for a copy source: 5368709120", nil)
r.HTTPResponse = &http.Response{
StatusCode: http.StatusBadRequest,
Body: io.NopCloser(strings.NewReader("")),
}
case "HeadObject":
r.HTTPResponse = &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader("")),
}
// Modify the existing output struct (the SDK returns the original pointer).
r.Data.(*s3.HeadObjectOutput).ContentLength = aws.Int64(6 * 1024 * 1024 * 1024) // 6 GiB
case "CreateMultipartUpload":
r.HTTPResponse = &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader("")),
}
r.Data.(*s3.CreateMultipartUploadOutput).UploadId = aws.String("test-upload-id")
case "UploadPartCopy":
r.HTTPResponse = &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader("")),
}
r.Data.(*s3.UploadPartCopyOutput).CopyPartResult = &s3.CopyPartResult{
ETag: aws.String("\"etag123\""),
}
case "CompleteMultipartUpload":
r.HTTPResponse = &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader("")),
}
default:
r.HTTPResponse = &http.Response{
StatusCode: http.StatusOK,
Body: io.NopCloser(strings.NewReader("")),
}
}
})

mockAPI.Handlers.Unmarshal.PushBack(func(r *request.Request) {
if r.Error != nil {
if awsErr, ok := r.Error.(awserr.Error); ok {
if awsErr.Code() == request.ErrCodeSerialization {
r.Error = nil
}
}
}
})

log.Init("debug", false)

mockS3 := &S3{api: mockAPI}
err = mockS3.Copy(context.Background(), u, u, Metadata{})
if err != nil {
t.Fatalf("expected no error, got: %v", err)
}

// Expect: CopyObject (fail) -> HeadObject -> CreateMultipartUpload -> UploadPartCopy x2 -> CompleteMultipartUpload
expectedOps := []string{
"CopyObject",
"HeadObject",
"CreateMultipartUpload",
"UploadPartCopy",
"UploadPartCopy",
"CompleteMultipartUpload",
}
if diff := cmp.Diff(expectedOps, operations); diff != "" {
t.Errorf("unexpected API operations (-want +got):\n%s", diff)
}
}