Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/memory/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ Trigger-boundary dedup — policy in Architectural Invariant #18, table DDL unde

### Sequential Agent Loops (#740, UI #1106)

Bounded sequential task execution against one agent. Runner is an in-process `asyncio.Task` spawned by `loop_service.py`; each iteration dispatches through `task_execution_service.execute_task()` with `triggered_by="loop"` and the parent `loop_id` carried on the resulting `schedule_executions` row — iterations go through the standard `capacity_manager` admit/slot path, sharing the agent's `max_parallel_tasks` budget. Message template supports `{{run}}` and `{{previous_response}}`; `max_runs` 1–100 hard cap; optional `stop_signal` (until-mode), `delay_seconds`, `timeout_per_run`, `model`, `allowed_tools`. Stop is cooperative: `POST /api/loops/{id}/stop` flips an in-process `should_stop` flag; the current iteration finishes and the runner exits with `stop_reason="user_stopped"`. Restart recovery via the cleanup-service startup hook (above); no auto-resume. WS events `loop_run_completed`/`loop_completed`.
Bounded sequential task execution against one agent. Runner is an in-process `asyncio.Task` spawned by `loop_service.py`; each iteration dispatches through `task_execution_service.execute_task()` with `triggered_by="loop"` and the parent `loop_id` carried on the resulting `schedule_executions` row — iterations go through the standard `capacity_manager` admit/slot path, sharing the agent's `max_parallel_tasks` budget. Message template supports `{{run}}` and `{{previous_response}}`; `max_runs` 1–100 hard cap; optional `stop_signal` (until-mode), `delay_seconds`, `timeout_per_run`, `max_duration_seconds`, `model`, `allowed_tools`. Stop is cooperative: `POST /api/loops/{id}/stop` flips an in-process `should_stop` flag; the current iteration finishes and the runner exits with `stop_reason="user_stopped"`. **Wall-clock deadline (#1156):** optional `max_duration_seconds` (≤7 days) measured from `started_at`, checked only at iteration boundaries (before the next run and before/after the inter-run delay, which is capped to the remaining budget) — an in-flight run is never killed mid-turn, so overshoot is bounded by one `timeout_per_run`; expiry stops the loop with `stop_reason="deadline_exceeded"`. Rejected at create (400) when smaller than the effective per-run timeout (`timeout_per_run`, else the agent's `execution_timeout_seconds`). `GET /api/loops/{id}` returns `max_duration_seconds` + computed `elapsed_seconds`. Restart recovery via the cleanup-service startup hook (above); no auto-resume. WS events `loop_run_completed`/`loop_completed`.

**Web UI (#1106):** a **Loops** tab on Agent Detail (`components/LoopsPanel.vue` + agent-scoped `stores/loops.js`; `setAgent(name)` on mount, `clear()` on unmount). The global WS handler routes the fleet-wide loop events to the store, which filters by mounted agent and targeted-refreshes only the affected loop; a 12s backstop poll runs while any loop is `queued`/`running` to recover a missed terminal event. Last full response rendered via `utils/markdown.js` (DOMPurify).

Expand Down Expand Up @@ -930,11 +930,12 @@ CREATE TABLE agent_loops (
stop_signal TEXT, -- NULL = fixed mode; set = until mode
delay_seconds INTEGER NOT NULL DEFAULT 0,
timeout_per_run INTEGER, -- NULL = agent's execution_timeout_seconds
max_duration_seconds INTEGER, -- #1156: NULL = no wall-clock deadline (≤7d when set)
model TEXT,
allowed_tools TEXT, -- JSON array
status TEXT NOT NULL, -- queued | running | completed | stopped | failed | interrupted
runs_completed INTEGER NOT NULL DEFAULT 0,
stop_reason TEXT, -- max_runs_reached | stop_signal_matched | user_stopped | error | interrupted
stop_reason TEXT, -- max_runs_reached | stop_signal_matched | user_stopped | deadline_exceeded | error | interrupted
last_response TEXT,
error TEXT,
started_by_user_id INTEGER,
Expand Down
30 changes: 30 additions & 0 deletions docs/memory/requirements.md
Original file line number Diff line number Diff line change
Expand Up @@ -2777,6 +2777,36 @@ Standalone mobile-friendly admin page for managing agents on the go. Designed as
auto-resume after restart; cross-agent loops (`agent` parameter
is `"self"` only for v1, matching `fan_out`).

### 38.2 Loop-level wall-clock deadline (#1156)
- **Status**: ✅ Implemented
- **Implements**: Issue #1156
- **Description**: A third hard stop alongside the `max_runs` iteration
cap and the (separately tracked) cost budget: an optional total
wall-clock deadline so a loop legally configured today (`max_runs=100`
× `timeout_per_run` up to 2h + `delay_seconds`) cannot run for days.
- **Parameter**: optional `max_duration_seconds` (int, 1 – 604800 = 7d;
NULL/omitted disables). Accepted on `POST /api/agents/{name}/loops`,
persisted on `agent_loops.max_duration_seconds`, exposed via the
`run_agent_loop` MCP tool.
- **Enforcement**: deadline measured from `started_at`; checked only at
iteration boundaries — before starting the next run and before/after
the inter-run delay (the `delay_seconds` sleep is capped to the
remaining budget, never sleeping past the deadline). An in-flight run
is never killed mid-turn, so actual overshoot is bounded by one
`timeout_per_run`.
- **Terminal state**: expiry stops the loop with terminal status
`stopped` and `stop_reason="deadline_exceeded"`.
- **Validation**: reject (400) `max_duration_seconds` smaller than the
effective per-run timeout (`timeout_per_run`, else the agent's
`execution_timeout_seconds`) — otherwise no iteration could finish
before the deadline.
- **Observability**: `GET /api/loops/{loop_id}` returns
`max_duration_seconds` and a computed `elapsed_seconds` (from
`started_at` to `completed_at` or now); the Loops UI shows the
deadline + elapsed when set.
- **Out of scope**: interrupting an in-flight run mid-turn; persisting
elapsed across a backend restart (loops do not auto-resume).

---

## 39. VoIP Telephony (VOIP-001)
Expand Down
4 changes: 4 additions & 0 deletions src/backend/db/loops.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def _loop_row_to_dict(row) -> dict:
"stop_signal": row["stop_signal"],
"delay_seconds": row["delay_seconds"],
"timeout_per_run": row["timeout_per_run"],
"max_duration_seconds": row["max_duration_seconds"],
"model": row["model"],
"allowed_tools": json.loads(row["allowed_tools"]) if row["allowed_tools"] else None,
"status": row["status"],
Expand Down Expand Up @@ -78,6 +79,7 @@ def create_loop(
stop_signal: Optional[str] = None,
delay_seconds: int = 0,
timeout_per_run: Optional[int] = None,
max_duration_seconds: Optional[int] = None,
model: Optional[str] = None,
allowed_tools: Optional[List[str]] = None,
started_by_user_id: Optional[int] = None,
Expand All @@ -99,6 +101,7 @@ def create_loop(
stop_signal=stop_signal,
delay_seconds=delay_seconds,
timeout_per_run=timeout_per_run,
max_duration_seconds=max_duration_seconds,
model=model,
allowed_tools=allowed_tools_json,
status="queued",
Expand Down Expand Up @@ -126,6 +129,7 @@ def create_loop(
"stop_signal": stop_signal,
"delay_seconds": delay_seconds,
"timeout_per_run": timeout_per_run,
"max_duration_seconds": max_duration_seconds,
"model": model,
"allowed_tools": allowed_tools,
"status": "queued",
Expand Down
18 changes: 18 additions & 0 deletions src/backend/db/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2402,6 +2402,23 @@ def _migrate_operator_queue_cleared_at(cursor, conn):
conn.commit()


def _migrate_agent_loops_max_duration(cursor, conn):
"""#1156 — loop-level wall-clock deadline.

Adds `max_duration_seconds INTEGER` (NULL = no deadline) to `agent_loops`.
The runner stops the loop at the next iteration boundary once the deadline
measured from `started_at` is exceeded (stop_reason='deadline_exceeded'),
bounding total loop duration alongside the existing `max_runs` cap.
"""
_safe_add_column(
cursor,
"agent_loops",
"max_duration_seconds",
"ALTER TABLE agent_loops ADD COLUMN max_duration_seconds INTEGER",
)
conn.commit()


MIGRATIONS = [
("agent_sharing", _migrate_agent_sharing_table),
("schedule_executions_observability", _migrate_schedule_executions_observability),
Expand Down Expand Up @@ -2475,4 +2492,5 @@ def _migrate_operator_queue_cleared_at(cursor, conn):
("agent_ownership_circuit_breaker", _migrate_agent_ownership_circuit_breaker),
("voip_tables", _migrate_voip_tables),
("operator_queue_cleared_at", _migrate_operator_queue_cleared_at),
("agent_loops_max_duration", _migrate_agent_loops_max_duration),
]
1 change: 1 addition & 0 deletions src/backend/db/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@
stop_signal TEXT,
delay_seconds INTEGER NOT NULL DEFAULT 0,
timeout_per_run INTEGER,
max_duration_seconds INTEGER,
model TEXT,
allowed_tools TEXT,
status TEXT NOT NULL,
Expand Down
1 change: 1 addition & 0 deletions src/backend/db/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ def process_bind_param(self, value, dialect):
Column("stop_signal", Text),
Column("delay_seconds", Integer),
Column("timeout_per_run", Integer),
Column("max_duration_seconds", Integer), # #1156 — wall-clock deadline
Column("model", Text),
Column("allowed_tools", Text),
Column("status", Text),
Expand Down
59 changes: 59 additions & 0 deletions src/backend/routers/loops.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"""

import logging
from datetime import datetime, timezone
from typing import List, Optional

from fastapi import APIRouter, Depends, HTTPException, Header
Expand Down Expand Up @@ -36,6 +37,10 @@
MAX_DELAY_SECONDS = 3600
MAX_TIMEOUT_PER_RUN = 7200
MAX_STOP_SIGNAL_LEN = 200
MAX_DURATION_SECONDS = 604_800 # 7 days — hard ceiling on the wall-clock deadline
# Fallback per-run timeout used for deadline validation when neither
# timeout_per_run nor an agent-specific timeout is available.
DEFAULT_PER_RUN_TIMEOUT = 3600


class StartLoopRequest(BaseModel):
Expand All @@ -44,6 +49,10 @@ class StartLoopRequest(BaseModel):
stop_signal: Optional[str] = Field(default=None, max_length=MAX_STOP_SIGNAL_LEN)
delay_seconds: int = Field(default=0, ge=0, le=MAX_DELAY_SECONDS)
timeout_per_run: Optional[int] = Field(default=None, ge=10, le=MAX_TIMEOUT_PER_RUN)
# #1156: optional loop-level wall-clock deadline. NULL = unbounded
# (max_runs is still the hard stop). Lower bound vs the per-run timeout
# is validated in the endpoint (needs the agent's configured timeout).
max_duration_seconds: Optional[int] = Field(default=None, ge=1, le=MAX_DURATION_SECONDS)
model: Optional[str] = None
allowed_tools: Optional[List[str]] = None

Expand Down Expand Up @@ -88,6 +97,10 @@ class LoopStatusResponse(BaseModel):
created_at: str
started_at: Optional[str] = None
completed_at: Optional[str] = None
# #1156: wall-clock deadline (NULL = unbounded) + elapsed since started_at
# (frozen at completed_at once terminal). Both NULL before the loop runs.
max_duration_seconds: Optional[int] = None
elapsed_seconds: Optional[int] = None


class StopLoopResponse(BaseModel):
Expand All @@ -102,6 +115,29 @@ class StopLoopResponse(BaseModel):
RESPONSE_PREVIEW_CHARS = 500


def _parse_iso(ts: Optional[str]) -> Optional[datetime]:
"""Parse a utc_now_iso() timestamp (ISO-Z) to an aware UTC datetime."""
if not ts:
return None
try:
return datetime.fromisoformat(ts.replace("Z", "+00:00"))
except ValueError:
return None


def _elapsed_seconds(loop: dict) -> Optional[int]:
"""Whole seconds from started_at to completed_at (terminal) or now.

None until the loop has started. Powers the GET deadline/elapsed view
(#1156) so operators can see how close a running loop is to its bound.
"""
started = _parse_iso(loop.get("started_at"))
if started is None:
return None
end = _parse_iso(loop.get("completed_at")) or datetime.now(timezone.utc)
return max(0, int((end - started).total_seconds()))


def _build_status_response(loop: dict) -> LoopStatusResponse:
runs_raw = db.list_loop_runs(loop["id"])
runs: List[LoopRunResponse] = []
Expand Down Expand Up @@ -133,6 +169,8 @@ def _build_status_response(loop: dict) -> LoopStatusResponse:
created_at=loop["created_at"],
started_at=loop["started_at"],
completed_at=loop["completed_at"],
max_duration_seconds=loop.get("max_duration_seconds"),
elapsed_seconds=_elapsed_seconds(loop),
)


Expand Down Expand Up @@ -162,6 +200,26 @@ async def start_loop(
x_mcp_key_name: Optional[str] = Header(None),
):
"""Start a sequential agent loop; return loop_id immediately (202)."""
# #1156: a deadline shorter than a single run can never let even one
# iteration finish — reject it. Compare against the effective per-run
# timeout (explicit override, else the agent's configured timeout).
if payload.max_duration_seconds is not None:
effective_per_run = payload.timeout_per_run
if effective_per_run is None:
try:
effective_per_run = db.get_execution_timeout(name)
except Exception:
effective_per_run = DEFAULT_PER_RUN_TIMEOUT
if payload.max_duration_seconds < effective_per_run:
raise HTTPException(
status_code=400,
detail=(
f"max_duration_seconds ({payload.max_duration_seconds}s) must be "
f">= the per-run timeout ({effective_per_run}s); otherwise no "
f"iteration could complete before the deadline."
),
)

service = get_loop_service()
loop_row = await service.start_loop(
agent_name=name,
Expand All @@ -170,6 +228,7 @@ async def start_loop(
stop_signal=payload.stop_signal,
delay_seconds=payload.delay_seconds,
timeout_per_run=payload.timeout_per_run,
max_duration_seconds=payload.max_duration_seconds,
model=payload.model,
allowed_tools=payload.allowed_tools,
started_by_user_id=current_user.id,
Expand Down
34 changes: 32 additions & 2 deletions src/backend/services/loop_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime
from datetime import datetime, timedelta
from typing import Optional

from database import db
Expand Down Expand Up @@ -97,6 +97,7 @@ async def start_loop(
stop_signal: Optional[str] = None,
delay_seconds: int = 0,
timeout_per_run: Optional[int] = None,
max_duration_seconds: Optional[int] = None,
model: Optional[str] = None,
allowed_tools: Optional[list] = None,
started_by_user_id: Optional[int] = None,
Expand All @@ -116,6 +117,7 @@ async def start_loop(
stop_signal=stop_signal,
delay_seconds=delay_seconds,
timeout_per_run=timeout_per_run,
max_duration_seconds=max_duration_seconds,
model=model,
allowed_tools=allowed_tools,
started_by_user_id=started_by_user_id,
Expand Down Expand Up @@ -186,6 +188,17 @@ async def _run(self, loop_id: str) -> None:
stop_reason = "max_runs_reached"
terminal_error: Optional[str] = None

# #1156: optional wall-clock deadline measured from loop start
# (≈ started_at, just stamped by mark_loop_running above). NULL/0
# disables. Enforced only at iteration boundaries, so an in-flight
# run is never killed mid-turn — overshoot is bounded by one
# timeout_per_run.
max_duration = loop.get("max_duration_seconds")
deadline = (
datetime.utcnow() + timedelta(seconds=max_duration)
if max_duration else None
)

try:
for run_number in range(1, loop["max_runs"] + 1):
# Cooperative stop check BEFORE starting the next iteration.
Expand All @@ -196,6 +209,12 @@ async def _run(self, loop_id: str) -> None:
stop_reason = "user_stopped"
break

# #1156: deadline check at the iteration boundary.
if deadline is not None and datetime.utcnow() >= deadline:
terminal_status = "stopped"
stop_reason = "deadline_exceeded"
break

rendered = _render_template(
loop["message_template"], run_number, previous_response,
)
Expand Down Expand Up @@ -300,8 +319,19 @@ async def _run(self, loop_id: str) -> None:

# Inter-run delay — also a stop point.
if loop["delay_seconds"] and run_number < loop["max_runs"]:
sleep_for = loop["delay_seconds"]
# #1156: never sleep past the deadline — cap the delay to
# the remaining budget; the next boundary check then stops
# the loop with deadline_exceeded.
if deadline is not None:
remaining = (deadline - datetime.utcnow()).total_seconds()
if remaining <= 0:
terminal_status = "stopped"
stop_reason = "deadline_exceeded"
break
sleep_for = min(sleep_for, remaining)
try:
await asyncio.sleep(loop["delay_seconds"])
await asyncio.sleep(sleep_for)
except asyncio.CancelledError:
terminal_status = "stopped"
stop_reason = "user_stopped"
Expand Down
Loading
Loading