Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 106 additions & 35 deletions src/backend/services/subscription_auto_switch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
tests pinning that contract.
"""

import asyncio
import logging
from typing import Optional

Expand All @@ -29,6 +30,55 @@
logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# Per-agent switch lock (#799)
# ---------------------------------------------------------------------------
#
# Concurrent subscription failures on the SAME agent (two chat requests, or a
# chat overlapping a scheduled task) both enter `handle_subscription_failure`
# and, without mutual exclusion, both pick the same alternative, both
# `assign_subscription_to_agent`, and both fire `_restart_agent` — the second
# `container_stop` racing the first `start_agent_internal` wedges the container,
# duplicates the switch notification, or trips the #421 `was_already_running`
# ambiguity. A per-agent lock serializes the read→decide→assign→restart window.
#
# Mirrors the event-loop-safe lazy pattern in `services/agent_call_limiter.py`
# (module dict + a lazily-created guard) rather than `defaultdict(asyncio.Lock)`:
# a defaultdict binds each lock to whatever event loop is current at first key
# access and persists it, which breaks across pytest's per-test loops
# ("Future attached to a different loop"). Creating the locks lazily on the
# running loop avoids that.
#
# INVARIANT: process-local. Correct only while (a) the backend runs a single
# process and (b) the scheduler delegates execution to the backend via
# `/api/internal/execute-task` rather than calling this module in its own
# process — both true today. If the backend ever runs multiple workers, escalate
# to a Redis `SETNX` lock keyed `auto_switch:{agent_name}` (TTL ≥ longest
# plausible container start, ~60s).
_AGENT_SWITCH_LOCKS: dict[str, asyncio.Lock] = {}
_AGENT_SWITCH_LOCKS_GUARD: Optional[asyncio.Lock] = None


async def agent_switch_lock(agent_name: str) -> asyncio.Lock:
"""Return the per-agent switch lock, creating it lazily on the running loop."""
global _AGENT_SWITCH_LOCKS_GUARD
if _AGENT_SWITCH_LOCKS_GUARD is None:
_AGENT_SWITCH_LOCKS_GUARD = asyncio.Lock()
lock = _AGENT_SWITCH_LOCKS.get(agent_name)
if lock is None:
async with _AGENT_SWITCH_LOCKS_GUARD:
lock = _AGENT_SWITCH_LOCKS.setdefault(agent_name, asyncio.Lock())
return lock


def _reset_locks_for_test() -> None:
"""Test hook: drop all per-agent locks + the guard so each test's event loop
starts clean (locks are loop-bound)."""
global _AGENT_SWITCH_LOCKS_GUARD
_AGENT_SWITCH_LOCKS.clear()
_AGENT_SWITCH_LOCKS_GUARD = None


# Substrings that indicate an auth-class subscription failure. Mirrors the
# scheduler's classification at `src/scheduler/service.py` (which now imports
# this same list to keep the two surfaces from drifting).
Expand Down Expand Up @@ -101,50 +151,71 @@ async def handle_subscription_failure(
Returns:
dict with switch details if auto-switch occurred, None otherwise.
"""
# 1. Check if auto-switch is enabled (default: on, #441)
# 1. Check if auto-switch is enabled (default: on, #441). Cheap, lock-free —
# a disabled platform never contends for the per-agent lock.
enabled = db.get_setting_value("auto_switch_subscriptions", default="true") == "true"
if not enabled:
return None

# 2. Check if agent has a subscription assigned
current_sub_id = db.get_agent_subscription_id(agent_name)
if not current_sub_id:
# 2. Snapshot the agent's subscription BEFORE acquiring the lock. This is the
# subscription our failure was (approximately) about. If a concurrent failure
# switches the agent off it while we wait for the lock, our failure is stale.
sub_at_entry = db.get_agent_subscription_id(agent_name)
if not sub_at_entry:
return None

# 3. Record the failure event in the rate-limit table. Auth-class events
# share the same table — the table tracks "subscription failure events"
# generically; `is_subscription_rate_limited` treats any event in the 2h
# window as a reason to skip the subscription as a candidate, which is the
# behavior we want for both kinds of failure.
consecutive_count = db.record_rate_limit_event(
agent_name=agent_name,
subscription_id=current_sub_id,
error_message=error_message,
)

# 4. Find a viable alternative subscription
alternative = db.select_best_alternative_subscription(current_sub_id)
if not alternative:
logger.warning(
f"[SUB-003] Agent '{agent_name}' hit a {failure_kind} failure on "
f"subscription {current_sub_id} (event #{consecutive_count}) "
f"but no viable alternative subscription is available"
# #799: serialize the read→decide→assign→restart window per agent so two
# concurrent failures on the same agent can't both switch + restart it.
async with await agent_switch_lock(agent_name):
# Re-read under the lock. If another coroutine already switched the agent
# off `sub_at_entry`, this failure is stale — return rather than switch
# again. This is what makes the fix correct for 3+ subscriptions: without
# it, a loser whose failure was about sub-A would attribute it to the new
# current sub-B and cascade A→B→C (#799 / Codex C8).
current_sub_id = db.get_agent_subscription_id(agent_name)
if current_sub_id != sub_at_entry:
logger.info(
f"[SUB-003] Agent '{agent_name}' already switched off subscription "
f"{sub_at_entry} (now {current_sub_id}) before this {failure_kind} "
f"failure acquired the lock — stale failure, skipping"
)
return None

# 3. Record the failure event in the rate-limit table. Auth-class events
# share the same table — the table tracks "subscription failure events"
# generically; `is_subscription_rate_limited` treats any event in the 2h
# window as a reason to skip the subscription as a candidate, which is the
# behavior we want for both kinds of failure.
consecutive_count = db.record_rate_limit_event(
agent_name=agent_name,
subscription_id=current_sub_id,
error_message=error_message,
)
return None

# Get current subscription name for logging / notification
current_sub = db.get_subscription(current_sub_id)
old_name = current_sub.name if current_sub else current_sub_id
# 4. Find a viable alternative subscription
alternative = db.select_best_alternative_subscription(current_sub_id)
if not alternative:
logger.warning(
f"[SUB-003] Agent '{agent_name}' hit a {failure_kind} failure on "
f"subscription {current_sub_id} (event #{consecutive_count}) "
f"but no viable alternative subscription is available"
)
return None

# 5. Perform the switch
return await _perform_auto_switch(
agent_name=agent_name,
old_subscription_id=current_sub_id,
old_subscription_name=old_name,
new_subscription=alternative,
failure_kind=failure_kind,
event_count=consecutive_count,
)
# Get current subscription name for logging / notification
current_sub = db.get_subscription(current_sub_id)
old_name = current_sub.name if current_sub else current_sub_id

# 5. Perform the switch (still under the lock — the assign + restart must
# not interleave with a concurrent switch for this agent).
return await _perform_auto_switch(
agent_name=agent_name,
old_subscription_id=current_sub_id,
old_subscription_name=old_name,
new_subscription=alternative,
failure_kind=failure_kind,
event_count=consecutive_count,
)


async def handle_rate_limit_error(
Expand Down
Loading
Loading