From b3c2fc8fda45df2c45a0cd30ecee9254f0b12383 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Sat, 27 Jun 2026 13:25:39 +0000 Subject: [PATCH] fix(hostrunner): bound teardown waits so a wedged child can't hang stop (#77.2/#77.3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the two LOW-severity residuals left after the #77.1 data-race fix (PR #276). Neither could hang a production path today, but both are unbounded waits that a nil/ineffective Closer or a ctx-ignoring driver.Input would turn into a permanent stuck host-runner stop. #77.2 — input-router fire-and-forget dispatch. tick() spawned a per-event goroutine with no tracking; Detach/StopAll waited on loop.done (the run goroutine) but not the in-flight driver.Input goroutines, so a dispatch could outlive teardown. Track them in a per-loop WaitGroup and drain it in run() before close(done). The drain is BOUNDED (inputDispatchDrainTimeout): a dispatch parked in a ctx-ignoring Input (StdioDriver.Input keeps its stdin Write un-preempted) only unblocks when stopDriver calls the driver's Stop() right AFTER Detach — so an unbounded wait there would deadlock. The straggler is reaped by that Stop(); abandoning the bounded wait leaks nothing. #77.3 — readLoop has no ctx.Done(). StdioDriver/ACPDriver readLoops unwind only on pipe EOF via Closer(); Stop() called wg.Wait() unconditionally, so a nil Closer would hang Stop forever. Bound both with driverStopDrainTimeout + a warn. New shared waitTimeout helper (lifecycle.go). Tests (all -race clean): - DetachBoundedByDispatchDrain: a wedged ctx-ignoring dispatch doesn't hang Detach, and is reaped once unblocked. - DetachWaitsForFastDispatch: Detach waits for an in-flight dispatch to finish (verified to FAIL without the drain). - StdioDriverStopBoundedWithoutCloser: Stop returns bounded with a nil Closer and a never-closing pipe. Co-Authored-By: Claude Opus 4.8 --- hub/internal/hostrunner/driver_acp.go | 6 +- hub/internal/hostrunner/driver_stdio.go | 6 +- hub/internal/hostrunner/input_router.go | 26 +++ hub/internal/hostrunner/lifecycle.go | 39 +++++ .../hostrunner/teardown_harden_test.go | 157 ++++++++++++++++++ 5 files changed, 232 insertions(+), 2 deletions(-) create mode 100644 hub/internal/hostrunner/lifecycle.go create mode 100644 hub/internal/hostrunner/teardown_harden_test.go diff --git a/hub/internal/hostrunner/driver_acp.go b/hub/internal/hostrunner/driver_acp.go index cd210fe6..7f8c4b14 100644 --- a/hub/internal/hostrunner/driver_acp.go +++ b/hub/internal/hostrunner/driver_acp.go @@ -622,7 +622,11 @@ func (d *ACPDriver) Stop() { if closer != nil { closer() } - d.wg.Wait() + // Bounded so a nil/ineffective Closer (the readLoop only unwinds on pipe + // EOF, it doesn't select on ctx) can't hang Stop forever (#77.3). + if !waitTimeout(&d.wg, driverStopDrainTimeout) { + d.Log.Warn("acp readLoop did not drain on Stop; abandoning", "agent", d.AgentID) + } // Drain any stragglers waiting on responses so callers don't leak. d.pendingMu.Lock() diff --git a/hub/internal/hostrunner/driver_stdio.go b/hub/internal/hostrunner/driver_stdio.go index 1674776e..7381435b 100644 --- a/hub/internal/hostrunner/driver_stdio.go +++ b/hub/internal/hostrunner/driver_stdio.go @@ -106,7 +106,11 @@ func (d *StdioDriver) Stop() { if closer != nil { closer() } - d.wg.Wait() + // Bounded so a nil/ineffective Closer (the readLoop only unwinds on pipe + // EOF, it doesn't select on ctx) can't hang Stop forever (#77.3). + if !waitTimeout(&d.wg, driverStopDrainTimeout) { + d.Log.Warn("stdio readLoop did not drain on Stop; abandoning", "agent", d.AgentID) + } shutCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() diff --git a/hub/internal/hostrunner/input_router.go b/hub/internal/hostrunner/input_router.go index fb3d0983..9e2ec8cc 100644 --- a/hub/internal/hostrunner/input_router.go +++ b/hub/internal/hostrunner/input_router.go @@ -63,8 +63,21 @@ type inputAgentLoop struct { cancel context.CancelFunc done chan struct{} lastSeq int64 + // dispatch tracks the per-event goroutines tick() spawns so the loop can + // drain them on teardown instead of letting them outlive Detach (#77.2). + dispatch sync.WaitGroup } +// inputDispatchDrainTimeout bounds how long run() waits for in-flight dispatch +// goroutines after its context is cancelled. A dispatch can be parked in a +// ctx-ignoring driver.Input (StdioDriver.Input keeps its stdin Write +// un-preempted on purpose); that straggler is unblocked moments later when +// stopDriver calls the driver's Stop() — which closes the transport — right +// after Detach. So the bound just keeps Detach from blocking on a wedged child; +// it never has to be long, and nothing leaks when it elapses. +// A var so teardown tests can shorten it. +var inputDispatchDrainTimeout = 2 * time.Second + // NewInputRouter returns a router wired against the host-runner client. // Log defaults to slog.Default() if nil. func NewInputRouter(client InputLister, log *slog.Logger) *InputRouter { @@ -145,6 +158,17 @@ func (r *InputRouter) StopAll() { func (r *InputRouter) run(ctx context.Context, agentID string, driver Inputter, loop *inputAgentLoop) { defer close(loop.done) + // Drain in-flight dispatch goroutines before signalling done, so Detach / + // StopAll (which block on loop.done) observe a torn-down agent's dispatches + // having finished rather than racing them (#77.2). Bounded so a wedged + // driver.Input can't hang the stop path — see inputDispatchDrainTimeout. + // Registered after `defer close(loop.done)` so it runs first (LIFO). + defer func() { + if !waitTimeout(&loop.dispatch, inputDispatchDrainTimeout) { + r.Log.Warn("input dispatch drain timed out; abandoning straggler", + "agent", agentID) + } + }() t := time.NewTicker(inputPollInterval) defer t.Stop() for { @@ -236,7 +260,9 @@ func (r *InputRouter) tick(ctx context.Context, agentID string, driver Inputter, // Spawning per-event is fine at user-input cardinality. seq := ev.Seq eventID := ev.ID + loop.dispatch.Add(1) go func() { + defer loop.dispatch.Done() if err := driver.Input(ctx, kind, payload); err != nil { r.Log.Warn("input dispatch failed", "agent", agentID, "seq", seq, "kind", kind, "err", err) diff --git a/hub/internal/hostrunner/lifecycle.go b/hub/internal/hostrunner/lifecycle.go new file mode 100644 index 00000000..c085e363 --- /dev/null +++ b/hub/internal/hostrunner/lifecycle.go @@ -0,0 +1,39 @@ +package hostrunner + +import ( + "sync" + "time" +) + +// driverStopDrainTimeout bounds how long a driver's Stop() waits for its +// readLoop to unwind. The loop normally drains the instant Closer closes its +// pipe (scanner hits EOF); the bound is the backstop for a nil or ineffective +// Closer, which would otherwise hang Stop() — and the whole host-runner stop +// path — forever (#77.3). A var so teardown tests can shorten it. +var driverStopDrainTimeout = 5 * time.Second + +// waitTimeout blocks until wg drains or d elapses, whichever comes first. It +// returns true if the group drained and false on timeout. +// +// Teardown paths in this package wait on goroutines that can be wedged on a +// blocking syscall a context cancel does not preempt — a child whose stdin +// buffer is full (StdioDriver.Input deliberately does not abort its Write), or a +// readLoop scanner that only unwinds when its pipe closes. An unconditional +// wg.Wait() there can hang the host-runner's stop path forever (#77). Bounding +// the wait keeps teardown live: the straggler is unblocked moments later when +// the driver's transport is closed, so abandoning the wait leaks nothing. +func waitTimeout(wg *sync.WaitGroup, d time.Duration) bool { + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + t := time.NewTimer(d) + defer t.Stop() + select { + case <-done: + return true + case <-t.C: + return false + } +} diff --git a/hub/internal/hostrunner/teardown_harden_test.go b/hub/internal/hostrunner/teardown_harden_test.go new file mode 100644 index 00000000..5f3782f0 --- /dev/null +++ b/hub/internal/hostrunner/teardown_harden_test.go @@ -0,0 +1,157 @@ +package hostrunner + +import ( + "context" + "encoding/json" + "io" + "sync" + "sync/atomic" + "testing" + "time" +) + +// wedgedInputter parks every Input call until release is closed and, crucially, +// IGNORES ctx — mirroring StdioDriver.Input, whose stdin Write isn't preempted +// by a context cancel. That's the case that would deadlock Detach if the drain +// weren't bounded (#77.2). +type wedgedInputter struct { + started chan struct{} + release chan struct{} + once sync.Once + done chan struct{} +} + +func (b *wedgedInputter) Input(_ context.Context, _ string, _ map[string]any) error { + b.once.Do(func() { close(b.started) }) + <-b.release + close(b.done) + return nil +} + +// TestInputRouter_DetachBoundedByDispatchDrain pins #77.2: Detach now waits for +// the per-event dispatch goroutines to drain (so they don't outlive teardown), +// but the wait is BOUNDED — a dispatch wedged in a ctx-ignoring driver.Input +// must not hang the stop path (which would deadlock, since stopDriver only +// closes the transport that unblocks it AFTER Detach returns). +func TestInputRouter_DetachBoundedByDispatchDrain(t *testing.T) { + defer swapDrainTimeout(&inputDispatchDrainTimeout, 150*time.Millisecond)() + + lister := &fakeInputLister{first: []AgentEvent{{ + Seq: 1, Producer: "user", Kind: "input.answer", + Payload: json.RawMessage(`{"body":"hi"}`), + }}} + r := NewInputRouter(lister, silentLogger()) + b := &wedgedInputter{ + started: make(chan struct{}), + release: make(chan struct{}), + done: make(chan struct{}), + } + r.Attach(context.Background(), "a1", b, 0) + + select { + case <-b.started: + case <-time.After(2 * time.Second): + t.Fatal("dispatch never started") + } + + // Detach must return within ~the drain bound even though Input is wedged. + detached := make(chan struct{}) + go func() { r.Detach("a1"); close(detached) }() + select { + case <-detached: + case <-time.After(inputDispatchDrainTimeout + 2*time.Second): + t.Fatal("Detach hung past the dispatch drain bound (#77.2 deadlock)") + } + + // Release the straggler; it must complete (no leak) once unblocked, the way + // the real driver's Stop() unblocks it right after Detach. + close(b.release) + select { + case <-b.done: + case <-time.After(2 * time.Second): + t.Fatal("dispatch goroutine leaked after release") + } +} + +// TestInputRouter_DetachWaitsForFastDispatch pins the normal-path guarantee: +// Detach does not return until an in-flight dispatch finishes (#77.2). The +// dispatch signals it has started, then does measurable work; Detach is called +// while it's mid-flight, and must have waited for completion when it returns. +// Without the drain, Detach would return during the work and `completed` would +// still be false. +func TestInputRouter_DetachWaitsForFastDispatch(t *testing.T) { + started := make(chan struct{}) + var completed atomic.Bool + lister := &fakeInputLister{first: []AgentEvent{{ + Seq: 1, Producer: "user", Kind: "input.answer", + Payload: json.RawMessage(`{"body":"hi"}`), + }}} + r := NewInputRouter(lister, silentLogger()) + r.Attach(context.Background(), "a1", inputterFunc(func() error { + close(started) + time.Sleep(100 * time.Millisecond) // in-flight work, ignores ctx + completed.Store(true) + return nil + }), 0) + + select { + case <-started: + case <-time.After(2 * time.Second): + t.Fatal("dispatch never started") + } + r.Detach("a1") + if !completed.Load() { + t.Fatal("Detach returned before the in-flight dispatch completed (#77.2)") + } +} + +type inputterFunc func() error + +func (f inputterFunc) Input(_ context.Context, _ string, _ map[string]any) error { return f() } + +// blockingReader blocks on Read until unblock is closed, then reports EOF — a +// stand-in for a child stdout pipe that never closes, so a driver's readLoop +// scanner is stuck and only Stop()'s bound can end the wait (#77.3). +type blockingReader struct{ unblock chan struct{} } + +func (b *blockingReader) Read(p []byte) (int, error) { + <-b.unblock + return 0, io.EOF +} + +// TestStdioDriverStopBoundedWithoutCloser pins #77.3: with a nil Closer and a +// readLoop wedged on a never-closing pipe, Stop() must still return (bounded) +// instead of hanging the host-runner stop path on wg.Wait() forever. +func TestStdioDriverStopBoundedWithoutCloser(t *testing.T) { + defer swapDrainTimeout(&driverStopDrainTimeout, 150*time.Millisecond)() + + rd := &blockingReader{unblock: make(chan struct{})} + d := &StdioDriver{ + AgentID: "a1", + Poster: &capturingPoster{}, + Stdout: rd, + Closer: nil, // the bug condition: nothing unblocks readLoop + Log: silentLogger(), + } + if err := d.Start(context.Background()); err != nil { + t.Fatalf("Start: %v", err) + } + + stopped := make(chan struct{}) + go func() { d.Stop(); close(stopped) }() + select { + case <-stopped: + case <-time.After(driverStopDrainTimeout + 2*time.Second): + t.Fatal("Stop hung past the readLoop drain bound (#77.3)") + } + + // Cleanup: let the abandoned readLoop unwind. + close(rd.unblock) +} + +// swapDrainTimeout sets *p to v and returns a restore func for defer. +func swapDrainTimeout(p *time.Duration, v time.Duration) func() { + old := *p + *p = v + return func() { *p = old } +}