fix(headless): harden drain/finalize against leaked reader threads (#1025)#1078
Conversation
| # done is set: _target finished, so ``errored`` is fully settled. | ||
| if errored.is_set(): | ||
| return "errored" | ||
| return "completed" |
There was a problem hiding this comment.
The snapshot trigger misses the leaked drain outcome — the exact race this PR fixes still fires on the "completed" path.
_drain_bounded only returns "budget_exceeded" (timeout) or "errored" (raise). But utils/subprocess_pgroup.py:drain_reader_threads has a third leak case: after force-close it computes leaked = [t for t in still_stuck if t.is_alive()], logs outcome=leaked, and returns normally (return [], well within the 90s budget). So a confirmed-alive leaked reader makes _drain_bounded return "completed" → ctx.drain_budget_exceeded stays False → _finalize_headless_result skips _snapshot_for_finalize and reads raw_messages / response_parts / execution_log / metadata while the leaked reader is still mutating them. metadata.model_copy(deep=True) (or iteration) can then raise RuntimeError: dictionary changed size during iteration with no retry guard, surfacing as an opaque HTTP 500.
The correct "a reader may still mutate ctx" signal is drain_outcome != "completed" including the leaked case — surface a "leaked" member on DrainOutcome fed from that branch, or treat it like the other two. As written the fix has a hole on the very path it declares safe.
| last_exc: Optional[RuntimeError] = None | ||
| for _ in range(_SNAPSHOT_RETRY_ATTEMPTS): | ||
| try: | ||
| return replace( |
There was a problem hiding this comment.
The snapshot doesn't isolate auth_abort_event / auth_abort_reason, which finalize reads off the still-live objects.
replace(ctx, ...) only overrides raw_messages, response_parts, execution_log, verbose_output_lines, metadata. The new context keeps the same auth_abort_event (Event) and auth_abort_reason (list) — both mutated by the read_stderr() reader thread (lines 438-439). On the leaked-reader path, finalize then reads ctx.auth_abort_event.is_set() (line 737) and ctx.auth_abort_reason[0] (738) on those live objects.
So if the leaked thread is the stderr reader and it matches an auth pattern in a late line after the snapshot but before line 737, an otherwise-successful run is converted to a spurious HTTP 503 — which task_execution_service (SUB-003) treats as an auth failure and uses to auto-switch the subscription. Narrow, but real.
At minimum this contradicts the helper's own docstring ("Snapshot EVERY field finalize reads"). Either snapshot the auth fields too, or document explicitly why they're exempt (as parse_failure_count/_sample already are).
| drain_outcome = _drain_bounded(process, stdout_thread, stderr_thread, | ||
| grace=3, pgid=ctx.process_pgid, | ||
| execution_tag=ctx.task_session_id) | ||
| ctx.drain_budget_exceeded = drain_outcome in ("budget_exceeded", "errored") |
There was a problem hiding this comment.
This drain_budget_exceeded write is dead for its stated purpose.
The TimeoutExpired branch sets the flag and then immediately raises. execute_headless_task catches TimeoutExpired and raises HTTPException(504) without calling _finalize_headless_result, so the flag set here is never read — only the post-exit drain at line 612 feeds finalize. Harmless today, but it reads as if the snapshot protects the timeout path when it does not, and it's a maintenance trap if the timeout branch is ever wired into finalize later. Consider dropping the assignment here (with a comment) or routing the timeout path through finalize so the snapshot actually applies.
| "never swallow exceptions silently" rule). | ||
| """ | ||
| done = threading.Event() | ||
| errored = threading.Event() |
There was a problem hiding this comment.
Minor (cleanup): the second errored Event is redundant. It's written by one thread and read by one thread only after the done barrier, so a single write-once cell captures the same thing with the ordering self-evident:
outcome = "completed"
def _target():
nonlocal outcome
try:
asyncio.run(_drain_reader_threads(...))
except Exception:
outcome = "errored"
logger.exception(...)
finally:
done.set()
...
if not done.wait(timeout=_DRAIN_BUDGET_SECONDS):
return "budget_exceeded"
return outcomeRemoves one Event allocation per drain and the load-bearing "errored is fully settled" comment. (A concurrent.futures Future would fold the timeout + exception-capture + outcome together even more cleanly.)
| response_parts=list(ctx.response_parts), | ||
| execution_log=list(ctx.execution_log), | ||
| verbose_output_lines=list(ctx.verbose_output_lines), | ||
| metadata=ctx.metadata.model_copy(deep=True), |
There was a problem hiding this comment.
Minor (cleanup): model_copy(deep=True) — ExecutionMetadata's only nested mutable field is compact_events, which finalize overwrites from the JSONL on the success path (_extract_compact_events_from_jsonl). Everything else is scalars, where deep=True == shallow. The deep copy is also precisely what can raise the "changed size during iteration" the 3-attempt retry loop exists to absorb. A deep=False (or model_copy(update={'compact_events': list(...)}) if a tear-safe copy of that one list is actually wanted) narrows the race window the retry is guarding.
Code review —
|
Review findings addressed —
|
| # | Finding | Fix |
|---|---|---|
| 1 | 🔴 leaked drain outcome missed → snapshot skipped while a confirmed-alive reader mutates ctx |
_drain_bounded adds a "leaked" DrainOutcome: after a within-budget, non-raising drain it checks whether any reader thread it was handed is still alive (no dependency on drain_reader_threads internals). Flag renamed reader_may_be_live, set on drain_outcome != "completed" so budget_exceeded / errored / leaked all snapshot. |
| 2 | 🟠 auth_abort_* not snapshotted → possible spurious 503 / SUB-003 auto-switch |
_snapshot_for_finalize now freezes both: a fresh Event mirroring is_set() (_freeze_event) + a list(...) copy of auth_abort_reason. The "snapshots every field finalize reads" claim now holds. |
| 3 | Dead flag on the timeout path | Dropped — the TimeoutExpired path re-raises to HTTP 504 without finalizing, so nothing reads it (replaced with a comment). |
| 4 | Redundant errored Event |
Collapsed to a single write-once outcome cell set by the daemon thread, read after the done barrier. |
| 5 | model_copy(deep=True) |
Kept deliberately — compact_events is a mutable list a leaked reader can append to, so the deep copy is the correct isolation on the snapshot path; the retry guard is needed regardless of depth (it covers __pydantic_fields_set__ growth). |
| — | Chat path (claude_code.py) has the same race |
Left as a follow-up — out of #1025's headless scope, and findings 1+2 argue the real fix is to lift this onto the shared _drain_bounded/finalize seam both callers cross rather than re-implement per-caller. Worth its own issue. |
New tests: "leaked"-outcome (within-budget drain leaving a live reader) + auth-abort freeze isolation; 21 passed locally. Re-verified on a live agent-server — clean task still 200s through the fast path with zero leaked/snapshot warnings.
Second review pass (post-fix
|
|
Resolve by running |
|
Validation: approve the fix on its merits, but please rebase before merge. The D19/D20 hardening itself looks solid — typed One blocking concern — the branch is stale and predates the fix it's meant to complement. It was cut from a
Please rebase onto current (For reference: the |
…1025) Salvages the two orthogonal robustness improvements from the closed PR #980 that the #973 fix did not include. Pure hardening of the already-bounded drain/finalize path — defense-in-depth, not a behavioural change on the clean path. D20 — capture daemon-thread exceptions in `_drain_bounded` `subprocess_lifecycle._drain_bounded` swallowed every drain exception via `except Exception: pass`, making a drain that raised indistinguishable from a clean completion (and hiding the leaked reader). It now captures + logs the exception and returns a `DrainOutcome` (`completed` | `budget_exceeded` | `errored`) so the caller can tell the leaked-reader cases apart. Existing callers that ignore the return value are unaffected. D19 — `_finalize_headless_result` snapshot isolation On the budget-exceeded / errored drain path a reader thread is leaked and may still be mutating ctx's shared buffers / metadata. The headless run context now carries `drain_budget_exceeded`; when set, finalize rebinds `ctx` to a deep snapshot (`list(...)` of each buffer + `metadata.model_copy(deep=True)`) via `_snapshot_for_finalize`, which is retry-guarded against the "changed size during iteration" race and falls back to the live ctx if every attempt loses. Clean drains keep the zero-copy fast path. Out of scope (per the issue): the #980 asyncio.wait_for margin widening and the #970 lsof/proc pipe-holder hunt — both correctly excluded. Tests: `_drain_bounded` outcome matrix (completed/budget_exceeded/errored + no-swallow logging) in test_drain_bounded.py; snapshot list/metadata isolation, retry-exhaustion fallback, and default-off fast path in test_headless_finalize_snapshot.py. Full related suite: 130 passed. Related to #1025 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
#1025) Self-review of the #1025 PR surfaced two correctness gaps in the hardening itself, plus a dead assignment and a redundant Event. Fixed: 1. Leaked-reader outcome was missed (strongest finding). drain_reader_threads force-closes and *returns normally* when a grandchild-held reader won't EOF (its own outcome=leaked METRIC, #586), so a within-budget, non-raising drain did NOT prove the readers were dead — _drain_bounded returned "completed", finalize skipped the snapshot, and read buffers a confirmed-alive reader was still mutating. _drain_bounded now adds a "leaked" DrainOutcome: after a within-budget return it checks whether any reader thread it was handed is still alive (no dependency on drain_reader_threads internals). The headless flag is renamed reader_may_be_live and set on `drain_outcome != "completed"` so budget_exceeded / errored / leaked all trigger the snapshot. 2. Snapshot didn't isolate the auth-abort signal. finalize reads auth_abort_event.is_set() / auth_abort_reason[0], but the stderr reader (the thread that leaks) mutates them — a late auth match could flip a success to a spurious 503 (→ SUB-003 auto-switch). _snapshot_for_finalize now freezes both (a fresh Event mirroring is_set() via _freeze_event + a list copy), so the snapshot's "every field finalize reads" claim holds. 3. Dropped the dead reader_may_be_live assignment on the TimeoutExpired path — that path re-raises to HTTP 504 without finalizing, so nothing reads it (replaced with a comment). 4. Collapsed the redundant `errored` Event in _drain_bounded to a single write-once `outcome` cell set by the daemon thread and read after the `done` barrier — removes the load-bearing two-Event ordering invariant. Tests: new "leaked"-outcome case (within-budget drain leaving a live reader) and auth-abort freeze isolation; renamed the flag test. 21 passed locally. Re-verified against a live agent-server: clean task still 200s through the fast path with zero leaked/snapshot warnings. Related to #1025 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
e65238a to
b5daa8b
Compare
vybe
left a comment
There was a problem hiding this comment.
Approving. Verified the load-bearing claim independently: _snapshot_for_finalize captures exactly the 7 ctx buffers _finalize_headless_result reads (lists, metadata deep-copy, frozen auth-abort event/reason); the only iterating read (execution_log) is over the captured copy; tool_start_times is correctly omitted (read only by the #970 stall-watchdog, never by finalize). Outcome matrix + no-swallow logging + snapshot isolation/fallback/auth-freeze are well-tested and CI is green.
reader_may_be_live on the kill path because "the 504 path never calls _finalize_headless_result." #1229 adds _timeout_504_detail(ctx, …) which reads ctx.metadata/buffers on exactly that kill/504 path. Whichever lands second must reconcile — read 504 telemetry from a snapshot (or set reader_may_be_live on the kill path) so the leaked-reader race isn't reopened on the timeout path.
Summary
Salvages the two orthogonal robustness improvements from the closed PR #980 that the #973 fix (the actual 2h false-timeout bug) did not include. Pure hardening of the already-bounded drain/finalize path — defense-in-depth, no behavioural change on the clean path.
D20 — capture daemon-thread exceptions in
_drain_boundedsubprocess_lifecycle._drain_boundedswallowed every drain exception viaexcept Exception: pass, making a drain that raised indistinguishable from a clean completion (and hiding the leaked reader). It now captures + logs the exception and returns aDrainOutcome(completed|budget_exceeded|errored). Existing callers that ignore the return value (claude_code.py) are unaffected.D19 —
_finalize_headless_resultsnapshot isolationOn the budget-exceeded / errored drain path a reader thread is leaked and may still be mutating ctx's shared buffers / metadata. The run context now carries
drain_budget_exceeded; when set, finalize rebindsctxto a deep snapshot (list(...)of each buffer +metadata.model_copy(deep=True)) via_snapshot_for_finalize, retry-guarded against the "changed size during iteration" race with a live-ctx fallback. Clean drains keep the zero-copy fast path.Explicitly out of scope (per the issue)
asyncio.wait_forouter-margin widening (fix(#970): bound headless wait on result message + tool stall #973 leaves it as-is)/procpipe-holder hunt (reintroduces the D-state deadlock bug: Runaway subprocess after task failure makes agent-server unresponsive, triggering persistent CB cascade #817 removed)Tests
test_drain_bounded.py: outcome matrix —completed/budget_exceeded/errored, plus no-swallow logging assertiontest_headless_finalize_snapshot.py(new): list + metadata isolation from post-snapshot mutation, retry-exhaustion fallback to live ctx, default-off fast path-k "drain or headless or claude or subprocess or pgroup or orphan"): 130 passedRelated to #1025
🤖 Generated with Claude Code