From 489c6e23bea27aa3367d8af3ddb43ba5cbc38595 Mon Sep 17 00:00:00 2001 From: Clayton Date: Tue, 16 Jun 2026 21:52:12 -0500 Subject: [PATCH] feat(volunteer): add confidence-gated kb.volunteer_context push channel --- CHANGELOG.md | 8 +- src/vouch/capabilities.py | 1 + src/vouch/cli.py | 22 ++- src/vouch/hot_memory.py | 89 +++++++++ src/vouch/jsonl_server.py | 11 +- src/vouch/server.py | 20 +- src/vouch/sessions.py | 4 +- src/vouch/volunteer_context.py | 341 ++++++++++++++++++++++++++++++++ templates/config.template.yaml | 10 + tests/test_volunteer_context.py | 190 ++++++++++++++++++ 10 files changed, 689 insertions(+), 7 deletions(-) create mode 100644 src/vouch/hot_memory.py create mode 100644 src/vouch/volunteer_context.py create mode 100644 tests/test_volunteer_context.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 63c3d93f..2eb0eb15 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,13 @@ All notable changes to vouch are documented here. Format follows - `vouch serve` now fails fast with a clear `vouch init` hint when no `.vouch/` KB is present, instead of starting a server that immediately misbehaves (#95). ### Added -### Added +- `kb.volunteer_context` — confidence-gated push context for active sessions. + `kb.session_start(task=…)` opens a background watch on retrieval salience; + when an approved claim's normalized relevance exceeds the configured + threshold (default `0.85`), vouch queues `{claim_id, relevance, why}` and + emits an MCP notification (`kb.volunteer_context`). JSONL and CLI clients + poll via `kb.volunteer_context` / `vouch session volunteer`. Pushes are + throttled (default 30s) and respect scope visibility (#236). - `vouch sync --vault ` — bidirectional sync between the KB and an Obsidian/Logseq-style markdown vault. Forward (vault → KB): edits to `/vouch/pages/.md` become page-edit proposals citing a diff --git a/src/vouch/capabilities.py b/src/vouch/capabilities.py index a5bad382..1f7b7954 100644 --- a/src/vouch/capabilities.py +++ b/src/vouch/capabilities.py @@ -46,6 +46,7 @@ "kb.source_verify", "kb.session_start", "kb.session_end", + "kb.volunteer_context", "kb.crystallize", "kb.index_rebuild", "kb.lint", diff --git a/src/vouch/cli.py b/src/vouch/cli.py index 5157badd..0a2f2c4a 100644 --- a/src/vouch/cli.py +++ b/src/vouch/cli.py @@ -20,7 +20,7 @@ import click import yaml -from . import __version__, bundle, health +from . import __version__, bundle, health, volunteer_context from . import audit as audit_mod from . import install_adapter as install_mod from . import lifecycle as life @@ -1153,6 +1153,26 @@ def session_start_cmd(agent: str | None, task: str | None, note: str | None) -> click.echo(sess.id) +@session.command("volunteer") +@click.argument("session_id") +@click.option("--no-clear", is_flag=True, help="Peek without draining the queue.") +@click.option("--json", "as_json", is_flag=True, help="Emit JSON.") +def session_volunteer_cmd(session_id: str, no_clear: bool, as_json: bool) -> None: + """Poll volunteered context for an active session.""" + offers = volunteer_context.drain_pending(session_id, clear=not no_clear) + payload = {"volunteers": [o.to_dict() for o in offers]} + if as_json: + _emit_json(payload) + return + if not offers: + click.echo("(no volunteered context)") + return + for offer in offers: + click.echo( + f"{offer.claim_id} relevance={offer.relevance:.2f} {offer.why}" + ) + + @session.command("end") @click.argument("session_id") @click.option("--note", default=None) diff --git a/src/vouch/hot_memory.py b/src/vouch/hot_memory.py new file mode 100644 index 00000000..1af9c588 --- /dev/null +++ b/src/vouch/hot_memory.py @@ -0,0 +1,89 @@ +"""Per-session hot memory — task query and salience snapshots for push context. + +Tracks what the active session is working on and the last relevance scores +seen for approved claims. ``volunteer_context`` diffs snapshots to decide when +a claim newly crosses the confidence threshold. +""" + +from __future__ import annotations + +import threading +from dataclasses import dataclass, field + + +@dataclass +class SalienceSnapshot: + """Relevance scores for a single evaluation pass.""" + + scores: dict[str, float] = field(default_factory=dict) + + +@dataclass +class HotMemory: + """In-memory state for one active session watch.""" + + session_id: str + query: str + agent: str + project: str | None = None + last_snapshot: SalienceSnapshot = field(default_factory=SalienceSnapshot) + last_push_at: float | None = None + push_count: int = 0 + volunteered: set[str] = field(default_factory=set) + active: bool = True + + +_registry: dict[str, HotMemory] = {} +_lock = threading.Lock() + + +def register( + *, + session_id: str, + query: str, + agent: str, + project: str | None = None, +) -> HotMemory: + """Create or replace hot memory for *session_id*.""" + mem = HotMemory( + session_id=session_id, + query=query, + agent=agent, + project=project, + ) + with _lock: + _registry[session_id] = mem + return mem + + +def get(session_id: str) -> HotMemory | None: + with _lock: + return _registry.get(session_id) + + +def unregister(session_id: str) -> None: + with _lock: + mem = _registry.pop(session_id, None) + if mem is not None: + mem.active = False + + +def update_snapshot(session_id: str, scores: dict[str, float]) -> SalienceSnapshot | None: + """Store *scores* and return the previous snapshot (for delta detection).""" + with _lock: + mem = _registry.get(session_id) + if mem is None: + return None + prev = mem.last_snapshot + mem.last_snapshot = SalienceSnapshot(scores=dict(scores)) + return prev + + +def mark_volunteered(session_id: str, claim_id: str, *, pushed_at: float) -> None: + with _lock: + mem = _registry.get(session_id) + if mem is None: + return + mem.volunteered.add(claim_id) + mem.last_push_at = pushed_at + mem.push_count += 1 diff --git a/src/vouch/jsonl_server.py b/src/vouch/jsonl_server.py index b6eca5bc..4fe16767 100644 --- a/src/vouch/jsonl_server.py +++ b/src/vouch/jsonl_server.py @@ -26,7 +26,7 @@ from pathlib import Path from typing import Any -from . import audit, bundle, health +from . import audit, bundle, health, volunteer_context from . import lifecycle as life from . import sessions as sess_mod from . import verify as verify_mod @@ -412,6 +412,14 @@ def _h_crystallize(p: dict) -> dict: ) +def _h_volunteer_context(p: dict) -> dict: + offers = volunteer_context.drain_pending( + p["session_id"], + clear=bool(p.get("clear", True)), + ) + return {"volunteers": [o.to_dict() for o in offers]} + + def _h_index_rebuild(_: dict) -> dict: return health.rebuild_index(_store()) @@ -595,6 +603,7 @@ def _h_provenance_rebuild(_: dict) -> dict: "kb.source_verify": _h_source_verify, "kb.session_start": _h_session_start, "kb.session_end": _h_session_end, + "kb.volunteer_context": _h_volunteer_context, "kb.crystallize": _h_crystallize, "kb.index_rebuild": _h_index_rebuild, "kb.lint": _h_lint, diff --git a/src/vouch/server.py b/src/vouch/server.py index fae54851..7be19963 100644 --- a/src/vouch/server.py +++ b/src/vouch/server.py @@ -11,13 +11,14 @@ from __future__ import annotations +import asyncio import os from pathlib import Path from typing import Any from mcp.server.fastmcp import FastMCP -from . import audit, bundle, health +from . import audit, bundle, health, volunteer_context from . import lifecycle as life from . import sessions as sess_mod from . import verify as verify_mod @@ -536,11 +537,24 @@ def kb_source_verify() -> list[dict[str, Any]]: @mcp.tool() -def kb_session_start(task: str | None = None, note: str | None = None) -> dict[str, Any]: - sess = sess_mod.session_start(_store(), agent=_agent(), task=task, note=note) +async def kb_session_start(task: str | None = None, note: str | None = None) -> dict[str, Any]: + store = _store() + sess = sess_mod.session_start(store, agent=_agent(), task=task, note=note) + ctx = mcp.get_context() + if ctx.session is not None: + volunteer_context.register_mcp_push( + sess.id, ctx.session, asyncio.get_running_loop(), + ) return sess.model_dump(mode="json") +@mcp.tool() +def kb_volunteer_context(session_id: str, *, clear: bool = True) -> dict[str, Any]: + """Poll confidence-gated context volunteered for an active session.""" + offers = volunteer_context.drain_pending(session_id, clear=clear) + return {"volunteers": [o.to_dict() for o in offers]} + + @mcp.tool() def kb_session_end(session_id: str, note: str | None = None) -> dict[str, Any]: sess = sess_mod.session_end(_store(), session_id, note=note) diff --git a/src/vouch/sessions.py b/src/vouch/sessions.py index 71fcaa93..6a39d8b0 100644 --- a/src/vouch/sessions.py +++ b/src/vouch/sessions.py @@ -12,7 +12,7 @@ import uuid from datetime import UTC, datetime -from . import audit, index_db +from . import audit, index_db, volunteer_context from .models import Page, PageType, ProposalStatus, Session from .proposals import approve from .storage import KBStore @@ -33,6 +33,7 @@ def session_start(store: KBStore, *, agent: str, task: str | None = None, store.kb_dir, event="session.start", actor=agent, object_ids=[sess.id], data={"task": task}, ) + volunteer_context.on_session_start(store, sess) return sess @@ -52,6 +53,7 @@ def session_end(store: KBStore, session_id: str, *, note: str | None = None) -> store.kb_dir, event="session.end", actor=sess.agent, object_ids=[sess.id], data={"proposals": len(sess.proposal_ids)}, ) + volunteer_context.on_session_end(session_id) return sess diff --git a/src/vouch/volunteer_context.py b/src/vouch/volunteer_context.py new file mode 100644 index 00000000..eef2f6bc --- /dev/null +++ b/src/vouch/volunteer_context.py @@ -0,0 +1,341 @@ +"""Confidence-gated push context — ``kb.volunteer_context`` (#236). + +When a session opens with a task, vouch watches retrieval salience for +approved claims. Claims whose normalized relevance exceeds a configurable +threshold are queued and optionally pushed as MCP notifications. +""" + +from __future__ import annotations + +import asyncio +import logging +import threading +import time +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +import yaml + +from . import hot_memory +from .context import _RETRACTED_CLAIM_STATUSES +from .models import Session +from .scoping import ViewerContext, viewer_from +from .storage import ArtifactNotFoundError, KBStore + +if TYPE_CHECKING: + from mcp.server.session import ServerSession + +logger = logging.getLogger(__name__) + +DEFAULT_THRESHOLD = 0.85 +DEFAULT_THROTTLE_SECONDS = 30.0 +DEFAULT_POLL_INTERVAL = 2.0 +DEFAULT_MAX_PER_SESSION = 50 + +_mcp_push: dict[str, tuple[ServerSession, asyncio.AbstractEventLoop]] = {} +_pending: dict[str, list[VolunteerOffer]] = {} +_watch_threads: dict[str, threading.Thread] = {} +_state_lock = threading.Lock() + + +@dataclass(frozen=True) +class VolunteerConfig: + enabled: bool = True + threshold: float = DEFAULT_THRESHOLD + throttle_seconds: float = DEFAULT_THROTTLE_SECONDS + poll_interval_seconds: float = DEFAULT_POLL_INTERVAL + max_per_session: int = DEFAULT_MAX_PER_SESSION + + +@dataclass(frozen=True) +class VolunteerOffer: + claim_id: str + relevance: float + why: str + session_id: str + + def to_dict(self) -> dict[str, Any]: + return { + "claim_id": self.claim_id, + "relevance": self.relevance, + "why": self.why, + "session_id": self.session_id, + } + + +def load_config(store: KBStore) -> VolunteerConfig: + """Read ``volunteer:`` from config.yaml; fall back to defaults.""" + try: + loaded = yaml.safe_load(store.config_path.read_text()) + except (OSError, yaml.YAMLError): + return VolunteerConfig() + if not isinstance(loaded, dict): + return VolunteerConfig() + raw = loaded.get("volunteer") + if not isinstance(raw, dict): + return VolunteerConfig() + enabled = bool(raw.get("enabled", True)) + threshold = float(raw.get("threshold", DEFAULT_THRESHOLD)) + throttle = float(raw.get("throttle_seconds", DEFAULT_THROTTLE_SECONDS)) + poll = float(raw.get("poll_interval_seconds", DEFAULT_POLL_INTERVAL)) + max_per = int(raw.get("max_per_session", DEFAULT_MAX_PER_SESSION)) + return VolunteerConfig( + enabled=enabled, + threshold=threshold, + throttle_seconds=throttle, + poll_interval_seconds=poll, + max_per_session=max_per, + ) + + +def session_query(sess: Session) -> str | None: + parts: list[str] = [] + if sess.task: + parts.append(sess.task.strip()) + if sess.note: + parts.append(sess.note.strip()) + joined = " ".join(parts).strip() + return joined or None + + +def normalize_relevance(raw: float, backend: str, *, batch_max: float) -> float: + if backend in ("embedding", "hybrid"): + return max(0.0, min(1.0, raw)) + if batch_max <= 0.0: + return 0.0 + return max(0.0, min(1.0, raw / batch_max)) + + +def _retrieve_claim_scores( + store: KBStore, + query: str, + viewer: ViewerContext, + *, + limit: int = 10, +) -> list[tuple[str, float, str, str]]: + """Return (claim_id, raw_score, snippet, backend) for visible claims.""" + from .context import _retrieve + + hits = _retrieve(store, query, limit, viewer) + claim_hits = [(k, i, s, sc, be) for k, i, s, sc, be in hits if k == "claim"] + if not claim_hits: + return [] + + batch_max = max(sc for _, _, _, sc, _ in claim_hits) + out: list[tuple[str, float, str, str]] = [] + for _, claim_id, snippet, score, backend in claim_hits: + try: + claim = store.get_claim(claim_id) + except ArtifactNotFoundError: + continue + if claim.status in _RETRACTED_CLAIM_STATUSES: + continue + rel = normalize_relevance(score, backend, batch_max=batch_max) + out.append((claim_id, rel, snippet, backend)) + out.sort(key=lambda row: row[1], reverse=True) + return out + + +def _build_why(*, claim_id: str, query: str, relevance: float, backend: str, snippet: str) -> str: + preview = snippet.replace("«", "").replace("»", "").strip() + if preview: + return ( + f"task mentions {query!r}; approved claim {claim_id!r} " + f"matches ({backend} relevance {relevance:.2f}): {preview[:120]}" + ) + return ( + f"task mentions {query!r}; approved claim {claim_id!r} " + f"matches with {backend} relevance {relevance:.2f}" + ) + + +def evaluate_session( + store: KBStore, + sess: Session, + *, + config: VolunteerConfig | None = None, + project: str | None = None, +) -> VolunteerOffer | None: + """Return the best new volunteer offer for *sess*, or ``None``.""" + cfg = config or load_config(store) + if not cfg.enabled: + return None + query = session_query(sess) + if not query: + return None + + mem = hot_memory.get(sess.id) + if mem is None: + return None + if mem.push_count >= cfg.max_per_session: + return None + + viewer = viewer_from( + config_path=store.config_path, + project=project or mem.project, + agent=mem.agent, + ) + scored = _retrieve_claim_scores(store, query, viewer) + if not scored: + return None + + scores = {cid: rel for cid, rel, _, _ in scored} + hot_memory.update_snapshot(sess.id, scores) + + now = time.monotonic() + if mem.last_push_at is not None and (now - mem.last_push_at) < cfg.throttle_seconds: + return None + + for claim_id, relevance, snippet, backend in scored: + if claim_id in mem.volunteered: + continue + if relevance < cfg.threshold: + continue + why = _build_why( + claim_id=claim_id, + query=query, + relevance=relevance, + backend=backend, + snippet=snippet, + ) + return VolunteerOffer( + claim_id=claim_id, + relevance=relevance, + why=why, + session_id=sess.id, + ) + return None + + +def enqueue_offer(offer: VolunteerOffer) -> None: + """Queue an offer and optionally push over MCP (used by tests).""" + _enqueue_offer(offer) + + +def _enqueue_offer(offer: VolunteerOffer) -> None: + with _state_lock: + _pending.setdefault(offer.session_id, []).append(offer) + hot_memory.mark_volunteered(offer.session_id, offer.claim_id, pushed_at=time.monotonic()) + _maybe_mcp_push(offer) + + +def _maybe_mcp_push(offer: VolunteerOffer) -> None: + with _state_lock: + push = _mcp_push.get(offer.session_id) + if push is None: + return + session, loop = push + + async def _send() -> None: + from typing import cast + + from mcp.types import Notification, ServerNotification + + try: + await session.send_notification( + cast( + ServerNotification, + Notification( + method="kb.volunteer_context", + params=offer.to_dict(), + ), + ) + ) + except Exception: + logger.exception("MCP volunteer_context push failed for %s", offer.session_id) + + try: + asyncio.run_coroutine_threadsafe(_send(), loop) + except RuntimeError: + logger.exception("no event loop for MCP volunteer push") + + +def register_mcp_push( + session_id: str, + session: ServerSession, + loop: asyncio.AbstractEventLoop, +) -> None: + with _state_lock: + _mcp_push[session_id] = (session, loop) + + +def drain_pending(session_id: str, *, clear: bool = True) -> list[VolunteerOffer]: + """Return queued offers for *session_id* (poll surface for JSONL / CLI).""" + with _state_lock: + if clear: + return _pending.pop(session_id, []) + return list(_pending.get(session_id, [])) + + +def on_session_start(store: KBStore, sess: Session) -> None: + """Register hot memory and start the background watch when *sess* has a task.""" + cfg = load_config(store) + if not cfg.enabled: + return + query = session_query(sess) + if not query: + return + + hot_memory.register( + session_id=sess.id, + query=query, + agent=sess.agent, + ) + try: + offer = evaluate_session(store, sess, config=cfg) + if offer is not None: + _enqueue_offer(offer) + except Exception: + logger.exception("initial volunteer evaluation failed for %s", sess.id) + _start_watch(store, sess.id, cfg) + + +def on_session_end(session_id: str) -> None: + hot_memory.unregister(session_id) + with _state_lock: + _mcp_push.pop(session_id, None) + _pending.pop(session_id, None) + thread = _watch_threads.pop(session_id, None) + if thread is not None and thread.is_alive(): + # ``unregister`` sets ``active=False``; the loop exits on next check. + thread.join(timeout=0.1) + + +def _start_watch(store: KBStore, session_id: str, cfg: VolunteerConfig) -> None: + existing = _watch_threads.get(session_id) + if existing is not None and existing.is_alive(): + return + + def _loop() -> None: + while True: + mem = hot_memory.get(session_id) + if mem is None or not mem.active: + break + try: + sess = store.get_session(session_id) + offer = evaluate_session(store, sess, config=cfg) + if offer is not None: + _enqueue_offer(offer) + except Exception: + logger.exception("volunteer watch failed for session %s", session_id) + mem = hot_memory.get(session_id) + if mem is None or not mem.active: + break + time.sleep(cfg.poll_interval_seconds) + + thread = threading.Thread( + target=_loop, + name=f"vouch-volunteer-{session_id}", + daemon=True, + ) + _watch_threads[session_id] = thread + thread.start() + + +def evaluate_now(store: KBStore, session_id: str) -> VolunteerOffer | None: + """Synchronous single-shot evaluation (tests and immediate poll).""" + sess = store.get_session(session_id) + offer = evaluate_session(store, sess) + if offer is not None: + _enqueue_offer(offer) + return offer diff --git a/templates/config.template.yaml b/templates/config.template.yaml index d76164ce..1f9968d7 100644 --- a/templates/config.template.yaml +++ b/templates/config.template.yaml @@ -19,6 +19,16 @@ review: expire_pending_after_days: 90 # GC threshold for stale proposals (0 = never) similarity_threshold: 0.95 # cosine warn on propose (needs embeddings extra) +# Confidence-gated push context (kb.volunteer_context — #236). +# When a session opens with a task, vouch watches retrieval salience and +# surfaces high-relevance approved claims via MCP notification + poll. +# volunteer: +# enabled: true +# threshold: 0.85 # normalized relevance (0–1) +# throttle_seconds: 30 # min gap between pushes per session +# poll_interval_seconds: 2 # background watch interval +# max_per_session: 50 + # Optional per-agent identity overrides. Keyed by VOUCH_AGENT value. agents: claude-code: diff --git a/tests/test_volunteer_context.py b/tests/test_volunteer_context.py new file mode 100644 index 00000000..2529a162 --- /dev/null +++ b/tests/test_volunteer_context.py @@ -0,0 +1,190 @@ +"""Confidence-gated push context — kb.volunteer_context (#236).""" + +from __future__ import annotations + +import time +from pathlib import Path + +import pytest + +from vouch import health, volunteer_context +from vouch import sessions as sess_mod +from vouch.hot_memory import get as hot_memory_get +from vouch.jsonl_server import handle_request +from vouch.models import ArtifactScope, Claim, Visibility +from vouch.storage import KBStore + + +@pytest.fixture +def store(tmp_path: Path) -> KBStore: + s = KBStore.init(tmp_path) + s.config_path.write_text( + "volunteer:\n poll_interval_seconds: 999\n throttle_seconds: 0\n" + ) + return s + + +def test_jwt_claim_volunteered_on_session_start(store: KBStore, monkeypatch) -> None: + src = store.put_source(b"jwt spec") + store.put_claim(Claim( + id="auth-uses-jwt", + text="Authentication uses JWT bearer tokens for API access", + evidence=[src.id], + )) + health.rebuild_index(store) + monkeypatch.chdir(store.root) + + sess = sess_mod.session_start(store, agent="test-agent", task="implement jwt auth") + pending = volunteer_context.drain_pending(sess.id) + + assert len(pending) == 1 + offer = pending[0] + assert offer.claim_id == "auth-uses-jwt" + assert offer.relevance >= volunteer_context.DEFAULT_THRESHOLD + assert "jwt" in offer.why.lower() + + assert volunteer_context.drain_pending(sess.id) == [] + + sess_mod.session_end(store, sess.id) + + +def test_pending_proposal_not_volunteered(store: KBStore, monkeypatch) -> None: + from vouch.proposals import propose_claim + + src = store.put_source(b"e") + sess = sess_mod.session_start(store, agent="a", task="jwt") + propose_claim( + store, + text="JWT draft — not yet approved", + evidence=[src.id], + proposed_by="a", + session_id=sess.id, + ) + health.rebuild_index(store) + monkeypatch.chdir(store.root) + + offer = volunteer_context.evaluate_now(store, sess.id) + assert offer is None + sess_mod.session_end(store, sess.id) + + +def test_private_claim_respects_scope(store: KBStore, monkeypatch) -> None: + src = store.put_source(b"e") + store.put_claim(Claim( + id="secret-jwt", + text="JWT signing key rotation policy", + evidence=[src.id], + scope=ArtifactScope(visibility=Visibility.PRIVATE, agent="alice"), + )) + health.rebuild_index(store) + monkeypatch.chdir(store.root) + monkeypatch.setenv("VOUCH_AGENT", "bob") + + sess = sess_mod.session_start(store, agent="bob", task="jwt rotation") + offer = volunteer_context.evaluate_now(store, sess.id) + assert offer is None + sess_mod.session_end(store, sess.id) + + +def test_throttle_blocks_second_push(store: KBStore, monkeypatch) -> None: + from vouch import hot_memory + from vouch.models import Session + + src = store.put_source(b"e") + store.put_claim(Claim( + id="auth-uses-jwt", + text="JWT tokens", + evidence=[src.id], + )) + store.put_claim(Claim( + id="jwt-refresh", + text="JWT refresh rotation", + evidence=[src.id], + )) + health.rebuild_index(store) + monkeypatch.chdir(store.root) + + cfg = volunteer_context.VolunteerConfig(throttle_seconds=60.0) + sess = Session(id="sess-throttle-test", agent="a", task="jwt") + store.put_session(sess) + hot_memory.register(session_id=sess.id, query="jwt", agent="a") + + first = volunteer_context.evaluate_session(store, sess, config=cfg) + assert first is not None + volunteer_context.enqueue_offer(first) + + second = volunteer_context.evaluate_session(store, sess, config=cfg) + assert second is None + + +def test_jsonl_volunteer_context_poll(store: KBStore, monkeypatch) -> None: + src = store.put_source(b"e") + store.put_claim(Claim( + id="auth-uses-jwt", + text="JWT bearer auth", + evidence=[src.id], + )) + health.rebuild_index(store) + monkeypatch.chdir(store.root) + + start = handle_request({ + "id": "1", + "method": "kb.session_start", + "params": {"task": "jwt"}, + }) + assert start["ok"] + session_id = start["result"]["id"] + volunteer_context.evaluate_now(store, session_id) + + poll = handle_request({ + "id": "2", + "method": "kb.volunteer_context", + "params": {"session_id": session_id}, + }) + assert poll["ok"] + assert len(poll["result"]["volunteers"]) == 1 + assert poll["result"]["volunteers"][0]["claim_id"] == "auth-uses-jwt" + + empty = handle_request({ + "id": "3", + "method": "kb.volunteer_context", + "params": {"session_id": session_id}, + }) + assert empty["ok"] + assert empty["result"]["volunteers"] == [] + + handle_request({"id": "4", "method": "kb.session_end", "params": {"session_id": session_id}}) + + +def test_session_without_task_skips_watch(store: KBStore) -> None: + sess = sess_mod.session_start(store, agent="a") + assert hot_memory_get(sess.id) is None + sess_mod.session_end(store, sess.id) + + +def test_watch_delivers_within_five_seconds(store: KBStore, monkeypatch) -> None: + src = store.put_source(b"e") + store.put_claim(Claim( + id="auth-uses-jwt", + text="JWT authentication", + evidence=[src.id], + )) + health.rebuild_index(store) + monkeypatch.chdir(store.root) + + store.config_path.write_text( + "volunteer:\n poll_interval_seconds: 1\n throttle_seconds: 0\n" + ) + + sess = sess_mod.session_start(store, agent="a", task="jwt") + deadline = time.monotonic() + 5.0 + got: list[volunteer_context.VolunteerOffer] = [] + while time.monotonic() < deadline: + got = volunteer_context.drain_pending(sess.id, clear=False) + if got: + break + time.sleep(0.2) + + assert got, "expected volunteered claim within 5 seconds" + assert got[0].claim_id == "auth-uses-jwt" + sess_mod.session_end(store, sess.id)