From 89bfce15ec6a2898e6ef39e8546bbe3a3f42bec9 Mon Sep 17 00:00:00 2001 From: Andrii Pasternak Date: Thu, 11 Jun 2026 16:21:51 +0100 Subject: [PATCH] =?UTF-8?q?fix(sub-003):=20per-agent=20lock=20for=20auto-s?= =?UTF-8?q?witch=20=E2=80=94=20serialize=20concurrent=20failures=20(#799)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Concurrent subscription failures on the same agent (two chat requests, or a chat overlapping a scheduled task) both ran handle_subscription_failure's read→decide→assign→restart sequence with no mutual exclusion: both could pick the same alternative, both assign_subscription_to_agent, and both fire _restart_agent — wedging the container, duplicating the switch notification, or tripping the #421 was_already_running ambiguity. - Add a per-agent asyncio.Lock (lazy + event-loop-safe, mirroring agent_call_limiter.py rather than defaultdict(asyncio.Lock)) wrapping the whole switch critical section. - Add a stale-failure guard: snapshot the subscription at handler entry and, under the lock, bail when it changed — so 3+ viable subscriptions don't cascade A→B→C (a loser whose failure was about the now-previous sub). - Tests: concurrent 2-sub (exactly one switch), 3-sub no-cascade regression, distinct-agent locks. Pins the race the #606 carve-out test deferred. Process-local lock is sufficient today (single backend process; the scheduler delegates execution to the backend via /api/internal/execute-task rather than calling this module in its own process). #1037 (drain-aware recreate) is split into its own PR. Co-Authored-By: Claude Opus 4.8 --- .../services/subscription_auto_switch.py | 141 ++++++++--- ...st_subscription_auto_switch_concurrency.py | 231 ++++++++++++++++++ 2 files changed, 337 insertions(+), 35 deletions(-) create mode 100644 tests/unit/test_subscription_auto_switch_concurrency.py diff --git a/src/backend/services/subscription_auto_switch.py b/src/backend/services/subscription_auto_switch.py index 6707a7a5..7ab05767 100644 --- a/src/backend/services/subscription_auto_switch.py +++ b/src/backend/services/subscription_auto_switch.py @@ -20,6 +20,7 @@ tests pinning that contract. """ +import asyncio import logging from typing import Optional @@ -29,6 +30,55 @@ logger = logging.getLogger(__name__) +# --------------------------------------------------------------------------- +# Per-agent switch lock (#799) +# --------------------------------------------------------------------------- +# +# Concurrent subscription failures on the SAME agent (two chat requests, or a +# chat overlapping a scheduled task) both enter `handle_subscription_failure` +# and, without mutual exclusion, both pick the same alternative, both +# `assign_subscription_to_agent`, and both fire `_restart_agent` — the second +# `container_stop` racing the first `start_agent_internal` wedges the container, +# duplicates the switch notification, or trips the #421 `was_already_running` +# ambiguity. A per-agent lock serializes the read→decide→assign→restart window. +# +# Mirrors the event-loop-safe lazy pattern in `services/agent_call_limiter.py` +# (module dict + a lazily-created guard) rather than `defaultdict(asyncio.Lock)`: +# a defaultdict binds each lock to whatever event loop is current at first key +# access and persists it, which breaks across pytest's per-test loops +# ("Future attached to a different loop"). Creating the locks lazily on the +# running loop avoids that. +# +# INVARIANT: process-local. Correct only while (a) the backend runs a single +# process and (b) the scheduler delegates execution to the backend via +# `/api/internal/execute-task` rather than calling this module in its own +# process — both true today. If the backend ever runs multiple workers, escalate +# to a Redis `SETNX` lock keyed `auto_switch:{agent_name}` (TTL ≥ longest +# plausible container start, ~60s). +_AGENT_SWITCH_LOCKS: dict[str, asyncio.Lock] = {} +_AGENT_SWITCH_LOCKS_GUARD: Optional[asyncio.Lock] = None + + +async def agent_switch_lock(agent_name: str) -> asyncio.Lock: + """Return the per-agent switch lock, creating it lazily on the running loop.""" + global _AGENT_SWITCH_LOCKS_GUARD + if _AGENT_SWITCH_LOCKS_GUARD is None: + _AGENT_SWITCH_LOCKS_GUARD = asyncio.Lock() + lock = _AGENT_SWITCH_LOCKS.get(agent_name) + if lock is None: + async with _AGENT_SWITCH_LOCKS_GUARD: + lock = _AGENT_SWITCH_LOCKS.setdefault(agent_name, asyncio.Lock()) + return lock + + +def _reset_locks_for_test() -> None: + """Test hook: drop all per-agent locks + the guard so each test's event loop + starts clean (locks are loop-bound).""" + global _AGENT_SWITCH_LOCKS_GUARD + _AGENT_SWITCH_LOCKS.clear() + _AGENT_SWITCH_LOCKS_GUARD = None + + # Substrings that indicate an auth-class subscription failure. Mirrors the # scheduler's classification at `src/scheduler/service.py` (which now imports # this same list to keep the two surfaces from drifting). @@ -101,50 +151,71 @@ async def handle_subscription_failure( Returns: dict with switch details if auto-switch occurred, None otherwise. """ - # 1. Check if auto-switch is enabled (default: on, #441) + # 1. Check if auto-switch is enabled (default: on, #441). Cheap, lock-free — + # a disabled platform never contends for the per-agent lock. enabled = db.get_setting_value("auto_switch_subscriptions", default="true") == "true" if not enabled: return None - # 2. Check if agent has a subscription assigned - current_sub_id = db.get_agent_subscription_id(agent_name) - if not current_sub_id: + # 2. Snapshot the agent's subscription BEFORE acquiring the lock. This is the + # subscription our failure was (approximately) about. If a concurrent failure + # switches the agent off it while we wait for the lock, our failure is stale. + sub_at_entry = db.get_agent_subscription_id(agent_name) + if not sub_at_entry: return None - # 3. Record the failure event in the rate-limit table. Auth-class events - # share the same table — the table tracks "subscription failure events" - # generically; `is_subscription_rate_limited` treats any event in the 2h - # window as a reason to skip the subscription as a candidate, which is the - # behavior we want for both kinds of failure. - consecutive_count = db.record_rate_limit_event( - agent_name=agent_name, - subscription_id=current_sub_id, - error_message=error_message, - ) - - # 4. Find a viable alternative subscription - alternative = db.select_best_alternative_subscription(current_sub_id) - if not alternative: - logger.warning( - f"[SUB-003] Agent '{agent_name}' hit a {failure_kind} failure on " - f"subscription {current_sub_id} (event #{consecutive_count}) " - f"but no viable alternative subscription is available" + # #799: serialize the read→decide→assign→restart window per agent so two + # concurrent failures on the same agent can't both switch + restart it. + async with await agent_switch_lock(agent_name): + # Re-read under the lock. If another coroutine already switched the agent + # off `sub_at_entry`, this failure is stale — return rather than switch + # again. This is what makes the fix correct for 3+ subscriptions: without + # it, a loser whose failure was about sub-A would attribute it to the new + # current sub-B and cascade A→B→C (#799 / Codex C8). + current_sub_id = db.get_agent_subscription_id(agent_name) + if current_sub_id != sub_at_entry: + logger.info( + f"[SUB-003] Agent '{agent_name}' already switched off subscription " + f"{sub_at_entry} (now {current_sub_id}) before this {failure_kind} " + f"failure acquired the lock — stale failure, skipping" + ) + return None + + # 3. Record the failure event in the rate-limit table. Auth-class events + # share the same table — the table tracks "subscription failure events" + # generically; `is_subscription_rate_limited` treats any event in the 2h + # window as a reason to skip the subscription as a candidate, which is the + # behavior we want for both kinds of failure. + consecutive_count = db.record_rate_limit_event( + agent_name=agent_name, + subscription_id=current_sub_id, + error_message=error_message, ) - return None - # Get current subscription name for logging / notification - current_sub = db.get_subscription(current_sub_id) - old_name = current_sub.name if current_sub else current_sub_id + # 4. Find a viable alternative subscription + alternative = db.select_best_alternative_subscription(current_sub_id) + if not alternative: + logger.warning( + f"[SUB-003] Agent '{agent_name}' hit a {failure_kind} failure on " + f"subscription {current_sub_id} (event #{consecutive_count}) " + f"but no viable alternative subscription is available" + ) + return None - # 5. Perform the switch - return await _perform_auto_switch( - agent_name=agent_name, - old_subscription_id=current_sub_id, - old_subscription_name=old_name, - new_subscription=alternative, - failure_kind=failure_kind, - event_count=consecutive_count, - ) + # Get current subscription name for logging / notification + current_sub = db.get_subscription(current_sub_id) + old_name = current_sub.name if current_sub else current_sub_id + + # 5. Perform the switch (still under the lock — the assign + restart must + # not interleave with a concurrent switch for this agent). + return await _perform_auto_switch( + agent_name=agent_name, + old_subscription_id=current_sub_id, + old_subscription_name=old_name, + new_subscription=alternative, + failure_kind=failure_kind, + event_count=consecutive_count, + ) async def handle_rate_limit_error( diff --git a/tests/unit/test_subscription_auto_switch_concurrency.py b/tests/unit/test_subscription_auto_switch_concurrency.py new file mode 100644 index 00000000..24f50932 --- /dev/null +++ b/tests/unit/test_subscription_auto_switch_concurrency.py @@ -0,0 +1,231 @@ +"""Concurrency tests for SUB-003 auto-switch (#799). + +Pins the per-agent switch lock + stale-failure guard added in +``services/subscription_auto_switch.py``. These are the race the carve-out +test ``test_subscription_auto_switch_no_cred_import.py`` explicitly left out +of scope ("Explicit non-claim: this is NOT a concurrency test"). + +Bug being fixed: two subscription failures on the SAME agent (two chat +requests, or a chat overlapping a scheduled task) both ran the +read→decide→assign→restart sequence with no mutual exclusion, so both could +pick the same alternative, both ``assign_subscription_to_agent``, and both +fire ``_restart_agent`` — wedging the container / duplicating the switch. + +What's pinned here: + - Part A: ``handle_subscription_failure`` is serialized per agent by + ``agent_switch_lock``, so concurrent failures yield exactly ONE switch. + - Part B (Codex C8): the loser snapshots the subscription at entry and, + after taking the lock, bails when it changed — so 3+ viable + subscriptions do NOT cascade A→B→C. + +Determinism: rather than rely on asyncio interleaving (``asyncio.Lock.acquire`` +does not yield on a free lock, so a naive gather lets the winner switch before +the loser snapshots), the tests PRE-HOLD the agent lock. Both coroutines then +park at ``async with lock`` having already snapshotted the old sub; releasing +the lock lets exactly one win and the other observe the post-switch state. + +Modules under test: + src/backend/services/subscription_auto_switch.py::handle_subscription_failure + src/backend/services/subscription_auto_switch.py::agent_switch_lock +""" + +from __future__ import annotations + +import asyncio +import importlib +import importlib.util +import os +import sys +import types +from unittest.mock import MagicMock + +import pytest + + +_BACKEND = os.path.abspath( + os.path.join(os.path.dirname(__file__), "..", "..", "src", "backend") +) + +# Slots this file stubs/loads at import time. The autouse snapshot/restore +# fixture (precedent: test_subscription_auto_switch_no_cred_import.py) bounds +# the blast radius so these don't leak into other files sharing the session. +_STUBBED_MODULE_NAMES = [ + "services", + "database", + "db_models", + "services.subscription_auto_switch", +] + + +@pytest.fixture(autouse=True) +def _restore_sys_modules(): + saved = {name: sys.modules.get(name) for name in _STUBBED_MODULE_NAMES} + try: + yield + finally: + for name, value in saved.items(): + if value is None: + sys.modules.pop(name, None) + else: + sys.modules[name] = value + + +def _load_auto_switch(mock_db): + """Load ``subscription_auto_switch`` in isolation with ``database`` stubbed. + + The module's top-level imports are only ``asyncio``/``logging``/``typing`` + + ``from database import db`` + ``from db_models import NotificationCreate`` + (the ``services.*`` imports are lazy, inside ``_restart_agent`` / + ``_perform_auto_switch``), so loading it by file location under its real + dotted name needs no real ``services`` package boot. + """ + sys.modules.setdefault("services", types.ModuleType("services")) + + db_module = types.ModuleType("database") + db_module.db = mock_db + sys.modules["database"] = db_module + + # db_models is pure Pydantic — import the real one rather than stubbing + # (a bare stub would persist and break unrelated tests). + if "db_models" not in sys.modules: + if _BACKEND not in sys.path: + sys.path.insert(0, _BACKEND) + import db_models # noqa: F401 — registers in sys.modules + + sys.modules.pop("services.subscription_auto_switch", None) + spec = importlib.util.spec_from_file_location( + "services.subscription_auto_switch", + os.path.join(_BACKEND, "services", "subscription_auto_switch.py"), + ) + mod = importlib.util.module_from_spec(spec) + sys.modules["services.subscription_auto_switch"] = mod + spec.loader.exec_module(mod) + return mod + + +def _sub(sub_id: str) -> MagicMock: + """A subscription stand-in with ``.id`` / ``.name`` (``.name`` must be set + after construction — ``MagicMock(name=...)`` is the repr name, not a field).""" + s = MagicMock() + s.id = sub_id + s.name = f"sub-{sub_id}" + return s + + +def _build_db(initial_sub: str, alt_map: dict) -> MagicMock: + """A ``db`` stub whose current subscription is mutable. + + ``alt_map`` maps current-sub-id → the subscription + ``select_best_alternative_subscription`` returns for it (or absent → None). + """ + state = {"current": initial_sub} + db = MagicMock() + db.get_setting_value.return_value = "true" + db.get_agent_subscription_id.side_effect = lambda _agent: state["current"] + db.record_rate_limit_event.return_value = 1 + db.get_subscription.side_effect = lambda sub_id: _sub(sub_id) + db.select_best_alternative_subscription.side_effect = ( + lambda cur: alt_map.get(cur) + ) + db._state = state # expose for assertions / the perform spy + return db + + +def _install_perform_spy(mod, db): + """Replace ``_perform_auto_switch`` with a spy that simulates the assign + (mutates the stub's current sub) and records each switch.""" + calls: list[str] = [] + + async def spy(*, agent_name, old_subscription_id, old_subscription_name, + new_subscription, failure_kind, event_count): + calls.append(new_subscription.id) + db._state["current"] = new_subscription.id # the real assign side effect + return { + "switched": True, + "agent_name": agent_name, + "old_subscription": old_subscription_name, + "new_subscription": new_subscription.name, + } + + mod._perform_auto_switch = spy + return calls + + +@pytest.mark.asyncio +async def test_concurrent_failures_switch_exactly_once_two_subs(): + """Two concurrent failures on one agent (A→B viable) → exactly ONE switch; + the loser returns None.""" + db = _build_db("A", alt_map={"A": _sub("B")}) + mod = _load_auto_switch(db) + mod._reset_locks_for_test() + calls = _install_perform_spy(mod, db) + + # Pre-hold the agent lock so both coroutines snapshot "A" and park at + # `async with lock` before either can switch. + lock = await mod.agent_switch_lock("burst-agent") + await lock.acquire() + + t1 = asyncio.create_task(mod.handle_subscription_failure("burst-agent", "429", "rate_limit")) + t2 = asyncio.create_task(mod.handle_subscription_failure("burst-agent", "429", "rate_limit")) + await asyncio.sleep(0) # let both reach `await lock.acquire()` + + lock.release() + results = await asyncio.gather(t1, t2) + + assert calls == ["B"], f"expected exactly one switch to B, got {calls!r}" + assert db._state["current"] == "B" + switched = [r for r in results if r is not None] + skipped = [r for r in results if r is None] + assert len(switched) == 1 and len(skipped) == 1, results + assert db.assign_subscription_to_agent.call_count == 0 # spy stands in + + +@pytest.mark.asyncio +async def test_concurrent_failures_no_cascade_three_subs(): + """C8 regression (CRITICAL): with A,B,C all viable, two concurrent failures + that both observed sub-A must NOT cascade A→B→C. + + Without the stale-failure guard, the loser would read current=B (after the + winner's A→B switch) and switch B→C. The guard makes it a no-op: it never + even reaches alternative selection for B. + """ + db = _build_db("A", alt_map={"A": _sub("B"), "B": _sub("C")}) + mod = _load_auto_switch(db) + mod._reset_locks_for_test() + calls = _install_perform_spy(mod, db) + + lock = await mod.agent_switch_lock("burst-agent") + await lock.acquire() + + t1 = asyncio.create_task(mod.handle_subscription_failure("burst-agent", "429", "rate_limit")) + t2 = asyncio.create_task(mod.handle_subscription_failure("burst-agent", "429", "rate_limit")) + await asyncio.sleep(0) + + lock.release() + results = await asyncio.gather(t1, t2) + + assert calls == ["B"], f"expected ONLY A→B, got cascade {calls!r}" + assert db._state["current"] == "B" + assert len([r for r in results if r is None]) == 1, "loser must no-op" + # The guard short-circuits BEFORE alternative selection for the new sub: + selected = [c.args[0] for c in db.select_best_alternative_subscription.call_args_list] + assert "B" not in selected, ( + f"loser reached alternative selection for the post-switch sub — the " + f"stale-failure guard did not fire (selected={selected!r})" + ) + + +@pytest.mark.asyncio +async def test_different_agents_use_distinct_locks(): + """Distinct agents get distinct lock objects (so they don't serialize); + the same agent always gets the same lock.""" + db = _build_db("A", alt_map={}) + mod = _load_auto_switch(db) + mod._reset_locks_for_test() + + lock_a1 = await mod.agent_switch_lock("agent-a") + lock_a2 = await mod.agent_switch_lock("agent-a") + lock_b = await mod.agent_switch_lock("agent-b") + + assert lock_a1 is lock_a2, "same agent must reuse its lock" + assert lock_a1 is not lock_b, "different agents must not share a lock"