From 79e843ef17d242797e8475185b66663881032397 Mon Sep 17 00:00:00 2001 From: David Gageot Date: Thu, 19 Mar 2026 18:55:27 +0100 Subject: [PATCH] Fix data races with shared bytes.Buffer using concurrent.Buffer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two independent data races were caused by unsynchronized bytes.Buffer access: 1. pkg/agent: TestModel_LogsSelection sets the global slog default to a handler backed by a bytes.Buffer. Parallel tests (TestModelOverride, TestModelOverride_ConcurrentAccess) also call Agent.Model() which triggers slog.Info(), racing on the shared buffer. 2. pkg/tools/builtin: startLocked() uses a bytes.Buffer as cmd.Stderr. The os/exec goroutine writes to it while readNotifications reads and resets it on a ticker. Introduce concurrent.Buffer — a mutex-protected bytes.Buffer — in the existing pkg/concurrent package (alongside Map and Slice) and use it in both locations. Assisted-By: docker-agent --- pkg/agent/agent_test.go | 4 ++-- pkg/concurrent/buffer.go | 44 ++++++++++++++++++++++++++++++++++++++++ pkg/tools/builtin/lsp.go | 15 +++++++------- 3 files changed, 53 insertions(+), 10 deletions(-) create mode 100644 pkg/concurrent/buffer.go diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go index 9eb2f1783..ee3e221c0 100644 --- a/pkg/agent/agent_test.go +++ b/pkg/agent/agent_test.go @@ -1,7 +1,6 @@ package agent import ( - "bytes" "context" "errors" "log/slog" @@ -11,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/docker/docker-agent/pkg/chat" + "github.com/docker/docker-agent/pkg/concurrent" "github.com/docker/docker-agent/pkg/model/provider/base" "github.com/docker/docker-agent/pkg/tools" ) @@ -144,7 +144,7 @@ func TestModelOverride(t *testing.T) { func TestModel_LogsSelection(t *testing.T) { t.Parallel() - var buf bytes.Buffer + var buf concurrent.Buffer handler := slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelInfo}) prev := slog.Default() slog.SetDefault(slog.New(handler)) diff --git a/pkg/concurrent/buffer.go b/pkg/concurrent/buffer.go new file mode 100644 index 000000000..44d8b9745 --- /dev/null +++ b/pkg/concurrent/buffer.go @@ -0,0 +1,44 @@ +package concurrent + +import ( + "bytes" + "sync" +) + +// Buffer is a concurrency-safe [bytes.Buffer]. +// It implements [io.Writer] so it can be used anywhere a plain buffer would, +// e.g. as the output target for a log handler or as subprocess stderr. +type Buffer struct { + mu sync.Mutex + buf bytes.Buffer +} + +// Write appends p to the buffer. +func (b *Buffer) Write(p []byte) (int, error) { + b.mu.Lock() + defer b.mu.Unlock() + return b.buf.Write(p) +} + +// String returns the buffered content. +func (b *Buffer) String() string { + b.mu.Lock() + defer b.mu.Unlock() + return b.buf.String() +} + +// Reset clears the buffer. +func (b *Buffer) Reset() { + b.mu.Lock() + defer b.mu.Unlock() + b.buf.Reset() +} + +// Drain returns the buffered content and resets the buffer atomically. +func (b *Buffer) Drain() string { + b.mu.Lock() + defer b.mu.Unlock() + s := b.buf.String() + b.buf.Reset() + return s +} diff --git a/pkg/tools/builtin/lsp.go b/pkg/tools/builtin/lsp.go index f6450bbaa..ff2d5f6fc 100644 --- a/pkg/tools/builtin/lsp.go +++ b/pkg/tools/builtin/lsp.go @@ -2,7 +2,6 @@ package builtin import ( "bufio" - "bytes" "cmp" "context" "encoding/json" @@ -20,6 +19,7 @@ import ( "sync/atomic" "time" + "github.com/docker/docker-agent/pkg/concurrent" "github.com/docker/docker-agent/pkg/tools" ) @@ -492,8 +492,8 @@ func (h *lspHandler) startLocked() error { return fmt.Errorf("failed to create stdout pipe: %w", err) } - var stderrBuf bytes.Buffer - cmd.Stderr = &stderrBuf + stderrBuf := &concurrent.Buffer{} + cmd.Stderr = stderrBuf if err := cmd.Start(); err != nil { stdin.Close() @@ -506,7 +506,7 @@ func (h *lspHandler) startLocked() error { h.stdin = stdin h.stdout = bufio.NewReader(stdout) - go h.readNotifications(processCtx, &stderrBuf) + go h.readNotifications(processCtx, stderrBuf) slog.Debug("LSP server started successfully") return nil @@ -1432,7 +1432,7 @@ func (h *lspHandler) readMessageLocked() ([]byte, error) { return body, nil } -func (h *lspHandler) readNotifications(ctx context.Context, stderrBuf *bytes.Buffer) { +func (h *lspHandler) readNotifications(ctx context.Context, stderrBuf *concurrent.Buffer) { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() @@ -1441,9 +1441,8 @@ func (h *lspHandler) readNotifications(ctx context.Context, stderrBuf *bytes.Buf case <-ctx.Done(): return case <-ticker.C: - if stderrBuf.Len() > 0 { - slog.Debug("LSP stderr", "content", stderrBuf.String()) - stderrBuf.Reset() + if content := stderrBuf.Drain(); content != "" { + slog.Debug("LSP stderr", "content", content) } } }