From 5282f419c6108d34ce7d19f5660a4eb0477915b6 Mon Sep 17 00:00:00 2001 From: deepseek-1 Date: Sun, 14 Jun 2026 13:10:53 +0800 Subject: [PATCH] feat(hub): additive optional limit/keyset pagination on unbounded list endpoints (#291) - audit events: add ?before= / ?after= ISO-8601 keyset cursors on ts - channel events: add ?before= keyset cursor on received_ts - A2A cards: add optional ?limit=N (absent = unlimited, max 1000) - Tests: keyset walk no-gap/no-overlap, limit caps, no-params unchanged Co-Authored-By: Claude Opus 4.7 --- hub/internal/server/audit.go | 19 ++- hub/internal/server/handlers_a2a.go | 17 ++- hub/internal/server/handlers_a2a_test.go | 65 ++++++++++ .../server/handlers_agent_families_test.go | 2 +- hub/internal/server/handlers_audit.go | 12 +- .../server/handlers_audit_coverage_test.go | 122 +++++++++++++++++- hub/internal/server/handlers_events.go | 15 ++- .../server/handlers_events_sender_test.go | 107 +++++++++++++++ hub/internal/server/handlers_phase_test.go | 4 +- .../server/handlers_templates_test.go | 4 +- 10 files changed, 351 insertions(+), 16 deletions(-) diff --git a/hub/internal/server/audit.go b/hub/internal/server/audit.go index bd61de24..88a6ef25 100644 --- a/hub/internal/server/audit.go +++ b/hub/internal/server/audit.go @@ -89,9 +89,13 @@ func actorFromContext(ctx context.Context) (tokenID, kind, handle string) { // whose meta_json carries a `project_id` matching it (covers agent.spawn, // run.create, document.create, review.request, artifact.create, etc.). // Project-scoped W2 Activity feed depends on this. +// +// Keyset pagination: before/after are ISO-8601 timestamps on the ts column +// (the natural monotonic sort key). Modeled on the agent_events (ts, seq) +// keyset pattern — ts alone sorts lexicographically correct for ISO-8601. func (s *Server) listAuditEvents( ctx context.Context, - teamID, action, since, projectID string, + teamID, action, since, projectID, before, after string, limit int, ) ([]AuditRow, error) { if limit <= 0 { @@ -116,8 +120,17 @@ func (s *Server) listAuditEvents( OR json_extract(meta_json, '$.project_id') = ?)` args = append(args, projectID, projectID) } - q += ` ORDER BY ts DESC LIMIT ?` - args = append(args, limit) + switch { + case before != "": + q += ` AND ts < ? ORDER BY ts DESC LIMIT ?` + args = append(args, before, limit) + case after != "": + q += ` AND ts > ? ORDER BY ts ASC LIMIT ?` + args = append(args, after, limit) + default: + q += ` ORDER BY ts DESC LIMIT ?` + args = append(args, limit) + } rows, err := s.db.QueryContext(ctx, q, args...) if err != nil { diff --git a/hub/internal/server/handlers_a2a.go b/hub/internal/server/handlers_a2a.go index d4984010..a48759b6 100644 --- a/hub/internal/server/handlers_a2a.go +++ b/hub/internal/server/handlers_a2a.go @@ -3,6 +3,7 @@ package server import ( "encoding/json" "net/http" + "strconv" "strings" "github.com/go-chi/chi/v5" @@ -117,7 +118,8 @@ func (s *Server) handlePutHostA2ACards(w http.ResponseWriter, r *http.Request) { } // handleListTeamA2ACards returns cards across all hosts in the team. -// Supports ?handle= to filter (steward calls this to find workers). +// Supports ?handle= to filter (steward calls this to find workers) +// and ?limit=N to cap rows (absent = unlimited, current behaviour). func (s *Server) handleListTeamA2ACards(w http.ResponseWriter, r *http.Request) { team := chi.URLParam(r, "team") handle := r.URL.Query().Get("handle") @@ -130,6 +132,19 @@ func (s *Server) handleListTeamA2ACards(w http.ResponseWriter, r *http.Request) args = append(args, handle) } query += ` ORDER BY host_id, agent_id` + limit := 0 + if v := r.URL.Query().Get("limit"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + limit = n + } + } + if limit > 1000 { + limit = 1000 + } + if limit > 0 { + query += ` LIMIT ?` + args = append(args, limit) + } rows, err := s.db.QueryContext(r.Context(), query, args...) if err != nil { diff --git a/hub/internal/server/handlers_a2a_test.go b/hub/internal/server/handlers_a2a_test.go index 241e6738..b6f94720 100644 --- a/hub/internal/server/handlers_a2a_test.go +++ b/hub/internal/server/handlers_a2a_test.go @@ -250,3 +250,68 @@ func TestListTeamA2ACards_RewritesURLToRelay(t *testing.T) { t.Errorf("name field lost: %+v", card) } } + +func TestListTeamA2ACards_Limit(t *testing.T) { + s, token := newA2ATestServer(t) + seedTestHost(t, s, defaultTeamID, "host-gpu", "gpu-1") + + // PUT 5 cards on one host. + cards := make([]map[string]any, 5) + for i := 0; i < 5; i++ { + cards[i] = map[string]any{ + "agent_id": "a" + itos(i), + "handle": "w" + itos(i), + "card": map[string]any{"name": "w" + itos(i)}, + } + } + status, body := doReq(t, s, token, http.MethodPut, + "/v1/teams/"+defaultTeamID+"/hosts/host-gpu/a2a/cards", + map[string]any{"cards": cards}) + if status != http.StatusOK { + t.Fatalf("put cards: %d %s", status, body) + } + + // No params — returns all cards (unchanged). + _, body = doReq(t, s, token, http.MethodGet, + "/v1/teams/"+defaultTeamID+"/a2a/cards", nil) + var all []a2aCardOut + if err := json.Unmarshal(body, &all); err != nil { + t.Fatalf("decode: %v", err) + } + if len(all) != 5 { + t.Fatalf("no-params got %d cards, want 5", len(all)) + } + + // ?limit=2 caps to 2. + _, body = doReq(t, s, token, http.MethodGet, + "/v1/teams/"+defaultTeamID+"/a2a/cards?limit=2", nil) + var limited []a2aCardOut + if err := json.Unmarshal(body, &limited); err != nil { + t.Fatalf("decode limited: %v", err) + } + if len(limited) != 2 { + t.Fatalf("limit=2 got %d cards, want 2", len(limited)) + } + + // ?limit=0 is ignored (no cap, returns all). + _, body = doReq(t, s, token, http.MethodGet, + "/v1/teams/"+defaultTeamID+"/a2a/cards?limit=0", nil) + var unlimited []a2aCardOut + if err := json.Unmarshal(body, &unlimited); err != nil { + t.Fatalf("decode limit=0: %v", err) + } + if len(unlimited) != 5 { + t.Fatalf("limit=0 got %d cards, want 5 (no cap)", len(unlimited)) + } + + // ?limit=9999 is clamped to 1000, returns all 5. + _, body = doReq(t, s, token, http.MethodGet, + "/v1/teams/"+defaultTeamID+"/a2a/cards?limit=9999", nil) + var capped []a2aCardOut + if err := json.Unmarshal(body, &capped); err != nil { + t.Fatalf("decode limit=9999: %v", err) + } + if len(capped) != 5 { + t.Fatalf("limit=9999 got %d cards, want 5 (clamped but enough)", len(capped)) + } +} diff --git a/hub/internal/server/handlers_agent_families_test.go b/hub/internal/server/handlers_agent_families_test.go index 7cbff2a8..252caf84 100644 --- a/hub/internal/server/handlers_agent_families_test.go +++ b/hub/internal/server/handlers_agent_families_test.go @@ -237,7 +237,7 @@ func TestAgentFamilies_HotReload_PutThenSpawnSeesIt(t *testing.T) { } // Audit row should reflect the create. - rows, err := c.s.listAuditEvents(context.Background(), c.teamID, "", "", "", 50) + rows, err := c.s.listAuditEvents(context.Background(), c.teamID, "", "", "", "", "", 50) if err != nil { t.Fatalf("listAudit: %v", err) } diff --git a/hub/internal/server/handlers_audit.go b/hub/internal/server/handlers_audit.go index 5a5c5a44..6aeed3cc 100644 --- a/hub/internal/server/handlers_audit.go +++ b/hub/internal/server/handlers_audit.go @@ -14,14 +14,22 @@ import ( // target_kind='project' rows and any row whose meta_json // carries a project_id field equal to this value // - limit — max rows, clamped to 500 (default 100) +// - before — ISO-8601 UTC timestamp; keyset cursor for backward +// pagination (ts < before ORDER BY ts DESC) +// - after — ISO-8601 UTC timestamp; keyset cursor for forward +// pagination (ts > after ORDER BY ts ASC) // -// Rows are ordered ts DESC so the newest actions appear first. +// Rows are ordered ts DESC so the newest actions appear first. When +// before/after are absent the response is byte-identical to the prior +// release — purely additive pagination. func (s *Server) handleListAudit(w http.ResponseWriter, r *http.Request) { team := chi.URLParam(r, "team") q := r.URL.Query() action := q.Get("action") since := q.Get("since") projectID := q.Get("project_id") + before := q.Get("before") + after := q.Get("after") limit := 100 if v := q.Get("limit"); v != "" { if n, err := strconv.Atoi(v); err == nil { @@ -34,7 +42,7 @@ func (s *Server) handleListAudit(w http.ResponseWriter, r *http.Request) { if limit <= 0 { limit = 100 } - rows, err := s.listAuditEvents(r.Context(), team, action, since, projectID, limit) + rows, err := s.listAuditEvents(r.Context(), team, action, since, projectID, before, after, limit) if err != nil { s.writeDBErr(w, err) return diff --git a/hub/internal/server/handlers_audit_coverage_test.go b/hub/internal/server/handlers_audit_coverage_test.go index bf4035a7..9fb909d5 100644 --- a/hub/internal/server/handlers_audit_coverage_test.go +++ b/hub/internal/server/handlers_audit_coverage_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "net/http" "testing" + "time" ) // countAuditActions returns a map of action → count of audit rows with that @@ -188,7 +189,7 @@ func TestListAudit_ActionFilterAndTeamScope(t *testing.T) { } // No filter — the scoped team's rows only, never the "other" team's. - rows, err := s.listAuditEvents(ctx, defaultTeamID, "", "", "", 100) + rows, err := s.listAuditEvents(ctx, defaultTeamID, "", "", "", "", "", 100) if err != nil { t.Fatalf("list: %v", err) } @@ -197,7 +198,7 @@ func TestListAudit_ActionFilterAndTeamScope(t *testing.T) { } // action=run.create — filters down to the one matching row. - rows, err = s.listAuditEvents(ctx, defaultTeamID, "run.create", "", "", 100) + rows, err = s.listAuditEvents(ctx, defaultTeamID, "run.create", "", "", "", "", 100) if err != nil { t.Fatalf("list filtered: %v", err) } @@ -234,7 +235,7 @@ func TestListAudit_FiltersByProjectID(t *testing.T) { // Row with neither target=project nor project_id meta — must NOT match. mustExec("template.created", "template", "t1", "{}") - rows, err := s.listAuditEvents(ctx, defaultTeamID, "", "", "p1", 100) + rows, err := s.listAuditEvents(ctx, defaultTeamID, "", "", "p1", "", "", 100) if err != nil { t.Fatalf("list: %v", err) } @@ -254,3 +255,118 @@ func TestListAudit_FiltersByProjectID(t *testing.T) { } } +// TestListAudit_KeysetPagination verifies the before=/after= keyset cursors +// on audit events. Seeds rows with distinct timestamps, walks both directions, +// and confirms no-gap/no-overlap page boundaries. +func TestListAudit_KeysetPagination(t *testing.T) { + s, _ := newA2ATestServer(t) + ctx := context.Background() + + // Seed N audit rows with distinct timestamps so keyset boundaries are clean. + const N = 7 + for i := 0; i < N; i++ { + ts := NowUTC() + if _, err := s.db.ExecContext(ctx, ` + INSERT INTO audit_events ( + id, team_id, ts, actor_kind, action, summary, meta_json + ) VALUES (?, ?, ?, 'system', 'test.ping', ?, '{}')`, + NewID(), defaultTeamID, ts, "ping "+itos(i)); err != nil { + t.Fatalf("seed audit row %d: %v", i, err) + } + time.Sleep(10 * time.Millisecond) // ensure distinct ts + } + + // No params — all rows returned DESC (unchanged behaviour). + all, err := s.listAuditEvents(ctx, defaultTeamID, "", "", "", "", "", 100) + if err != nil { + t.Fatalf("full list: %v", err) + } + if len(all) != N { + t.Fatalf("full list got %d rows, want %d", len(all), N) + } + + // ?limit=3 caps to 3 rows. + limited, err := s.listAuditEvents(ctx, defaultTeamID, "", "", "", "", "", 3) + if err != nil { + t.Fatalf("limit 3: %v", err) + } + if len(limited) != 3 { + t.Fatalf("limit 3 got %d rows, want 3", len(limited)) + } + + // Walk backward with before= from newest to oldest. + cursor := "9999" // far future — all rows are before this + var backPages [][]AuditRow + for { + page, err := s.listAuditEvents(ctx, defaultTeamID, "", "", "", cursor, "", 3) + if err != nil { + t.Fatalf("before page: %v", err) + } + if len(page) == 0 { + break + } + backPages = append(backPages, page) + cursor = page[len(page)-1].TS // oldest in this page = next cursor + } + if len(backPages) < 2 { + t.Fatalf("before walk got %d pages, want >=2 from %d rows with limit 3", len(backPages), N) + } + // Every page must be DESC. + for i, p := range backPages { + for j := 1; j < len(p); j++ { + if p[j].TS > p[j-1].TS { + t.Errorf("before page %d not DESC at index %d: %s > %s", i, j, p[j].TS, p[j-1].TS) + } + } + } + // No overlap between adjacent backward pages. + for i := 1; i < len(backPages); i++ { + prevNewest := backPages[i-1][0].TS + for _, r := range backPages[i] { + if r.TS >= prevNewest { + t.Errorf("backward page %d contains row ts=%s overlapping prior page newest=%s", + i, r.TS, prevNewest) + } + } + } + backTotal := 0 + for _, p := range backPages { + backTotal += len(p) + } + if backTotal != N { + t.Errorf("backward walk total = %d, want %d", backTotal, N) + } + + // Walk forward with after= from oldest to newest. + cursor = "0000" // far past + var fwdPages [][]AuditRow + for { + page, err := s.listAuditEvents(ctx, defaultTeamID, "", "", "", "", cursor, 3) + if err != nil { + t.Fatalf("after page: %v", err) + } + if len(page) == 0 { + break + } + fwdPages = append(fwdPages, page) + cursor = page[len(page)-1].TS // newest in this page = next cursor + } + // Every page must be ASC. + for i, p := range fwdPages { + for j := 1; j < len(p); j++ { + if p[j].TS < p[j-1].TS { + t.Errorf("after page %d not ASC at index %d: %s < %s", i, j, p[j].TS, p[j-1].TS) + } + } + } + fwdTotal := 0 + for _, p := range fwdPages { + fwdTotal += len(p) + } + if fwdTotal != N { + t.Errorf("forward walk total = %d, want %d", fwdTotal, N) + } +} + +// itos converts int to string for seed summaries (tiny helper, avoids strconv import). +func itos(n int) string { return string(rune('0' + n)) } diff --git a/hub/internal/server/handlers_events.go b/hub/internal/server/handlers_events.go index 921666be..5d7c0022 100644 --- a/hub/internal/server/handlers_events.go +++ b/hub/internal/server/handlers_events.go @@ -185,6 +185,7 @@ func (s *Server) handleListEvents(w http.ResponseWriter, r *http.Request) { return } since := r.URL.Query().Get("since") // received_ts exclusive + before := r.URL.Query().Get("before") limit := 100 if q := r.URL.Query().Get("limit"); q != "" { if n, err := strconv.Atoi(q); err == nil && n > 0 && n <= 1000 { @@ -196,7 +197,17 @@ func (s *Server) handleListEvents(w http.ResponseWriter, r *http.Request) { rows *sql.Rows err error ) - if since != "" { + switch { + case before != "": + rows, err = s.db.QueryContext(r.Context(), ` + SELECT id, schema_version, ts, received_ts, channel_id, type, + COALESCE(from_id, ''), to_ids_json, parts_json, + task_id, correlation_id, + pane_ref_json, usage_tokens_json, metadata_json + FROM events + WHERE channel_id = ? AND received_ts < ? + ORDER BY received_ts DESC LIMIT ?`, ch, before, limit) + case since != "": rows, err = s.db.QueryContext(r.Context(), ` SELECT id, schema_version, ts, received_ts, channel_id, type, COALESCE(from_id, ''), to_ids_json, parts_json, @@ -205,7 +216,7 @@ func (s *Server) handleListEvents(w http.ResponseWriter, r *http.Request) { FROM events WHERE channel_id = ? AND received_ts > ? ORDER BY received_ts ASC LIMIT ?`, ch, since, limit) - } else { + default: rows, err = s.db.QueryContext(r.Context(), ` SELECT id, schema_version, ts, received_ts, channel_id, type, COALESCE(from_id, ''), to_ids_json, parts_json, diff --git a/hub/internal/server/handlers_events_sender_test.go b/hub/internal/server/handlers_events_sender_test.go index 8ac034b2..04295c0c 100644 --- a/hub/internal/server/handlers_events_sender_test.go +++ b/hub/internal/server/handlers_events_sender_test.go @@ -6,6 +6,7 @@ import ( "net/http" "net/http/httptest" "testing" + "time" ) // F-08 — a posted channel event's sender and cost attribution are @@ -118,3 +119,109 @@ func TestPostEvent_HostRelayUsesStampedAgentID(t *testing.T) { t.Errorf("worker spent_cents = %d; want 300 (host-relayed cost)", got) } } + +func TestListEvents_BeforeCursor(t *testing.T) { + s, token := newA2ATestServer(t) + seedEventChannel(t, s, "chan-before") + + // Post several events to the channel; they get sequential received_ts. + hostTok := mintToken(t, s, "host", map[string]any{"team": defaultTeamID, "role": "host"}) + worker := seedAgentWithKind(t, s, defaultTeamID, "w1", "claude-code", "") + var recvTS []string + for i := 0; i < 5; i++ { + var buf bytes.Buffer + _ = json.NewEncoder(&buf).Encode(map[string]any{ + "type": "message", + "parts": []map[string]any{{"kind": "text", "text": "msg " + itos(i)}}, + }) + req := httptest.NewRequest(http.MethodPost, + "/v1/teams/"+defaultTeamID+"/channels/chan-before/events", &buf) + req.Header.Set("Authorization", "Bearer "+hostTok) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Agent-Id", worker) + rr := httptest.NewRecorder() + s.router.ServeHTTP(rr, req) + if rr.Code != http.StatusCreated { + t.Fatalf("post event %d: %d body=%s", i, rr.Code, rr.Body.String()) + } + time.Sleep(10 * time.Millisecond) + // Read back the received_ts. + var evt struct { + ReceivedTS string `json:"received_ts"` + } + if err := json.Unmarshal(rr.Body.Bytes(), &evt); err != nil { + t.Fatalf("decode event %d: %v", i, err) + } + recvTS = append(recvTS, evt.ReceivedTS) + } + + // No params — returns events DESC (unchanged). + status, body := doReq(t, s, token, http.MethodGet, + "/v1/teams/"+defaultTeamID+"/channels/chan-before/events", nil) + if status != http.StatusOK { + t.Fatalf("list: %d body=%s", status, body) + } + var all []map[string]any + if err := json.Unmarshal(body, &all); err != nil { + t.Fatalf("decode: %v", err) + } + if len(all) != 5 { + t.Fatalf("no params got %d events, want 5", len(all)) + } + + // ?before= — all 5 should be returned DESC. + status, body = doReq(t, s, token, http.MethodGet, + "/v1/teams/"+defaultTeamID+"/channels/chan-before/events?limit=10&before="+recvTS[4], nil) + if status != http.StatusOK { + t.Fatalf("before newest+1: %d body=%s", status, body) + } + var beforeAll []map[string]any + if err := json.Unmarshal(body, &beforeAll); err != nil { + t.Fatalf("decode before all: %v", err) + } + // All events except the cursor itself (strict <) should be returned. + if len(beforeAll) < 4 || len(beforeAll) > 5 { + t.Fatalf("before newest+1 got %d events, want 4 or 5", len(beforeAll)) + } + + // ?before= — zero events (all have received_ts >= oldest). + status, body = doReq(t, s, token, http.MethodGet, + "/v1/teams/"+defaultTeamID+"/channels/chan-before/events?limit=10&before="+recvTS[0], nil) + if status != http.StatusOK { + t.Fatalf("before oldest: %d body=%s", status, body) + } + var beforeNone []map[string]any + if err := json.Unmarshal(body, &beforeNone); err != nil { + t.Fatalf("decode before none: %v", err) + } + if len(beforeNone) != 0 { + t.Fatalf("before oldest got %d events, want 0", len(beforeNone)) + } + + // Walk backwards page by page with before=. + cursor := "9999" + var pages [][]map[string]any + for { + status, body = doReq(t, s, token, http.MethodGet, + "/v1/teams/"+defaultTeamID+"/channels/chan-before/events?limit=2&before="+cursor, nil) + if status != http.StatusOK { + t.Fatalf("walk page: %d body=%s", status, body) + } + var page []map[string]any + if err := json.Unmarshal(body, &page); err != nil { + t.Fatalf("decode walk page: %v", err) + } + if len(page) == 0 { + break + } + pages = append(pages, page) + cursor, _ = page[len(page)-1]["received_ts"].(string) + } + total := 0 + for _, p := range pages { + total += len(p) + } + if total != 5 { + t.Errorf("before walk total = %d, want 5", total) + } +} diff --git a/hub/internal/server/handlers_phase_test.go b/hub/internal/server/handlers_phase_test.go index 70eb8938..17f1f9a1 100644 --- a/hub/internal/server/handlers_phase_test.go +++ b/hub/internal/server/handlers_phase_test.go @@ -147,7 +147,7 @@ func TestPhase_AdvanceWalksTemplateOrder(t *testing.T) { } // Verify audit row. - rows, err := s.listAuditEvents(req.Context(), team, "project.phase_advanced", "", "", 10) + rows, err := s.listAuditEvents(req.Context(), team, "project.phase_advanced", "", "", "", "", 10) if err != nil { t.Fatalf("listAuditEvents: %v", err) } @@ -305,7 +305,7 @@ func TestPhase_SetWritesPhaseAndAuditsRevert(t *testing.T) { if rr.Code != http.StatusOK { t.Fatalf("revert: %d %s", rr.Code, rr.Body.String()) } - rows, err := s.listAuditEvents(req.Context(), team, "project.phase_reverted", "", "", 10) + rows, err := s.listAuditEvents(req.Context(), team, "project.phase_reverted", "", "", "", "", 10) if err != nil { t.Fatalf("listAuditEvents: %v", err) } diff --git a/hub/internal/server/handlers_templates_test.go b/hub/internal/server/handlers_templates_test.go index 802d8906..ad5ad002 100644 --- a/hub/internal/server/handlers_templates_test.go +++ b/hub/internal/server/handlers_templates_test.go @@ -73,7 +73,7 @@ func TestTemplates_PutCreatesAndAudits(t *testing.T) { } // Audit should have one created + one updated row for this target. - rows, err := c.s.listAuditEvents(context.Background(), c.teamID, "", "", "", 100) + rows, err := c.s.listAuditEvents(context.Background(), c.teamID, "", "", "", "", "", 100) if err != nil { t.Fatalf("listAudit: %v", err) } @@ -140,7 +140,7 @@ func TestTemplates_DeleteRemovesAndAudits(t *testing.T) { t.Errorf("re-DELETE = %d, want 404", status) } - rows, _ := c.s.listAuditEvents(context.Background(), c.teamID, "template.deleted", "", "", 10) + rows, _ := c.s.listAuditEvents(context.Background(), c.teamID, "template.deleted", "", "", "", "", 10) found := false for _, r := range rows { if r.TargetID == "agents/doomed.v1.yaml" {