Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions hub/internal/server/digest_fold_strip_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package server

import (
"context"
"encoding/json"
"reflect"
"strings"
"testing"
)

// TestFoldStripsBodiesWithoutChangingDigest pins #118 §3: the fold loaders
// strip the large display bodies (text/content/message/…) server-side, and
// doing so must NOT change the computed digest — the fold only reads small
// structured keys. The brute==incremental tests can't catch a wrongly-dropped
// field (both paths share the same projection and would break identically), so
// this compares the stripped-from-DB digest against the full-payload reference.
func TestFoldStripsBodiesWithoutChangingDigest(t *testing.T) {
s, _ := newA2ATestServer(t)
ctx := context.Background()

v, memEvents := loadDigestVector(t)
// Reference digest: the canonical vector folded with FULL in-memory payloads.
wantD, _ := computeAgentDigest("a", v.TeamID, memEvents)

agentID := seedAgentRow(t, s, defaultTeamID, "stripper", "claude-code")
ew := evWForTeam(t, s, defaultTeamID)
big := strings.Repeat("x", 50000) // simulate an accumulated-transcript body
for _, e := range v.Events {
p := map[string]any{}
for k, val := range e.Payload {
p[k] = val
}
// Heavy bodies that must be stripped. None is read by the fold, so a
// correct strip leaves the digest identical to the reference above.
p["text"] = big
p["content"] = big
p["message"] = big
pj, _ := json.Marshal(p)
if _, err := ew.Exec(
`INSERT INTO agent_events (id, agent_id, seq, ts, kind, producer, payload_json)
VALUES (?, ?, ?, ?, ?, ?, ?)`,
"e-"+itoaInt(int(e.Seq)), agentID, e.Seq, e.TS, e.Kind, e.Producer, string(pj),
); err != nil {
t.Fatalf("insert event seq=%d: %v", e.Seq, err)
}
}

er, err := s.eventsReader(defaultTeamID)
if err != nil {
t.Fatalf("eventsReader: %v", err)
}
got, err := loadFoldEvents(ctx, er, agentID)
if err != nil {
t.Fatalf("loadFoldEvents: %v", err)
}
if len(got) != len(v.Events) {
t.Fatalf("loaded %d events, want %d", len(got), len(v.Events))
}

// The bodies must actually be gone — guards against json_remove silently
// no-op'ing (e.g. a JSON1 regression), which would defeat the optimization.
for _, fe := range got {
for _, k := range []string{"text", "content", "message"} {
if _, ok := fe.Payload[k]; ok {
t.Fatalf("seq=%d: %q body not stripped", fe.Seq, k)
}
}
}

gotD, _ := computeAgentDigest("a", v.TeamID, got)
if !reflect.DeepEqual(digestJSON(gotD), digestJSON(wantD)) {
t.Errorf("stripped digest != full-payload digest\n got: %+v\nwant: %+v",
digestJSON(gotD), digestJSON(wantD))
}
}
26 changes: 23 additions & 3 deletions hub/internal/server/digest_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,26 +446,46 @@ func (s *Server) backfillAgentDigest(ctx context.Context, agentID, teamID string
return d, nil
}

// foldEventCols is the agent_events projection every fold loader shares. The
// payload is shrunk server-side: the fold (brute + incremental) only reads small
// structured keys — tokens, cost_usd, by_model, tool ids/names, status,
// is_error, type, turn_id — and never the large display bodies (#118 §1). So
// json_remove strips those bodies before the row crosses into Go, cutting the
// bulk of the payload I/O + json.Unmarshal cost the text-heavy events carry (a
// single `text` event can hold the run's whole accumulated transcript).
//
// SQLite silently ignores json_remove paths that don't exist, so listing a body
// field an event lacks is a no-op — and removing a field the fold doesn't read
// can never change the digest (the equivalence is pinned by
// TestFoldStripsBodiesWithoutChangingDigest). json_valid guards a NULL/malformed
// payload through untouched; scanFoldEvents already tolerates a non-JSON blob.
const foldEventCols = `seq, session_ordinal, kind, ts, producer, session_id,
CASE WHEN json_valid(payload_json)
THEN json_remove(payload_json,
'$.text', '$.content', '$.message', '$.delta', '$.output',
'$.thinking', '$.thought', '$.reasoning')
ELSE payload_json END AS payload_json`

// loadFoldEvents reads an agent's full ordered event log as foldEvents.
func loadFoldEvents(ctx context.Context, q digestStore, agentID string) ([]foldEvent, error) {
return scanFoldEvents(q.QueryContext(ctx, `
SELECT seq, session_ordinal, kind, ts, producer, session_id, payload_json
SELECT `+foldEventCols+`
FROM agent_events WHERE agent_id = ? ORDER BY seq ASC`, agentID))
}

// loadFoldEventsBefore reads the ordered prefix [1, beforeSeq) — used by the
// in-tx prefix backfill of a pre-existing agent.
func loadFoldEventsBefore(ctx context.Context, q digestStore, agentID string, beforeSeq int64) ([]foldEvent, error) {
return scanFoldEvents(q.QueryContext(ctx, `
SELECT seq, session_ordinal, kind, ts, producer, session_id, payload_json
SELECT `+foldEventCols+`
FROM agent_events WHERE agent_id = ? AND seq < ? ORDER BY seq ASC`, agentID, beforeSeq))
}

// loadFoldEventsAfter reads the ordered suffix (afterSeq, ∞) — the events the
// deferred-fold worker still has to fold past the digest watermark.
func loadFoldEventsAfter(ctx context.Context, q digestStore, agentID string, afterSeq int64) ([]foldEvent, error) {
return scanFoldEvents(q.QueryContext(ctx, `
SELECT seq, session_ordinal, kind, ts, producer, session_id, payload_json
SELECT `+foldEventCols+`
FROM agent_events WHERE agent_id = ? AND seq > ? ORDER BY seq ASC`, agentID, afterSeq))
}

Expand Down
Loading