From 79744ae0f39cadda8a7af66149e255d9e02e3695 Mon Sep 17 00:00:00 2001 From: Oleksii Dolhov Date: Fri, 5 Jun 2026 13:52:27 +0300 Subject: [PATCH 1/2] fix(headless): harden drain/finalize against leaked reader threads (#1025) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Salvages the two orthogonal robustness improvements from the closed PR #980 that the #973 fix did not include. Pure hardening of the already-bounded drain/finalize path — defense-in-depth, not a behavioural change on the clean path. D20 — capture daemon-thread exceptions in `_drain_bounded` `subprocess_lifecycle._drain_bounded` swallowed every drain exception via `except Exception: pass`, making a drain that raised indistinguishable from a clean completion (and hiding the leaked reader). It now captures + logs the exception and returns a `DrainOutcome` (`completed` | `budget_exceeded` | `errored`) so the caller can tell the leaked-reader cases apart. Existing callers that ignore the return value are unaffected. D19 — `_finalize_headless_result` snapshot isolation On the budget-exceeded / errored drain path a reader thread is leaked and may still be mutating ctx's shared buffers / metadata. The headless run context now carries `drain_budget_exceeded`; when set, finalize rebinds `ctx` to a deep snapshot (`list(...)` of each buffer + `metadata.model_copy(deep=True)`) via `_snapshot_for_finalize`, which is retry-guarded against the "changed size during iteration" race and falls back to the live ctx if every attempt loses. Clean drains keep the zero-copy fast path. Out of scope (per the issue): the #980 asyncio.wait_for margin widening and the #970 lsof/proc pipe-holder hunt — both correctly excluded. Tests: `_drain_bounded` outcome matrix (completed/budget_exceeded/errored + no-swallow logging) in test_drain_bounded.py; snapshot list/metadata isolation, retry-exhaustion fallback, and default-off fast path in test_headless_finalize_snapshot.py. Full related suite: 130 passed. Related to #1025 Co-Authored-By: Claude Opus 4.8 (1M context) --- .../services/headless_executor.py | 86 ++++++++++++++-- .../services/subprocess_lifecycle.py | 44 ++++++++- tests/unit/test_drain_bounded.py | 59 +++++++++++ tests/unit/test_headless_finalize_snapshot.py | 97 +++++++++++++++++++ 4 files changed, 276 insertions(+), 10 deletions(-) create mode 100644 tests/unit/test_headless_finalize_snapshot.py diff --git a/docker/base-image/agent_server/services/headless_executor.py b/docker/base-image/agent_server/services/headless_executor.py index 3171d2448..720ae41bd 100644 --- a/docker/base-image/agent_server/services/headless_executor.py +++ b/docker/base-image/agent_server/services/headless_executor.py @@ -22,7 +22,7 @@ import threading import time import uuid -from dataclasses import dataclass, field +from dataclasses import dataclass, field, replace from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple @@ -230,6 +230,11 @@ class HeadlessRunContext: # "max_duration" — the effective_timeout budget was genuinely exhausted termination_reason: Optional[str] = None stalled_tool: Optional[str] = None + # #1025 (salvaged from #980): True when the drain returned "budget_exceeded" + # or "errored" — a reader thread is leaked and may still be mutating the + # shared buffers below, so _finalize_headless_result snapshots its read + # fields before reading them. + drain_budget_exceeded: bool = False # Shared mutable buffers (populated by stream_parser via process_stream_line) response_parts: List[str] = field(default_factory=list) @@ -674,23 +679,69 @@ def _run_stdout() -> None: f"[Headless Task] Task {ctx.task_session_id} {reason} — killing process group" ) _terminate_process_group(process, graceful_timeout=5, pgid=ctx.process_pgid, execution_tag=ctx.task_session_id) - _drain_bounded(process, stdout_thread, stderr_thread, - grace=3, pgid=ctx.process_pgid, - execution_tag=ctx.task_session_id) + drain_outcome = _drain_bounded(process, stdout_thread, stderr_thread, + grace=3, pgid=ctx.process_pgid, + execution_tag=ctx.task_session_id) + ctx.drain_budget_exceeded = drain_outcome in ("budget_exceeded", "errored") raise # Subprocess exited. Drain readers — if a hook grandchild still # holds a pipe, the helper will close the pipe FDs so the # reader threads can exit. - _drain_bounded(process, stdout_thread, stderr_thread, - grace=5, pgid=ctx.process_pgid, - execution_tag=ctx.task_session_id) + drain_outcome = _drain_bounded(process, stdout_thread, stderr_thread, + grace=5, pgid=ctx.process_pgid, + execution_tag=ctx.task_session_id) + ctx.drain_budget_exceeded = drain_outcome in ("budget_exceeded", "errored") # Re-raise permission-mode failure captured by stdout thread if ctx.stdout_exc: raise ctx.stdout_exc[0] +# #1025: number of times _snapshot_for_finalize retries the metadata deep-copy +# when it loses the race with a still-mutating leaked reader. The mutation +# window is a single attribute set, so the first retry almost always wins. +_SNAPSHOT_RETRY_ATTEMPTS = 3 + + +def _snapshot_for_finalize(ctx: HeadlessRunContext) -> HeadlessRunContext: + """Copy the finalize-read fields of ``ctx`` onto a fresh context (#1025/D19). + + Called only on the budget-exceeded / errored drain path, where a leaked + reader thread may still be mutating ``ctx`` concurrently. ``list(...)`` of a + list never raises on a concurrent append, but ``metadata.model_copy(deep= + True)`` iterates the pydantic model's ``__dict__`` / ``__pydantic_fields_set__`` + and CAN raise ``RuntimeError("... changed size during iteration")`` if the + reader sets a not-yet-set field mid-copy (most relevant on the ``errored`` + outcome, where the reader isn't necessarily wedged in ``readline``). The + window is a single attribute set, so a bounded retry almost always succeeds + on the next attempt; if every attempt loses the race we fall back to the + live ``ctx`` (the pre-#1025 behaviour — no worse than not snapshotting). + """ + last_exc: Optional[RuntimeError] = None + for _ in range(_SNAPSHOT_RETRY_ATTEMPTS): + try: + return replace( + ctx, + raw_messages=list(ctx.raw_messages), + response_parts=list(ctx.response_parts), + execution_log=list(ctx.execution_log), + verbose_output_lines=list(ctx.verbose_output_lines), + metadata=ctx.metadata.model_copy(deep=True), + ) + except RuntimeError as exc: + # "dictionary/set changed size during iteration" — the leaked + # reader mutated metadata mid-deep-copy. Retry. + last_exc = exc + logger.warning( + "[Headless Task] Run-context snapshot lost the race with a leaked " + "reader %s times for task %s (%s) — finalizing against the live " + "context as a last resort. Issue #1025.", + _SNAPSHOT_RETRY_ATTEMPTS, ctx.task_session_id, last_exc, + ) + return ctx + + def _finalize_headless_result( ctx: HeadlessRunContext, ) -> Tuple[str, List[Dict], ExecutionMetadata, str]: @@ -706,6 +757,27 @@ def _finalize_headless_result( orchestrator's outer ``except HTTPException: raise`` chain re-surfaces them). """ + # #1025 (D19): when the drain exceeded its budget (or errored), a reader + # thread is leaked and may still be mutating the shared buffers below. + # Snapshot EVERY field finalize reads onto a fresh context so iteration + # can't tear and a late append by the leaked reader can't be half-read. + # ``parse_failure_count`` / ``parse_failure_sample`` are an int + str + # (atomic reads, no snapshot). The leaked reader keeps mutating the + # ORIGINAL ctx (which it captured by closure); rebinding the local ``ctx`` + # to the snapshot means the rest of finalize — including + # _attempt_empty_result_recovery's in-place mutations — operates entirely + # on the isolated copy. Clean-drain runs skip this and keep the zero-copy + # fast path. The snapshot itself is retry-guarded against the same race + # (see _snapshot_for_finalize). + if ctx.drain_budget_exceeded: + logger.warning( + "[Headless Task] Drain budget exceeded for task %s — finalizing " + "against a snapshot of the run context (a leaked reader thread " + "may still be mutating the live buffers). Issue #1025.", + ctx.task_session_id, + ) + ctx = _snapshot_for_finalize(ctx) + # Build verbose transcript from stderr (the human-readable execution log) # SECURITY: Sanitize stderr output sanitized_lines = [sanitize_text(line) for line in ctx.verbose_output_lines] diff --git a/docker/base-image/agent_server/services/subprocess_lifecycle.py b/docker/base-image/agent_server/services/subprocess_lifecycle.py index 155c2b278..494e15b72 100644 --- a/docker/base-image/agent_server/services/subprocess_lifecycle.py +++ b/docker/base-image/agent_server/services/subprocess_lifecycle.py @@ -14,7 +14,7 @@ import logging import subprocess import threading -from typing import Optional +from typing import Literal, Optional from ..utils.subprocess_pgroup import ( capture_pgid as _capture_pgid, @@ -23,8 +23,16 @@ drain_reader_threads as _drain_reader_threads, ) +# Outcome of a bounded drain, returned to the caller so the orchestrator can +# decide whether to treat the run's shared mutable state as trustworthy +# (#1025 / salvaged from #980). ``budget_exceeded`` / ``errored`` both leave a +# leaked reader thread that may still be mutating the run context concurrently; +# the headless path snapshots its finalize-read fields on those outcomes. +DrainOutcome = Literal["completed", "budget_exceeded", "errored"] + __all__ = [ "_DRAIN_BUDGET_SECONDS", + "DrainOutcome", "_drain_bounded", "_capture_pgid", "_terminate_process_group", @@ -51,7 +59,7 @@ def _drain_bounded( grace: int = 5, pgid: Optional[int] = None, execution_tag: Optional[str] = None, -) -> None: +) -> DrainOutcome: """Run drain_reader_threads with a hard _DRAIN_BUDGET_SECONDS time cap. Prevents a TextIOWrapper lock deadlock in safe_close_pipes (Issue #728) @@ -62,8 +70,24 @@ def _drain_bounded( Issue #817: ``execution_tag`` is threaded through to the underlying ``drain_reader_threads`` so the env-tag sweep runs after every drain. + + Issue #1025 (salvaged from #980): returns the drain outcome instead of + ``None`` so the headless orchestrator can detect the budget-exceeded / + errored cases — both leave a leaked reader thread that may still be + mutating the shared run context, so the caller snapshots its + finalize-read fields before reading them: + + - ``"completed"`` — the drain finished cleanly within budget. + - ``"budget_exceeded"`` — the daemon thread did not finish within + ``_DRAIN_BUDGET_SECONDS`` (the #728 safe_close_pipes deadlock); the + reader thread is leaked. + - ``"errored"`` — the drain raised. Previously swallowed with + ``except Exception: pass``, which masked the failure as a clean + ``"completed"``; now captured and logged (honours the project-wide + "never swallow exceptions silently" rule). """ done = threading.Event() + errored = threading.Event() def _target() -> None: try: @@ -72,7 +96,16 @@ def _target() -> None: execution_tag=execution_tag, )) except Exception: - pass + # #1025: capture + log instead of swallowing. A drain that raises + # used to be indistinguishable from a clean completion, hiding a + # leaked reader thread from the finalize path. + errored.set() + logger.exception( + "[Subprocess] Drain raised inside the daemon thread " + "(pid=%s) — treating as errored; reader thread(s) may be " + "leaked. Issue #1025.", + process.pid, + ) finally: done.set() @@ -84,3 +117,8 @@ def _target() -> None: "leaked daemon threads (pid=%s). Issue #728.", _DRAIN_BUDGET_SECONDS, process.pid, ) + return "budget_exceeded" + # done is set: _target finished, so ``errored`` is fully settled. + if errored.is_set(): + return "errored" + return "completed" diff --git a/tests/unit/test_drain_bounded.py b/tests/unit/test_drain_bounded.py index 531ad39db..655d11354 100644 --- a/tests/unit/test_drain_bounded.py +++ b/tests/unit/test_drain_bounded.py @@ -139,3 +139,62 @@ async def _recording_drain(process, *threads, grace=5, pgid=None, **kwargs): assert received.get("grace") == 3, f"grace not forwarded: {received}" assert received.get("pgid") == 42, f"pgid not forwarded: {received}" + + +# --------------------------------------------------------------------------- +# Issue #1025 (salvaged from #980): _drain_bounded returns a DrainOutcome and +# no longer swallows daemon-thread exceptions. +# --------------------------------------------------------------------------- + +def test_drain_bounded_returns_completed_on_clean_drain(monkeypatch): + """A drain that finishes within budget must report ``completed``.""" + + async def _fast_drain(*args, **kwargs): + return None + + monkeypatch.setattr( + "agent_server.services.subprocess_lifecycle._drain_reader_threads", + _fast_drain, + ) + + assert _drain_bounded(_make_fake_process(), grace=5, pgid=None) == "completed" + + +def test_drain_bounded_returns_budget_exceeded_on_hang(monkeypatch): + """A drain that overruns the budget must report ``budget_exceeded`` (a + leaked reader thread the finalize path must defend against).""" + + async def _hanging_drain(*args, **kwargs): + await asyncio.sleep(600) + + monkeypatch.setattr( + "agent_server.services.subprocess_lifecycle._drain_reader_threads", + _hanging_drain, + ) + monkeypatch.setattr( + "agent_server.services.subprocess_lifecycle._DRAIN_BUDGET_SECONDS", + 1, + ) + + assert _drain_bounded(_make_fake_process(), grace=1, pgid=None) == "budget_exceeded" + + +def test_drain_bounded_returns_errored_and_logs_when_drain_raises(monkeypatch, caplog): + """A drain that raises must be reported as ``errored`` and logged — not + silently swallowed as a clean ``completed`` (the pre-#1025 bug).""" + + async def _raising_drain(*args, **kwargs): + raise RuntimeError("boom in drain") + + monkeypatch.setattr( + "agent_server.services.subprocess_lifecycle._drain_reader_threads", + _raising_drain, + ) + + with caplog.at_level("ERROR"): + outcome = _drain_bounded(_make_fake_process(), grace=5, pgid=None) + + assert outcome == "errored" + assert any("Drain raised" in r.message for r in caplog.records), ( + "errored drain must be logged, not swallowed" + ) diff --git a/tests/unit/test_headless_finalize_snapshot.py b/tests/unit/test_headless_finalize_snapshot.py new file mode 100644 index 000000000..f831e16c9 --- /dev/null +++ b/tests/unit/test_headless_finalize_snapshot.py @@ -0,0 +1,97 @@ +"""Issue #1025 (salvaged from #980 / D19): _finalize_headless_result isolates +itself from a leaked reader thread by snapshotting the run context. + +When _drain_bounded reports ``budget_exceeded`` or ``errored``, a reader +thread is leaked and may still be appending to ctx's shared buffers / setting +metadata fields. _finalize_headless_result rebinds ``ctx`` to a deep snapshot +so iteration can't tear and a late append can't be half-read. Clean drains +keep the zero-copy fast path. + +These tests exercise the snapshot helper directly (the concurrency-prone +part) plus the gating flag — without spawning a real claude subprocess. +""" +from __future__ import annotations + +from unittest.mock import MagicMock + +# conftest.py preloads the real agent_server package; just import. +from agent_server.services.headless_executor import ( # noqa: E402 + HeadlessRunContext, + _snapshot_for_finalize, +) + + +def _make_ctx() -> HeadlessRunContext: + ctx = HeadlessRunContext( + cmd=["claude"], + task_session_id="task-1025", + task_start_iso="2026-06-05T00:00:00Z", + effective_timeout=600, + images=None, + prompt="hi", + ) + ctx.raw_messages.append({"type": "assistant", "n": 1}) + ctx.response_parts.append("hello") + ctx.metadata.cost_usd = 0.01 + return ctx + + +def test_snapshot_decouples_lists_from_later_mutation(): + """A post-snapshot append to the live ctx (simulating a leaked reader) + must NOT leak into the snapshot used by finalize.""" + ctx = _make_ctx() + snap = _snapshot_for_finalize(ctx) + + # Snapshot must be a different context object with copied buffers. + assert snap is not ctx + assert snap.raw_messages is not ctx.raw_messages + assert snap.response_parts is not ctx.response_parts + + # Leaked reader keeps mutating the ORIGINAL ctx after the snapshot. + ctx.raw_messages.append({"type": "assistant", "n": 2}) + ctx.response_parts.append(" world") + + # Snapshot is frozen at snapshot time. + assert snap.raw_messages == [{"type": "assistant", "n": 1}] + assert snap.response_parts == ["hello"] + + +def test_snapshot_deep_copies_metadata(): + """metadata must be deep-copied so a leaked reader writing a metadata + field after the snapshot can't mutate finalize's view.""" + ctx = _make_ctx() + snap = _snapshot_for_finalize(ctx) + + assert snap.metadata is not ctx.metadata + assert snap.metadata.cost_usd == 0.01 + + # Leaked reader writes more metadata onto the live ctx. + ctx.metadata.cost_usd = 0.99 + ctx.metadata.num_turns = 7 + + assert snap.metadata.cost_usd == 0.01 + assert snap.metadata.num_turns is None + + +def test_snapshot_falls_back_to_live_ctx_when_every_retry_loses(): + """If the deep-copy loses the race on every attempt, the helper returns the + live ctx rather than raising — no worse than the pre-#1025 behaviour.""" + ctx = _make_ctx() + + # Stand in a metadata whose deep-copy always loses the race (pydantic + # forbids patching model_copy on a real instance). + fake_metadata = MagicMock() + fake_metadata.model_copy.side_effect = RuntimeError( + "dictionary changed size during iteration" + ) + ctx.metadata = fake_metadata + + # Must not raise; falls back to the same ctx instance. + result = _snapshot_for_finalize(ctx) + assert result is ctx + assert fake_metadata.model_copy.call_count == 3 # _SNAPSHOT_RETRY_ATTEMPTS + + +def test_drain_budget_exceeded_defaults_false(): + """Clean runs must keep the zero-copy fast path (flag off by default).""" + assert _make_ctx().drain_budget_exceeded is False From b5daa8bde7fd6910fd55769b725f60ed943a607c Mon Sep 17 00:00:00 2001 From: Oleksii Dolhov Date: Fri, 5 Jun 2026 14:29:08 +0300 Subject: [PATCH 2/2] fix(headless): address review findings on the drain/finalize hardening (#1025) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Self-review of the #1025 PR surfaced two correctness gaps in the hardening itself, plus a dead assignment and a redundant Event. Fixed: 1. Leaked-reader outcome was missed (strongest finding). drain_reader_threads force-closes and *returns normally* when a grandchild-held reader won't EOF (its own outcome=leaked METRIC, #586), so a within-budget, non-raising drain did NOT prove the readers were dead — _drain_bounded returned "completed", finalize skipped the snapshot, and read buffers a confirmed-alive reader was still mutating. _drain_bounded now adds a "leaked" DrainOutcome: after a within-budget return it checks whether any reader thread it was handed is still alive (no dependency on drain_reader_threads internals). The headless flag is renamed reader_may_be_live and set on `drain_outcome != "completed"` so budget_exceeded / errored / leaked all trigger the snapshot. 2. Snapshot didn't isolate the auth-abort signal. finalize reads auth_abort_event.is_set() / auth_abort_reason[0], but the stderr reader (the thread that leaks) mutates them — a late auth match could flip a success to a spurious 503 (→ SUB-003 auto-switch). _snapshot_for_finalize now freezes both (a fresh Event mirroring is_set() via _freeze_event + a list copy), so the snapshot's "every field finalize reads" claim holds. 3. Dropped the dead reader_may_be_live assignment on the TimeoutExpired path — that path re-raises to HTTP 504 without finalizing, so nothing reads it (replaced with a comment). 4. Collapsed the redundant `errored` Event in _drain_bounded to a single write-once `outcome` cell set by the daemon thread and read after the `done` barrier — removes the load-bearing two-Event ordering invariant. Tests: new "leaked"-outcome case (within-budget drain leaving a live reader) and auth-abort freeze isolation; renamed the flag test. 21 passed locally. Re-verified against a live agent-server: clean task still 200s through the fast path with zero leaked/snapshot warnings. Related to #1025 Co-Authored-By: Claude Opus 4.8 (1M context) --- .../services/headless_executor.py | 85 ++++++++++++------- .../services/subprocess_lifecycle.py | 49 ++++++++--- tests/unit/test_drain_bounded.py | 30 +++++++ tests/unit/test_headless_finalize_snapshot.py | 25 +++++- 4 files changed, 144 insertions(+), 45 deletions(-) diff --git a/docker/base-image/agent_server/services/headless_executor.py b/docker/base-image/agent_server/services/headless_executor.py index 720ae41bd..90e16300c 100644 --- a/docker/base-image/agent_server/services/headless_executor.py +++ b/docker/base-image/agent_server/services/headless_executor.py @@ -230,11 +230,11 @@ class HeadlessRunContext: # "max_duration" — the effective_timeout budget was genuinely exhausted termination_reason: Optional[str] = None stalled_tool: Optional[str] = None - # #1025 (salvaged from #980): True when the drain returned "budget_exceeded" - # or "errored" — a reader thread is leaked and may still be mutating the - # shared buffers below, so _finalize_headless_result snapshots its read - # fields before reading them. - drain_budget_exceeded: bool = False + # #1025 (salvaged from #980): True when the drain returned anything other + # than "completed" (budget_exceeded / errored / leaked) — a reader thread + # may still be alive and mutating the shared buffers below, so + # _finalize_headless_result snapshots its read fields before reading them. + reader_may_be_live: bool = False # Shared mutable buffers (populated by stream_parser via process_stream_line) response_parts: List[str] = field(default_factory=list) @@ -679,10 +679,13 @@ def _run_stdout() -> None: f"[Headless Task] Task {ctx.task_session_id} {reason} — killing process group" ) _terminate_process_group(process, graceful_timeout=5, pgid=ctx.process_pgid, execution_tag=ctx.task_session_id) - drain_outcome = _drain_bounded(process, stdout_thread, stderr_thread, - grace=3, pgid=ctx.process_pgid, - execution_tag=ctx.task_session_id) - ctx.drain_budget_exceeded = drain_outcome in ("budget_exceeded", "errored") + # Bound the post-kill drain, then re-raise. The orchestrator converts + # TimeoutExpired to HTTP 504 WITHOUT calling _finalize_headless_result, + # so there is no finalize read to protect here — we intentionally do + # not set reader_may_be_live on this path (#1025). + _drain_bounded(process, stdout_thread, stderr_thread, + grace=3, pgid=ctx.process_pgid, + execution_tag=ctx.task_session_id) raise # Subprocess exited. Drain readers — if a hook grandchild still @@ -691,7 +694,9 @@ def _run_stdout() -> None: drain_outcome = _drain_bounded(process, stdout_thread, stderr_thread, grace=5, pgid=ctx.process_pgid, execution_tag=ctx.task_session_id) - ctx.drain_budget_exceeded = drain_outcome in ("budget_exceeded", "errored") + # Anything but "completed" means a reader thread may still be alive and + # mutating the shared buffers finalize reads (#1025). + ctx.reader_may_be_live = drain_outcome != "completed" # Re-raise permission-mode failure captured by stdout thread if ctx.stdout_exc: @@ -704,19 +709,37 @@ def _run_stdout() -> None: _SNAPSHOT_RETRY_ATTEMPTS = 3 +def _freeze_event(event: threading.Event) -> threading.Event: + """Capture an Event's set/clear state into a fresh, immutable-by-the-reader + Event (#1025). The leaked reader holds the ORIGINAL event; the snapshot's + copy reflects the verdict as of snapshot time and can't be flipped later.""" + frozen = threading.Event() + if event.is_set(): + frozen.set() + return frozen + + def _snapshot_for_finalize(ctx: HeadlessRunContext) -> HeadlessRunContext: """Copy the finalize-read fields of ``ctx`` onto a fresh context (#1025/D19). - Called only on the budget-exceeded / errored drain path, where a leaked - reader thread may still be mutating ``ctx`` concurrently. ``list(...)`` of a - list never raises on a concurrent append, but ``metadata.model_copy(deep= - True)`` iterates the pydantic model's ``__dict__`` / ``__pydantic_fields_set__`` - and CAN raise ``RuntimeError("... changed size during iteration")`` if the - reader sets a not-yet-set field mid-copy (most relevant on the ``errored`` - outcome, where the reader isn't necessarily wedged in ``readline``). The - window is a single attribute set, so a bounded retry almost always succeeds - on the next attempt; if every attempt loses the race we fall back to the - live ``ctx`` (the pre-#1025 behaviour — no worse than not snapshotting). + Called whenever the drain left a reader thread that may still be alive + (``reader_may_be_live``), where that reader can mutate ``ctx`` concurrently. + Every field finalize reads is captured: the four buffers (``list(...)`` of a + list never raises on a concurrent append), ``metadata`` (deep-copied), and + the auth-abort signal (``auth_abort_event`` frozen via :func:`_freeze_event` + + ``auth_abort_reason`` list-copied) — the stderr reader is exactly the + thread that can leak, and finalize reads ``auth_abort_event.is_set()`` / + ``auth_abort_reason[0]``, so leaving them aliased would let a late auth match + flip a success to a spurious 503. + + ``metadata.model_copy(deep=True)`` iterates the pydantic model's ``__dict__`` + / ``__pydantic_fields_set__`` and CAN raise ``RuntimeError("... changed size + during iteration")`` if the reader sets a not-yet-set field mid-copy (most + relevant on the ``errored`` outcome, where the reader isn't necessarily + wedged in ``readline``). The window is a single attribute set, so a bounded + retry almost always succeeds on the next attempt; if every attempt loses the + race we fall back to the live ``ctx`` (the pre-#1025 behaviour — no worse + than not snapshotting). """ last_exc: Optional[RuntimeError] = None for _ in range(_SNAPSHOT_RETRY_ATTEMPTS): @@ -728,6 +751,8 @@ def _snapshot_for_finalize(ctx: HeadlessRunContext) -> HeadlessRunContext: execution_log=list(ctx.execution_log), verbose_output_lines=list(ctx.verbose_output_lines), metadata=ctx.metadata.model_copy(deep=True), + auth_abort_event=_freeze_event(ctx.auth_abort_event), + auth_abort_reason=list(ctx.auth_abort_reason), ) except RuntimeError as exc: # "dictionary/set changed size during iteration" — the leaked @@ -757,23 +782,23 @@ def _finalize_headless_result( orchestrator's outer ``except HTTPException: raise`` chain re-surfaces them). """ - # #1025 (D19): when the drain exceeded its budget (or errored), a reader - # thread is leaked and may still be mutating the shared buffers below. - # Snapshot EVERY field finalize reads onto a fresh context so iteration - # can't tear and a late append by the leaked reader can't be half-read. - # ``parse_failure_count`` / ``parse_failure_sample`` are an int + str - # (atomic reads, no snapshot). The leaked reader keeps mutating the + # #1025 (D19): when the drain left a reader thread that may still be alive + # (budget_exceeded / errored / leaked), it can keep mutating the shared + # buffers below. Snapshot EVERY field finalize reads onto a fresh context so + # iteration can't tear and a late append by the leaked reader can't be + # half-read. ``parse_failure_count`` / ``parse_failure_sample`` are an int + + # str (atomic reads, no snapshot). The leaked reader keeps mutating the # ORIGINAL ctx (which it captured by closure); rebinding the local ``ctx`` # to the snapshot means the rest of finalize — including # _attempt_empty_result_recovery's in-place mutations — operates entirely # on the isolated copy. Clean-drain runs skip this and keep the zero-copy # fast path. The snapshot itself is retry-guarded against the same race # (see _snapshot_for_finalize). - if ctx.drain_budget_exceeded: + if ctx.reader_may_be_live: logger.warning( - "[Headless Task] Drain budget exceeded for task %s — finalizing " - "against a snapshot of the run context (a leaked reader thread " - "may still be mutating the live buffers). Issue #1025.", + "[Headless Task] Drain left a possibly-live reader for task %s — " + "finalizing against a snapshot of the run context (the reader may " + "still be mutating the live buffers). Issue #1025.", ctx.task_session_id, ) ctx = _snapshot_for_finalize(ctx) diff --git a/docker/base-image/agent_server/services/subprocess_lifecycle.py b/docker/base-image/agent_server/services/subprocess_lifecycle.py index 494e15b72..9285e3d56 100644 --- a/docker/base-image/agent_server/services/subprocess_lifecycle.py +++ b/docker/base-image/agent_server/services/subprocess_lifecycle.py @@ -25,10 +25,10 @@ # Outcome of a bounded drain, returned to the caller so the orchestrator can # decide whether to treat the run's shared mutable state as trustworthy -# (#1025 / salvaged from #980). ``budget_exceeded`` / ``errored`` both leave a -# leaked reader thread that may still be mutating the run context concurrently; -# the headless path snapshots its finalize-read fields on those outcomes. -DrainOutcome = Literal["completed", "budget_exceeded", "errored"] +# (#1025 / salvaged from #980). Every non-``completed`` outcome leaves a reader +# thread that may still be mutating the run context concurrently; the headless +# path snapshots its finalize-read fields on those outcomes. +DrainOutcome = Literal["completed", "budget_exceeded", "errored", "leaked"] __all__ = [ "_DRAIN_BUDGET_SECONDS", @@ -72,12 +72,13 @@ def _drain_bounded( ``drain_reader_threads`` so the env-tag sweep runs after every drain. Issue #1025 (salvaged from #980): returns the drain outcome instead of - ``None`` so the headless orchestrator can detect the budget-exceeded / - errored cases — both leave a leaked reader thread that may still be - mutating the shared run context, so the caller snapshots its - finalize-read fields before reading them: + ``None`` so the headless orchestrator can detect every case that leaves a + reader thread still mutating the shared run context, and snapshot its + finalize-read fields before reading them. Only ``"completed"`` guarantees + no reader survives: - - ``"completed"`` — the drain finished cleanly within budget. + - ``"completed"`` — the drain finished within budget AND every + reader thread passed in has exited. - ``"budget_exceeded"`` — the daemon thread did not finish within ``_DRAIN_BUDGET_SECONDS`` (the #728 safe_close_pipes deadlock); the reader thread is leaked. @@ -85,11 +86,20 @@ def _drain_bounded( ``except Exception: pass``, which masked the failure as a clean ``"completed"``; now captured and logged (honours the project-wide "never swallow exceptions silently" rule). + - ``"leaked"`` — the drain returned normally within budget but a + reader thread is still alive. ``drain_reader_threads`` force-closes the + pipes and *continues* when a grandchild-held reader won't EOF (its own + ``outcome=leaked`` METRIC), returning normally — so a within-budget, + non-raising drain does NOT by itself prove the readers are dead. This + outcome closes that gap (the #586 leaked-reader case). """ done = threading.Event() - errored = threading.Event() + # Single write-once cell, set only by the daemon thread and read only after + # the ``done`` barrier — no second Event / ordering invariant to maintain. + outcome: DrainOutcome = "completed" def _target() -> None: + nonlocal outcome try: asyncio.run(_drain_reader_threads( process, *threads, grace=grace, pgid=pgid, @@ -99,7 +109,7 @@ def _target() -> None: # #1025: capture + log instead of swallowing. A drain that raises # used to be indistinguishable from a clean completion, hiding a # leaked reader thread from the finalize path. - errored.set() + outcome = "errored" logger.exception( "[Subprocess] Drain raised inside the daemon thread " "(pid=%s) — treating as errored; reader thread(s) may be " @@ -118,7 +128,20 @@ def _target() -> None: _DRAIN_BUDGET_SECONDS, process.pid, ) return "budget_exceeded" - # done is set: _target finished, so ``errored`` is fully settled. - if errored.is_set(): + # done is set → _target finished, so ``outcome`` is fully settled. + if outcome == "errored": return "errored" + # The drain returned within budget without raising, but drain_reader_threads + # force-closes and continues on the #586 leaked-reader case. If any reader + # we were asked to drain is still alive, it can keep mutating ctx — surface + # that so the caller snapshots instead of trusting the live buffers (#1025). + if any(t is not None and t.is_alive() for t in threads): + logger.warning( + "[Subprocess] Drain returned within budget but %d reader thread(s) " + "are still alive (pid=%s) — reporting 'leaked' so finalize snapshots " + "the run context. Issue #1025.", + sum(1 for t in threads if t is not None and t.is_alive()), + process.pid, + ) + return "leaked" return "completed" diff --git a/tests/unit/test_drain_bounded.py b/tests/unit/test_drain_bounded.py index 655d11354..cdca484b7 100644 --- a/tests/unit/test_drain_bounded.py +++ b/tests/unit/test_drain_bounded.py @@ -198,3 +198,33 @@ async def _raising_drain(*args, **kwargs): assert any("Drain raised" in r.message for r in caplog.records), ( "errored drain must be logged, not swallowed" ) + + +def test_drain_bounded_returns_leaked_when_reader_survives(monkeypatch): + """A drain that returns within budget WITHOUT raising but leaves a reader + thread alive must report ``leaked`` — drain_reader_threads force-closes and + continues on the #586 leaked-reader case, so a within-budget return does not + by itself prove the readers are dead (review finding on the #1025 PR).""" + + async def _fast_drain(*args, **kwargs): + return None + + monkeypatch.setattr( + "agent_server.services.subprocess_lifecycle._drain_reader_threads", + _fast_drain, + ) + + # A real, still-alive reader thread (the leaked case). + stop = threading.Event() + leaked_reader = threading.Thread(target=stop.wait, daemon=True) + leaked_reader.start() + try: + outcome = _drain_bounded(_make_fake_process(), leaked_reader, grace=5, pgid=None) + finally: + stop.set() + leaked_reader.join(timeout=2) + + assert outcome == "leaked", ( + "a within-budget drain that leaves a reader alive must surface 'leaked' " + "so finalize snapshots instead of trusting the live buffers" + ) diff --git a/tests/unit/test_headless_finalize_snapshot.py b/tests/unit/test_headless_finalize_snapshot.py index f831e16c9..2f025ad31 100644 --- a/tests/unit/test_headless_finalize_snapshot.py +++ b/tests/unit/test_headless_finalize_snapshot.py @@ -92,6 +92,27 @@ def test_snapshot_falls_back_to_live_ctx_when_every_retry_loses(): assert fake_metadata.model_copy.call_count == 3 # _SNAPSHOT_RETRY_ATTEMPTS -def test_drain_budget_exceeded_defaults_false(): +def test_reader_may_be_live_defaults_false(): """Clean runs must keep the zero-copy fast path (flag off by default).""" - assert _make_ctx().drain_budget_exceeded is False + assert _make_ctx().reader_may_be_live is False + + +def test_snapshot_freezes_auth_abort_signal(): + """A leaked stderr reader that sets auth-abort AFTER the snapshot must not + flip finalize's view — auth_abort_event/reason are frozen at snapshot time + (regression guard for the review finding on spurious 503s).""" + ctx = _make_ctx() + snap = _snapshot_for_finalize(ctx) + + # Snapshot copies must be distinct objects, frozen at the not-set state. + assert snap.auth_abort_event is not ctx.auth_abort_event + assert snap.auth_abort_reason is not ctx.auth_abort_reason + assert snap.auth_abort_event.is_set() is False + + # Leaked reader fires an auth abort on the live ctx after the snapshot. + ctx.auth_abort_reason.append("Not logged in") + ctx.auth_abort_event.set() + + # Finalize's view (the snapshot) is unaffected. + assert snap.auth_abort_event.is_set() is False + assert snap.auth_abort_reason == []