diff --git a/hub/internal/server/digest_fold_strip_test.go b/hub/internal/server/digest_fold_strip_test.go new file mode 100644 index 00000000..0854a639 --- /dev/null +++ b/hub/internal/server/digest_fold_strip_test.go @@ -0,0 +1,75 @@ +package server + +import ( + "context" + "encoding/json" + "reflect" + "strings" + "testing" +) + +// TestFoldStripsBodiesWithoutChangingDigest pins #118 §3: the fold loaders +// strip the large display bodies (text/content/message/…) server-side, and +// doing so must NOT change the computed digest — the fold only reads small +// structured keys. The brute==incremental tests can't catch a wrongly-dropped +// field (both paths share the same projection and would break identically), so +// this compares the stripped-from-DB digest against the full-payload reference. +func TestFoldStripsBodiesWithoutChangingDigest(t *testing.T) { + s, _ := newA2ATestServer(t) + ctx := context.Background() + + v, memEvents := loadDigestVector(t) + // Reference digest: the canonical vector folded with FULL in-memory payloads. + wantD, _ := computeAgentDigest("a", v.TeamID, memEvents) + + agentID := seedAgentRow(t, s, defaultTeamID, "stripper", "claude-code") + ew := evWForTeam(t, s, defaultTeamID) + big := strings.Repeat("x", 50000) // simulate an accumulated-transcript body + for _, e := range v.Events { + p := map[string]any{} + for k, val := range e.Payload { + p[k] = val + } + // Heavy bodies that must be stripped. None is read by the fold, so a + // correct strip leaves the digest identical to the reference above. + p["text"] = big + p["content"] = big + p["message"] = big + pj, _ := json.Marshal(p) + if _, err := ew.Exec( + `INSERT INTO agent_events (id, agent_id, seq, ts, kind, producer, payload_json) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + "e-"+itoaInt(int(e.Seq)), agentID, e.Seq, e.TS, e.Kind, e.Producer, string(pj), + ); err != nil { + t.Fatalf("insert event seq=%d: %v", e.Seq, err) + } + } + + er, err := s.eventsReader(defaultTeamID) + if err != nil { + t.Fatalf("eventsReader: %v", err) + } + got, err := loadFoldEvents(ctx, er, agentID) + if err != nil { + t.Fatalf("loadFoldEvents: %v", err) + } + if len(got) != len(v.Events) { + t.Fatalf("loaded %d events, want %d", len(got), len(v.Events)) + } + + // The bodies must actually be gone — guards against json_remove silently + // no-op'ing (e.g. a JSON1 regression), which would defeat the optimization. + for _, fe := range got { + for _, k := range []string{"text", "content", "message"} { + if _, ok := fe.Payload[k]; ok { + t.Fatalf("seq=%d: %q body not stripped", fe.Seq, k) + } + } + } + + gotD, _ := computeAgentDigest("a", v.TeamID, got) + if !reflect.DeepEqual(digestJSON(gotD), digestJSON(wantD)) { + t.Errorf("stripped digest != full-payload digest\n got: %+v\nwant: %+v", + digestJSON(gotD), digestJSON(wantD)) + } +} diff --git a/hub/internal/server/digest_store.go b/hub/internal/server/digest_store.go index cec18ca3..d219083a 100644 --- a/hub/internal/server/digest_store.go +++ b/hub/internal/server/digest_store.go @@ -446,10 +446,30 @@ func (s *Server) backfillAgentDigest(ctx context.Context, agentID, teamID string return d, nil } +// foldEventCols is the agent_events projection every fold loader shares. The +// payload is shrunk server-side: the fold (brute + incremental) only reads small +// structured keys — tokens, cost_usd, by_model, tool ids/names, status, +// is_error, type, turn_id — and never the large display bodies (#118 §1). So +// json_remove strips those bodies before the row crosses into Go, cutting the +// bulk of the payload I/O + json.Unmarshal cost the text-heavy events carry (a +// single `text` event can hold the run's whole accumulated transcript). +// +// SQLite silently ignores json_remove paths that don't exist, so listing a body +// field an event lacks is a no-op — and removing a field the fold doesn't read +// can never change the digest (the equivalence is pinned by +// TestFoldStripsBodiesWithoutChangingDigest). json_valid guards a NULL/malformed +// payload through untouched; scanFoldEvents already tolerates a non-JSON blob. +const foldEventCols = `seq, session_ordinal, kind, ts, producer, session_id, + CASE WHEN json_valid(payload_json) + THEN json_remove(payload_json, + '$.text', '$.content', '$.message', '$.delta', '$.output', + '$.thinking', '$.thought', '$.reasoning') + ELSE payload_json END AS payload_json` + // loadFoldEvents reads an agent's full ordered event log as foldEvents. func loadFoldEvents(ctx context.Context, q digestStore, agentID string) ([]foldEvent, error) { return scanFoldEvents(q.QueryContext(ctx, ` - SELECT seq, session_ordinal, kind, ts, producer, session_id, payload_json + SELECT `+foldEventCols+` FROM agent_events WHERE agent_id = ? ORDER BY seq ASC`, agentID)) } @@ -457,7 +477,7 @@ func loadFoldEvents(ctx context.Context, q digestStore, agentID string) ([]foldE // in-tx prefix backfill of a pre-existing agent. func loadFoldEventsBefore(ctx context.Context, q digestStore, agentID string, beforeSeq int64) ([]foldEvent, error) { return scanFoldEvents(q.QueryContext(ctx, ` - SELECT seq, session_ordinal, kind, ts, producer, session_id, payload_json + SELECT `+foldEventCols+` FROM agent_events WHERE agent_id = ? AND seq < ? ORDER BY seq ASC`, agentID, beforeSeq)) } @@ -465,7 +485,7 @@ func loadFoldEventsBefore(ctx context.Context, q digestStore, agentID string, be // deferred-fold worker still has to fold past the digest watermark. func loadFoldEventsAfter(ctx context.Context, q digestStore, agentID string, afterSeq int64) ([]foldEvent, error) { return scanFoldEvents(q.QueryContext(ctx, ` - SELECT seq, session_ordinal, kind, ts, producer, session_id, payload_json + SELECT `+foldEventCols+` FROM agent_events WHERE agent_id = ? AND seq > ? ORDER BY seq ASC`, agentID, afterSeq)) }