Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 101 additions & 4 deletions docker/base-image/agent_server/services/headless_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import threading
import time
import uuid
from dataclasses import dataclass, field
from dataclasses import dataclass, field, replace
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
Expand Down Expand Up @@ -230,6 +230,11 @@ class HeadlessRunContext:
# "max_duration" — the effective_timeout budget was genuinely exhausted
termination_reason: Optional[str] = None
stalled_tool: Optional[str] = None
# #1025 (salvaged from #980): True when the drain returned anything other
# than "completed" (budget_exceeded / errored / leaked) — a reader thread
# may still be alive and mutating the shared buffers below, so
# _finalize_headless_result snapshots its read fields before reading them.
reader_may_be_live: bool = False

# Shared mutable buffers (populated by stream_parser via process_stream_line)
response_parts: List[str] = field(default_factory=list)
Expand Down Expand Up @@ -674,6 +679,10 @@ def _run_stdout() -> None:
f"[Headless Task] Task {ctx.task_session_id} {reason} — killing process group"
)
_terminate_process_group(process, graceful_timeout=5, pgid=ctx.process_pgid, execution_tag=ctx.task_session_id)
# Bound the post-kill drain, then re-raise. The orchestrator converts
# TimeoutExpired to HTTP 504 WITHOUT calling _finalize_headless_result,
# so there is no finalize read to protect here — we intentionally do
# not set reader_may_be_live on this path (#1025).
_drain_bounded(process, stdout_thread, stderr_thread,
grace=3, pgid=ctx.process_pgid,
execution_tag=ctx.task_session_id)
Expand All @@ -682,15 +691,82 @@ def _run_stdout() -> None:
# Subprocess exited. Drain readers — if a hook grandchild still
# holds a pipe, the helper will close the pipe FDs so the
# reader threads can exit.
_drain_bounded(process, stdout_thread, stderr_thread,
grace=5, pgid=ctx.process_pgid,
execution_tag=ctx.task_session_id)
drain_outcome = _drain_bounded(process, stdout_thread, stderr_thread,
grace=5, pgid=ctx.process_pgid,
execution_tag=ctx.task_session_id)
# Anything but "completed" means a reader thread may still be alive and
# mutating the shared buffers finalize reads (#1025).
ctx.reader_may_be_live = drain_outcome != "completed"

# Re-raise permission-mode failure captured by stdout thread
if ctx.stdout_exc:
raise ctx.stdout_exc[0]


# #1025: number of times _snapshot_for_finalize retries the metadata deep-copy
# when it loses the race with a still-mutating leaked reader. The mutation
# window is a single attribute set, so the first retry almost always wins.
_SNAPSHOT_RETRY_ATTEMPTS = 3


def _freeze_event(event: threading.Event) -> threading.Event:
"""Capture an Event's set/clear state into a fresh, immutable-by-the-reader
Event (#1025). The leaked reader holds the ORIGINAL event; the snapshot's
copy reflects the verdict as of snapshot time and can't be flipped later."""
frozen = threading.Event()
if event.is_set():
frozen.set()
return frozen


def _snapshot_for_finalize(ctx: HeadlessRunContext) -> HeadlessRunContext:
"""Copy the finalize-read fields of ``ctx`` onto a fresh context (#1025/D19).

Called whenever the drain left a reader thread that may still be alive
(``reader_may_be_live``), where that reader can mutate ``ctx`` concurrently.
Every field finalize reads is captured: the four buffers (``list(...)`` of a
list never raises on a concurrent append), ``metadata`` (deep-copied), and
the auth-abort signal (``auth_abort_event`` frozen via :func:`_freeze_event`
+ ``auth_abort_reason`` list-copied) — the stderr reader is exactly the
thread that can leak, and finalize reads ``auth_abort_event.is_set()`` /
``auth_abort_reason[0]``, so leaving them aliased would let a late auth match
flip a success to a spurious 503.

``metadata.model_copy(deep=True)`` iterates the pydantic model's ``__dict__``
/ ``__pydantic_fields_set__`` and CAN raise ``RuntimeError("... changed size
during iteration")`` if the reader sets a not-yet-set field mid-copy (most
relevant on the ``errored`` outcome, where the reader isn't necessarily
wedged in ``readline``). The window is a single attribute set, so a bounded
retry almost always succeeds on the next attempt; if every attempt loses the
race we fall back to the live ``ctx`` (the pre-#1025 behaviour — no worse
than not snapshotting).
"""
last_exc: Optional[RuntimeError] = None
for _ in range(_SNAPSHOT_RETRY_ATTEMPTS):
try:
return replace(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The snapshot doesn't isolate auth_abort_event / auth_abort_reason, which finalize reads off the still-live objects.

replace(ctx, ...) only overrides raw_messages, response_parts, execution_log, verbose_output_lines, metadata. The new context keeps the same auth_abort_event (Event) and auth_abort_reason (list) — both mutated by the read_stderr() reader thread (lines 438-439). On the leaked-reader path, finalize then reads ctx.auth_abort_event.is_set() (line 737) and ctx.auth_abort_reason[0] (738) on those live objects.

So if the leaked thread is the stderr reader and it matches an auth pattern in a late line after the snapshot but before line 737, an otherwise-successful run is converted to a spurious HTTP 503 — which task_execution_service (SUB-003) treats as an auth failure and uses to auto-switch the subscription. Narrow, but real.

At minimum this contradicts the helper's own docstring ("Snapshot EVERY field finalize reads"). Either snapshot the auth fields too, or document explicitly why they're exempt (as parse_failure_count/_sample already are).

ctx,
raw_messages=list(ctx.raw_messages),
response_parts=list(ctx.response_parts),
execution_log=list(ctx.execution_log),
verbose_output_lines=list(ctx.verbose_output_lines),
metadata=ctx.metadata.model_copy(deep=True),

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor (cleanup): model_copy(deep=True)ExecutionMetadata's only nested mutable field is compact_events, which finalize overwrites from the JSONL on the success path (_extract_compact_events_from_jsonl). Everything else is scalars, where deep=True == shallow. The deep copy is also precisely what can raise the "changed size during iteration" the 3-attempt retry loop exists to absorb. A deep=False (or model_copy(update={'compact_events': list(...)}) if a tear-safe copy of that one list is actually wanted) narrows the race window the retry is guarding.

auth_abort_event=_freeze_event(ctx.auth_abort_event),
auth_abort_reason=list(ctx.auth_abort_reason),
)
except RuntimeError as exc:
# "dictionary/set changed size during iteration" — the leaked
# reader mutated metadata mid-deep-copy. Retry.
last_exc = exc
logger.warning(
"[Headless Task] Run-context snapshot lost the race with a leaked "
"reader %s times for task %s (%s) — finalizing against the live "
"context as a last resort. Issue #1025.",
_SNAPSHOT_RETRY_ATTEMPTS, ctx.task_session_id, last_exc,
)
return ctx


def _finalize_headless_result(
ctx: HeadlessRunContext,
) -> Tuple[str, List[Dict], ExecutionMetadata, str]:
Expand All @@ -706,6 +782,27 @@ def _finalize_headless_result(
orchestrator's outer ``except HTTPException: raise`` chain re-surfaces
them).
"""
# #1025 (D19): when the drain left a reader thread that may still be alive
# (budget_exceeded / errored / leaked), it can keep mutating the shared
# buffers below. Snapshot EVERY field finalize reads onto a fresh context so
# iteration can't tear and a late append by the leaked reader can't be
# half-read. ``parse_failure_count`` / ``parse_failure_sample`` are an int +
# str (atomic reads, no snapshot). The leaked reader keeps mutating the
# ORIGINAL ctx (which it captured by closure); rebinding the local ``ctx``
# to the snapshot means the rest of finalize — including
# _attempt_empty_result_recovery's in-place mutations — operates entirely
# on the isolated copy. Clean-drain runs skip this and keep the zero-copy
# fast path. The snapshot itself is retry-guarded against the same race
# (see _snapshot_for_finalize).
if ctx.reader_may_be_live:
logger.warning(
"[Headless Task] Drain left a possibly-live reader for task %s — "
"finalizing against a snapshot of the run context (the reader may "
"still be mutating the live buffers). Issue #1025.",
ctx.task_session_id,
)
ctx = _snapshot_for_finalize(ctx)

# Build verbose transcript from stderr (the human-readable execution log)
# SECURITY: Sanitize stderr output
sanitized_lines = [sanitize_text(line) for line in ctx.verbose_output_lines]
Expand Down
67 changes: 64 additions & 3 deletions docker/base-image/agent_server/services/subprocess_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import logging
import subprocess
import threading
from typing import Optional
from typing import Literal, Optional

from ..utils.subprocess_pgroup import (
capture_pgid as _capture_pgid,
Expand All @@ -23,8 +23,16 @@
drain_reader_threads as _drain_reader_threads,
)

# Outcome of a bounded drain, returned to the caller so the orchestrator can
# decide whether to treat the run's shared mutable state as trustworthy
# (#1025 / salvaged from #980). Every non-``completed`` outcome leaves a reader
# thread that may still be mutating the run context concurrently; the headless
# path snapshots its finalize-read fields on those outcomes.
DrainOutcome = Literal["completed", "budget_exceeded", "errored", "leaked"]

__all__ = [
"_DRAIN_BUDGET_SECONDS",
"DrainOutcome",
"_drain_bounded",
"_capture_pgid",
"_terminate_process_group",
Expand All @@ -51,7 +59,7 @@ def _drain_bounded(
grace: int = 5,
pgid: Optional[int] = None,
execution_tag: Optional[str] = None,
) -> None:
) -> DrainOutcome:
"""Run drain_reader_threads with a hard _DRAIN_BUDGET_SECONDS time cap.

Prevents a TextIOWrapper lock deadlock in safe_close_pipes (Issue #728)
Expand All @@ -62,17 +70,52 @@ def _drain_bounded(

Issue #817: ``execution_tag`` is threaded through to the underlying
``drain_reader_threads`` so the env-tag sweep runs after every drain.

Issue #1025 (salvaged from #980): returns the drain outcome instead of
``None`` so the headless orchestrator can detect every case that leaves a
reader thread still mutating the shared run context, and snapshot its
finalize-read fields before reading them. Only ``"completed"`` guarantees
no reader survives:

- ``"completed"`` — the drain finished within budget AND every
reader thread passed in has exited.
- ``"budget_exceeded"`` — the daemon thread did not finish within
``_DRAIN_BUDGET_SECONDS`` (the #728 safe_close_pipes deadlock); the
reader thread is leaked.
- ``"errored"`` — the drain raised. Previously swallowed with
``except Exception: pass``, which masked the failure as a clean
``"completed"``; now captured and logged (honours the project-wide
"never swallow exceptions silently" rule).
- ``"leaked"`` — the drain returned normally within budget but a
reader thread is still alive. ``drain_reader_threads`` force-closes the
pipes and *continues* when a grandchild-held reader won't EOF (its own
``outcome=leaked`` METRIC), returning normally — so a within-budget,
non-raising drain does NOT by itself prove the readers are dead. This
outcome closes that gap (the #586 leaked-reader case).
"""
done = threading.Event()
# Single write-once cell, set only by the daemon thread and read only after
# the ``done`` barrier — no second Event / ordering invariant to maintain.
outcome: DrainOutcome = "completed"

def _target() -> None:
nonlocal outcome
try:
asyncio.run(_drain_reader_threads(
process, *threads, grace=grace, pgid=pgid,
execution_tag=execution_tag,
))
except Exception:
pass
# #1025: capture + log instead of swallowing. A drain that raises
# used to be indistinguishable from a clean completion, hiding a
# leaked reader thread from the finalize path.
outcome = "errored"
logger.exception(
"[Subprocess] Drain raised inside the daemon thread "
"(pid=%s) — treating as errored; reader thread(s) may be "
"leaked. Issue #1025.",
process.pid,
)
finally:
done.set()

Expand All @@ -84,3 +127,21 @@ def _target() -> None:
"leaked daemon threads (pid=%s). Issue #728.",
_DRAIN_BUDGET_SECONDS, process.pid,
)
return "budget_exceeded"
# done is set → _target finished, so ``outcome`` is fully settled.
if outcome == "errored":
return "errored"
# The drain returned within budget without raising, but drain_reader_threads
# force-closes and continues on the #586 leaked-reader case. If any reader
# we were asked to drain is still alive, it can keep mutating ctx — surface
# that so the caller snapshots instead of trusting the live buffers (#1025).
if any(t is not None and t.is_alive() for t in threads):
logger.warning(
"[Subprocess] Drain returned within budget but %d reader thread(s) "
"are still alive (pid=%s) — reporting 'leaked' so finalize snapshots "
"the run context. Issue #1025.",
sum(1 for t in threads if t is not None and t.is_alive()),
process.pid,
)
return "leaked"
return "completed"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The snapshot trigger misses the leaked drain outcome — the exact race this PR fixes still fires on the "completed" path.

_drain_bounded only returns "budget_exceeded" (timeout) or "errored" (raise). But utils/subprocess_pgroup.py:drain_reader_threads has a third leak case: after force-close it computes leaked = [t for t in still_stuck if t.is_alive()], logs outcome=leaked, and returns normally (return [], well within the 90s budget). So a confirmed-alive leaked reader makes _drain_bounded return "completed"ctx.drain_budget_exceeded stays False_finalize_headless_result skips _snapshot_for_finalize and reads raw_messages / response_parts / execution_log / metadata while the leaked reader is still mutating them. metadata.model_copy(deep=True) (or iteration) can then raise RuntimeError: dictionary changed size during iteration with no retry guard, surfacing as an opaque HTTP 500.

The correct "a reader may still mutate ctx" signal is drain_outcome != "completed" including the leaked case — surface a "leaked" member on DrainOutcome fed from that branch, or treat it like the other two. As written the fix has a hole on the very path it declares safe.

89 changes: 89 additions & 0 deletions tests/unit/test_drain_bounded.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,92 @@ async def _recording_drain(process, *threads, grace=5, pgid=None, **kwargs):

assert received.get("grace") == 3, f"grace not forwarded: {received}"
assert received.get("pgid") == 42, f"pgid not forwarded: {received}"


# ---------------------------------------------------------------------------
# Issue #1025 (salvaged from #980): _drain_bounded returns a DrainOutcome and
# no longer swallows daemon-thread exceptions.
# ---------------------------------------------------------------------------

def test_drain_bounded_returns_completed_on_clean_drain(monkeypatch):
"""A drain that finishes within budget must report ``completed``."""

async def _fast_drain(*args, **kwargs):
return None

monkeypatch.setattr(
"agent_server.services.subprocess_lifecycle._drain_reader_threads",
_fast_drain,
)

assert _drain_bounded(_make_fake_process(), grace=5, pgid=None) == "completed"


def test_drain_bounded_returns_budget_exceeded_on_hang(monkeypatch):
"""A drain that overruns the budget must report ``budget_exceeded`` (a
leaked reader thread the finalize path must defend against)."""

async def _hanging_drain(*args, **kwargs):
await asyncio.sleep(600)

monkeypatch.setattr(
"agent_server.services.subprocess_lifecycle._drain_reader_threads",
_hanging_drain,
)
monkeypatch.setattr(
"agent_server.services.subprocess_lifecycle._DRAIN_BUDGET_SECONDS",
1,
)

assert _drain_bounded(_make_fake_process(), grace=1, pgid=None) == "budget_exceeded"


def test_drain_bounded_returns_errored_and_logs_when_drain_raises(monkeypatch, caplog):
"""A drain that raises must be reported as ``errored`` and logged — not
silently swallowed as a clean ``completed`` (the pre-#1025 bug)."""

async def _raising_drain(*args, **kwargs):
raise RuntimeError("boom in drain")

monkeypatch.setattr(
"agent_server.services.subprocess_lifecycle._drain_reader_threads",
_raising_drain,
)

with caplog.at_level("ERROR"):
outcome = _drain_bounded(_make_fake_process(), grace=5, pgid=None)

assert outcome == "errored"
assert any("Drain raised" in r.message for r in caplog.records), (
"errored drain must be logged, not swallowed"
)


def test_drain_bounded_returns_leaked_when_reader_survives(monkeypatch):
"""A drain that returns within budget WITHOUT raising but leaves a reader
thread alive must report ``leaked`` — drain_reader_threads force-closes and
continues on the #586 leaked-reader case, so a within-budget return does not
by itself prove the readers are dead (review finding on the #1025 PR)."""

async def _fast_drain(*args, **kwargs):
return None

monkeypatch.setattr(
"agent_server.services.subprocess_lifecycle._drain_reader_threads",
_fast_drain,
)

# A real, still-alive reader thread (the leaked case).
stop = threading.Event()
leaked_reader = threading.Thread(target=stop.wait, daemon=True)
leaked_reader.start()
try:
outcome = _drain_bounded(_make_fake_process(), leaked_reader, grace=5, pgid=None)
finally:
stop.set()
leaked_reader.join(timeout=2)

assert outcome == "leaked", (
"a within-budget drain that leaves a reader alive must surface 'leaked' "
"so finalize snapshots instead of trusting the live buffers"
)
Loading
Loading