diff --git a/docker/base-image/agent_server/services/headless_executor.py b/docker/base-image/agent_server/services/headless_executor.py index 3171d244..90e16300 100644 --- a/docker/base-image/agent_server/services/headless_executor.py +++ b/docker/base-image/agent_server/services/headless_executor.py @@ -22,7 +22,7 @@ import threading import time import uuid -from dataclasses import dataclass, field +from dataclasses import dataclass, field, replace from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Tuple @@ -230,6 +230,11 @@ class HeadlessRunContext: # "max_duration" — the effective_timeout budget was genuinely exhausted termination_reason: Optional[str] = None stalled_tool: Optional[str] = None + # #1025 (salvaged from #980): True when the drain returned anything other + # than "completed" (budget_exceeded / errored / leaked) — a reader thread + # may still be alive and mutating the shared buffers below, so + # _finalize_headless_result snapshots its read fields before reading them. + reader_may_be_live: bool = False # Shared mutable buffers (populated by stream_parser via process_stream_line) response_parts: List[str] = field(default_factory=list) @@ -674,6 +679,10 @@ def _run_stdout() -> None: f"[Headless Task] Task {ctx.task_session_id} {reason} — killing process group" ) _terminate_process_group(process, graceful_timeout=5, pgid=ctx.process_pgid, execution_tag=ctx.task_session_id) + # Bound the post-kill drain, then re-raise. The orchestrator converts + # TimeoutExpired to HTTP 504 WITHOUT calling _finalize_headless_result, + # so there is no finalize read to protect here — we intentionally do + # not set reader_may_be_live on this path (#1025). _drain_bounded(process, stdout_thread, stderr_thread, grace=3, pgid=ctx.process_pgid, execution_tag=ctx.task_session_id) @@ -682,15 +691,82 @@ def _run_stdout() -> None: # Subprocess exited. Drain readers — if a hook grandchild still # holds a pipe, the helper will close the pipe FDs so the # reader threads can exit. - _drain_bounded(process, stdout_thread, stderr_thread, - grace=5, pgid=ctx.process_pgid, - execution_tag=ctx.task_session_id) + drain_outcome = _drain_bounded(process, stdout_thread, stderr_thread, + grace=5, pgid=ctx.process_pgid, + execution_tag=ctx.task_session_id) + # Anything but "completed" means a reader thread may still be alive and + # mutating the shared buffers finalize reads (#1025). + ctx.reader_may_be_live = drain_outcome != "completed" # Re-raise permission-mode failure captured by stdout thread if ctx.stdout_exc: raise ctx.stdout_exc[0] +# #1025: number of times _snapshot_for_finalize retries the metadata deep-copy +# when it loses the race with a still-mutating leaked reader. The mutation +# window is a single attribute set, so the first retry almost always wins. +_SNAPSHOT_RETRY_ATTEMPTS = 3 + + +def _freeze_event(event: threading.Event) -> threading.Event: + """Capture an Event's set/clear state into a fresh, immutable-by-the-reader + Event (#1025). The leaked reader holds the ORIGINAL event; the snapshot's + copy reflects the verdict as of snapshot time and can't be flipped later.""" + frozen = threading.Event() + if event.is_set(): + frozen.set() + return frozen + + +def _snapshot_for_finalize(ctx: HeadlessRunContext) -> HeadlessRunContext: + """Copy the finalize-read fields of ``ctx`` onto a fresh context (#1025/D19). + + Called whenever the drain left a reader thread that may still be alive + (``reader_may_be_live``), where that reader can mutate ``ctx`` concurrently. + Every field finalize reads is captured: the four buffers (``list(...)`` of a + list never raises on a concurrent append), ``metadata`` (deep-copied), and + the auth-abort signal (``auth_abort_event`` frozen via :func:`_freeze_event` + + ``auth_abort_reason`` list-copied) — the stderr reader is exactly the + thread that can leak, and finalize reads ``auth_abort_event.is_set()`` / + ``auth_abort_reason[0]``, so leaving them aliased would let a late auth match + flip a success to a spurious 503. + + ``metadata.model_copy(deep=True)`` iterates the pydantic model's ``__dict__`` + / ``__pydantic_fields_set__`` and CAN raise ``RuntimeError("... changed size + during iteration")`` if the reader sets a not-yet-set field mid-copy (most + relevant on the ``errored`` outcome, where the reader isn't necessarily + wedged in ``readline``). The window is a single attribute set, so a bounded + retry almost always succeeds on the next attempt; if every attempt loses the + race we fall back to the live ``ctx`` (the pre-#1025 behaviour — no worse + than not snapshotting). + """ + last_exc: Optional[RuntimeError] = None + for _ in range(_SNAPSHOT_RETRY_ATTEMPTS): + try: + return replace( + ctx, + raw_messages=list(ctx.raw_messages), + response_parts=list(ctx.response_parts), + execution_log=list(ctx.execution_log), + verbose_output_lines=list(ctx.verbose_output_lines), + metadata=ctx.metadata.model_copy(deep=True), + auth_abort_event=_freeze_event(ctx.auth_abort_event), + auth_abort_reason=list(ctx.auth_abort_reason), + ) + except RuntimeError as exc: + # "dictionary/set changed size during iteration" — the leaked + # reader mutated metadata mid-deep-copy. Retry. + last_exc = exc + logger.warning( + "[Headless Task] Run-context snapshot lost the race with a leaked " + "reader %s times for task %s (%s) — finalizing against the live " + "context as a last resort. Issue #1025.", + _SNAPSHOT_RETRY_ATTEMPTS, ctx.task_session_id, last_exc, + ) + return ctx + + def _finalize_headless_result( ctx: HeadlessRunContext, ) -> Tuple[str, List[Dict], ExecutionMetadata, str]: @@ -706,6 +782,27 @@ def _finalize_headless_result( orchestrator's outer ``except HTTPException: raise`` chain re-surfaces them). """ + # #1025 (D19): when the drain left a reader thread that may still be alive + # (budget_exceeded / errored / leaked), it can keep mutating the shared + # buffers below. Snapshot EVERY field finalize reads onto a fresh context so + # iteration can't tear and a late append by the leaked reader can't be + # half-read. ``parse_failure_count`` / ``parse_failure_sample`` are an int + + # str (atomic reads, no snapshot). The leaked reader keeps mutating the + # ORIGINAL ctx (which it captured by closure); rebinding the local ``ctx`` + # to the snapshot means the rest of finalize — including + # _attempt_empty_result_recovery's in-place mutations — operates entirely + # on the isolated copy. Clean-drain runs skip this and keep the zero-copy + # fast path. The snapshot itself is retry-guarded against the same race + # (see _snapshot_for_finalize). + if ctx.reader_may_be_live: + logger.warning( + "[Headless Task] Drain left a possibly-live reader for task %s — " + "finalizing against a snapshot of the run context (the reader may " + "still be mutating the live buffers). Issue #1025.", + ctx.task_session_id, + ) + ctx = _snapshot_for_finalize(ctx) + # Build verbose transcript from stderr (the human-readable execution log) # SECURITY: Sanitize stderr output sanitized_lines = [sanitize_text(line) for line in ctx.verbose_output_lines] diff --git a/docker/base-image/agent_server/services/subprocess_lifecycle.py b/docker/base-image/agent_server/services/subprocess_lifecycle.py index 155c2b27..9285e3d5 100644 --- a/docker/base-image/agent_server/services/subprocess_lifecycle.py +++ b/docker/base-image/agent_server/services/subprocess_lifecycle.py @@ -14,7 +14,7 @@ import logging import subprocess import threading -from typing import Optional +from typing import Literal, Optional from ..utils.subprocess_pgroup import ( capture_pgid as _capture_pgid, @@ -23,8 +23,16 @@ drain_reader_threads as _drain_reader_threads, ) +# Outcome of a bounded drain, returned to the caller so the orchestrator can +# decide whether to treat the run's shared mutable state as trustworthy +# (#1025 / salvaged from #980). Every non-``completed`` outcome leaves a reader +# thread that may still be mutating the run context concurrently; the headless +# path snapshots its finalize-read fields on those outcomes. +DrainOutcome = Literal["completed", "budget_exceeded", "errored", "leaked"] + __all__ = [ "_DRAIN_BUDGET_SECONDS", + "DrainOutcome", "_drain_bounded", "_capture_pgid", "_terminate_process_group", @@ -51,7 +59,7 @@ def _drain_bounded( grace: int = 5, pgid: Optional[int] = None, execution_tag: Optional[str] = None, -) -> None: +) -> DrainOutcome: """Run drain_reader_threads with a hard _DRAIN_BUDGET_SECONDS time cap. Prevents a TextIOWrapper lock deadlock in safe_close_pipes (Issue #728) @@ -62,17 +70,52 @@ def _drain_bounded( Issue #817: ``execution_tag`` is threaded through to the underlying ``drain_reader_threads`` so the env-tag sweep runs after every drain. + + Issue #1025 (salvaged from #980): returns the drain outcome instead of + ``None`` so the headless orchestrator can detect every case that leaves a + reader thread still mutating the shared run context, and snapshot its + finalize-read fields before reading them. Only ``"completed"`` guarantees + no reader survives: + + - ``"completed"`` — the drain finished within budget AND every + reader thread passed in has exited. + - ``"budget_exceeded"`` — the daemon thread did not finish within + ``_DRAIN_BUDGET_SECONDS`` (the #728 safe_close_pipes deadlock); the + reader thread is leaked. + - ``"errored"`` — the drain raised. Previously swallowed with + ``except Exception: pass``, which masked the failure as a clean + ``"completed"``; now captured and logged (honours the project-wide + "never swallow exceptions silently" rule). + - ``"leaked"`` — the drain returned normally within budget but a + reader thread is still alive. ``drain_reader_threads`` force-closes the + pipes and *continues* when a grandchild-held reader won't EOF (its own + ``outcome=leaked`` METRIC), returning normally — so a within-budget, + non-raising drain does NOT by itself prove the readers are dead. This + outcome closes that gap (the #586 leaked-reader case). """ done = threading.Event() + # Single write-once cell, set only by the daemon thread and read only after + # the ``done`` barrier — no second Event / ordering invariant to maintain. + outcome: DrainOutcome = "completed" def _target() -> None: + nonlocal outcome try: asyncio.run(_drain_reader_threads( process, *threads, grace=grace, pgid=pgid, execution_tag=execution_tag, )) except Exception: - pass + # #1025: capture + log instead of swallowing. A drain that raises + # used to be indistinguishable from a clean completion, hiding a + # leaked reader thread from the finalize path. + outcome = "errored" + logger.exception( + "[Subprocess] Drain raised inside the daemon thread " + "(pid=%s) — treating as errored; reader thread(s) may be " + "leaked. Issue #1025.", + process.pid, + ) finally: done.set() @@ -84,3 +127,21 @@ def _target() -> None: "leaked daemon threads (pid=%s). Issue #728.", _DRAIN_BUDGET_SECONDS, process.pid, ) + return "budget_exceeded" + # done is set → _target finished, so ``outcome`` is fully settled. + if outcome == "errored": + return "errored" + # The drain returned within budget without raising, but drain_reader_threads + # force-closes and continues on the #586 leaked-reader case. If any reader + # we were asked to drain is still alive, it can keep mutating ctx — surface + # that so the caller snapshots instead of trusting the live buffers (#1025). + if any(t is not None and t.is_alive() for t in threads): + logger.warning( + "[Subprocess] Drain returned within budget but %d reader thread(s) " + "are still alive (pid=%s) — reporting 'leaked' so finalize snapshots " + "the run context. Issue #1025.", + sum(1 for t in threads if t is not None and t.is_alive()), + process.pid, + ) + return "leaked" + return "completed" diff --git a/tests/unit/test_drain_bounded.py b/tests/unit/test_drain_bounded.py index 531ad39d..cdca484b 100644 --- a/tests/unit/test_drain_bounded.py +++ b/tests/unit/test_drain_bounded.py @@ -139,3 +139,92 @@ async def _recording_drain(process, *threads, grace=5, pgid=None, **kwargs): assert received.get("grace") == 3, f"grace not forwarded: {received}" assert received.get("pgid") == 42, f"pgid not forwarded: {received}" + + +# --------------------------------------------------------------------------- +# Issue #1025 (salvaged from #980): _drain_bounded returns a DrainOutcome and +# no longer swallows daemon-thread exceptions. +# --------------------------------------------------------------------------- + +def test_drain_bounded_returns_completed_on_clean_drain(monkeypatch): + """A drain that finishes within budget must report ``completed``.""" + + async def _fast_drain(*args, **kwargs): + return None + + monkeypatch.setattr( + "agent_server.services.subprocess_lifecycle._drain_reader_threads", + _fast_drain, + ) + + assert _drain_bounded(_make_fake_process(), grace=5, pgid=None) == "completed" + + +def test_drain_bounded_returns_budget_exceeded_on_hang(monkeypatch): + """A drain that overruns the budget must report ``budget_exceeded`` (a + leaked reader thread the finalize path must defend against).""" + + async def _hanging_drain(*args, **kwargs): + await asyncio.sleep(600) + + monkeypatch.setattr( + "agent_server.services.subprocess_lifecycle._drain_reader_threads", + _hanging_drain, + ) + monkeypatch.setattr( + "agent_server.services.subprocess_lifecycle._DRAIN_BUDGET_SECONDS", + 1, + ) + + assert _drain_bounded(_make_fake_process(), grace=1, pgid=None) == "budget_exceeded" + + +def test_drain_bounded_returns_errored_and_logs_when_drain_raises(monkeypatch, caplog): + """A drain that raises must be reported as ``errored`` and logged — not + silently swallowed as a clean ``completed`` (the pre-#1025 bug).""" + + async def _raising_drain(*args, **kwargs): + raise RuntimeError("boom in drain") + + monkeypatch.setattr( + "agent_server.services.subprocess_lifecycle._drain_reader_threads", + _raising_drain, + ) + + with caplog.at_level("ERROR"): + outcome = _drain_bounded(_make_fake_process(), grace=5, pgid=None) + + assert outcome == "errored" + assert any("Drain raised" in r.message for r in caplog.records), ( + "errored drain must be logged, not swallowed" + ) + + +def test_drain_bounded_returns_leaked_when_reader_survives(monkeypatch): + """A drain that returns within budget WITHOUT raising but leaves a reader + thread alive must report ``leaked`` — drain_reader_threads force-closes and + continues on the #586 leaked-reader case, so a within-budget return does not + by itself prove the readers are dead (review finding on the #1025 PR).""" + + async def _fast_drain(*args, **kwargs): + return None + + monkeypatch.setattr( + "agent_server.services.subprocess_lifecycle._drain_reader_threads", + _fast_drain, + ) + + # A real, still-alive reader thread (the leaked case). + stop = threading.Event() + leaked_reader = threading.Thread(target=stop.wait, daemon=True) + leaked_reader.start() + try: + outcome = _drain_bounded(_make_fake_process(), leaked_reader, grace=5, pgid=None) + finally: + stop.set() + leaked_reader.join(timeout=2) + + assert outcome == "leaked", ( + "a within-budget drain that leaves a reader alive must surface 'leaked' " + "so finalize snapshots instead of trusting the live buffers" + ) diff --git a/tests/unit/test_headless_finalize_snapshot.py b/tests/unit/test_headless_finalize_snapshot.py new file mode 100644 index 00000000..2f025ad3 --- /dev/null +++ b/tests/unit/test_headless_finalize_snapshot.py @@ -0,0 +1,118 @@ +"""Issue #1025 (salvaged from #980 / D19): _finalize_headless_result isolates +itself from a leaked reader thread by snapshotting the run context. + +When _drain_bounded reports ``budget_exceeded`` or ``errored``, a reader +thread is leaked and may still be appending to ctx's shared buffers / setting +metadata fields. _finalize_headless_result rebinds ``ctx`` to a deep snapshot +so iteration can't tear and a late append can't be half-read. Clean drains +keep the zero-copy fast path. + +These tests exercise the snapshot helper directly (the concurrency-prone +part) plus the gating flag — without spawning a real claude subprocess. +""" +from __future__ import annotations + +from unittest.mock import MagicMock + +# conftest.py preloads the real agent_server package; just import. +from agent_server.services.headless_executor import ( # noqa: E402 + HeadlessRunContext, + _snapshot_for_finalize, +) + + +def _make_ctx() -> HeadlessRunContext: + ctx = HeadlessRunContext( + cmd=["claude"], + task_session_id="task-1025", + task_start_iso="2026-06-05T00:00:00Z", + effective_timeout=600, + images=None, + prompt="hi", + ) + ctx.raw_messages.append({"type": "assistant", "n": 1}) + ctx.response_parts.append("hello") + ctx.metadata.cost_usd = 0.01 + return ctx + + +def test_snapshot_decouples_lists_from_later_mutation(): + """A post-snapshot append to the live ctx (simulating a leaked reader) + must NOT leak into the snapshot used by finalize.""" + ctx = _make_ctx() + snap = _snapshot_for_finalize(ctx) + + # Snapshot must be a different context object with copied buffers. + assert snap is not ctx + assert snap.raw_messages is not ctx.raw_messages + assert snap.response_parts is not ctx.response_parts + + # Leaked reader keeps mutating the ORIGINAL ctx after the snapshot. + ctx.raw_messages.append({"type": "assistant", "n": 2}) + ctx.response_parts.append(" world") + + # Snapshot is frozen at snapshot time. + assert snap.raw_messages == [{"type": "assistant", "n": 1}] + assert snap.response_parts == ["hello"] + + +def test_snapshot_deep_copies_metadata(): + """metadata must be deep-copied so a leaked reader writing a metadata + field after the snapshot can't mutate finalize's view.""" + ctx = _make_ctx() + snap = _snapshot_for_finalize(ctx) + + assert snap.metadata is not ctx.metadata + assert snap.metadata.cost_usd == 0.01 + + # Leaked reader writes more metadata onto the live ctx. + ctx.metadata.cost_usd = 0.99 + ctx.metadata.num_turns = 7 + + assert snap.metadata.cost_usd == 0.01 + assert snap.metadata.num_turns is None + + +def test_snapshot_falls_back_to_live_ctx_when_every_retry_loses(): + """If the deep-copy loses the race on every attempt, the helper returns the + live ctx rather than raising — no worse than the pre-#1025 behaviour.""" + ctx = _make_ctx() + + # Stand in a metadata whose deep-copy always loses the race (pydantic + # forbids patching model_copy on a real instance). + fake_metadata = MagicMock() + fake_metadata.model_copy.side_effect = RuntimeError( + "dictionary changed size during iteration" + ) + ctx.metadata = fake_metadata + + # Must not raise; falls back to the same ctx instance. + result = _snapshot_for_finalize(ctx) + assert result is ctx + assert fake_metadata.model_copy.call_count == 3 # _SNAPSHOT_RETRY_ATTEMPTS + + +def test_reader_may_be_live_defaults_false(): + """Clean runs must keep the zero-copy fast path (flag off by default).""" + assert _make_ctx().reader_may_be_live is False + + +def test_snapshot_freezes_auth_abort_signal(): + """A leaked stderr reader that sets auth-abort AFTER the snapshot must not + flip finalize's view — auth_abort_event/reason are frozen at snapshot time + (regression guard for the review finding on spurious 503s).""" + ctx = _make_ctx() + snap = _snapshot_for_finalize(ctx) + + # Snapshot copies must be distinct objects, frozen at the not-set state. + assert snap.auth_abort_event is not ctx.auth_abort_event + assert snap.auth_abort_reason is not ctx.auth_abort_reason + assert snap.auth_abort_event.is_set() is False + + # Leaked reader fires an auth abort on the live ctx after the snapshot. + ctx.auth_abort_reason.append("Not logged in") + ctx.auth_abort_event.set() + + # Finalize's view (the snapshot) is unaffected. + assert snap.auth_abort_event.is_set() is False + assert snap.auth_abort_reason == []