From 130d4adf0620806644f1d2cab4e786e57821f130 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Sat, 14 Feb 2026 19:13:40 +0000 Subject: [PATCH 1/3] Fix race condition in BestEffort PutObject and prevent data corruption in Fallback This commit addresses several critical issues with the `BestEffort` and `Fallback` routing strategies for `PutObject` and `UploadPart` operations: 1. **BestEffort Race Condition & Deadlock**: - Previously, `ActBestEffort` used a shared `io.Reader` for both primary and secondary uploads, leading to a race condition where one upload would consume the stream, causing the other to fail or upload partial data. - Also fixed a deadlock in `doParallel` where the primary operation would block waiting for the secondary operation to start consuming the piped stream, but the secondary operation was only started *after* the primary finished. The secondary operation is now started concurrently. - Introduced `tolerantWriter` in `router.go` and updated `teeBody` to support a tolerant mode where write errors to the secondary stream are ignored (for `ActBestEffort`), allowing the primary stream to proceed uninterrupted. 2. **Fallback Safety**: - `ActFallback` for large streaming bodies is unsafe because the stream cannot be rewound after a failed primary attempt. This would result in an empty body being sent to the secondary. - Added explicit checks in `ops.go` and `multipart_ops.go` to return an error if `ActFallback` is used with a large or unknown-length stream, preventing silent data corruption. - Small bodies (buffered via `drainBody`) continue to support fallback correctly. 3. **Test Improvements**: - Added `router_ops_test.go` with integration tests for `ActBestEffort` (concurrent write verification) and `ActFallback` (small body retry). - Fixed test stability by using `sync.WaitGroup` instead of `time.Sleep`. - Updated `router_test.go` to match new `teeBody` signature. Co-authored-by: wilbeibi <659203+wilbeibi@users.noreply.github.com> --- multipart_ops.go | 8 +- ops.go | 18 +++-- router.go | 70 ++++++++++++++--- router_ops_test.go | 190 +++++++++++++++++++++++++++++++++++++++++++++ router_test.go | 2 +- 5 files changed, 270 insertions(+), 18 deletions(-) create mode 100644 router_ops_test.go diff --git a/multipart_ops.go b/multipart_ops.go index ee56f1a..3f5ec30 100644 --- a/multipart_ops.go +++ b/multipart_ops.go @@ -49,7 +49,13 @@ func (c *router) UploadPart(ctx context.Context, in *s3.UploadPartInput, optFns ) // UploadPart typically handles large chunks (5MB-5GB), use streaming if in.ContentLength == nil || *in.ContentLength >= c.maxBufferBytes { - r1, r2, err = teeBody(ctx, in.Body) + // For large bodies, we can only support parallel actions (Mirror, BestEffort). + // Fallback requires buffering or seeking, which we can't do for generic streams. + if action == config.ActFallback { + return nil, fmt.Errorf("%s: fallback strategy is not supported for large or unknown-length streams", op) + } + tolerant := (action == config.ActBestEffort) + r1, r2, err = teeBody(ctx, in.Body, tolerant) } else { r1, r2, err = drainBody(ctx, in.Body) } diff --git a/ops.go b/ops.go index 7c2bd7c..502ad2c 100644 --- a/ops.go +++ b/ops.go @@ -56,22 +56,30 @@ func (c *router) PutObject( primB, secB := c.cfg.PhysicalBuckets(bucket) inPrimary, inSecondary := *in, *in inPrimary.Bucket, inSecondary.Bucket = aws.String(primB), aws.String(secB) - if action == config.ActMirror && in.Body != nil { + if (action == config.ActMirror || action == config.ActBestEffort || action == config.ActFallback) && in.Body != nil { var ( r1, r2 io.Reader err error ) // If ContentLength is not provided, S3 use chunked transfer encoding. if in.ContentLength == nil || *in.ContentLength >= c.maxBufferBytes { - r1, r2, err = teeBody(ctx, in.Body) + // For large bodies, we can only support parallel actions (Mirror, BestEffort). + // Fallback requires buffering or seeking, which we can't do for generic streams. + if action == config.ActFallback { + return nil, fmt.Errorf("%s: fallback strategy is not supported for large or unknown-length streams", op) + } + tolerant := (action == config.ActBestEffort) + r1, r2, err = teeBody(ctx, in.Body, tolerant) } else { r1, r2, err = drainBody(ctx, in.Body) } if err != nil { - return nil, fmt.Errorf("%s: failed to split body for mirror: %w", op, err) + return nil, fmt.Errorf("%s: failed to split body for %s: %w", op, action, err) + } + if r1 != nil && r2 != nil { + inPrimary.Body = r1 + inSecondary.Body = r2 } - inPrimary.Body = r1 - inSecondary.Body = r2 } return dispatch(ctx, action, func(ctx context.Context, st store.Store, in *s3.PutObjectInput) (*s3.PutObjectOutput, error) { diff --git a/router.go b/router.go index 3192c38..fc612ee 100644 --- a/router.go +++ b/router.go @@ -116,10 +116,10 @@ func doParallel[I any, T any]( return out, nil } // best-effort: fire-and-forget secondary - out, err := op(ctx, s1, in1) go func() { _, _ = op(ctx, s2, in2) }() + out, err := op(ctx, s1, in1) return out, err } @@ -134,24 +134,72 @@ func drainBody(ctx context.Context, r io.Reader) (io.ReadSeeker, io.ReadSeeker, return bytes.NewReader(data), bytes.NewReader(data), nil } -func teeBody(ctx context.Context, r io.Reader) (io.ReadCloser, io.ReadCloser, error) { +// tolerantWriter wraps an io.Writer and ignores errors from the underlying writer. +type tolerantWriter struct { + w io.Writer + failed bool +} + +func (t *tolerantWriter) Write(p []byte) (n int, err error) { + if t.failed { + return len(p), nil + } + n, err = t.w.Write(p) + if err != nil { + t.failed = true + // Act as if we wrote everything to keep MultiWriter happy. + return len(p), nil + } + return n, nil +} + +func teeBody(ctx context.Context, r io.Reader, tolerantSecondary bool) (io.ReadCloser, io.ReadCloser, error) { pr1, pw1 := io.Pipe() pr2, pw2 := io.Pipe() go func() { defer pw1.Close() defer pw2.Close() - select { - case <-ctx.Done(): - err := ctx.Err() - pw1.CloseWithError(err) - pw2.CloseWithError(err) - return - default: - _, err := io.Copy(io.MultiWriter(pw1, pw2), r) - if err != nil { + var w2 io.Writer = pw2 + if tolerantSecondary { + w2 = &tolerantWriter{w: pw2} + } + mw := io.MultiWriter(pw1, w2) + + buf := make([]byte, 32*1024) + for { + select { + case <-ctx.Done(): + err := ctx.Err() pw1.CloseWithError(err) pw2.CloseWithError(err) + return + default: + } + + nr, er := r.Read(buf) + if nr > 0 { + nw, ew := mw.Write(buf[0:nr]) + if ew != nil { + // Check if error is from pw1 + // If tolerantWriter wrapped pw2, it won't return error. + // So any error here means pw1 failed (or tolerantWriter itself failed unexpectedly). + pw1.CloseWithError(ew) + pw2.CloseWithError(ew) + return + } + if nr != nw { + pw1.CloseWithError(io.ErrShortWrite) + pw2.CloseWithError(io.ErrShortWrite) + return + } + } + if er != nil { + if er != io.EOF { + pw1.CloseWithError(er) + pw2.CloseWithError(er) + } + return } } }() diff --git a/router_ops_test.go b/router_ops_test.go new file mode 100644 index 0000000..9aba5cd --- /dev/null +++ b/router_ops_test.go @@ -0,0 +1,190 @@ +package s3router + +import ( + "context" + "io" + "strings" + "sync" + "testing" + + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/wilbeibi/s3router/config" +) + +type mockBodyStore struct { + name string + t *testing.T + // Optional: Simulate error for primary/secondary + fail bool + // Optional: WaitGroup to signal completion + wg *sync.WaitGroup +} + +func (m *mockBodyStore) GetObject(ctx context.Context, in *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + return nil, nil +} +func (m *mockBodyStore) PutObject(ctx context.Context, in *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) { + if m.wg != nil { + defer m.wg.Done() + } + if in.Body == nil { + m.t.Errorf("[%s] PutObject body is nil", m.name) + return nil, nil + } + // Simulate reading the body + data, err := io.ReadAll(in.Body) + if err != nil { + m.t.Errorf("[%s] failed to read body: %v", m.name, err) + return nil, err + } + + if m.fail { + return nil, io.EOF // Simulate error + } + + expected := "hello world" + if string(data) != expected { + m.t.Errorf("[%s] got body %q, want %q", m.name, string(data), expected) + } else { + // m.t.Logf("[%s] successfully read body", m.name) + } + return &s3.PutObjectOutput{}, nil +} +func (m *mockBodyStore) HeadObject(ctx context.Context, in *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + return nil, nil +} +func (m *mockBodyStore) DeleteObject(ctx context.Context, in *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) { + return nil, nil +} +func (m *mockBodyStore) DeleteObjects(ctx context.Context, in *s3.DeleteObjectsInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error) { + return nil, nil +} +func (m *mockBodyStore) ListObjectsV2(ctx context.Context, in *s3.ListObjectsV2Input, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { + return nil, nil +} +func (m *mockBodyStore) CreateMultipartUpload(ctx context.Context, in *s3.CreateMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error) { + return nil, nil +} +func (m *mockBodyStore) UploadPart(ctx context.Context, in *s3.UploadPartInput, optFns ...func(*s3.Options)) (*s3.UploadPartOutput, error) { + return nil, nil +} +func (m *mockBodyStore) CompleteMultipartUpload(ctx context.Context, in *s3.CompleteMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error) { + return nil, nil +} +func (m *mockBodyStore) ListParts(ctx context.Context, in *s3.ListPartsInput, optFns ...func(*s3.Options)) (*s3.ListPartsOutput, error) { + return nil, nil +} +func (m *mockBodyStore) AbortMultipartUpload(ctx context.Context, in *s3.AbortMultipartUploadInput, optFns ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error) { + return nil, nil +} + +func stringPtr(s string) *string { + return &s +} + +func TestPutObject_BestEffort_Concurrent(t *testing.T) { + cfgYaml := ` +endpoints: + primary: http://p + secondary: http://s +buckets: + testbucket: + primary: p + secondary: s +rules: + - bucket: testbucket + prefix: + "*": + PutObject: best-effort + "*": primary +` + cfg, err := config.Load(strings.NewReader(cfgYaml)) + if err != nil { + t.Fatalf("failed to load config: %v", err) + } + + var wg sync.WaitGroup + wg.Add(2) + + primary := &mockBodyStore{name: "primary", t: t, wg: &wg} + secondary := &mockBodyStore{name: "secondary", t: t, wg: &wg} + + r, err := New(cfg, primary, secondary) + if err != nil { + t.Fatalf("New failed: %v", err) + } + + // Use a pipe to simulate a stream + pr, pw := io.Pipe() + go func() { + pw.Write([]byte("hello world")) + pw.Close() + }() + + ctx := context.Background() + _, err = r.PutObject(ctx, &s3.PutObjectInput{ + Bucket: stringPtr("testbucket"), + Key: stringPtr("obj"), + Body: pr, + }) + if err != nil { + t.Fatalf("PutObject failed: %v", err) + } + + // Wait for both primary (which finishes before PutObject returns) + // and secondary (which finishes async) to complete. + wg.Wait() +} + +func TestPutObject_Fallback_SmallBody(t *testing.T) { + // Fallback should work for small bodies by buffering + cfgYaml := ` +endpoints: + primary: http://p + secondary: http://s +buckets: + testbucket: + primary: p + secondary: s +rules: + - bucket: testbucket + prefix: + "*": + PutObject: fallback + "*": primary +` + cfg, err := config.Load(strings.NewReader(cfgYaml)) + if err != nil { + t.Fatalf("failed to load config: %v", err) + } + + primary := &mockBodyStore{name: "primary", t: t, fail: true} // Primary fails + secondary := &mockBodyStore{name: "secondary", t: t} + + r, err := New(cfg, primary, secondary) + if err != nil { + t.Fatalf("New failed: %v", err) + } + + bodyContent := "hello world" + contentLength := int64(len(bodyContent)) + // ContentLength is optional but drainBody uses it to decide buffering if available. + // If nil, it checks maxBufferBytes which is large by default. + // So drainBody will buffer "hello world" only if ContentLength is set and small. + + ctx := context.Background() + _, err = r.PutObject(ctx, &s3.PutObjectInput{ + Bucket: stringPtr("testbucket"), + Key: stringPtr("obj"), + Body: strings.NewReader(bodyContent), + ContentLength: &contentLength, + }) + + // Even if it is strings.NewReader (seekable), drainBody reads it all. + // Primary mock reads it all (fails). + // Secondary mock should receive a fresh reader with same content. + + if err != nil { + t.Fatalf("PutObject failed: %v", err) + } +} diff --git a/router_test.go b/router_test.go index 8958de4..5449ef2 100644 --- a/router_test.go +++ b/router_test.go @@ -124,7 +124,7 @@ func TestTeeBody(t *testing.T) { ctx := context.Background() want := []byte("stream‑content") - pr1, pr2, err := teeBody(ctx, bytes.NewReader(want)) + pr1, pr2, err := teeBody(ctx, bytes.NewReader(want), false) if err != nil { t.Fatalf("teeBody error: %v", err) } From 2607ae61aeebec0e0de1e069e688ab067a0c1b76 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Sat, 14 Feb 2026 19:15:37 +0000 Subject: [PATCH 2/3] Fix CI failure by removing invalid uv config in go.yml The CI workflow was failing because: 1. `enable-cache: true` was set for `astral-sh/setup-uv`, but no `uv.lock` file exists in the repository, causing a failure. 2. `python-version` was provided as an input, but `astral-sh/setup-uv` does not support this input. This change removes the invalid configuration to allow the workflow to proceed. `uv run` will handle Python dependencies as specified in `scripts/validate_config.py`. Co-authored-by: wilbeibi <659203+wilbeibi@users.noreply.github.com> --- .github/workflows/go.yml | 3 --- 1 file changed, 3 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 68151fb..91e4dd7 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -19,9 +19,6 @@ jobs: - name: Set up uv uses: astral-sh/setup-uv@v3 - with: - enable-cache: true - python-version: "3.11" - name: Validate router configs run: | From 78d0bba4046f8cbf8cc7b095679e81e5053ae13d Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Sun, 15 Feb 2026 00:10:10 +0000 Subject: [PATCH 3/3] Fix BestEffort race condition and Fallback safety This commit addresses several critical issues with the `BestEffort` and `Fallback` routing strategies for `PutObject` and `UploadPart` operations: 1. **BestEffort Race Condition & Deadlock**: - Previously, `ActBestEffort` used a shared `io.Reader` for both primary and secondary uploads, leading to a race condition where one upload would consume the stream, causing the other to fail or upload partial data. - Also fixed a deadlock in `doParallel` where the primary operation would block waiting for the secondary operation to start consuming the piped stream, but the secondary operation was only started *after* the primary finished. The secondary operation is now started concurrently. - Introduced `tolerantWriter` in `router.go` and updated `teeBody` to support a tolerant mode where write errors to the secondary stream are ignored (for `ActBestEffort`), allowing the primary stream to proceed uninterrupted. 2. **Fallback Safety**: - `ActFallback` for large streaming bodies is unsafe because the stream cannot be rewound after a failed primary attempt. This would result in an empty body being sent to the secondary. - Added explicit checks in `ops.go` and `multipart_ops.go` to return an error if `ActFallback` is used with a large or unknown-length stream, preventing silent data corruption. - Small bodies (buffered via `drainBody`) continue to support fallback correctly. 3. **Test Improvements**: - Added `router_ops_test.go` with integration tests for `ActBestEffort` (concurrent write verification) and `ActFallback` (small body retry). - Fixed test stability by using `sync.WaitGroup` instead of `time.Sleep`. - Updated `router_test.go` to match new `teeBody` signature. 4. **CI Fix**: - Removed invalid `enable-cache` and `python-version` inputs from the `astral-sh/setup-uv` step in `.github/workflows/go.yml` to fix CI failures. Co-authored-by: wilbeibi <659203+wilbeibi@users.noreply.github.com>