Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions hub/internal/server/audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,13 @@ func actorFromContext(ctx context.Context) (tokenID, kind, handle string) {
// whose meta_json carries a `project_id` matching it (covers agent.spawn,
// run.create, document.create, review.request, artifact.create, etc.).
// Project-scoped W2 Activity feed depends on this.
//
// Keyset pagination: before/after are ISO-8601 timestamps on the ts column
// (the natural monotonic sort key). Modeled on the agent_events (ts, seq)
// keyset pattern — ts alone sorts lexicographically correct for ISO-8601.
func (s *Server) listAuditEvents(
ctx context.Context,
teamID, action, since, projectID string,
teamID, action, since, projectID, before, after string,
limit int,
) ([]AuditRow, error) {
if limit <= 0 {
Expand All @@ -116,8 +120,17 @@ func (s *Server) listAuditEvents(
OR json_extract(meta_json, '$.project_id') = ?)`
args = append(args, projectID, projectID)
}
q += ` ORDER BY ts DESC LIMIT ?`
args = append(args, limit)
switch {
case before != "":
q += ` AND ts < ? ORDER BY ts DESC LIMIT ?`
args = append(args, before, limit)
case after != "":
q += ` AND ts > ? ORDER BY ts ASC LIMIT ?`
args = append(args, after, limit)
default:
q += ` ORDER BY ts DESC LIMIT ?`
args = append(args, limit)
}

rows, err := s.db.QueryContext(ctx, q, args...)
if err != nil {
Expand Down
17 changes: 16 additions & 1 deletion hub/internal/server/handlers_a2a.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"encoding/json"
"net/http"
"strconv"
"strings"

"github.com/go-chi/chi/v5"
Expand Down Expand Up @@ -117,7 +118,8 @@ func (s *Server) handlePutHostA2ACards(w http.ResponseWriter, r *http.Request) {
}

// handleListTeamA2ACards returns cards across all hosts in the team.
// Supports ?handle=<handle> to filter (steward calls this to find workers).
// Supports ?handle=<handle> to filter (steward calls this to find workers)
// and ?limit=N to cap rows (absent = unlimited, current behaviour).
func (s *Server) handleListTeamA2ACards(w http.ResponseWriter, r *http.Request) {
team := chi.URLParam(r, "team")
handle := r.URL.Query().Get("handle")
Expand All @@ -130,6 +132,19 @@ func (s *Server) handleListTeamA2ACards(w http.ResponseWriter, r *http.Request)
args = append(args, handle)
}
query += ` ORDER BY host_id, agent_id`
limit := 0
if v := r.URL.Query().Get("limit"); v != "" {
if n, err := strconv.Atoi(v); err == nil && n > 0 {
limit = n
}
}
if limit > 1000 {
limit = 1000
}
if limit > 0 {
query += ` LIMIT ?`
args = append(args, limit)
}

rows, err := s.db.QueryContext(r.Context(), query, args...)
if err != nil {
Expand Down
65 changes: 65 additions & 0 deletions hub/internal/server/handlers_a2a_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,68 @@ func TestListTeamA2ACards_RewritesURLToRelay(t *testing.T) {
t.Errorf("name field lost: %+v", card)
}
}

func TestListTeamA2ACards_Limit(t *testing.T) {
s, token := newA2ATestServer(t)
seedTestHost(t, s, defaultTeamID, "host-gpu", "gpu-1")

// PUT 5 cards on one host.
cards := make([]map[string]any, 5)
for i := 0; i < 5; i++ {
cards[i] = map[string]any{
"agent_id": "a" + itos(i),
"handle": "w" + itos(i),
"card": map[string]any{"name": "w" + itos(i)},
}
}
status, body := doReq(t, s, token, http.MethodPut,
"/v1/teams/"+defaultTeamID+"/hosts/host-gpu/a2a/cards",
map[string]any{"cards": cards})
if status != http.StatusOK {
t.Fatalf("put cards: %d %s", status, body)
}

// No params — returns all cards (unchanged).
_, body = doReq(t, s, token, http.MethodGet,
"/v1/teams/"+defaultTeamID+"/a2a/cards", nil)
var all []a2aCardOut
if err := json.Unmarshal(body, &all); err != nil {
t.Fatalf("decode: %v", err)
}
if len(all) != 5 {
t.Fatalf("no-params got %d cards, want 5", len(all))
}

// ?limit=2 caps to 2.
_, body = doReq(t, s, token, http.MethodGet,
"/v1/teams/"+defaultTeamID+"/a2a/cards?limit=2", nil)
var limited []a2aCardOut
if err := json.Unmarshal(body, &limited); err != nil {
t.Fatalf("decode limited: %v", err)
}
if len(limited) != 2 {
t.Fatalf("limit=2 got %d cards, want 2", len(limited))
}

// ?limit=0 is ignored (no cap, returns all).
_, body = doReq(t, s, token, http.MethodGet,
"/v1/teams/"+defaultTeamID+"/a2a/cards?limit=0", nil)
var unlimited []a2aCardOut
if err := json.Unmarshal(body, &unlimited); err != nil {
t.Fatalf("decode limit=0: %v", err)
}
if len(unlimited) != 5 {
t.Fatalf("limit=0 got %d cards, want 5 (no cap)", len(unlimited))
}

// ?limit=9999 is clamped to 1000, returns all 5.
_, body = doReq(t, s, token, http.MethodGet,
"/v1/teams/"+defaultTeamID+"/a2a/cards?limit=9999", nil)
var capped []a2aCardOut
if err := json.Unmarshal(body, &capped); err != nil {
t.Fatalf("decode limit=9999: %v", err)
}
if len(capped) != 5 {
t.Fatalf("limit=9999 got %d cards, want 5 (clamped but enough)", len(capped))
}
}
2 changes: 1 addition & 1 deletion hub/internal/server/handlers_agent_families_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func TestAgentFamilies_HotReload_PutThenSpawnSeesIt(t *testing.T) {
}

// Audit row should reflect the create.
rows, err := c.s.listAuditEvents(context.Background(), c.teamID, "", "", "", 50)
rows, err := c.s.listAuditEvents(context.Background(), c.teamID, "", "", "", "", "", 50)
if err != nil {
t.Fatalf("listAudit: %v", err)
}
Expand Down
12 changes: 10 additions & 2 deletions hub/internal/server/handlers_audit.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,22 @@ import (
// target_kind='project' rows and any row whose meta_json
// carries a project_id field equal to this value
// - limit — max rows, clamped to 500 (default 100)
// - before — ISO-8601 UTC timestamp; keyset cursor for backward
// pagination (ts < before ORDER BY ts DESC)
// - after — ISO-8601 UTC timestamp; keyset cursor for forward
// pagination (ts > after ORDER BY ts ASC)
//
// Rows are ordered ts DESC so the newest actions appear first.
// Rows are ordered ts DESC so the newest actions appear first. When
// before/after are absent the response is byte-identical to the prior
// release — purely additive pagination.
func (s *Server) handleListAudit(w http.ResponseWriter, r *http.Request) {
team := chi.URLParam(r, "team")
q := r.URL.Query()
action := q.Get("action")
since := q.Get("since")
projectID := q.Get("project_id")
before := q.Get("before")
after := q.Get("after")
limit := 100
if v := q.Get("limit"); v != "" {
if n, err := strconv.Atoi(v); err == nil {
Expand All @@ -34,7 +42,7 @@ func (s *Server) handleListAudit(w http.ResponseWriter, r *http.Request) {
if limit <= 0 {
limit = 100
}
rows, err := s.listAuditEvents(r.Context(), team, action, since, projectID, limit)
rows, err := s.listAuditEvents(r.Context(), team, action, since, projectID, before, after, limit)
if err != nil {
s.writeDBErr(w, err)
return
Expand Down
122 changes: 119 additions & 3 deletions hub/internal/server/handlers_audit_coverage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"net/http"
"testing"
"time"
)

// countAuditActions returns a map of action → count of audit rows with that
Expand Down Expand Up @@ -188,7 +189,7 @@ func TestListAudit_ActionFilterAndTeamScope(t *testing.T) {
}

// No filter — the scoped team's rows only, never the "other" team's.
rows, err := s.listAuditEvents(ctx, defaultTeamID, "", "", "", 100)
rows, err := s.listAuditEvents(ctx, defaultTeamID, "", "", "", "", "", 100)
if err != nil {
t.Fatalf("list: %v", err)
}
Expand All @@ -197,7 +198,7 @@ func TestListAudit_ActionFilterAndTeamScope(t *testing.T) {
}

// action=run.create — filters down to the one matching row.
rows, err = s.listAuditEvents(ctx, defaultTeamID, "run.create", "", "", 100)
rows, err = s.listAuditEvents(ctx, defaultTeamID, "run.create", "", "", "", "", 100)
if err != nil {
t.Fatalf("list filtered: %v", err)
}
Expand Down Expand Up @@ -234,7 +235,7 @@ func TestListAudit_FiltersByProjectID(t *testing.T) {
// Row with neither target=project nor project_id meta — must NOT match.
mustExec("template.created", "template", "t1", "{}")

rows, err := s.listAuditEvents(ctx, defaultTeamID, "", "", "p1", 100)
rows, err := s.listAuditEvents(ctx, defaultTeamID, "", "", "p1", "", "", 100)
if err != nil {
t.Fatalf("list: %v", err)
}
Expand All @@ -254,3 +255,118 @@ func TestListAudit_FiltersByProjectID(t *testing.T) {
}
}

// TestListAudit_KeysetPagination verifies the before=/after= keyset cursors
// on audit events. Seeds rows with distinct timestamps, walks both directions,
// and confirms no-gap/no-overlap page boundaries.
func TestListAudit_KeysetPagination(t *testing.T) {
s, _ := newA2ATestServer(t)
ctx := context.Background()

// Seed N audit rows with distinct timestamps so keyset boundaries are clean.
const N = 7
for i := 0; i < N; i++ {
ts := NowUTC()
if _, err := s.db.ExecContext(ctx, `
INSERT INTO audit_events (
id, team_id, ts, actor_kind, action, summary, meta_json
) VALUES (?, ?, ?, 'system', 'test.ping', ?, '{}')`,
NewID(), defaultTeamID, ts, "ping "+itos(i)); err != nil {
t.Fatalf("seed audit row %d: %v", i, err)
}
time.Sleep(10 * time.Millisecond) // ensure distinct ts
}

// No params — all rows returned DESC (unchanged behaviour).
all, err := s.listAuditEvents(ctx, defaultTeamID, "", "", "", "", "", 100)
if err != nil {
t.Fatalf("full list: %v", err)
}
if len(all) != N {
t.Fatalf("full list got %d rows, want %d", len(all), N)
}

// ?limit=3 caps to 3 rows.
limited, err := s.listAuditEvents(ctx, defaultTeamID, "", "", "", "", "", 3)
if err != nil {
t.Fatalf("limit 3: %v", err)
}
if len(limited) != 3 {
t.Fatalf("limit 3 got %d rows, want 3", len(limited))
}

// Walk backward with before= from newest to oldest.
cursor := "9999" // far future — all rows are before this
var backPages [][]AuditRow
for {
page, err := s.listAuditEvents(ctx, defaultTeamID, "", "", "", cursor, "", 3)
if err != nil {
t.Fatalf("before page: %v", err)
}
if len(page) == 0 {
break
}
backPages = append(backPages, page)
cursor = page[len(page)-1].TS // oldest in this page = next cursor
}
if len(backPages) < 2 {
t.Fatalf("before walk got %d pages, want >=2 from %d rows with limit 3", len(backPages), N)
}
// Every page must be DESC.
for i, p := range backPages {
for j := 1; j < len(p); j++ {
if p[j].TS > p[j-1].TS {
t.Errorf("before page %d not DESC at index %d: %s > %s", i, j, p[j].TS, p[j-1].TS)
}
}
}
// No overlap between adjacent backward pages.
for i := 1; i < len(backPages); i++ {
prevNewest := backPages[i-1][0].TS
for _, r := range backPages[i] {
if r.TS >= prevNewest {
t.Errorf("backward page %d contains row ts=%s overlapping prior page newest=%s",
i, r.TS, prevNewest)
}
}
}
backTotal := 0
for _, p := range backPages {
backTotal += len(p)
}
if backTotal != N {
t.Errorf("backward walk total = %d, want %d", backTotal, N)
}

// Walk forward with after= from oldest to newest.
cursor = "0000" // far past
var fwdPages [][]AuditRow
for {
page, err := s.listAuditEvents(ctx, defaultTeamID, "", "", "", "", cursor, 3)
if err != nil {
t.Fatalf("after page: %v", err)
}
if len(page) == 0 {
break
}
fwdPages = append(fwdPages, page)
cursor = page[len(page)-1].TS // newest in this page = next cursor
}
// Every page must be ASC.
for i, p := range fwdPages {
for j := 1; j < len(p); j++ {
if p[j].TS < p[j-1].TS {
t.Errorf("after page %d not ASC at index %d: %s < %s", i, j, p[j].TS, p[j-1].TS)
}
}
}
fwdTotal := 0
for _, p := range fwdPages {
fwdTotal += len(p)
}
if fwdTotal != N {
t.Errorf("forward walk total = %d, want %d", fwdTotal, N)
}
}

// itos converts int to string for seed summaries (tiny helper, avoids strconv import).
func itos(n int) string { return string(rune('0' + n)) }
15 changes: 13 additions & 2 deletions hub/internal/server/handlers_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (s *Server) handleListEvents(w http.ResponseWriter, r *http.Request) {
return
}
since := r.URL.Query().Get("since") // received_ts exclusive
before := r.URL.Query().Get("before")
limit := 100
if q := r.URL.Query().Get("limit"); q != "" {
if n, err := strconv.Atoi(q); err == nil && n > 0 && n <= 1000 {
Expand All @@ -196,7 +197,17 @@ func (s *Server) handleListEvents(w http.ResponseWriter, r *http.Request) {
rows *sql.Rows
err error
)
if since != "" {
switch {
case before != "":
rows, err = s.db.QueryContext(r.Context(), `
SELECT id, schema_version, ts, received_ts, channel_id, type,
COALESCE(from_id, ''), to_ids_json, parts_json,
task_id, correlation_id,
pane_ref_json, usage_tokens_json, metadata_json
FROM events
WHERE channel_id = ? AND received_ts < ?
ORDER BY received_ts DESC LIMIT ?`, ch, before, limit)
case since != "":
rows, err = s.db.QueryContext(r.Context(), `
SELECT id, schema_version, ts, received_ts, channel_id, type,
COALESCE(from_id, ''), to_ids_json, parts_json,
Expand All @@ -205,7 +216,7 @@ func (s *Server) handleListEvents(w http.ResponseWriter, r *http.Request) {
FROM events
WHERE channel_id = ? AND received_ts > ?
ORDER BY received_ts ASC LIMIT ?`, ch, since, limit)
} else {
default:
rows, err = s.db.QueryContext(r.Context(), `
SELECT id, schema_version, ts, received_ts, channel_id, type,
COALESCE(from_id, ''), to_ids_json, parts_json,
Expand Down
Loading
Loading