-
Notifications
You must be signed in to change notification settings - Fork 48
fix(headless): harden drain/finalize against leaked reader threads (#1025) #1078
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,7 +22,7 @@ | |
| import threading | ||
| import time | ||
| import uuid | ||
| from dataclasses import dataclass, field | ||
| from dataclasses import dataclass, field, replace | ||
| from datetime import datetime | ||
| from pathlib import Path | ||
| from typing import Dict, List, Optional, Tuple | ||
|
|
@@ -230,6 +230,11 @@ class HeadlessRunContext: | |
| # "max_duration" — the effective_timeout budget was genuinely exhausted | ||
| termination_reason: Optional[str] = None | ||
| stalled_tool: Optional[str] = None | ||
| # #1025 (salvaged from #980): True when the drain returned anything other | ||
| # than "completed" (budget_exceeded / errored / leaked) — a reader thread | ||
| # may still be alive and mutating the shared buffers below, so | ||
| # _finalize_headless_result snapshots its read fields before reading them. | ||
| reader_may_be_live: bool = False | ||
|
|
||
| # Shared mutable buffers (populated by stream_parser via process_stream_line) | ||
| response_parts: List[str] = field(default_factory=list) | ||
|
|
@@ -674,6 +679,10 @@ def _run_stdout() -> None: | |
| f"[Headless Task] Task {ctx.task_session_id} {reason} — killing process group" | ||
| ) | ||
| _terminate_process_group(process, graceful_timeout=5, pgid=ctx.process_pgid, execution_tag=ctx.task_session_id) | ||
| # Bound the post-kill drain, then re-raise. The orchestrator converts | ||
| # TimeoutExpired to HTTP 504 WITHOUT calling _finalize_headless_result, | ||
| # so there is no finalize read to protect here — we intentionally do | ||
| # not set reader_may_be_live on this path (#1025). | ||
| _drain_bounded(process, stdout_thread, stderr_thread, | ||
| grace=3, pgid=ctx.process_pgid, | ||
| execution_tag=ctx.task_session_id) | ||
|
|
@@ -682,15 +691,82 @@ def _run_stdout() -> None: | |
| # Subprocess exited. Drain readers — if a hook grandchild still | ||
| # holds a pipe, the helper will close the pipe FDs so the | ||
| # reader threads can exit. | ||
| _drain_bounded(process, stdout_thread, stderr_thread, | ||
| grace=5, pgid=ctx.process_pgid, | ||
| execution_tag=ctx.task_session_id) | ||
| drain_outcome = _drain_bounded(process, stdout_thread, stderr_thread, | ||
| grace=5, pgid=ctx.process_pgid, | ||
| execution_tag=ctx.task_session_id) | ||
| # Anything but "completed" means a reader thread may still be alive and | ||
| # mutating the shared buffers finalize reads (#1025). | ||
| ctx.reader_may_be_live = drain_outcome != "completed" | ||
|
|
||
| # Re-raise permission-mode failure captured by stdout thread | ||
| if ctx.stdout_exc: | ||
| raise ctx.stdout_exc[0] | ||
|
|
||
|
|
||
| # #1025: number of times _snapshot_for_finalize retries the metadata deep-copy | ||
| # when it loses the race with a still-mutating leaked reader. The mutation | ||
| # window is a single attribute set, so the first retry almost always wins. | ||
| _SNAPSHOT_RETRY_ATTEMPTS = 3 | ||
|
|
||
|
|
||
| def _freeze_event(event: threading.Event) -> threading.Event: | ||
| """Capture an Event's set/clear state into a fresh, immutable-by-the-reader | ||
| Event (#1025). The leaked reader holds the ORIGINAL event; the snapshot's | ||
| copy reflects the verdict as of snapshot time and can't be flipped later.""" | ||
| frozen = threading.Event() | ||
| if event.is_set(): | ||
| frozen.set() | ||
| return frozen | ||
|
|
||
|
|
||
| def _snapshot_for_finalize(ctx: HeadlessRunContext) -> HeadlessRunContext: | ||
| """Copy the finalize-read fields of ``ctx`` onto a fresh context (#1025/D19). | ||
|
|
||
| Called whenever the drain left a reader thread that may still be alive | ||
| (``reader_may_be_live``), where that reader can mutate ``ctx`` concurrently. | ||
| Every field finalize reads is captured: the four buffers (``list(...)`` of a | ||
| list never raises on a concurrent append), ``metadata`` (deep-copied), and | ||
| the auth-abort signal (``auth_abort_event`` frozen via :func:`_freeze_event` | ||
| + ``auth_abort_reason`` list-copied) — the stderr reader is exactly the | ||
| thread that can leak, and finalize reads ``auth_abort_event.is_set()`` / | ||
| ``auth_abort_reason[0]``, so leaving them aliased would let a late auth match | ||
| flip a success to a spurious 503. | ||
|
|
||
| ``metadata.model_copy(deep=True)`` iterates the pydantic model's ``__dict__`` | ||
| / ``__pydantic_fields_set__`` and CAN raise ``RuntimeError("... changed size | ||
| during iteration")`` if the reader sets a not-yet-set field mid-copy (most | ||
| relevant on the ``errored`` outcome, where the reader isn't necessarily | ||
| wedged in ``readline``). The window is a single attribute set, so a bounded | ||
| retry almost always succeeds on the next attempt; if every attempt loses the | ||
| race we fall back to the live ``ctx`` (the pre-#1025 behaviour — no worse | ||
| than not snapshotting). | ||
| """ | ||
| last_exc: Optional[RuntimeError] = None | ||
| for _ in range(_SNAPSHOT_RETRY_ATTEMPTS): | ||
| try: | ||
| return replace( | ||
| ctx, | ||
| raw_messages=list(ctx.raw_messages), | ||
| response_parts=list(ctx.response_parts), | ||
| execution_log=list(ctx.execution_log), | ||
| verbose_output_lines=list(ctx.verbose_output_lines), | ||
| metadata=ctx.metadata.model_copy(deep=True), | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor (cleanup): |
||
| auth_abort_event=_freeze_event(ctx.auth_abort_event), | ||
| auth_abort_reason=list(ctx.auth_abort_reason), | ||
| ) | ||
| except RuntimeError as exc: | ||
| # "dictionary/set changed size during iteration" — the leaked | ||
| # reader mutated metadata mid-deep-copy. Retry. | ||
| last_exc = exc | ||
| logger.warning( | ||
| "[Headless Task] Run-context snapshot lost the race with a leaked " | ||
| "reader %s times for task %s (%s) — finalizing against the live " | ||
| "context as a last resort. Issue #1025.", | ||
| _SNAPSHOT_RETRY_ATTEMPTS, ctx.task_session_id, last_exc, | ||
| ) | ||
| return ctx | ||
|
|
||
|
|
||
| def _finalize_headless_result( | ||
| ctx: HeadlessRunContext, | ||
| ) -> Tuple[str, List[Dict], ExecutionMetadata, str]: | ||
|
|
@@ -706,6 +782,27 @@ def _finalize_headless_result( | |
| orchestrator's outer ``except HTTPException: raise`` chain re-surfaces | ||
| them). | ||
| """ | ||
| # #1025 (D19): when the drain left a reader thread that may still be alive | ||
| # (budget_exceeded / errored / leaked), it can keep mutating the shared | ||
| # buffers below. Snapshot EVERY field finalize reads onto a fresh context so | ||
| # iteration can't tear and a late append by the leaked reader can't be | ||
| # half-read. ``parse_failure_count`` / ``parse_failure_sample`` are an int + | ||
| # str (atomic reads, no snapshot). The leaked reader keeps mutating the | ||
| # ORIGINAL ctx (which it captured by closure); rebinding the local ``ctx`` | ||
| # to the snapshot means the rest of finalize — including | ||
| # _attempt_empty_result_recovery's in-place mutations — operates entirely | ||
| # on the isolated copy. Clean-drain runs skip this and keep the zero-copy | ||
| # fast path. The snapshot itself is retry-guarded against the same race | ||
| # (see _snapshot_for_finalize). | ||
| if ctx.reader_may_be_live: | ||
| logger.warning( | ||
| "[Headless Task] Drain left a possibly-live reader for task %s — " | ||
| "finalizing against a snapshot of the run context (the reader may " | ||
| "still be mutating the live buffers). Issue #1025.", | ||
| ctx.task_session_id, | ||
| ) | ||
| ctx = _snapshot_for_finalize(ctx) | ||
|
|
||
| # Build verbose transcript from stderr (the human-readable execution log) | ||
| # SECURITY: Sanitize stderr output | ||
| sanitized_lines = [sanitize_text(line) for line in ctx.verbose_output_lines] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,7 +14,7 @@ | |
| import logging | ||
| import subprocess | ||
| import threading | ||
| from typing import Optional | ||
| from typing import Literal, Optional | ||
|
|
||
| from ..utils.subprocess_pgroup import ( | ||
| capture_pgid as _capture_pgid, | ||
|
|
@@ -23,8 +23,16 @@ | |
| drain_reader_threads as _drain_reader_threads, | ||
| ) | ||
|
|
||
| # Outcome of a bounded drain, returned to the caller so the orchestrator can | ||
| # decide whether to treat the run's shared mutable state as trustworthy | ||
| # (#1025 / salvaged from #980). Every non-``completed`` outcome leaves a reader | ||
| # thread that may still be mutating the run context concurrently; the headless | ||
| # path snapshots its finalize-read fields on those outcomes. | ||
| DrainOutcome = Literal["completed", "budget_exceeded", "errored", "leaked"] | ||
|
|
||
| __all__ = [ | ||
| "_DRAIN_BUDGET_SECONDS", | ||
| "DrainOutcome", | ||
| "_drain_bounded", | ||
| "_capture_pgid", | ||
| "_terminate_process_group", | ||
|
|
@@ -51,7 +59,7 @@ def _drain_bounded( | |
| grace: int = 5, | ||
| pgid: Optional[int] = None, | ||
| execution_tag: Optional[str] = None, | ||
| ) -> None: | ||
| ) -> DrainOutcome: | ||
| """Run drain_reader_threads with a hard _DRAIN_BUDGET_SECONDS time cap. | ||
|
|
||
| Prevents a TextIOWrapper lock deadlock in safe_close_pipes (Issue #728) | ||
|
|
@@ -62,17 +70,52 @@ def _drain_bounded( | |
|
|
||
| Issue #817: ``execution_tag`` is threaded through to the underlying | ||
| ``drain_reader_threads`` so the env-tag sweep runs after every drain. | ||
|
|
||
| Issue #1025 (salvaged from #980): returns the drain outcome instead of | ||
| ``None`` so the headless orchestrator can detect every case that leaves a | ||
| reader thread still mutating the shared run context, and snapshot its | ||
| finalize-read fields before reading them. Only ``"completed"`` guarantees | ||
| no reader survives: | ||
|
|
||
| - ``"completed"`` — the drain finished within budget AND every | ||
| reader thread passed in has exited. | ||
| - ``"budget_exceeded"`` — the daemon thread did not finish within | ||
| ``_DRAIN_BUDGET_SECONDS`` (the #728 safe_close_pipes deadlock); the | ||
| reader thread is leaked. | ||
| - ``"errored"`` — the drain raised. Previously swallowed with | ||
| ``except Exception: pass``, which masked the failure as a clean | ||
| ``"completed"``; now captured and logged (honours the project-wide | ||
| "never swallow exceptions silently" rule). | ||
| - ``"leaked"`` — the drain returned normally within budget but a | ||
| reader thread is still alive. ``drain_reader_threads`` force-closes the | ||
| pipes and *continues* when a grandchild-held reader won't EOF (its own | ||
| ``outcome=leaked`` METRIC), returning normally — so a within-budget, | ||
| non-raising drain does NOT by itself prove the readers are dead. This | ||
| outcome closes that gap (the #586 leaked-reader case). | ||
| """ | ||
| done = threading.Event() | ||
| # Single write-once cell, set only by the daemon thread and read only after | ||
| # the ``done`` barrier — no second Event / ordering invariant to maintain. | ||
| outcome: DrainOutcome = "completed" | ||
|
|
||
| def _target() -> None: | ||
| nonlocal outcome | ||
| try: | ||
| asyncio.run(_drain_reader_threads( | ||
| process, *threads, grace=grace, pgid=pgid, | ||
| execution_tag=execution_tag, | ||
| )) | ||
| except Exception: | ||
| pass | ||
| # #1025: capture + log instead of swallowing. A drain that raises | ||
| # used to be indistinguishable from a clean completion, hiding a | ||
| # leaked reader thread from the finalize path. | ||
| outcome = "errored" | ||
| logger.exception( | ||
| "[Subprocess] Drain raised inside the daemon thread " | ||
| "(pid=%s) — treating as errored; reader thread(s) may be " | ||
| "leaked. Issue #1025.", | ||
| process.pid, | ||
| ) | ||
| finally: | ||
| done.set() | ||
|
|
||
|
|
@@ -84,3 +127,21 @@ def _target() -> None: | |
| "leaked daemon threads (pid=%s). Issue #728.", | ||
| _DRAIN_BUDGET_SECONDS, process.pid, | ||
| ) | ||
| return "budget_exceeded" | ||
| # done is set → _target finished, so ``outcome`` is fully settled. | ||
| if outcome == "errored": | ||
| return "errored" | ||
| # The drain returned within budget without raising, but drain_reader_threads | ||
| # force-closes and continues on the #586 leaked-reader case. If any reader | ||
| # we were asked to drain is still alive, it can keep mutating ctx — surface | ||
| # that so the caller snapshots instead of trusting the live buffers (#1025). | ||
| if any(t is not None and t.is_alive() for t in threads): | ||
| logger.warning( | ||
| "[Subprocess] Drain returned within budget but %d reader thread(s) " | ||
| "are still alive (pid=%s) — reporting 'leaked' so finalize snapshots " | ||
| "the run context. Issue #1025.", | ||
| sum(1 for t in threads if t is not None and t.is_alive()), | ||
| process.pid, | ||
| ) | ||
| return "leaked" | ||
| return "completed" | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The snapshot trigger misses the
The correct "a reader may still mutate ctx" signal is |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The snapshot doesn't isolate
auth_abort_event/auth_abort_reason, which finalize reads off the still-live objects.replace(ctx, ...)only overridesraw_messages,response_parts,execution_log,verbose_output_lines,metadata. The new context keeps the sameauth_abort_event(Event) andauth_abort_reason(list) — both mutated by theread_stderr()reader thread (lines 438-439). On the leaked-reader path, finalize then readsctx.auth_abort_event.is_set()(line 737) andctx.auth_abort_reason[0](738) on those live objects.So if the leaked thread is the stderr reader and it matches an auth pattern in a late line after the snapshot but before line 737, an otherwise-successful run is converted to a spurious HTTP 503 — which
task_execution_service(SUB-003) treats as an auth failure and uses to auto-switch the subscription. Narrow, but real.At minimum this contradicts the helper's own docstring ("Snapshot EVERY field finalize reads"). Either snapshot the auth fields too, or document explicitly why they're exempt (as
parse_failure_count/_samplealready are).