diff --git a/hub/internal/server/handlers_agent_digest.go b/hub/internal/server/handlers_agent_digest.go index f554bce0..9fc24b00 100644 --- a/hub/internal/server/handlers_agent_digest.go +++ b/hub/internal/server/handlers_agent_digest.go @@ -2,6 +2,8 @@ package server import ( "context" + "database/sql" + "encoding/json" "net/http" "strings" @@ -113,7 +115,29 @@ func (s *Server) sumTurnActiveMs(ctx context.Context, agentIDs []string) (int64, // URL, or (the OTLP export) by resolving it from the session row. Resolving it // here from control would add a round-trip and fail for a session with events // but no control row. +// +// #118 §1: the GROUP BY scan runs on every digest/turns read, so for a terminal +// (archived) session — whose agent set can no longer grow — the result is cached +// on sessions.agent_ids_json and served O(1) thereafter. A paused session can +// still resume and bind a new agent, so non-archived sessions always re-scan. func (s *Server) sessionAgentIDs(ctx context.Context, team, session string) ([]string, error) { + // Control-store probe: is this session sealed, and do we already have its + // agent set cached? A session with events but no control row (the doc above) + // scans the same as before — status stays "" so the cache path is skipped. + var status string + var cached sql.NullString + _ = s.db.QueryRowContext(ctx, + `SELECT status, agent_ids_json FROM sessions WHERE id = ?`, session, + ).Scan(&status, &cached) + sealed := status == "archived" + if sealed && cached.Valid && cached.String != "" { + var ids []string + if err := json.Unmarshal([]byte(cached.String), &ids); err == nil { + return ids, nil + } + // A malformed blob falls through to the authoritative scan. + } + er, err := s.eventsReader(team) if err != nil { return nil, err @@ -135,7 +159,22 @@ func (s *Server) sessionAgentIDs(ctx context.Context, team, session string) ([]s } out = append(out, id) } - return out, rows.Err() + if err := rows.Err(); err != nil { + return nil, err + } + + // Read-repair: materialize the set onto the archived session so the next + // read skips the scan. Best-effort — a write failure just means we scan + // again next time. Only when non-empty (an empty set would re-scan anyway, + // and avoids caching a transient "no events yet" state). + if sealed && !cached.Valid && len(out) > 0 { + if blob, err := json.Marshal(out); err == nil { + _, _ = s.db.ExecContext(ctx, + `UPDATE sessions SET agent_ids_json = ? WHERE id = ? AND agent_ids_json IS NULL`, + string(blob), session) + } + } + return out, nil } // mergeDigest folds src into dst (the session rollup): counts sum, taxonomies diff --git a/hub/internal/server/session_agent_ids_cache_test.go b/hub/internal/server/session_agent_ids_cache_test.go new file mode 100644 index 00000000..49d1fdf9 --- /dev/null +++ b/hub/internal/server/session_agent_ids_cache_test.go @@ -0,0 +1,106 @@ +package server + +import ( + "context" + "testing" +) + +// insertEventAtTS writes one agent_events row with an explicit ts so tests can +// pin the MIN(ts) ordering sessionAgentIDs depends on. +func insertEventAtTS(t *testing.T, s *Server, agentID, sesID string, seq int, ts string) { + t.Helper() + if _, err := evWForTeam(t, s, defaultTeamID).Exec( + `INSERT INTO agent_events + (id, agent_id, seq, ts, kind, producer, payload_json, session_id) + VALUES (?, ?, ?, ?, 'text', 'agent', '{}', ?)`, + "evt-"+sesID+"-"+agentID+"-"+itoaInt(seq), + agentID, seq, ts, sesID, + ); err != nil { + t.Fatalf("insert event: %v", err) + } +} + +func seedAgentRowWithID(t *testing.T, s *Server, team, agentID string) { + t.Helper() + if _, err := s.db.Exec( + `INSERT INTO agents (id, team_id, handle, kind, created_at) + VALUES (?, ?, ?, 'claude-code', ?)`, + agentID, team, "h-"+agentID, NowUTC()); err != nil { + t.Fatalf("insert agent: %v", err) + } +} + +// TestSessionAgentIDs_CachesOnlyWhenArchived verifies the #118 §1 +// denormalization: the agent set is scanned (authoritatively) for a live +// session and served from sessions.agent_ids_json for an archived one. +func TestSessionAgentIDs_CachesOnlyWhenArchived(t *testing.T) { + s, _ := newA2ATestServer(t) + ctx := context.Background() + const sesID = "ses-cache" + const a1, a2, a3 = "agent-1", "agent-2", "agent-3" + + seedAgentRowWithID(t, s, defaultTeamID, a1) + seedAgentRowWithID(t, s, defaultTeamID, a2) + seedAgentRowWithID(t, s, defaultTeamID, a3) + if _, err := s.db.ExecContext(ctx, + `INSERT INTO sessions + (id, team_id, title, scope_kind, current_agent_id, + status, opened_at, last_active_at) + VALUES (?, ?, 'cache test', 'team', ?, 'active', ?, ?)`, + sesID, defaultTeamID, a1, NowUTC(), NowUTC()); err != nil { + t.Fatalf("insert session: %v", err) + } + + // a1 active first, then a2 (resume). Ordered by MIN(ts). + insertEventAtTS(t, s, a1, sesID, 1, "2026-06-27T00:00:01Z") + insertEventAtTS(t, s, a2, sesID, 1, "2026-06-27T00:00:02Z") + + // While active: scan, do not cache. + got, err := s.sessionAgentIDs(ctx, defaultTeamID, sesID) + if err != nil { + t.Fatalf("sessionAgentIDs (active): %v", err) + } + if len(got) != 2 || got[0] != a1 || got[1] != a2 { + t.Fatalf("active scan = %v, want [%s %s]", got, a1, a2) + } + var cached *string + if err := s.db.QueryRowContext(ctx, + `SELECT agent_ids_json FROM sessions WHERE id = ?`, sesID).Scan(&cached); err != nil { + t.Fatalf("read cache col: %v", err) + } + if cached != nil { + t.Fatalf("agent_ids_json materialized for a live session: %q", *cached) + } + + // Archive the session — now the set is immutable and may be cached. + if _, err := s.db.ExecContext(ctx, + `UPDATE sessions SET status = 'archived' WHERE id = ?`, sesID); err != nil { + t.Fatalf("archive: %v", err) + } + got, err = s.sessionAgentIDs(ctx, defaultTeamID, sesID) + if err != nil { + t.Fatalf("sessionAgentIDs (archived, first): %v", err) + } + if len(got) != 2 || got[0] != a1 || got[1] != a2 { + t.Fatalf("archived scan = %v, want [%s %s]", got, a1, a2) + } + if err := s.db.QueryRowContext(ctx, + `SELECT agent_ids_json FROM sessions WHERE id = ?`, sesID).Scan(&cached); err != nil { + t.Fatalf("read cache col after archive: %v", err) + } + if cached == nil { + t.Fatal("agent_ids_json not materialized after archived read") + } + + // Prove the cache is now served O(1): add a third agent's events to the + // session's shard, then read again — the result must still be the cached + // (sealed) set, NOT a fresh scan that would include a3. + insertEventAtTS(t, s, a3, sesID, 1, "2026-06-27T00:00:03Z") + got, err = s.sessionAgentIDs(ctx, defaultTeamID, sesID) + if err != nil { + t.Fatalf("sessionAgentIDs (archived, cached): %v", err) + } + if len(got) != 2 || got[0] != a1 || got[1] != a2 { + t.Fatalf("cached read = %v, want sealed [%s %s] (not a re-scan)", got, a1, a2) + } +} diff --git a/hub/migrations/0060_sessions_agent_ids.down.sql b/hub/migrations/0060_sessions_agent_ids.down.sql new file mode 100644 index 00000000..4452e77a --- /dev/null +++ b/hub/migrations/0060_sessions_agent_ids.down.sql @@ -0,0 +1 @@ +ALTER TABLE sessions DROP COLUMN agent_ids_json; diff --git a/hub/migrations/0060_sessions_agent_ids.up.sql b/hub/migrations/0060_sessions_agent_ids.up.sql new file mode 100644 index 00000000..2d129015 --- /dev/null +++ b/hub/migrations/0060_sessions_agent_ids.up.sql @@ -0,0 +1,14 @@ +-- Denormalize the per-session agent set onto the session row (#118 §1). +-- +-- sessionAgentIDs (the agents that produced events in a session, ordered by +-- first activity) was a `GROUP BY agent_id ORDER BY MIN(ts)` over the session's +-- full agent_events span — run on EVERY digest/turns read, even when the digest +-- itself is already fresh. For a 10k-event session that is a 10k-row index scan +-- per Insight open. +-- +-- This column caches that result as a JSON array of agent ids. It is populated +-- lazily and ONLY for terminal (archived) sessions, whose agent set can no +-- longer grow — a paused session can resume and bind a new agent, so those keep +-- scanning. NULL means "not yet materialized"; the read path falls back to the +-- authoritative scan and writes the result back once the session is archived. +ALTER TABLE sessions ADD COLUMN agent_ids_json TEXT;