From bc23675f13976466657fa54604b9ca1ca2a40146 Mon Sep 17 00:00:00 2001 From: Oleksii Dolhov Date: Fri, 12 Jun 2026 13:49:46 +0300 Subject: [PATCH] feat(loops): loop-level wall-clock deadline (max_duration_seconds) (#1156) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an optional total-duration bound to sequential agent loops, the third hard stop alongside the max_runs iteration cap. A loop legally configured today (max_runs=100 × timeout_per_run up to 2h + delays) could run for days; max_duration_seconds caps total wall-clock time. - Schema: agent_loops.max_duration_seconds INTEGER (nullable) + idempotent migration (agent_loops_max_duration). - Runner (loop_service): deadline measured from started_at, checked only at iteration boundaries (before the next run + before/after the inter-run delay, which is capped to the remaining budget). An in-flight run is never killed mid-turn — overshoot is bounded by one timeout_per_run. Expiry stops with stop_reason="deadline_exceeded", terminal status "stopped". - Router: optional max_duration_seconds (1..604800); 400 when smaller than the effective per-run timeout (timeout_per_run, else agent execution_timeout). GET /api/loops/{id} returns max_duration_seconds + computed elapsed_seconds. - MCP run_agent_loop + UI Loops form/detail expose the parameter and deadline. - Tests: deadline stop at boundary, in-flight run completes (not killed), delay capped to remaining budget, no-deadline regression, and the four validation paths. Docs: architecture.md (feature + schema) and requirements.md §38.2. Closes #1156 Co-Authored-By: Claude Opus 4.8 (1M context) --- docs/memory/architecture.md | 5 +- docs/memory/requirements.md | 30 ++++ src/backend/db/loops.py | 4 + src/backend/db/migrations.py | 18 +++ src/backend/db/schema.py | 1 + src/backend/db/tables.py | 1 + src/backend/routers/loops.py | 59 ++++++++ src/backend/services/loop_service.py | 34 ++++- src/frontend/src/components/LoopsPanel.vue | 32 +++++ src/mcp-server/src/client.ts | 1 + src/mcp-server/src/tools/loops.ts | 15 ++ tests/unit/test_loop_service.py | 155 +++++++++++++++++++++ tests/unit/test_loops_router_validation.py | 98 +++++++++++++ 13 files changed, 449 insertions(+), 4 deletions(-) create mode 100644 tests/unit/test_loops_router_validation.py diff --git a/docs/memory/architecture.md b/docs/memory/architecture.md index cd7f5d04..0045176d 100644 --- a/docs/memory/architecture.md +++ b/docs/memory/architecture.md @@ -387,7 +387,7 @@ Trigger-boundary dedup — policy in Architectural Invariant #18, table DDL unde ### Sequential Agent Loops (#740, UI #1106) -Bounded sequential task execution against one agent. Runner is an in-process `asyncio.Task` spawned by `loop_service.py`; each iteration dispatches through `task_execution_service.execute_task()` with `triggered_by="loop"` and the parent `loop_id` carried on the resulting `schedule_executions` row — iterations go through the standard `capacity_manager` admit/slot path, sharing the agent's `max_parallel_tasks` budget. Message template supports `{{run}}` and `{{previous_response}}`; `max_runs` 1–100 hard cap; optional `stop_signal` (until-mode), `delay_seconds`, `timeout_per_run`, `model`, `allowed_tools`. Stop is cooperative: `POST /api/loops/{id}/stop` flips an in-process `should_stop` flag; the current iteration finishes and the runner exits with `stop_reason="user_stopped"`. Restart recovery via the cleanup-service startup hook (above); no auto-resume. WS events `loop_run_completed`/`loop_completed`. +Bounded sequential task execution against one agent. Runner is an in-process `asyncio.Task` spawned by `loop_service.py`; each iteration dispatches through `task_execution_service.execute_task()` with `triggered_by="loop"` and the parent `loop_id` carried on the resulting `schedule_executions` row — iterations go through the standard `capacity_manager` admit/slot path, sharing the agent's `max_parallel_tasks` budget. Message template supports `{{run}}` and `{{previous_response}}`; `max_runs` 1–100 hard cap; optional `stop_signal` (until-mode), `delay_seconds`, `timeout_per_run`, `max_duration_seconds`, `model`, `allowed_tools`. Stop is cooperative: `POST /api/loops/{id}/stop` flips an in-process `should_stop` flag; the current iteration finishes and the runner exits with `stop_reason="user_stopped"`. **Wall-clock deadline (#1156):** optional `max_duration_seconds` (≤7 days) measured from `started_at`, checked only at iteration boundaries (before the next run and before/after the inter-run delay, which is capped to the remaining budget) — an in-flight run is never killed mid-turn, so overshoot is bounded by one `timeout_per_run`; expiry stops the loop with `stop_reason="deadline_exceeded"`. Rejected at create (400) when smaller than the effective per-run timeout (`timeout_per_run`, else the agent's `execution_timeout_seconds`). `GET /api/loops/{id}` returns `max_duration_seconds` + computed `elapsed_seconds`. Restart recovery via the cleanup-service startup hook (above); no auto-resume. WS events `loop_run_completed`/`loop_completed`. **Web UI (#1106):** a **Loops** tab on Agent Detail (`components/LoopsPanel.vue` + agent-scoped `stores/loops.js`; `setAgent(name)` on mount, `clear()` on unmount). The global WS handler routes the fleet-wide loop events to the store, which filters by mounted agent and targeted-refreshes only the affected loop; a 12s backstop poll runs while any loop is `queued`/`running` to recover a missed terminal event. Last full response rendered via `utils/markdown.js` (DOMPurify). @@ -930,11 +930,12 @@ CREATE TABLE agent_loops ( stop_signal TEXT, -- NULL = fixed mode; set = until mode delay_seconds INTEGER NOT NULL DEFAULT 0, timeout_per_run INTEGER, -- NULL = agent's execution_timeout_seconds + max_duration_seconds INTEGER, -- #1156: NULL = no wall-clock deadline (≤7d when set) model TEXT, allowed_tools TEXT, -- JSON array status TEXT NOT NULL, -- queued | running | completed | stopped | failed | interrupted runs_completed INTEGER NOT NULL DEFAULT 0, - stop_reason TEXT, -- max_runs_reached | stop_signal_matched | user_stopped | error | interrupted + stop_reason TEXT, -- max_runs_reached | stop_signal_matched | user_stopped | deadline_exceeded | error | interrupted last_response TEXT, error TEXT, started_by_user_id INTEGER, diff --git a/docs/memory/requirements.md b/docs/memory/requirements.md index 78d44834..dfd11e59 100644 --- a/docs/memory/requirements.md +++ b/docs/memory/requirements.md @@ -2777,6 +2777,36 @@ Standalone mobile-friendly admin page for managing agents on the go. Designed as auto-resume after restart; cross-agent loops (`agent` parameter is `"self"` only for v1, matching `fan_out`). +### 38.2 Loop-level wall-clock deadline (#1156) +- **Status**: ✅ Implemented +- **Implements**: Issue #1156 +- **Description**: A third hard stop alongside the `max_runs` iteration + cap and the (separately tracked) cost budget: an optional total + wall-clock deadline so a loop legally configured today (`max_runs=100` + × `timeout_per_run` up to 2h + `delay_seconds`) cannot run for days. +- **Parameter**: optional `max_duration_seconds` (int, 1 – 604800 = 7d; + NULL/omitted disables). Accepted on `POST /api/agents/{name}/loops`, + persisted on `agent_loops.max_duration_seconds`, exposed via the + `run_agent_loop` MCP tool. +- **Enforcement**: deadline measured from `started_at`; checked only at + iteration boundaries — before starting the next run and before/after + the inter-run delay (the `delay_seconds` sleep is capped to the + remaining budget, never sleeping past the deadline). An in-flight run + is never killed mid-turn, so actual overshoot is bounded by one + `timeout_per_run`. +- **Terminal state**: expiry stops the loop with terminal status + `stopped` and `stop_reason="deadline_exceeded"`. +- **Validation**: reject (400) `max_duration_seconds` smaller than the + effective per-run timeout (`timeout_per_run`, else the agent's + `execution_timeout_seconds`) — otherwise no iteration could finish + before the deadline. +- **Observability**: `GET /api/loops/{loop_id}` returns + `max_duration_seconds` and a computed `elapsed_seconds` (from + `started_at` to `completed_at` or now); the Loops UI shows the + deadline + elapsed when set. +- **Out of scope**: interrupting an in-flight run mid-turn; persisting + elapsed across a backend restart (loops do not auto-resume). + --- ## 39. VoIP Telephony (VOIP-001) diff --git a/src/backend/db/loops.py b/src/backend/db/loops.py index b4aa5d88..dc20d453 100644 --- a/src/backend/db/loops.py +++ b/src/backend/db/loops.py @@ -30,6 +30,7 @@ def _loop_row_to_dict(row) -> dict: "stop_signal": row["stop_signal"], "delay_seconds": row["delay_seconds"], "timeout_per_run": row["timeout_per_run"], + "max_duration_seconds": row["max_duration_seconds"], "model": row["model"], "allowed_tools": json.loads(row["allowed_tools"]) if row["allowed_tools"] else None, "status": row["status"], @@ -78,6 +79,7 @@ def create_loop( stop_signal: Optional[str] = None, delay_seconds: int = 0, timeout_per_run: Optional[int] = None, + max_duration_seconds: Optional[int] = None, model: Optional[str] = None, allowed_tools: Optional[List[str]] = None, started_by_user_id: Optional[int] = None, @@ -99,6 +101,7 @@ def create_loop( stop_signal=stop_signal, delay_seconds=delay_seconds, timeout_per_run=timeout_per_run, + max_duration_seconds=max_duration_seconds, model=model, allowed_tools=allowed_tools_json, status="queued", @@ -126,6 +129,7 @@ def create_loop( "stop_signal": stop_signal, "delay_seconds": delay_seconds, "timeout_per_run": timeout_per_run, + "max_duration_seconds": max_duration_seconds, "model": model, "allowed_tools": allowed_tools, "status": "queued", diff --git a/src/backend/db/migrations.py b/src/backend/db/migrations.py index c8a83d61..15152e5c 100644 --- a/src/backend/db/migrations.py +++ b/src/backend/db/migrations.py @@ -2402,6 +2402,23 @@ def _migrate_operator_queue_cleared_at(cursor, conn): conn.commit() +def _migrate_agent_loops_max_duration(cursor, conn): + """#1156 — loop-level wall-clock deadline. + + Adds `max_duration_seconds INTEGER` (NULL = no deadline) to `agent_loops`. + The runner stops the loop at the next iteration boundary once the deadline + measured from `started_at` is exceeded (stop_reason='deadline_exceeded'), + bounding total loop duration alongside the existing `max_runs` cap. + """ + _safe_add_column( + cursor, + "agent_loops", + "max_duration_seconds", + "ALTER TABLE agent_loops ADD COLUMN max_duration_seconds INTEGER", + ) + conn.commit() + + MIGRATIONS = [ ("agent_sharing", _migrate_agent_sharing_table), ("schedule_executions_observability", _migrate_schedule_executions_observability), @@ -2475,4 +2492,5 @@ def _migrate_operator_queue_cleared_at(cursor, conn): ("agent_ownership_circuit_breaker", _migrate_agent_ownership_circuit_breaker), ("voip_tables", _migrate_voip_tables), ("operator_queue_cleared_at", _migrate_operator_queue_cleared_at), + ("agent_loops_max_duration", _migrate_agent_loops_max_duration), ] diff --git a/src/backend/db/schema.py b/src/backend/db/schema.py index 35ee0600..66e97940 100644 --- a/src/backend/db/schema.py +++ b/src/backend/db/schema.py @@ -247,6 +247,7 @@ stop_signal TEXT, delay_seconds INTEGER NOT NULL DEFAULT 0, timeout_per_run INTEGER, + max_duration_seconds INTEGER, model TEXT, allowed_tools TEXT, status TEXT NOT NULL, diff --git a/src/backend/db/tables.py b/src/backend/db/tables.py index 774426e7..c7baccbf 100644 --- a/src/backend/db/tables.py +++ b/src/backend/db/tables.py @@ -232,6 +232,7 @@ def process_bind_param(self, value, dialect): Column("stop_signal", Text), Column("delay_seconds", Integer), Column("timeout_per_run", Integer), + Column("max_duration_seconds", Integer), # #1156 — wall-clock deadline Column("model", Text), Column("allowed_tools", Text), Column("status", Text), diff --git a/src/backend/routers/loops.py b/src/backend/routers/loops.py index 86f14cf4..cb03995b 100644 --- a/src/backend/routers/loops.py +++ b/src/backend/routers/loops.py @@ -8,6 +8,7 @@ """ import logging +from datetime import datetime, timezone from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException, Header @@ -36,6 +37,10 @@ MAX_DELAY_SECONDS = 3600 MAX_TIMEOUT_PER_RUN = 7200 MAX_STOP_SIGNAL_LEN = 200 +MAX_DURATION_SECONDS = 604_800 # 7 days — hard ceiling on the wall-clock deadline +# Fallback per-run timeout used for deadline validation when neither +# timeout_per_run nor an agent-specific timeout is available. +DEFAULT_PER_RUN_TIMEOUT = 3600 class StartLoopRequest(BaseModel): @@ -44,6 +49,10 @@ class StartLoopRequest(BaseModel): stop_signal: Optional[str] = Field(default=None, max_length=MAX_STOP_SIGNAL_LEN) delay_seconds: int = Field(default=0, ge=0, le=MAX_DELAY_SECONDS) timeout_per_run: Optional[int] = Field(default=None, ge=10, le=MAX_TIMEOUT_PER_RUN) + # #1156: optional loop-level wall-clock deadline. NULL = unbounded + # (max_runs is still the hard stop). Lower bound vs the per-run timeout + # is validated in the endpoint (needs the agent's configured timeout). + max_duration_seconds: Optional[int] = Field(default=None, ge=1, le=MAX_DURATION_SECONDS) model: Optional[str] = None allowed_tools: Optional[List[str]] = None @@ -88,6 +97,10 @@ class LoopStatusResponse(BaseModel): created_at: str started_at: Optional[str] = None completed_at: Optional[str] = None + # #1156: wall-clock deadline (NULL = unbounded) + elapsed since started_at + # (frozen at completed_at once terminal). Both NULL before the loop runs. + max_duration_seconds: Optional[int] = None + elapsed_seconds: Optional[int] = None class StopLoopResponse(BaseModel): @@ -102,6 +115,29 @@ class StopLoopResponse(BaseModel): RESPONSE_PREVIEW_CHARS = 500 +def _parse_iso(ts: Optional[str]) -> Optional[datetime]: + """Parse a utc_now_iso() timestamp (ISO-Z) to an aware UTC datetime.""" + if not ts: + return None + try: + return datetime.fromisoformat(ts.replace("Z", "+00:00")) + except ValueError: + return None + + +def _elapsed_seconds(loop: dict) -> Optional[int]: + """Whole seconds from started_at to completed_at (terminal) or now. + + None until the loop has started. Powers the GET deadline/elapsed view + (#1156) so operators can see how close a running loop is to its bound. + """ + started = _parse_iso(loop.get("started_at")) + if started is None: + return None + end = _parse_iso(loop.get("completed_at")) or datetime.now(timezone.utc) + return max(0, int((end - started).total_seconds())) + + def _build_status_response(loop: dict) -> LoopStatusResponse: runs_raw = db.list_loop_runs(loop["id"]) runs: List[LoopRunResponse] = [] @@ -133,6 +169,8 @@ def _build_status_response(loop: dict) -> LoopStatusResponse: created_at=loop["created_at"], started_at=loop["started_at"], completed_at=loop["completed_at"], + max_duration_seconds=loop.get("max_duration_seconds"), + elapsed_seconds=_elapsed_seconds(loop), ) @@ -162,6 +200,26 @@ async def start_loop( x_mcp_key_name: Optional[str] = Header(None), ): """Start a sequential agent loop; return loop_id immediately (202).""" + # #1156: a deadline shorter than a single run can never let even one + # iteration finish — reject it. Compare against the effective per-run + # timeout (explicit override, else the agent's configured timeout). + if payload.max_duration_seconds is not None: + effective_per_run = payload.timeout_per_run + if effective_per_run is None: + try: + effective_per_run = db.get_execution_timeout(name) + except Exception: + effective_per_run = DEFAULT_PER_RUN_TIMEOUT + if payload.max_duration_seconds < effective_per_run: + raise HTTPException( + status_code=400, + detail=( + f"max_duration_seconds ({payload.max_duration_seconds}s) must be " + f">= the per-run timeout ({effective_per_run}s); otherwise no " + f"iteration could complete before the deadline." + ), + ) + service = get_loop_service() loop_row = await service.start_loop( agent_name=name, @@ -170,6 +228,7 @@ async def start_loop( stop_signal=payload.stop_signal, delay_seconds=payload.delay_seconds, timeout_per_run=payload.timeout_per_run, + max_duration_seconds=payload.max_duration_seconds, model=payload.model, allowed_tools=payload.allowed_tools, started_by_user_id=current_user.id, diff --git a/src/backend/services/loop_service.py b/src/backend/services/loop_service.py index 871f6afb..3ceb65f6 100644 --- a/src/backend/services/loop_service.py +++ b/src/backend/services/loop_service.py @@ -28,7 +28,7 @@ import json import logging from dataclasses import dataclass, field -from datetime import datetime +from datetime import datetime, timedelta from typing import Optional from database import db @@ -97,6 +97,7 @@ async def start_loop( stop_signal: Optional[str] = None, delay_seconds: int = 0, timeout_per_run: Optional[int] = None, + max_duration_seconds: Optional[int] = None, model: Optional[str] = None, allowed_tools: Optional[list] = None, started_by_user_id: Optional[int] = None, @@ -116,6 +117,7 @@ async def start_loop( stop_signal=stop_signal, delay_seconds=delay_seconds, timeout_per_run=timeout_per_run, + max_duration_seconds=max_duration_seconds, model=model, allowed_tools=allowed_tools, started_by_user_id=started_by_user_id, @@ -186,6 +188,17 @@ async def _run(self, loop_id: str) -> None: stop_reason = "max_runs_reached" terminal_error: Optional[str] = None + # #1156: optional wall-clock deadline measured from loop start + # (≈ started_at, just stamped by mark_loop_running above). NULL/0 + # disables. Enforced only at iteration boundaries, so an in-flight + # run is never killed mid-turn — overshoot is bounded by one + # timeout_per_run. + max_duration = loop.get("max_duration_seconds") + deadline = ( + datetime.utcnow() + timedelta(seconds=max_duration) + if max_duration else None + ) + try: for run_number in range(1, loop["max_runs"] + 1): # Cooperative stop check BEFORE starting the next iteration. @@ -196,6 +209,12 @@ async def _run(self, loop_id: str) -> None: stop_reason = "user_stopped" break + # #1156: deadline check at the iteration boundary. + if deadline is not None and datetime.utcnow() >= deadline: + terminal_status = "stopped" + stop_reason = "deadline_exceeded" + break + rendered = _render_template( loop["message_template"], run_number, previous_response, ) @@ -300,8 +319,19 @@ async def _run(self, loop_id: str) -> None: # Inter-run delay — also a stop point. if loop["delay_seconds"] and run_number < loop["max_runs"]: + sleep_for = loop["delay_seconds"] + # #1156: never sleep past the deadline — cap the delay to + # the remaining budget; the next boundary check then stops + # the loop with deadline_exceeded. + if deadline is not None: + remaining = (deadline - datetime.utcnow()).total_seconds() + if remaining <= 0: + terminal_status = "stopped" + stop_reason = "deadline_exceeded" + break + sleep_for = min(sleep_for, remaining) try: - await asyncio.sleep(loop["delay_seconds"]) + await asyncio.sleep(sleep_for) except asyncio.CancelledError: terminal_status = "stopped" stop_reason = "user_stopped" diff --git a/src/frontend/src/components/LoopsPanel.vue b/src/frontend/src/components/LoopsPanel.vue index 29ff9801..d2101e6c 100644 --- a/src/frontend/src/components/LoopsPanel.vue +++ b/src/frontend/src/components/LoopsPanel.vue @@ -89,6 +89,20 @@ class="w-full px-3 py-2 border border-gray-300 dark:border-gray-600 dark:bg-gray-700 dark:text-white rounded-md focus:outline-none focus:ring-2 focus:ring-action-primary-500" /> + + +
+ + +

Total wall-clock limit; checked between runs. Must be ≥ the per-run timeout.

+
@@ -224,6 +238,12 @@ {{ loop.error }} + +
+ Deadline: + {{ formatSeconds(loop.elapsed_seconds) }} / {{ formatSeconds(loop.max_duration_seconds) }} elapsed +
+

Runs

@@ -301,6 +321,7 @@ const defaultForm = () => ({ stop_signal: '', delay_seconds: 0, timeout_per_run: null, + max_duration_seconds: null, model: '', allowed_tools: null, }) @@ -354,6 +375,7 @@ async function submit() { if (form.stop_signal && form.stop_signal.trim()) payload.stop_signal = form.stop_signal.trim() if (form.delay_seconds) payload.delay_seconds = form.delay_seconds if (form.timeout_per_run) payload.timeout_per_run = form.timeout_per_run + if (form.max_duration_seconds) payload.max_duration_seconds = form.max_duration_seconds if (form.model) payload.model = form.model if (form.allowed_tools !== null) payload.allowed_tools = form.allowed_tools @@ -397,12 +419,22 @@ function formatStopReason(reason) { max_runs_reached: 'reached max runs', stop_signal_matched: 'stop signal matched', user_stopped: 'stopped by user', + deadline_exceeded: 'deadline exceeded', error: 'error', interrupted: 'interrupted', } return map[reason] || reason } +function formatSeconds(secs) { + if (secs === null || secs === undefined) return '—' + if (secs < 60) return `${secs}s` + if (secs < 3600) return `${Math.floor(secs / 60)}m ${secs % 60}s` + const h = Math.floor(secs / 3600) + const m = Math.floor((secs % 3600) / 60) + return `${h}h ${m}m` +} + function formatCost(cost) { if (cost === null || cost === undefined) return '—' return `$${cost.toFixed(4)}` diff --git a/src/mcp-server/src/client.ts b/src/mcp-server/src/client.ts index 877e6cb0..7cf6461a 100644 --- a/src/mcp-server/src/client.ts +++ b/src/mcp-server/src/client.ts @@ -1778,6 +1778,7 @@ export class TrinityClient { stop_signal?: string; delay_seconds?: number; timeout_per_run?: number; + max_duration_seconds?: number; model?: string; allowed_tools?: string[]; } diff --git a/src/mcp-server/src/tools/loops.ts b/src/mcp-server/src/tools/loops.ts index 8b5c8cae..a331f927 100644 --- a/src/mcp-server/src/tools/loops.ts +++ b/src/mcp-server/src/tools/loops.ts @@ -108,6 +108,19 @@ export function createLoopTools( .describe( "Per-iteration timeout in seconds (defaults to agent's configured execution_timeout_seconds)." ), + max_duration_seconds: z + .number() + .int() + .min(1) + .max(604_800) + .optional() + .describe( + "Optional loop-level wall-clock deadline in seconds (1–604800 = up to 7 days). " + + "Checked at each iteration boundary; an in-flight run is never killed mid-turn. " + + "When the deadline passes the loop stops with stop_reason='deadline_exceeded'. " + + "Must be >= timeout_per_run (or the agent's execution timeout when unset). " + + "Omit for no time bound (max_runs still applies)." + ), model: z .string() .optional() @@ -125,6 +138,7 @@ export function createLoopTools( stop_signal?: string; delay_seconds?: number; timeout_per_run?: number; + max_duration_seconds?: number; model?: string; allowed_tools?: string[]; }, @@ -146,6 +160,7 @@ export function createLoopTools( stop_signal: params.stop_signal, delay_seconds: params.delay_seconds, timeout_per_run: params.timeout_per_run, + max_duration_seconds: params.max_duration_seconds, model: params.model, allowed_tools: params.allowed_tools, }); diff --git a/tests/unit/test_loop_service.py b/tests/unit/test_loop_service.py index 76afda4a..b9352229 100644 --- a/tests/unit/test_loop_service.py +++ b/tests/unit/test_loop_service.py @@ -16,6 +16,7 @@ import asyncio import sys from dataclasses import dataclass, field +from datetime import datetime, timedelta from pathlib import Path from typing import Any, Optional from unittest.mock import AsyncMock, MagicMock @@ -517,6 +518,160 @@ def test_orphan_sweep_idempotent(self, loop_module): assert db.mark_orphan_loops_interrupted() == 0 +# --------------------------------------------------------------------------- +# Runner — wall-clock deadline (#1156) +# --------------------------------------------------------------------------- + +class _FakeClock: + """Controllable stand-in for ``datetime`` inside loop_service. + + Only ``utcnow()`` is exercised by the runner; it returns the current + fake instant. Tests advance ``now`` (directly or via a task that bumps + it each run) to drive the deadline deterministically — no real sleeping. + """ + now = datetime(2026, 1, 1, 0, 0, 0) + + @classmethod + def utcnow(cls): + return cls.now + + +class TestDeadline: + def _install_clock(self, ls, monkeypatch, *, advance_per_run: float): + """Swap in the fake clock; each execute_task advances it.""" + _FakeClock.now = datetime(2026, 1, 1, 0, 0, 0) + monkeypatch.setattr(ls, "datetime", _FakeClock) + + async def _exec(**kwargs): + ts = self._ts + ts.calls.append(kwargs) + result = ts.results[ts._idx] if ts._idx < len(ts.results) else _Result() + ts._idx += 1 + _FakeClock.now = _FakeClock.now + timedelta(seconds=advance_per_run) + return result + + return _exec + + def test_deadline_stops_loop_at_boundary(self, loop_module, monkeypatch): + ls, db, ts = loop_module + self._ts = ts + ts.results = [_Result(response=f"r{i}") for i in range(1, 6)] + ts.execute_task = self._install_clock(ls, monkeypatch, advance_per_run=6) + + async def go(): + service = ls.LoopService() + row = await service.start_loop( + agent_name="a1", + message_template="m", + max_runs=5, + max_duration_seconds=10, # ~1.6 runs fit before the deadline + ) + handle = service._handles.get(row["id"]) + if handle is not None: + await handle.task + return row["id"] + + loop_id = _run(go()) + loop = db.get_loop(loop_id) + assert loop["status"] == "stopped" + assert loop["stop_reason"] == "deadline_exceeded" + # Run 1 (t0→6) and run 2 (t6→12) both started before the deadline; the + # boundary check before run 3 (t12 ≥ 10) trips. max_runs never reached. + assert loop["runs_completed"] == 2 + assert len(ts.calls) == 2 + + def test_in_flight_run_is_not_killed_mid_turn(self, loop_module, monkeypatch): + ls, db, ts = loop_module + self._ts = ts + ts.results = [_Result(response="done-run")] + # One run pushes the clock well past the deadline; that run must still + # finalize as completed (deadline is enforced only at the boundary). + ts.execute_task = self._install_clock(ls, monkeypatch, advance_per_run=999) + + async def go(): + service = ls.LoopService() + row = await service.start_loop( + agent_name="a1", + message_template="m", + max_runs=5, + max_duration_seconds=10, + ) + handle = service._handles.get(row["id"]) + if handle is not None: + await handle.task + return row["id"] + + loop_id = _run(go()) + loop = db.get_loop(loop_id) + assert loop["stop_reason"] == "deadline_exceeded" + assert loop["runs_completed"] == 1 # the in-flight run completed + runs = db.list_loop_runs(loop_id) + assert runs[0]["status"] == "completed" + assert runs[0]["response"] == "done-run" + + def test_delay_does_not_sleep_past_deadline(self, loop_module, monkeypatch): + ls, db, ts = loop_module + self._ts = ts + ts.results = [_Result(response="r1"), _Result(response="r2")] + ts.execute_task = self._install_clock(ls, monkeypatch, advance_per_run=3) + + # Capture sleep durations and advance the fake clock by them instead + # of really sleeping. + slept: list[float] = [] + + async def _fake_sleep(secs): + slept.append(secs) + _FakeClock.now = _FakeClock.now + timedelta(seconds=secs) + + monkeypatch.setattr(ls.asyncio, "sleep", _fake_sleep) + + async def go(): + service = ls.LoopService() + row = await service.start_loop( + agent_name="a1", + message_template="m", + max_runs=5, + delay_seconds=100, # would blow way past the deadline + max_duration_seconds=10, + ) + handle = service._handles.get(row["id"]) + if handle is not None: + await handle.task + return row["id"] + + loop_id = _run(go()) + loop = db.get_loop(loop_id) + assert loop["stop_reason"] == "deadline_exceeded" + # run1 t0→3, then delay capped to remaining (10−3=7), not the full 100. + assert slept == [7] + + def test_no_deadline_runs_all_when_unset(self, loop_module, monkeypatch): + ls, db, ts = loop_module + self._ts = ts + ts.results = [_Result(response=f"r{i}") for i in range(1, 4)] + # Clock jumps far each run; with no deadline it must be ignored. + ts.execute_task = self._install_clock(ls, monkeypatch, advance_per_run=10_000) + + async def go(): + service = ls.LoopService() + row = await service.start_loop( + agent_name="a1", + message_template="m", + max_runs=3, + max_duration_seconds=None, + ) + handle = service._handles.get(row["id"]) + if handle is not None: + await handle.task + return row["id"] + + loop_id = _run(go()) + loop = db.get_loop(loop_id) + assert loop["status"] == "completed" + assert loop["stop_reason"] == "max_runs_reached" + assert loop["runs_completed"] == 3 + + # --------------------------------------------------------------------------- # get_status # --------------------------------------------------------------------------- diff --git a/tests/unit/test_loops_router_validation.py b/tests/unit/test_loops_router_validation.py new file mode 100644 index 00000000..b148e81b --- /dev/null +++ b/tests/unit/test_loops_router_validation.py @@ -0,0 +1,98 @@ +"""Validation tests for the loops router — max_duration_seconds deadline (#1156). + +Calls the ``start_loop`` endpoint coroutine directly with mocked auth/db/service +so the cross-field validation (deadline must be >= the effective per-run +timeout) can be exercised without a live FastAPI app or database. +""" + +from __future__ import annotations + +import sys +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +_BACKEND = Path(__file__).resolve().parent.parent.parent / "src" / "backend" +_BACKEND_STR = str(_BACKEND) +while _BACKEND_STR in sys.path: + sys.path.remove(_BACKEND_STR) +sys.path.insert(0, _BACKEND_STR) + +pytestmark = pytest.mark.unit + + +def _load_router(monkeypatch): + from routers import loops as loops_router + + fake_db = MagicMock() + fake_db.get_execution_timeout.return_value = 900 # agent default for tests + monkeypatch.setattr(loops_router, "db", fake_db) + + fake_service = MagicMock() + fake_service.start_loop = AsyncMock( + return_value={"id": "loop_x", "status": "queued"} + ) + monkeypatch.setattr(loops_router, "get_loop_service", lambda: fake_service) + + return loops_router, fake_db, fake_service + + +def _user(): + u = MagicMock() + u.id = 1 + u.email = "u@example.com" + return u + + +async def _call(loops_router, payload): + return await loops_router.start_loop( + payload=payload, + name="a1", + current_user=_user(), + x_source_agent=None, + x_mcp_key_id=None, + x_mcp_key_name=None, + ) + + +class TestMaxDurationValidation: + def test_rejects_deadline_below_explicit_timeout_per_run(self, monkeypatch): + loops_router, _, service = _load_router(monkeypatch) + payload = loops_router.StartLoopRequest( + message="m", max_runs=5, timeout_per_run=600, max_duration_seconds=300, + ) + with pytest.raises(loops_router.HTTPException) as exc: + __import__("asyncio").run(_call(loops_router, payload)) + assert exc.value.status_code == 400 + assert "max_duration_seconds" in exc.value.detail + service.start_loop.assert_not_called() + + def test_rejects_deadline_below_agent_timeout_when_per_run_unset(self, monkeypatch): + loops_router, fake_db, service = _load_router(monkeypatch) + payload = loops_router.StartLoopRequest( + message="m", max_runs=5, max_duration_seconds=100, # < 900 agent default + ) + with pytest.raises(loops_router.HTTPException) as exc: + __import__("asyncio").run(_call(loops_router, payload)) + assert exc.value.status_code == 400 + fake_db.get_execution_timeout.assert_called_once_with("a1") + service.start_loop.assert_not_called() + + def test_accepts_deadline_at_or_above_timeout(self, monkeypatch): + loops_router, _, service = _load_router(monkeypatch) + payload = loops_router.StartLoopRequest( + message="m", max_runs=5, timeout_per_run=600, max_duration_seconds=600, + ) + resp = __import__("asyncio").run(_call(loops_router, payload)) + assert resp.loop_id == "loop_x" + # deadline threaded through to the service + assert service.start_loop.await_args.kwargs["max_duration_seconds"] == 600 + + def test_no_deadline_skips_validation(self, monkeypatch): + loops_router, fake_db, service = _load_router(monkeypatch) + payload = loops_router.StartLoopRequest(message="m", max_runs=5) + resp = __import__("asyncio").run(_call(loops_router, payload)) + assert resp.loop_id == "loop_x" + fake_db.get_execution_timeout.assert_not_called() + assert service.start_loop.await_args.kwargs["max_duration_seconds"] is None