Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion hub/internal/hostrunner/driver_acp.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,11 @@ func (d *ACPDriver) Stop() {
if closer != nil {
closer()
}
d.wg.Wait()
// Bounded so a nil/ineffective Closer (the readLoop only unwinds on pipe
// EOF, it doesn't select on ctx) can't hang Stop forever (#77.3).
if !waitTimeout(&d.wg, driverStopDrainTimeout) {
d.Log.Warn("acp readLoop did not drain on Stop; abandoning", "agent", d.AgentID)
}

// Drain any stragglers waiting on responses so callers don't leak.
d.pendingMu.Lock()
Expand Down
6 changes: 5 additions & 1 deletion hub/internal/hostrunner/driver_stdio.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ func (d *StdioDriver) Stop() {
if closer != nil {
closer()
}
d.wg.Wait()
// Bounded so a nil/ineffective Closer (the readLoop only unwinds on pipe
// EOF, it doesn't select on ctx) can't hang Stop forever (#77.3).
if !waitTimeout(&d.wg, driverStopDrainTimeout) {
d.Log.Warn("stdio readLoop did not drain on Stop; abandoning", "agent", d.AgentID)
}

shutCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
Expand Down
26 changes: 26 additions & 0 deletions hub/internal/hostrunner/input_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,21 @@ type inputAgentLoop struct {
cancel context.CancelFunc
done chan struct{}
lastSeq int64
// dispatch tracks the per-event goroutines tick() spawns so the loop can
// drain them on teardown instead of letting them outlive Detach (#77.2).
dispatch sync.WaitGroup
}

// inputDispatchDrainTimeout bounds how long run() waits for in-flight dispatch
// goroutines after its context is cancelled. A dispatch can be parked in a
// ctx-ignoring driver.Input (StdioDriver.Input keeps its stdin Write
// un-preempted on purpose); that straggler is unblocked moments later when
// stopDriver calls the driver's Stop() — which closes the transport — right
// after Detach. So the bound just keeps Detach from blocking on a wedged child;
// it never has to be long, and nothing leaks when it elapses.
// A var so teardown tests can shorten it.
var inputDispatchDrainTimeout = 2 * time.Second

// NewInputRouter returns a router wired against the host-runner client.
// Log defaults to slog.Default() if nil.
func NewInputRouter(client InputLister, log *slog.Logger) *InputRouter {
Expand Down Expand Up @@ -145,6 +158,17 @@ func (r *InputRouter) StopAll() {

func (r *InputRouter) run(ctx context.Context, agentID string, driver Inputter, loop *inputAgentLoop) {
defer close(loop.done)
// Drain in-flight dispatch goroutines before signalling done, so Detach /
// StopAll (which block on loop.done) observe a torn-down agent's dispatches
// having finished rather than racing them (#77.2). Bounded so a wedged
// driver.Input can't hang the stop path — see inputDispatchDrainTimeout.
// Registered after `defer close(loop.done)` so it runs first (LIFO).
defer func() {
if !waitTimeout(&loop.dispatch, inputDispatchDrainTimeout) {
r.Log.Warn("input dispatch drain timed out; abandoning straggler",
"agent", agentID)
}
}()
t := time.NewTicker(inputPollInterval)
defer t.Stop()
for {
Expand Down Expand Up @@ -236,7 +260,9 @@ func (r *InputRouter) tick(ctx context.Context, agentID string, driver Inputter,
// Spawning per-event is fine at user-input cardinality.
seq := ev.Seq
eventID := ev.ID
loop.dispatch.Add(1)
go func() {
defer loop.dispatch.Done()
if err := driver.Input(ctx, kind, payload); err != nil {
r.Log.Warn("input dispatch failed",
"agent", agentID, "seq", seq, "kind", kind, "err", err)
Expand Down
39 changes: 39 additions & 0 deletions hub/internal/hostrunner/lifecycle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package hostrunner

import (
"sync"
"time"
)

// driverStopDrainTimeout bounds how long a driver's Stop() waits for its
// readLoop to unwind. The loop normally drains the instant Closer closes its
// pipe (scanner hits EOF); the bound is the backstop for a nil or ineffective
// Closer, which would otherwise hang Stop() — and the whole host-runner stop
// path — forever (#77.3). A var so teardown tests can shorten it.
var driverStopDrainTimeout = 5 * time.Second

// waitTimeout blocks until wg drains or d elapses, whichever comes first. It
// returns true if the group drained and false on timeout.
//
// Teardown paths in this package wait on goroutines that can be wedged on a
// blocking syscall a context cancel does not preempt — a child whose stdin
// buffer is full (StdioDriver.Input deliberately does not abort its Write), or a
// readLoop scanner that only unwinds when its pipe closes. An unconditional
// wg.Wait() there can hang the host-runner's stop path forever (#77). Bounding
// the wait keeps teardown live: the straggler is unblocked moments later when
// the driver's transport is closed, so abandoning the wait leaks nothing.
func waitTimeout(wg *sync.WaitGroup, d time.Duration) bool {
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
t := time.NewTimer(d)
defer t.Stop()
select {
case <-done:
return true
case <-t.C:
return false
}
}
157 changes: 157 additions & 0 deletions hub/internal/hostrunner/teardown_harden_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package hostrunner

import (
"context"
"encoding/json"
"io"
"sync"
"sync/atomic"
"testing"
"time"
)

// wedgedInputter parks every Input call until release is closed and, crucially,
// IGNORES ctx — mirroring StdioDriver.Input, whose stdin Write isn't preempted
// by a context cancel. That's the case that would deadlock Detach if the drain
// weren't bounded (#77.2).
type wedgedInputter struct {
started chan struct{}
release chan struct{}
once sync.Once
done chan struct{}
}

func (b *wedgedInputter) Input(_ context.Context, _ string, _ map[string]any) error {
b.once.Do(func() { close(b.started) })
<-b.release
close(b.done)
return nil
}

// TestInputRouter_DetachBoundedByDispatchDrain pins #77.2: Detach now waits for
// the per-event dispatch goroutines to drain (so they don't outlive teardown),
// but the wait is BOUNDED — a dispatch wedged in a ctx-ignoring driver.Input
// must not hang the stop path (which would deadlock, since stopDriver only
// closes the transport that unblocks it AFTER Detach returns).
func TestInputRouter_DetachBoundedByDispatchDrain(t *testing.T) {
defer swapDrainTimeout(&inputDispatchDrainTimeout, 150*time.Millisecond)()

lister := &fakeInputLister{first: []AgentEvent{{
Seq: 1, Producer: "user", Kind: "input.answer",
Payload: json.RawMessage(`{"body":"hi"}`),
}}}
r := NewInputRouter(lister, silentLogger())
b := &wedgedInputter{
started: make(chan struct{}),
release: make(chan struct{}),
done: make(chan struct{}),
}
r.Attach(context.Background(), "a1", b, 0)

select {
case <-b.started:
case <-time.After(2 * time.Second):
t.Fatal("dispatch never started")
}

// Detach must return within ~the drain bound even though Input is wedged.
detached := make(chan struct{})
go func() { r.Detach("a1"); close(detached) }()
select {
case <-detached:
case <-time.After(inputDispatchDrainTimeout + 2*time.Second):
t.Fatal("Detach hung past the dispatch drain bound (#77.2 deadlock)")
}

// Release the straggler; it must complete (no leak) once unblocked, the way
// the real driver's Stop() unblocks it right after Detach.
close(b.release)
select {
case <-b.done:
case <-time.After(2 * time.Second):
t.Fatal("dispatch goroutine leaked after release")
}
}

// TestInputRouter_DetachWaitsForFastDispatch pins the normal-path guarantee:
// Detach does not return until an in-flight dispatch finishes (#77.2). The
// dispatch signals it has started, then does measurable work; Detach is called
// while it's mid-flight, and must have waited for completion when it returns.
// Without the drain, Detach would return during the work and `completed` would
// still be false.
func TestInputRouter_DetachWaitsForFastDispatch(t *testing.T) {
started := make(chan struct{})
var completed atomic.Bool
lister := &fakeInputLister{first: []AgentEvent{{
Seq: 1, Producer: "user", Kind: "input.answer",
Payload: json.RawMessage(`{"body":"hi"}`),
}}}
r := NewInputRouter(lister, silentLogger())
r.Attach(context.Background(), "a1", inputterFunc(func() error {
close(started)
time.Sleep(100 * time.Millisecond) // in-flight work, ignores ctx
completed.Store(true)
return nil
}), 0)

select {
case <-started:
case <-time.After(2 * time.Second):
t.Fatal("dispatch never started")
}
r.Detach("a1")
if !completed.Load() {
t.Fatal("Detach returned before the in-flight dispatch completed (#77.2)")
}
}

type inputterFunc func() error

func (f inputterFunc) Input(_ context.Context, _ string, _ map[string]any) error { return f() }

// blockingReader blocks on Read until unblock is closed, then reports EOF — a
// stand-in for a child stdout pipe that never closes, so a driver's readLoop
// scanner is stuck and only Stop()'s bound can end the wait (#77.3).
type blockingReader struct{ unblock chan struct{} }

func (b *blockingReader) Read(p []byte) (int, error) {
<-b.unblock
return 0, io.EOF
}

// TestStdioDriverStopBoundedWithoutCloser pins #77.3: with a nil Closer and a
// readLoop wedged on a never-closing pipe, Stop() must still return (bounded)
// instead of hanging the host-runner stop path on wg.Wait() forever.
func TestStdioDriverStopBoundedWithoutCloser(t *testing.T) {
defer swapDrainTimeout(&driverStopDrainTimeout, 150*time.Millisecond)()

rd := &blockingReader{unblock: make(chan struct{})}
d := &StdioDriver{
AgentID: "a1",
Poster: &capturingPoster{},
Stdout: rd,
Closer: nil, // the bug condition: nothing unblocks readLoop
Log: silentLogger(),
}
if err := d.Start(context.Background()); err != nil {
t.Fatalf("Start: %v", err)
}

stopped := make(chan struct{})
go func() { d.Stop(); close(stopped) }()
select {
case <-stopped:
case <-time.After(driverStopDrainTimeout + 2*time.Second):
t.Fatal("Stop hung past the readLoop drain bound (#77.3)")
}

// Cleanup: let the abandoned readLoop unwind.
close(rd.unblock)
}

// swapDrainTimeout sets *p to v and returns a restore func for defer.
func swapDrainTimeout(p *time.Duration, v time.Duration) func() {
old := *p
*p = v
return func() { *p = old }
}
Loading