Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion hub/internal/server/handlers_agent_digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package server

import (
"context"
"database/sql"
"encoding/json"
"net/http"
"strings"

Expand Down Expand Up @@ -113,7 +115,29 @@ func (s *Server) sumTurnActiveMs(ctx context.Context, agentIDs []string) (int64,
// URL, or (the OTLP export) by resolving it from the session row. Resolving it
// here from control would add a round-trip and fail for a session with events
// but no control row.
//
// #118 §1: the GROUP BY scan runs on every digest/turns read, so for a terminal
// (archived) session — whose agent set can no longer grow — the result is cached
// on sessions.agent_ids_json and served O(1) thereafter. A paused session can
// still resume and bind a new agent, so non-archived sessions always re-scan.
func (s *Server) sessionAgentIDs(ctx context.Context, team, session string) ([]string, error) {
// Control-store probe: is this session sealed, and do we already have its
// agent set cached? A session with events but no control row (the doc above)
// scans the same as before — status stays "" so the cache path is skipped.
var status string
var cached sql.NullString
_ = s.db.QueryRowContext(ctx,
`SELECT status, agent_ids_json FROM sessions WHERE id = ?`, session,
).Scan(&status, &cached)
sealed := status == "archived"
if sealed && cached.Valid && cached.String != "" {
var ids []string
if err := json.Unmarshal([]byte(cached.String), &ids); err == nil {
return ids, nil
}
// A malformed blob falls through to the authoritative scan.
}

er, err := s.eventsReader(team)
if err != nil {
return nil, err
Expand All @@ -135,7 +159,22 @@ func (s *Server) sessionAgentIDs(ctx context.Context, team, session string) ([]s
}
out = append(out, id)
}
return out, rows.Err()
if err := rows.Err(); err != nil {
return nil, err
}

// Read-repair: materialize the set onto the archived session so the next
// read skips the scan. Best-effort — a write failure just means we scan
// again next time. Only when non-empty (an empty set would re-scan anyway,
// and avoids caching a transient "no events yet" state).
if sealed && !cached.Valid && len(out) > 0 {
if blob, err := json.Marshal(out); err == nil {
_, _ = s.db.ExecContext(ctx,
`UPDATE sessions SET agent_ids_json = ? WHERE id = ? AND agent_ids_json IS NULL`,
string(blob), session)
}
}
return out, nil
}

// mergeDigest folds src into dst (the session rollup): counts sum, taxonomies
Expand Down
106 changes: 106 additions & 0 deletions hub/internal/server/session_agent_ids_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package server

import (
"context"
"testing"
)

// insertEventAtTS writes one agent_events row with an explicit ts so tests can
// pin the MIN(ts) ordering sessionAgentIDs depends on.
func insertEventAtTS(t *testing.T, s *Server, agentID, sesID string, seq int, ts string) {
t.Helper()
if _, err := evWForTeam(t, s, defaultTeamID).Exec(
`INSERT INTO agent_events
(id, agent_id, seq, ts, kind, producer, payload_json, session_id)
VALUES (?, ?, ?, ?, 'text', 'agent', '{}', ?)`,
"evt-"+sesID+"-"+agentID+"-"+itoaInt(seq),
agentID, seq, ts, sesID,
); err != nil {
t.Fatalf("insert event: %v", err)
}
}

func seedAgentRowWithID(t *testing.T, s *Server, team, agentID string) {
t.Helper()
if _, err := s.db.Exec(
`INSERT INTO agents (id, team_id, handle, kind, created_at)
VALUES (?, ?, ?, 'claude-code', ?)`,
agentID, team, "h-"+agentID, NowUTC()); err != nil {
t.Fatalf("insert agent: %v", err)
}
}

// TestSessionAgentIDs_CachesOnlyWhenArchived verifies the #118 §1
// denormalization: the agent set is scanned (authoritatively) for a live
// session and served from sessions.agent_ids_json for an archived one.
func TestSessionAgentIDs_CachesOnlyWhenArchived(t *testing.T) {
s, _ := newA2ATestServer(t)
ctx := context.Background()
const sesID = "ses-cache"
const a1, a2, a3 = "agent-1", "agent-2", "agent-3"

seedAgentRowWithID(t, s, defaultTeamID, a1)
seedAgentRowWithID(t, s, defaultTeamID, a2)
seedAgentRowWithID(t, s, defaultTeamID, a3)
if _, err := s.db.ExecContext(ctx,
`INSERT INTO sessions
(id, team_id, title, scope_kind, current_agent_id,
status, opened_at, last_active_at)
VALUES (?, ?, 'cache test', 'team', ?, 'active', ?, ?)`,
sesID, defaultTeamID, a1, NowUTC(), NowUTC()); err != nil {
t.Fatalf("insert session: %v", err)
}

// a1 active first, then a2 (resume). Ordered by MIN(ts).
insertEventAtTS(t, s, a1, sesID, 1, "2026-06-27T00:00:01Z")
insertEventAtTS(t, s, a2, sesID, 1, "2026-06-27T00:00:02Z")

// While active: scan, do not cache.
got, err := s.sessionAgentIDs(ctx, defaultTeamID, sesID)
if err != nil {
t.Fatalf("sessionAgentIDs (active): %v", err)
}
if len(got) != 2 || got[0] != a1 || got[1] != a2 {
t.Fatalf("active scan = %v, want [%s %s]", got, a1, a2)
}
var cached *string
if err := s.db.QueryRowContext(ctx,
`SELECT agent_ids_json FROM sessions WHERE id = ?`, sesID).Scan(&cached); err != nil {
t.Fatalf("read cache col: %v", err)
}
if cached != nil {
t.Fatalf("agent_ids_json materialized for a live session: %q", *cached)
}

// Archive the session — now the set is immutable and may be cached.
if _, err := s.db.ExecContext(ctx,
`UPDATE sessions SET status = 'archived' WHERE id = ?`, sesID); err != nil {
t.Fatalf("archive: %v", err)
}
got, err = s.sessionAgentIDs(ctx, defaultTeamID, sesID)
if err != nil {
t.Fatalf("sessionAgentIDs (archived, first): %v", err)
}
if len(got) != 2 || got[0] != a1 || got[1] != a2 {
t.Fatalf("archived scan = %v, want [%s %s]", got, a1, a2)
}
if err := s.db.QueryRowContext(ctx,
`SELECT agent_ids_json FROM sessions WHERE id = ?`, sesID).Scan(&cached); err != nil {
t.Fatalf("read cache col after archive: %v", err)
}
if cached == nil {
t.Fatal("agent_ids_json not materialized after archived read")
}

// Prove the cache is now served O(1): add a third agent's events to the
// session's shard, then read again — the result must still be the cached
// (sealed) set, NOT a fresh scan that would include a3.
insertEventAtTS(t, s, a3, sesID, 1, "2026-06-27T00:00:03Z")
got, err = s.sessionAgentIDs(ctx, defaultTeamID, sesID)
if err != nil {
t.Fatalf("sessionAgentIDs (archived, cached): %v", err)
}
if len(got) != 2 || got[0] != a1 || got[1] != a2 {
t.Fatalf("cached read = %v, want sealed [%s %s] (not a re-scan)", got, a1, a2)
}
}
1 change: 1 addition & 0 deletions hub/migrations/0060_sessions_agent_ids.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE sessions DROP COLUMN agent_ids_json;
14 changes: 14 additions & 0 deletions hub/migrations/0060_sessions_agent_ids.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- Denormalize the per-session agent set onto the session row (#118 §1).
--
-- sessionAgentIDs (the agents that produced events in a session, ordered by
-- first activity) was a `GROUP BY agent_id ORDER BY MIN(ts)` over the session's
-- full agent_events span — run on EVERY digest/turns read, even when the digest
-- itself is already fresh. For a 10k-event session that is a 10k-row index scan
-- per Insight open.
--
-- This column caches that result as a JSON array of agent ids. It is populated
-- lazily and ONLY for terminal (archived) sessions, whose agent set can no
-- longer grow — a paused session can resume and bind a new agent, so those keep
-- scanning. NULL means "not yet materialized"; the read path falls back to the
-- authoritative scan and writes the result back once the session is archived.
ALTER TABLE sessions ADD COLUMN agent_ids_json TEXT;
Loading