diff --git a/cmd/engine/main.go b/cmd/engine/main.go index 6be4ee1..04f21bd 100644 --- a/cmd/engine/main.go +++ b/cmd/engine/main.go @@ -151,6 +151,22 @@ func run() error { } } + // Replay store: Phase 3.1. On by default; operators opt out via + // retrieval.replay.enabled=false (or VLE_RETRIEVAL_REPLAY_ENABLED=false). + // In-memory only — Phase 3.2 will swap this for a durable store + // behind the same retrieval.ReplayStore interface. + var replayStore retrieval.ReplayStore + if cfg.Retrieval.Replay.Enabled { + replayStore = retrieval.NewLRUReplayStore(retrieval.LRUReplayConfig{ + MaxEntries: cfg.Retrieval.Replay.MaxEntries, + TTL: time.Duration(cfg.Retrieval.Replay.TTLSeconds) * time.Second, + }) + logger.Info("retrieval: replay store enabled", + "max_entries", cfg.Retrieval.Replay.MaxEntries, + "ttl_seconds", cfg.Retrieval.Replay.TTLSeconds, + ) + } + pipeline := ingest.NewPipeline(ingest.Pipeline{ DB: pool, Storage: store, @@ -181,6 +197,7 @@ func run() error { Planning: cfg.Retrieval.Planning, ReRanker: reRanker, ReRank: cfg.Retrieval.ReRank, + Replay: replayStore, } srv := &http.Server{ diff --git a/config.example.yaml b/config.example.yaml index 53b5a79..235bb63 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -182,6 +182,28 @@ retrieval: # re-rank pass to do the final selection. top_k: 0 + # replay: Phase 3.1 reproducibility store. Every /v1/query and + # /v1/answer response carries a deterministic `trace_token`; the + # response body is stored in an in-memory LRU under that token so + # POST /v1/replay can return the byte-identical response on demand. + # + # OPT-OUT. Default enabled — replay is a moat versus stateless + # vector RAG and should ship on by default. Disable to free the + # memory budget when audit/replay isn't part of the operator's + # flow. When disabled the response `trace_token` field is empty + # and /v1/replay returns 501. + # + # The store is in-memory and not durable across process restarts. + # Phase 3.2 will swap this for a persistent store + per-document + # versioning behind the same interface. + replay: + enabled: true + # LRU capacity. Older entries are evicted under memory pressure. + max_entries: 1024 + # How long an entry remains valid. 86400 = 24 hours. Long + # audit flows may bump this; tight memory budgets shrink it. + ttl_seconds: 86400 + ingest: # The summarize and HyDE stages run concurrently. This caps the total # number of LLM calls in flight across both stages combined, so the diff --git a/internal/api/replay_test.go b/internal/api/replay_test.go new file mode 100644 index 0000000..e90a552 --- /dev/null +++ b/internal/api/replay_test.go @@ -0,0 +1,336 @@ +package api + +import ( + "bytes" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/go-chi/chi/v5" + + "github.com/hallelx2/vectorless-engine/pkg/retrieval" + "github.com/hallelx2/vectorless-engine/pkg/tree" +) + +// newReplayRouter wires only the routes /v1/replay actually touches. +// This avoids spinning up DB / Storage / Queue / Strategy just to +// exercise the replay endpoint contract. +func newReplayRouter(d Deps) http.Handler { + r := chi.NewRouter() + r.Route("/v1", func(r chi.Router) { + r.Post("/replay", d.handleReplay) + }) + return r +} + +// TestReplayByteExact: the central invariant of Phase 3.1. +// Put a response into the store, replay it, assert the bytes +// returned by the handler match what was stored — character for +// character. +func TestReplayByteExact(t *testing.T) { + store := retrieval.NewLRUReplayStore(retrieval.LRUReplayConfig{MaxEntries: 10}) + want := []byte(`{"answer":"hello","strategy":"chunked-tree","trace_token":"abc123"}` + "\n") + store.Put("token-1", retrieval.ReplayEntry{ + DocumentID: "doc_x", + Query: "what is x?", + ResponseJSON: want, + }) + + d := Deps{Replay: store} + srv := httptest.NewServer(newReplayRouter(d)) + defer srv.Close() + + body, _ := json.Marshal(map[string]any{ + "trace_token": "token-1", + "query": "what is x?", + "document_id": "doc_x", + }) + resp, err := http.Post(srv.URL+"/v1/replay", "application/json", bytes.NewReader(body)) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Fatalf("status = %d, want 200", resp.StatusCode) + } + got, _ := io.ReadAll(resp.Body) + if !bytes.Equal(got, want) { + t.Errorf("replay bytes differ:\n got %q\n want %q", got, want) + } +} + +// TestReplayUnknownToken: 404 with a clear error message. +func TestReplayUnknownToken(t *testing.T) { + store := retrieval.NewLRUReplayStore(retrieval.LRUReplayConfig{MaxEntries: 10}) + d := Deps{Replay: store} + srv := httptest.NewServer(newReplayRouter(d)) + defer srv.Close() + + body, _ := json.Marshal(map[string]any{ + "trace_token": "never-stored", + "query": "q", + "document_id": "doc_x", + }) + resp, err := http.Post(srv.URL+"/v1/replay", "application/json", bytes.NewReader(body)) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusNotFound { + t.Errorf("status = %d, want 404", resp.StatusCode) + } +} + +// TestReplayDocumentIDMismatch: 409 with details=document_id differs. +func TestReplayDocumentIDMismatch(t *testing.T) { + store := retrieval.NewLRUReplayStore(retrieval.LRUReplayConfig{MaxEntries: 10}) + store.Put("t", retrieval.ReplayEntry{ + DocumentID: "doc_real", + Query: "q", + ResponseJSON: []byte(`{"x":1}` + "\n"), + }) + d := Deps{Replay: store} + srv := httptest.NewServer(newReplayRouter(d)) + defer srv.Close() + + body, _ := json.Marshal(map[string]any{ + "trace_token": "t", + "query": "q", + "document_id": "doc_fake", + }) + resp, err := http.Post(srv.URL+"/v1/replay", "application/json", bytes.NewReader(body)) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusConflict { + t.Fatalf("status = %d, want 409", resp.StatusCode) + } + var errBody map[string]string + _ = json.NewDecoder(resp.Body).Decode(&errBody) + if errBody["error"] != "input mismatch" { + t.Errorf("error = %q, want input mismatch", errBody["error"]) + } + if !strings.Contains(errBody["details"], "document_id") { + t.Errorf("details should mention document_id, got %q", errBody["details"]) + } +} + +// TestReplayQueryMismatch: 409 with details=query differs. +func TestReplayQueryMismatch(t *testing.T) { + store := retrieval.NewLRUReplayStore(retrieval.LRUReplayConfig{MaxEntries: 10}) + store.Put("t", retrieval.ReplayEntry{ + DocumentID: "doc_x", + Query: "real query", + ResponseJSON: []byte(`{"x":1}` + "\n"), + }) + d := Deps{Replay: store} + srv := httptest.NewServer(newReplayRouter(d)) + defer srv.Close() + + body, _ := json.Marshal(map[string]any{ + "trace_token": "t", + "query": "tampered query", + "document_id": "doc_x", + }) + resp, err := http.Post(srv.URL+"/v1/replay", "application/json", bytes.NewReader(body)) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusConflict { + t.Fatalf("status = %d, want 409", resp.StatusCode) + } + var errBody map[string]string + _ = json.NewDecoder(resp.Body).Decode(&errBody) + if !strings.Contains(errBody["details"], "query") { + t.Errorf("details should mention query, got %q", errBody["details"]) + } +} + +// TestReplayDisabled: when Deps.Replay is nil the endpoint returns +// 501 Not Implemented. This is the opt-out path documented in the +// config block. +func TestReplayDisabled(t *testing.T) { + d := Deps{Replay: nil} + srv := httptest.NewServer(newReplayRouter(d)) + defer srv.Close() + + body, _ := json.Marshal(map[string]any{ + "trace_token": "anything", + "query": "q", + "document_id": "doc_x", + }) + resp, err := http.Post(srv.URL+"/v1/replay", "application/json", bytes.NewReader(body)) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusNotImplemented { + t.Errorf("status = %d, want 501", resp.StatusCode) + } +} + +// TestReplayRequiresFields: every field in the body is required. +// A missing field is a client error, not a 404 (which would +// otherwise be confusing — "the token isn't found" when really +// "you didn't send a token"). +func TestReplayRequiresFields(t *testing.T) { + store := retrieval.NewLRUReplayStore(retrieval.LRUReplayConfig{MaxEntries: 10}) + d := Deps{Replay: store} + srv := httptest.NewServer(newReplayRouter(d)) + defer srv.Close() + + cases := []map[string]any{ + {"query": "q", "document_id": "doc_x"}, // missing trace_token + {"trace_token": "t", "document_id": "doc_x"}, // missing query + {"trace_token": "t", "query": "q"}, // missing document_id + {"trace_token": "", "query": "q", "document_id": "doc"}, // empty trace_token + } + for i, body := range cases { + raw, _ := json.Marshal(body) + resp, err := http.Post(srv.URL+"/v1/replay", "application/json", bytes.NewReader(raw)) + if err != nil { + t.Fatalf("case %d: %v", i, err) + } + resp.Body.Close() + if resp.StatusCode != http.StatusBadRequest { + t.Errorf("case %d: status = %d, want 400", i, resp.StatusCode) + } + } +} + +// TestReplayBadJSON: malformed JSON request body → 400. +func TestReplayBadJSON(t *testing.T) { + store := retrieval.NewLRUReplayStore(retrieval.LRUReplayConfig{MaxEntries: 10}) + d := Deps{Replay: store} + srv := httptest.NewServer(newReplayRouter(d)) + defer srv.Close() + + resp, err := http.Post(srv.URL+"/v1/replay", "application/json", strings.NewReader("{not json")) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusBadRequest { + t.Errorf("status = %d, want 400", resp.StatusCode) + } +} + +// TestReplayEndToEndByteExact simulates the production flow: the +// server marshals a /v1/query response via marshalJSONForReplay, +// stores it under the same trace token it surfaced to the client, +// and the replay endpoint hands the bytes back verbatim. This is +// the end-to-end byte-exactness invariant the Phase 3.1 spec +// demands. +// +// The test uses Go's encoding/json directly (the same package the +// handler uses) so any drift between "serialised on write" and +// "served on replay" surfaces here. +func TestReplayEndToEndByteExact(t *testing.T) { + store := retrieval.NewLRUReplayStore(retrieval.LRUReplayConfig{MaxEntries: 10}) + d := Deps{Replay: store} + + // Build a representative response that exercises Go map JSON + // emission (lexicographic key sort) and a varied payload shape. + traceToken := retrieval.ComputeTraceToken("doc_x", "1", "claude-sonnet-4-5", + []tree.SectionID{"sec_a", "sec_b"}) + resp := map[string]any{ + "document_id": "doc_x", + "query": "what does the report say?", + "strategy": "chunked-tree", + "model": "claude-sonnet-4-5", + "sections": []map[string]any{ + {"id": "sec_a", "title": "Setup"}, + {"id": "sec_b", "title": "Usage"}, + }, + "elapsed_ms": 42, + "trace_token": traceToken, + } + + raw, err := marshalJSONForReplay(resp) + if err != nil { + t.Fatalf("marshal: %v", err) + } + + // Simulate the handler writing the response to wire AND storing. + d.Replay.Put(traceToken, retrieval.ReplayEntry{ + DocumentID: "doc_x", + Query: "what does the report say?", + Model: "claude-sonnet-4-5", + ResponseJSON: raw, + }) + + // Re-marshal the same Go value: encoding/json sorts map keys + // lexicographically, so the bytes must be identical. This is + // the property that makes byte-exact replay viable even when + // the response is built from a map[string]any rather than a + // struct with a fixed field order. + raw2, err := marshalJSONForReplay(resp) + if err != nil { + t.Fatalf("remarshal: %v", err) + } + if !bytes.Equal(raw, raw2) { + t.Errorf("encoding/json is non-deterministic on map[string]any:\n first %q\n second %q", raw, raw2) + } + + // Replay over HTTP. Bytes must equal what we stored. + srv := httptest.NewServer(newReplayRouter(d)) + defer srv.Close() + body, _ := json.Marshal(map[string]any{ + "trace_token": traceToken, + "query": "what does the report say?", + "document_id": "doc_x", + }) + got, err := http.Post(srv.URL+"/v1/replay", "application/json", bytes.NewReader(body)) + if err != nil { + t.Fatal(err) + } + defer got.Body.Close() + if got.StatusCode != http.StatusOK { + t.Fatalf("status = %d", got.StatusCode) + } + gotBytes, _ := io.ReadAll(got.Body) + if !bytes.Equal(gotBytes, raw) { + t.Errorf("end-to-end byte drift:\n stored %q\n got %q", raw, gotBytes) + } +} + +// TestReplayPreservesUnicodeAndWhitespace replays a payload chosen +// to expose any normalisation in the storage path: unicode, mixed +// whitespace, embedded newlines. The byte sequence must come back +// identical. +func TestReplayPreservesUnicodeAndWhitespace(t *testing.T) { + store := retrieval.NewLRUReplayStore(retrieval.LRUReplayConfig{MaxEntries: 10}) + // Hand-crafted bytes — deliberately not pretty-printed JSON, and + // includes content that round-tripping through encoding/json + // would re-escape. + want := []byte("{\"text\":\"héllo\\nworld 🌍\",\"k\": 42}\n") + store.Put("u", retrieval.ReplayEntry{ + DocumentID: tree.DocumentID("doc_u"), + Query: "q", + ResponseJSON: want, + }) + + d := Deps{Replay: store} + srv := httptest.NewServer(newReplayRouter(d)) + defer srv.Close() + + body, _ := json.Marshal(map[string]any{ + "trace_token": "u", + "query": "q", + "document_id": "doc_u", + }) + resp, err := http.Post(srv.URL+"/v1/replay", "application/json", bytes.NewReader(body)) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + got, _ := io.ReadAll(resp.Body) + if !bytes.Equal(got, want) { + t.Errorf("byte drift:\n got %q\n want %q", got, want) + } +} diff --git a/internal/api/server.go b/internal/api/server.go index 2a4f557..088db8e 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -83,6 +83,14 @@ type Deps struct { // `enable_rerank` field on /v1/query and /v1/answer overrides // ReRank.Enabled. TopK truncates the post-rerank candidate list. ReRank config.ReRankBlock + + // Replay is the Phase 3.1 in-memory replay-trace store. Every + // /v1/query and /v1/answer response is stamped with a + // deterministic trace_token and its body bytes are persisted + // here so /v1/replay can return them verbatim. Nil disables + // /v1/replay (the endpoint returns 501) and skips the per- + // response store write. + Replay retrieval.ReplayStore } // Router builds and returns the chi router wired with v1 routes. @@ -109,6 +117,7 @@ func Router(d Deps) http.Handler { r.Post("/query", d.handleQuery) r.Post("/query/multi", d.handleQueryMulti) r.Post("/answer", d.handleAnswer) + r.Post("/replay", d.handleReplay) }) r.Post("/internal/jobs/{kind}", d.handleQueueWebhook) @@ -503,10 +512,21 @@ func (d Deps) handleQuery(w http.ResponseWriter, r *http.Request) { } sections := make([]map[string]any, 0, len(enriched)) + finalIDs := make([]tree.SectionID, 0, len(enriched)) for _, e := range enriched { sections = append(sections, sectionWithContentToMap(e)) + finalIDs = append(finalIDs, e.sec.ID) } + // Trace token is computed over the FINAL IDs that ship in the + // response (after max_sections + ReRank truncation). Recomputing + // rather than reusing Result.TraceToken keeps the response and + // the replay log in sync even when post-processing reshapes the + // IDs the strategy originally picked. The model is the request's + // model (the same value the strategy used to stamp its result; + // trace_token is order-invariant so a sorted compare suffices). + traceToken := retrieval.ComputeTraceToken(body.DocumentID, "1", body.Model, finalIDs) + resp := map[string]any{ "document_id": body.DocumentID, "query": body.Query, @@ -514,11 +534,26 @@ func (d Deps) handleQuery(w http.ResponseWriter, r *http.Request) { "model": body.Model, "sections": sections, "elapsed_ms": time.Since(started).Milliseconds(), + "trace_token": traceToken, } if plan != nil { resp["plan"] = plan } - writeJSON(w, http.StatusOK, resp) + + raw, err := marshalJSONForReplay(resp) + if err != nil { + // Marshal failures here are unexpected (no custom MarshalJSON + // in the response tree); fall back to the encoder path so we + // still serve a response, just without replay capture. + writeJSON(w, http.StatusOK, resp) + return + } + d.writeJSONWithReplay(w, http.StatusOK, raw, traceToken, retrieval.ReplayEntry{ + DocumentID: body.DocumentID, + Query: body.Query, + Model: body.Model, + SelectedIDs: finalIDs, + }) } // sectionWithContent bundles a tree section with its loaded content @@ -776,7 +811,9 @@ func (d Deps) handleAnswer(w http.ResponseWriter, r *http.Request) { totalUsage.Add(synthUsage) citations := make([]map[string]any, 0, len(enriched)) + finalIDs := make([]tree.SectionID, 0, len(enriched)) for _, e := range enriched { + finalIDs = append(finalIDs, e.sec.ID) c := map[string]any{ "section_id": e.sec.ID, "title": e.sec.Title, @@ -800,6 +837,12 @@ func (d Deps) handleAnswer(w http.ResponseWriter, r *http.Request) { citations = append(citations, c) } + // Trace token mirrors handleQuery: hash over the final IDs that + // ground the answer + synthModel (the LLM that actually wrote + // the answer). Different synth models for the same retrieval set + // produce different responses and therefore different tokens. + traceToken := retrieval.ComputeTraceToken(body.DocumentID, "1", synthModel, finalIDs) + resp := map[string]any{ "document_id": body.DocumentID, "query": body.Query, @@ -814,12 +857,24 @@ func (d Deps) handleAnswer(w http.ResponseWriter, r *http.Request) { "cost_usd": totalUsage.CostUSD, "llm_calls": totalUsage.LLMCalls, }, - "elapsed_ms": time.Since(started).Milliseconds(), + "elapsed_ms": time.Since(started).Milliseconds(), + "trace_token": traceToken, } if plan != nil { resp["plan"] = plan } - writeJSON(w, http.StatusOK, resp) + + raw, err := marshalJSONForReplay(resp) + if err != nil { + writeJSON(w, http.StatusOK, resp) + return + } + d.writeJSONWithReplay(w, http.StatusOK, raw, traceToken, retrieval.ReplayEntry{ + DocumentID: body.DocumentID, + Query: body.Query, + Model: synthModel, + SelectedIDs: finalIDs, + }) } // synthesiseAnswer runs one LLM call producing the final answer from @@ -1336,6 +1391,111 @@ func writeErr(w http.ResponseWriter, status int, msg string) { writeJSON(w, status, map[string]string{"error": msg}) } +// marshalJSONForReplay marshals v to JSON exactly as it would be sent +// on the wire so the bytes can be both stored in the replay log AND +// returned to the caller in lock-step. Returns the bytes plus any +// marshal error; on error the caller falls back to writeJSON (which +// loses replay capture for that request but still serves the response). +// +// Why json.Marshal and not json.Encoder.Encode: Encoder appends a +// trailing newline; Marshal does not. The replay handler returns the +// stored bytes verbatim, so the byte representation must match the +// wire representation exactly. We append the newline ourselves here +// to match Encoder's behaviour and avoid a behavioural change for +// existing clients. +func marshalJSONForReplay(v any) ([]byte, error) { + raw, err := json.Marshal(v) + if err != nil { + return nil, err + } + // Match encoding/json.Encoder.Encode: it always emits a trailing + // newline. Adding it here keeps the wire format identical to the + // pre-3.1 behaviour and keeps replay byte-exact. + raw = append(raw, '\n') + return raw, nil +} + +// writeJSONWithReplay writes pre-marshalled JSON bytes verbatim and +// stores them in the replay log under the given token. Both writes +// MUST see the same bytes; the function is the single point where +// that invariant is enforced. +// +// When token is empty (no strategy ran, or the request opted out) +// the replay store is skipped silently — replay simply isn't +// available for that response. +func (d Deps) writeJSONWithReplay(w http.ResponseWriter, status int, raw []byte, token string, entry retrieval.ReplayEntry) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _, _ = w.Write(raw) + if d.Replay != nil && token != "" { + entry.ResponseJSON = raw + entry.CreatedAt = time.Now() + d.Replay.Put(token, entry) + } +} + +// handleReplay returns a byte-identical response previously stored +// against a trace_token. The body must echo the original query and +// document_id; mismatches return 409 with a specific `details` field +// so the caller knows which input drifted. Unknown tokens return 404. +// +// This is the endpoint that turns the whitepaper's "every answer is +// reproducible" claim into a working surface: an auditor can replay +// any /v1/query or /v1/answer response by retaining only the trace +// token, the query, and the document_id. +func (d Deps) handleReplay(w http.ResponseWriter, r *http.Request) { + if d.Replay == nil { + writeErr(w, http.StatusNotImplemented, "replay store not configured") + return + } + + var body struct { + TraceToken string `json:"trace_token"` + Query string `json:"query"` + DocumentID tree.DocumentID `json:"document_id"` + } + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + writeErr(w, http.StatusBadRequest, "invalid json: "+err.Error()) + return + } + if body.TraceToken == "" || body.Query == "" || body.DocumentID == "" { + writeErr(w, http.StatusBadRequest, "trace_token, query, and document_id are required") + return + } + + entry, ok := d.Replay.Get(body.TraceToken) + if !ok { + writeErr(w, http.StatusNotFound, "trace_token not found") + return + } + + // Strict input check. Order matters: we want the operator's + // diagnostic output to surface the FIRST drifting field rather + // than a vague "input mismatch". document_id is checked first + // because it's the highest-cardinality identifier; a wrong doc + // is the easiest way to misuse this endpoint. + if entry.DocumentID != body.DocumentID { + writeJSON(w, http.StatusConflict, map[string]string{ + "error": "input mismatch", + "details": "document_id differs from original", + }) + return + } + if entry.Query != body.Query { + writeJSON(w, http.StatusConflict, map[string]string{ + "error": "input mismatch", + "details": "query differs from original", + }) + return + } + + // Replay the original bytes verbatim. Content-Type matches the + // original response (always JSON for /v1/query and /v1/answer). + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write(entry.ResponseJSON) +} + func guessContentType(filename string) string { name := strings.ToLower(filename) switch { diff --git a/openapi.yaml b/openapi.yaml index 8b35548..a4c82ef 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -279,6 +279,82 @@ paths: "501": description: Endpoint not available — no LLM client configured + /v1/replay: + post: + tags: [Query] + summary: Replay a prior response byte-for-byte + operationId: replay + description: | + Returns the exact bytes of a previously-served /v1/query or + /v1/answer response keyed by its `trace_token`. The trace + token is surfaced on every retrieval response and is a + sha256 over (document_id, doc_version, model, sorted + selected section IDs, system prompt version). The body must + echo the original `query` and `document_id` so the engine + can refuse replay attempts under drifting inputs. + + Replay is the operational expression of the engine's + reproducibility guarantee: with only the trace token plus + the original inputs, an auditor can reconstruct the answer + an end-user actually saw, byte-for-byte, without storing + the full response themselves. + + Opt out at the server level by setting + `retrieval.replay.enabled=false` (or + `VLE_RETRIEVAL_REPLAY_ENABLED=false`); the endpoint then + returns 501. + requestBody: + required: true + content: + application/json: + schema: + $ref: "#/components/schemas/ReplayRequest" + responses: + "200": + description: | + Byte-identical original response. The body shape is whatever + /v1/query or /v1/answer originally returned. + content: + application/json: + schema: + oneOf: + - $ref: "#/components/schemas/QueryResponse" + - $ref: "#/components/schemas/AnswerResponse" + "400": + description: Missing or invalid request body. + "404": + description: | + Unknown trace_token — entry not found in the replay store + (never stored, evicted from the LRU, or past its TTL). + content: + application/json: + schema: + type: object + properties: + error: + type: string + example: trace_token not found + "409": + description: | + Input mismatch — the request's query and/or document_id + do not match the stored entry. The `details` field + identifies the specific drifting field so an auditor can + tell whether the query was tampered with vs. pointed at + the wrong document. + content: + application/json: + schema: + type: object + properties: + error: + type: string + example: input mismatch + details: + type: string + example: document_id differs from original + "501": + description: Replay store disabled on this server. + /v1/query/stream: post: tags: [Query] @@ -496,6 +572,16 @@ components: $ref: "#/components/schemas/Plan" elapsed_ms: type: integer + trace_token: + type: string + description: | + Deterministic 64-char hex sha256 over the inputs that + determine this response (document_id, document version, + model, sorted final section IDs, system prompt version). + Pass this token to /v1/replay along with the original + `query` and `document_id` to retrieve the byte-identical + response. Empty when the server has + `retrieval.replay.enabled=false`. QuerySection: type: object @@ -620,6 +706,16 @@ components: $ref: "#/components/schemas/Plan" elapsed_ms: type: integer + trace_token: + type: string + description: | + Deterministic 64-char hex sha256 token over the inputs + that determine this answer (document_id, document + version, synthesis model, sorted citation section IDs, + system prompt version). Pass to /v1/replay with the + original `query` and `document_id` to fetch the + byte-identical response. Empty when the server has + `retrieval.replay.enabled=false`. Plan: type: object @@ -700,3 +796,26 @@ components: Re-rank relevance score on a 0-100 scale. Omitted when no re-rank ran. Higher means the section is more directly relevant to the query. + + ReplayRequest: + type: object + description: | + Request body for POST /v1/replay. All three fields are + required: the trace_token identifies the stored response, + the query + document_id are validated against the stored + inputs so the engine rejects replay attempts under drifting + inputs (returns 409 with a `details` field identifying the + first drifting field). + required: [trace_token, query, document_id] + properties: + trace_token: + type: string + description: | + 64-char hex sha256 trace token surfaced on the original + /v1/query or /v1/answer response. + query: + type: string + description: The user query the original request carried. + document_id: + type: string + description: The document the original request targeted. diff --git a/pkg/config/config.go b/pkg/config/config.go index abfb8ce..43cd976 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -205,6 +205,31 @@ type RetrievalConfig struct { Answer AnswerBlock `yaml:"answer"` Planning PlanningBlock `yaml:"planning"` ReRank ReRankBlock `yaml:"rerank"` + Replay ReplayBlock `yaml:"replay"` +} + +// ReplayBlock configures the Phase 3.1 replay-trace store. +// +// When enabled, every /v1/query and /v1/answer response is stamped +// with a deterministic trace_token and the response body is stored +// in an in-memory LRU. Callers can POST /v1/replay with the token +// (plus the original query + document_id) to retrieve the byte- +// identical response. +// +// The store is opt-out — replay is a moat versus stateless vector +// RAG and should ship on by default. Disable to free the memory +// budget when audit/replay isn't part of the operator's flow. +type ReplayBlock struct { + // Enabled turns the replay store on. Default: true. + Enabled bool `yaml:"enabled"` + + // MaxEntries bounds the in-memory LRU. Default: 1024. + MaxEntries int `yaml:"max_entries"` + + // TTLSeconds is how long a replay entry remains valid. Default: + // 86400 (24h). Long-running audit flows may want to bump this; + // short-TTL deployments save memory. + TTLSeconds int `yaml:"ttl_seconds"` } // ReRankBlock configures the Phase 2.3 content-aware re-rank pass. @@ -411,6 +436,11 @@ func Default() Config { MaxContentChars: 2000, TopK: 0, }, + Replay: ReplayBlock{ + Enabled: true, + MaxEntries: 1024, + TTLSeconds: 86400, + }, }, Ingest: IngestConfig{ GlobalLLMConcurrency: 12, @@ -620,6 +650,24 @@ func applyEnvOverrides(c *Config) { c.Retrieval.ReRank.TopK = n } } + if v := os.Getenv("VLE_RETRIEVAL_REPLAY_ENABLED"); v != "" { + switch strings.ToLower(strings.TrimSpace(v)) { + case "1", "true", "yes", "on": + c.Retrieval.Replay.Enabled = true + case "0", "false", "no", "off": + c.Retrieval.Replay.Enabled = false + } + } + if v := os.Getenv("VLE_RETRIEVAL_REPLAY_MAX_ENTRIES"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + c.Retrieval.Replay.MaxEntries = n + } + } + if v := os.Getenv("VLE_RETRIEVAL_REPLAY_TTL_SECONDS"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + c.Retrieval.Replay.TTLSeconds = n + } + } } // Validate checks that required fields for the selected drivers are set. @@ -713,5 +761,12 @@ func (c Config) Validate() error { return fmt.Errorf("retrieval.rerank.top_k must be >= 0, got %d", c.Retrieval.ReRank.TopK) } + if c.Retrieval.Replay.MaxEntries < 0 { + return fmt.Errorf("retrieval.replay.max_entries must be >= 0, got %d", c.Retrieval.Replay.MaxEntries) + } + if c.Retrieval.Replay.TTLSeconds < 0 { + return fmt.Errorf("retrieval.replay.ttl_seconds must be >= 0, got %d", c.Retrieval.Replay.TTLSeconds) + } + return nil } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 64ba7d3..a9d9fab 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -55,11 +55,110 @@ func TestDefaultValues(t *testing.T) { if cfg.Retrieval.ReRank.TopK != 0 { t.Errorf("retrieval.rerank.top_k = %d, want 0 (keep all)", cfg.Retrieval.ReRank.TopK) } + if !cfg.Retrieval.Replay.Enabled { + t.Error("retrieval.replay.enabled should default to true (opt-out)") + } + if cfg.Retrieval.Replay.MaxEntries != 1024 { + t.Errorf("retrieval.replay.max_entries = %d, want 1024", cfg.Retrieval.Replay.MaxEntries) + } + if cfg.Retrieval.Replay.TTLSeconds != 86400 { + t.Errorf("retrieval.replay.ttl_seconds = %d, want 86400 (24h)", cfg.Retrieval.Replay.TTLSeconds) + } if cfg.Log.Level != "info" { t.Errorf("log.level = %q, want info", cfg.Log.Level) } } +func TestReplayEnvOverride(t *testing.T) { + // Not parallel — mutates env. Restore on exit. + prevEnabled := os.Getenv("VLE_RETRIEVAL_REPLAY_ENABLED") + prevMax := os.Getenv("VLE_RETRIEVAL_REPLAY_MAX_ENTRIES") + prevTTL := os.Getenv("VLE_RETRIEVAL_REPLAY_TTL_SECONDS") + defer func() { + os.Setenv("VLE_RETRIEVAL_REPLAY_ENABLED", prevEnabled) + os.Setenv("VLE_RETRIEVAL_REPLAY_MAX_ENTRIES", prevMax) + os.Setenv("VLE_RETRIEVAL_REPLAY_TTL_SECONDS", prevTTL) + }() + + os.Setenv("VLE_RETRIEVAL_REPLAY_ENABLED", "false") + os.Setenv("VLE_RETRIEVAL_REPLAY_MAX_ENTRIES", "256") + os.Setenv("VLE_RETRIEVAL_REPLAY_TTL_SECONDS", "3600") + + cfg := Default() + applyEnvOverrides(&cfg) + + if cfg.Retrieval.Replay.Enabled { + t.Error("VLE_RETRIEVAL_REPLAY_ENABLED=false should disable replay") + } + if cfg.Retrieval.Replay.MaxEntries != 256 { + t.Errorf("replay max_entries = %d, want 256", cfg.Retrieval.Replay.MaxEntries) + } + if cfg.Retrieval.Replay.TTLSeconds != 3600 { + t.Errorf("replay ttl_seconds = %d, want 3600", cfg.Retrieval.Replay.TTLSeconds) + } +} + +func TestReplayEnvOverrideEnable(t *testing.T) { + // Toggle on via env from an explicitly-disabled starting state. + prev := os.Getenv("VLE_RETRIEVAL_REPLAY_ENABLED") + defer os.Setenv("VLE_RETRIEVAL_REPLAY_ENABLED", prev) + + cfg := Default() + cfg.Retrieval.Replay.Enabled = false + os.Setenv("VLE_RETRIEVAL_REPLAY_ENABLED", "true") + applyEnvOverrides(&cfg) + if !cfg.Retrieval.Replay.Enabled { + t.Error("VLE_RETRIEVAL_REPLAY_ENABLED=true should enable replay even when disabled in YAML") + } +} + +func TestReplayEnvOverrideRejectsBad(t *testing.T) { + prevMax := os.Getenv("VLE_RETRIEVAL_REPLAY_MAX_ENTRIES") + prevTTL := os.Getenv("VLE_RETRIEVAL_REPLAY_TTL_SECONDS") + defer func() { + os.Setenv("VLE_RETRIEVAL_REPLAY_MAX_ENTRIES", prevMax) + os.Setenv("VLE_RETRIEVAL_REPLAY_TTL_SECONDS", prevTTL) + }() + + os.Setenv("VLE_RETRIEVAL_REPLAY_MAX_ENTRIES", "not-a-number") + os.Setenv("VLE_RETRIEVAL_REPLAY_TTL_SECONDS", "wat") + + cfg := Default() + applyEnvOverrides(&cfg) + if cfg.Retrieval.Replay.MaxEntries != 1024 { + t.Errorf("bad max_entries env should preserve default, got %d", cfg.Retrieval.Replay.MaxEntries) + } + if cfg.Retrieval.Replay.TTLSeconds != 86400 { + t.Errorf("bad ttl_seconds env should preserve default, got %d", cfg.Retrieval.Replay.TTLSeconds) + } +} + +func TestValidateReplayNegatives(t *testing.T) { + t.Parallel() + + cfg := Default() + cfg.Database.URL = "postgres://localhost/test" + cfg.Retrieval.Replay.MaxEntries = -1 + if err := cfg.Validate(); err == nil { + t.Error("negative replay max_entries should fail validation") + } + + cfg2 := Default() + cfg2.Database.URL = "postgres://localhost/test" + cfg2.Retrieval.Replay.TTLSeconds = -1 + if err := cfg2.Validate(); err == nil { + t.Error("negative replay ttl_seconds should fail validation") + } + + cfg3 := Default() + cfg3.Database.URL = "postgres://localhost/test" + cfg3.Retrieval.Replay.MaxEntries = 0 + cfg3.Retrieval.Replay.TTLSeconds = 0 + if err := cfg3.Validate(); err != nil { + t.Errorf("zero replay values should pass validation (use defaults at runtime): %v", err) + } +} + func TestReRankEnvOverride(t *testing.T) { // Not parallel — mutates env. Restore on exit. prevEnabled := os.Getenv("VLE_RETRIEVAL_RERANK_ENABLED") diff --git a/pkg/retrieval/agentic.go b/pkg/retrieval/agentic.go index 5166e32..41c49dd 100644 --- a/pkg/retrieval/agentic.go +++ b/pkg/retrieval/agentic.go @@ -184,6 +184,7 @@ func (a *AgenticStrategy) SelectWithCost(ctx context.Context, t *tree.Tree, quer ModelUsed: model, Usage: totalUsage, HopsTaken: hopsTaken, + TraceToken: ComputeTraceToken(t.DocumentID, traceDocVersionV1, model, finalIDs), }, nil case actionOutline: @@ -243,6 +244,7 @@ func (a *AgenticStrategy) SelectWithCost(ctx context.Context, t *tree.Tree, quer ModelUsed: model, Usage: totalUsage, HopsTaken: hopsTaken, + TraceToken: ComputeTraceToken(t.DocumentID, traceDocVersionV1, model, finalIDs), }, nil } diff --git a/pkg/retrieval/cached.go b/pkg/retrieval/cached.go index e1f2d3d..1e951b1 100644 --- a/pkg/retrieval/cached.go +++ b/pkg/retrieval/cached.go @@ -77,15 +77,23 @@ func (c *Cached) Select(ctx context.Context, t *tree.Tree, query string, budget // SelectWithCost checks the cache first. On a hit it returns zero usage // (no LLM call was made). On a miss it delegates to the inner strategy's // SelectWithCost if available, otherwise falls back to Select. +// +// The replay trace token is preserved across cache hits: because the +// token is a pure function of (document_id, doc_version, model, sorted +// selected_ids) and the cache key already varies with document + model, +// re-deriving the token from the cached slice produces the same value +// the original strategy would have stamped. func (c *Cached) SelectWithCost(ctx context.Context, t *tree.Tree, query string, budget ContextBudget) (*Result, error) { key := cache.Key(string(t.DocumentID), query, c.inner.Name(), budget.ModelName) if v, ok := c.cache.Get(key); ok { + ids := v.([]tree.SectionID) return &Result{ - SelectedIDs: v.([]tree.SectionID), + SelectedIDs: ids, Reasoning: "cached", ModelUsed: budget.ModelName, Usage: Usage{}, // zero — no LLM call + TraceToken: ComputeTraceToken(t.DocumentID, traceDocVersionV1, budget.ModelName, ids), }, nil } @@ -101,7 +109,10 @@ func (c *Cached) SelectWithCost(ctx context.Context, t *tree.Tree, query string, if err != nil { return nil, err } - result = &Result{SelectedIDs: ids} + result = &Result{ + SelectedIDs: ids, + TraceToken: ComputeTraceToken(t.DocumentID, traceDocVersionV1, budget.ModelName, ids), + } } c.cache.Set(key, result.SelectedIDs, c.ttl) diff --git a/pkg/retrieval/chunked_tree.go b/pkg/retrieval/chunked_tree.go index 716e47a..e7f9f09 100644 --- a/pkg/retrieval/chunked_tree.go +++ b/pkg/retrieval/chunked_tree.go @@ -55,6 +55,9 @@ func (c *ChunkedTree) Select(ctx context.Context, t *tree.Tree, query string, bu // SelectWithCost implements CostStrategy. func (c *ChunkedTree) SelectWithCost(ctx context.Context, t *tree.Tree, query string, budget ContextBudget) (*Result, error) { + if t == nil || t.Root == nil { + return &Result{}, nil + } tok := LLMTokenizer{C: c.LLM} slices, err := c.Splitter.Split(ctx, t, budget, tok) if err != nil { @@ -109,10 +112,12 @@ func (c *ChunkedTree) SelectWithCost(ctx context.Context, t *tree.Tree, query st totalUsage.Add(r.usage) } + selected := c.Merge.Merge(allIDs) return &Result{ - SelectedIDs: c.Merge.Merge(allIDs), + SelectedIDs: selected, Usage: totalUsage, HopsTaken: 1, + TraceToken: ComputeTraceToken(t.DocumentID, traceDocVersionV1, budget.ModelName, selected), }, nil } diff --git a/pkg/retrieval/replay.go b/pkg/retrieval/replay.go new file mode 100644 index 0000000..27381f8 --- /dev/null +++ b/pkg/retrieval/replay.go @@ -0,0 +1,152 @@ +package retrieval + +import ( + "sync" + "time" + + "github.com/hallelx2/vectorless-engine/pkg/cache" + "github.com/hallelx2/vectorless-engine/pkg/tree" +) + +// ReplayStore is the persistence interface for replay-trace entries. +// Implementations MUST be safe for concurrent use; the HTTP layer +// writes from /v1/query and /v1/answer handlers and reads from +// /v1/replay handlers at the same time. +type ReplayStore interface { + // Get retrieves a stored replay entry by trace token. Returns + // (zero, false) when the token is unknown or the entry has been + // evicted (LRU pressure or TTL). + Get(token string) (ReplayEntry, bool) + + // Put stores a replay entry under the given trace token. If the + // token is already present, the entry replaces it. + Put(token string, entry ReplayEntry) +} + +// ReplayEntry is what /v1/replay needs to reproduce a prior response +// byte-for-byte. ResponseJSON is the literal bytes of the original +// HTTP response — storing the marshalled response (not the underlying +// Go struct) is what guarantees byte-exact replay. Go's +// encoding/json sorts map keys lexicographically, so re-encoding the +// same map would already be deterministic, but persisting raw bytes +// removes any future doubt: it doesn't matter how the response is +// constructed, only what was actually returned. +// +// The remaining fields exist so the replay handler can validate the +// caller's claim — body.query and body.document_id must match +// entry.Query and entry.DocumentID, otherwise the engine returns 409 +// with a specific reason. +type ReplayEntry struct { + // DocumentID is the document the original request targeted. + DocumentID tree.DocumentID + + // Query is the user query the original request carried. + Query string + + // Model is the resolved LLM model name (after defaults). Stored + // so future versions can validate model claims; today the trace + // token already encodes the model, so a mismatch surfaces as + // "trace_token not found" rather than a 409. + Model string + + // SelectedIDs is the set of section IDs the strategy picked. + // Stored for debugging and observability; not used by the replay + // handler today (the response bytes already contain the IDs). + SelectedIDs []tree.SectionID + + // ResponseJSON is the literal response body sent over the wire. + // The replay handler writes this back verbatim with + // Content-Type: application/json. + ResponseJSON []byte + + // CreatedAt is when this entry was stored. Surfaced in logs. + CreatedAt time.Time +} + +// LRUReplayStore is an in-memory, TTL-aware, size-bounded replay +// store. It is a thin facade over pkg/cache.LRU: that cache is +// general-purpose (stores any) and concurrency-safe, so wrapping it +// avoids a duplicate LRU implementation. The facade enforces the +// ReplayEntry value type so callers cannot accidentally store the +// wrong shape. +// +// Capacity and TTL are configured at construction. Default capacity +// is 1024 entries, default TTL is 24h — sufficient for any realistic +// replay flow (audit, regression, debugging) while preventing +// unbounded memory growth. +// +// LRUReplayStore is NOT durable across process restarts. This is an +// intentional v1 limitation — the Phase 3.2 plan calls out persistent +// storage (Postgres-backed replay log + document versioning) as the +// next step. +type LRUReplayStore struct { + // mu guards ttl; the underlying cache is already lock-free for + // callers because pkg/cache.LRU has its own internal mutex. + mu sync.RWMutex + ttl time.Duration + + c *cache.LRU +} + +// Compile-time interface check. +var _ ReplayStore = (*LRUReplayStore)(nil) + +// LRUReplayConfig is the configuration for an LRUReplayStore. +type LRUReplayConfig struct { + // MaxEntries bounds the number of stored replay entries. Zero + // defaults to 1024. + MaxEntries int + + // TTL is how long an entry remains valid. Zero defaults to 24 + // hours. + TTL time.Duration +} + +// NewLRUReplayStore constructs an in-memory replay store with the +// given capacity and TTL. Zero values fall back to defaults. +func NewLRUReplayStore(cfg LRUReplayConfig) *LRUReplayStore { + ttl := cfg.TTL + if ttl <= 0 { + ttl = 24 * time.Hour + } + return &LRUReplayStore{ + ttl: ttl, + c: cache.NewLRU(cfg.MaxEntries), + } +} + +// Get implements ReplayStore. +func (s *LRUReplayStore) Get(token string) (ReplayEntry, bool) { + if token == "" { + return ReplayEntry{}, false + } + v, ok := s.c.Get(token) + if !ok { + return ReplayEntry{}, false + } + entry, ok := v.(ReplayEntry) + if !ok { + // Defensive: should never happen because Put only stores + // ReplayEntry, but a corrupt entry shouldn't panic the + // handler. Treat as a miss. + return ReplayEntry{}, false + } + return entry, true +} + +// Put implements ReplayStore. +func (s *LRUReplayStore) Put(token string, entry ReplayEntry) { + if token == "" { + return + } + s.mu.RLock() + ttl := s.ttl + s.mu.RUnlock() + s.c.Set(token, entry, ttl) +} + +// Len reports the current number of entries (including expired-but- +// not-yet-evicted entries). Primarily for tests and metrics. +func (s *LRUReplayStore) Len() int { + return s.c.Len() +} diff --git a/pkg/retrieval/replay_test.go b/pkg/retrieval/replay_test.go new file mode 100644 index 0000000..f4301a1 --- /dev/null +++ b/pkg/retrieval/replay_test.go @@ -0,0 +1,255 @@ +package retrieval_test + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/hallelx2/vectorless-engine/pkg/retrieval" + "github.com/hallelx2/vectorless-engine/pkg/tree" +) + +// TestLRUReplayStoreBasicPutGet covers the happy path: put an entry, +// get it back with byte-identical ResponseJSON. +func TestLRUReplayStoreBasicPutGet(t *testing.T) { + t.Parallel() + store := retrieval.NewLRUReplayStore(retrieval.LRUReplayConfig{ + MaxEntries: 10, + TTL: time.Minute, + }) + + want := retrieval.ReplayEntry{ + DocumentID: "doc_x", + Query: "what is x?", + Model: "claude-sonnet-4-5", + SelectedIDs: []tree.SectionID{"sec_a", "sec_b"}, + ResponseJSON: []byte(`{"answer":"hello","strategy":"chunked-tree"}`), + CreatedAt: time.Now(), + } + store.Put("token-abc", want) + + got, ok := store.Get("token-abc") + if !ok { + t.Fatal("expected entry for token-abc") + } + if got.DocumentID != want.DocumentID || got.Query != want.Query { + t.Errorf("entry mismatch: got %+v, want %+v", got, want) + } + if string(got.ResponseJSON) != string(want.ResponseJSON) { + t.Errorf("ResponseJSON not preserved verbatim: got %q want %q", + got.ResponseJSON, want.ResponseJSON) + } +} + +// TestLRUReplayStoreMiss covers the unknown-token path. +func TestLRUReplayStoreMiss(t *testing.T) { + t.Parallel() + store := retrieval.NewLRUReplayStore(retrieval.LRUReplayConfig{MaxEntries: 10}) + if _, ok := store.Get("nonexistent"); ok { + t.Error("expected miss for unknown token") + } +} + +// TestLRUReplayStoreEmptyToken ensures the store never accepts the +// empty trace token as a key — that would let callers mass-evict the +// LRU by Put-ing an empty token in a tight loop, and Get("") must +// always miss. +func TestLRUReplayStoreEmptyToken(t *testing.T) { + t.Parallel() + store := retrieval.NewLRUReplayStore(retrieval.LRUReplayConfig{MaxEntries: 10}) + store.Put("", retrieval.ReplayEntry{ + ResponseJSON: []byte(`{"x":1}`), + }) + if _, ok := store.Get(""); ok { + t.Error("empty trace token must always miss on Get") + } + if store.Len() != 0 { + t.Errorf("empty trace token must not be stored; Len=%d", store.Len()) + } +} + +// TestLRUReplayStoreEviction asserts that the LRU honours its +// capacity bound: pushing beyond MaxEntries evicts the +// least-recently-used entry. +func TestLRUReplayStoreEviction(t *testing.T) { + t.Parallel() + store := retrieval.NewLRUReplayStore(retrieval.LRUReplayConfig{ + MaxEntries: 3, + TTL: time.Hour, + }) + + for i, tok := range []string{"a", "b", "c"} { + store.Put(tok, retrieval.ReplayEntry{ + ResponseJSON: []byte(fmt.Sprintf(`{"i":%d}`, i)), + }) + } + // Adding "d" must evict "a" (LRU). + store.Put("d", retrieval.ReplayEntry{ResponseJSON: []byte(`{"i":3}`)}) + + if _, ok := store.Get("a"); ok { + t.Error("'a' should have been evicted as LRU") + } + if _, ok := store.Get("d"); !ok { + t.Error("'d' should be present after Put") + } + if got := store.Len(); got != 3 { + t.Errorf("Len = %d, want 3 (capacity)", got) + } +} + +// TestLRUReplayStoreTTLExpiry asserts that an entry past its TTL is +// no longer returned by Get. We can't sleep-wait reliably in tests, +// so we exercise the TTL path by using a very short TTL plus +// Sleep — fast enough to keep the test under a second. +func TestLRUReplayStoreTTLExpiry(t *testing.T) { + t.Parallel() + store := retrieval.NewLRUReplayStore(retrieval.LRUReplayConfig{ + MaxEntries: 10, + TTL: 50 * time.Millisecond, + }) + + store.Put("short", retrieval.ReplayEntry{ + ResponseJSON: []byte(`{"v":1}`), + }) + + // Immediately readable. + if _, ok := store.Get("short"); !ok { + t.Fatal("expected hit before expiry") + } + + // Wait past TTL. 200ms gives generous slack vs the 50ms TTL. + time.Sleep(200 * time.Millisecond) + + if _, ok := store.Get("short"); ok { + t.Error("expected miss after TTL expiry") + } +} + +// TestLRUReplayStoreUpdateInPlace asserts that Put-ing the same +// token twice replaces (not duplicates) the entry. This matters +// because the API surface re-Puts on a replay (idempotent retry). +func TestLRUReplayStoreUpdateInPlace(t *testing.T) { + t.Parallel() + store := retrieval.NewLRUReplayStore(retrieval.LRUReplayConfig{ + MaxEntries: 10, + TTL: time.Hour, + }) + store.Put("k", retrieval.ReplayEntry{ResponseJSON: []byte(`{"v":1}`)}) + store.Put("k", retrieval.ReplayEntry{ResponseJSON: []byte(`{"v":2}`)}) + + got, ok := store.Get("k") + if !ok { + t.Fatal("expected hit for replaced key") + } + if string(got.ResponseJSON) != `{"v":2}` { + t.Errorf("expected updated value v:2, got %q", got.ResponseJSON) + } + if store.Len() != 1 { + t.Errorf("Len = %d, want 1 (single key, no duplicate)", store.Len()) + } +} + +// TestLRUReplayStoreThreadSafety hammers Put/Get from many goroutines +// to surface any race condition under -race. The assertion is that +// every Get either returns a complete ReplayEntry or misses cleanly; +// the store must not corrupt any individual entry or panic. +func TestLRUReplayStoreThreadSafety(t *testing.T) { + t.Parallel() + store := retrieval.NewLRUReplayStore(retrieval.LRUReplayConfig{ + MaxEntries: 256, + TTL: time.Hour, + }) + + const workers = 16 + const opsPerWorker = 200 + + var hits, misses int64 + var wg sync.WaitGroup + for w := 0; w < workers; w++ { + w := w + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < opsPerWorker; i++ { + tok := fmt.Sprintf("w%d-i%d", w, i) + resp := []byte(fmt.Sprintf(`{"w":%d,"i":%d}`, w, i)) + store.Put(tok, retrieval.ReplayEntry{ + DocumentID: tree.DocumentID(fmt.Sprintf("doc_%d", w)), + Query: "q", + ResponseJSON: resp, + }) + got, ok := store.Get(tok) + if !ok { + atomic.AddInt64(&misses, 1) + continue + } + atomic.AddInt64(&hits, 1) + if string(got.ResponseJSON) != string(resp) { + t.Errorf("ResponseJSON corruption: got %q want %q", + got.ResponseJSON, resp) + return + } + } + }() + } + wg.Wait() + + // We don't require every Get to hit (LRU may evict between Put + // and Get under contention) — the test surfaces races, not + // hit-rate guarantees. + if hits+misses != workers*opsPerWorker { + t.Errorf("op count mismatch: hits+misses=%d expected %d", + hits+misses, workers*opsPerWorker) + } +} + +// TestLRUReplayStoreDefaults asserts that NewLRUReplayStore applies +// the documented defaults when zero values are passed. +func TestLRUReplayStoreDefaults(t *testing.T) { + t.Parallel() + store := retrieval.NewLRUReplayStore(retrieval.LRUReplayConfig{}) + // Cannot inspect TTL directly without exposing an accessor, so + // exercise it indirectly: a Put followed by an immediate Get + // must hit — TTL must NOT default to zero (which would make + // every entry immediately expired). + store.Put("token", retrieval.ReplayEntry{ + ResponseJSON: []byte(`{"x":1}`), + }) + if _, ok := store.Get("token"); !ok { + t.Error("default TTL must be non-zero; Put then immediate Get missed") + } +} + +// TestLRUReplayStoreByteExactness asserts that the stored ResponseJSON +// is preserved byte-for-byte (no normalisation, no whitespace +// trimming) — replay's whole value prop is "same response down to +// the byte". +func TestLRUReplayStoreByteExactness(t *testing.T) { + t.Parallel() + store := retrieval.NewLRUReplayStore(retrieval.LRUReplayConfig{ + MaxEntries: 10, + TTL: time.Hour, + }) + + // Include unusual whitespace and unicode so any normalisation + // pass would visibly mangle it. + tricky := []byte(`{"answer":"hello\n world", "emoji":"❤", "k": 42}`) + store.Put("t", retrieval.ReplayEntry{ResponseJSON: tricky}) + + got, ok := store.Get("t") + if !ok { + t.Fatal("expected hit") + } + if len(got.ResponseJSON) != len(tricky) { + t.Errorf("length drift: got %d want %d", len(got.ResponseJSON), len(tricky)) + } + for i := range tricky { + if got.ResponseJSON[i] != tricky[i] { + t.Errorf("byte %d differs: got 0x%x want 0x%x", + i, got.ResponseJSON[i], tricky[i]) + break + } + } +} diff --git a/pkg/retrieval/retrieval_test.go b/pkg/retrieval/retrieval_test.go index 347660d..9888c4b 100644 --- a/pkg/retrieval/retrieval_test.go +++ b/pkg/retrieval/retrieval_test.go @@ -316,3 +316,75 @@ func TestDefaultSplitterFastPath(t *testing.T) { t.Errorf("breadcrumb missing doc title: %q", slices[0].Breadcrumb) } } + +// TestSinglePassStampsTraceToken verifies that SelectWithCost +// populates a 64-char hex TraceToken on the returned Result. +func TestSinglePassStampsTraceToken(t *testing.T) { + tr := buildTree() + m := &mockLLM{pickIfPresent: []tree.SectionID{"sec_b"}} + s := retrieval.NewSinglePass(m) + + res, err := s.SelectWithCost(context.Background(), tr, "q", + retrieval.ContextBudget{ModelName: "claude-sonnet-4-5", MaxTokens: 1000}) + if err != nil { + t.Fatal(err) + } + if len(res.TraceToken) != 64 { + t.Fatalf("trace_token must be 64 chars, got %d (%q)", len(res.TraceToken), res.TraceToken) + } + for _, r := range res.TraceToken { + if !((r >= '0' && r <= '9') || (r >= 'a' && r <= 'f')) { + t.Fatalf("trace_token must be lowercase hex, got %q", res.TraceToken) + } + } + + // Same inputs → same token. + res2, err := s.SelectWithCost(context.Background(), tr, "q", + retrieval.ContextBudget{ModelName: "claude-sonnet-4-5", MaxTokens: 1000}) + if err != nil { + t.Fatal(err) + } + if res2.TraceToken != res.TraceToken { + t.Errorf("same inputs must produce same trace_token: %q vs %q", + res.TraceToken, res2.TraceToken) + } +} + +// TestChunkedTreeStampsTraceToken verifies that the chunked-tree +// strategy populates the trace token on its returned Result. +func TestChunkedTreeStampsTraceToken(t *testing.T) { + tr := buildTree() + m := &mockLLM{pickIfPresent: []tree.SectionID{"sec_a", "sec_b"}} + s := retrieval.NewChunkedTree(m) + + res, err := s.SelectWithCost(context.Background(), tr, "q", retrieval.ContextBudget{ + ModelName: "claude-sonnet-4-5", MaxTokens: 100000, MaxParallelCalls: 4, + }) + if err != nil { + t.Fatal(err) + } + if len(res.TraceToken) != 64 { + t.Fatalf("chunked-tree trace_token must be 64 chars, got %d", len(res.TraceToken)) + } +} + +// TestTraceTokenMatchesExternalComputation ties the strategy output to +// the canonical ComputeTraceToken helper, so any drift between the +// helper and the per-strategy plumbing is caught at test time. +func TestTraceTokenMatchesExternalComputation(t *testing.T) { + tr := buildTree() + m := &mockLLM{pickIfPresent: []tree.SectionID{"sec_a"}} + s := retrieval.NewSinglePass(m) + model := "claude-sonnet-4-5" + + res, err := s.SelectWithCost(context.Background(), tr, "q", + retrieval.ContextBudget{ModelName: model, MaxTokens: 1000}) + if err != nil { + t.Fatal(err) + } + want := retrieval.ComputeTraceToken(tr.DocumentID, "1", model, res.SelectedIDs) + if res.TraceToken != want { + t.Errorf("strategy trace_token %q does not match ComputeTraceToken %q", + res.TraceToken, want) + } +} diff --git a/pkg/retrieval/single_pass.go b/pkg/retrieval/single_pass.go index cb9a43e..95ab61c 100644 --- a/pkg/retrieval/single_pass.go +++ b/pkg/retrieval/single_pass.go @@ -65,14 +65,21 @@ func (s *SinglePass) SelectWithCost(ctx context.Context, t *tree.Tree, query str return nil, fmt.Errorf("single-pass llm call: %w", err) } + selected := FilterKnownIDs(ids, view.Sections) return &Result{ - SelectedIDs: FilterKnownIDs(ids, view.Sections), + SelectedIDs: selected, ModelUsed: model, Usage: usage, HopsTaken: 1, + TraceToken: ComputeTraceToken(t.DocumentID, traceDocVersionV1, model, selected), }, nil } +// traceDocVersionV1 is the placeholder document version used by every +// strategy until Phase 3.2 wires real per-document versioning. Defined +// once so the bump is a one-line change. +const traceDocVersionV1 = "1" + // defaultSelectionRetries is the number of EXTRA attempts (on top of the first) // the selection LLM call gets when its response fails to parse as JSON. Gemini's // JSON mode occasionally returns plain text ("The most relevant section is..."); diff --git a/pkg/retrieval/strategy.go b/pkg/retrieval/strategy.go index 29dd0dc..3edae3c 100644 --- a/pkg/retrieval/strategy.go +++ b/pkg/retrieval/strategy.go @@ -70,6 +70,14 @@ type Result struct { // strategies (e.g. agentic) set it to the number of tool-using turns // actually consumed, including the terminal "done" turn. HopsTaken int `json:"hops_taken,omitempty"` + + // TraceToken is the replay token computed by ComputeTraceToken over + // the inputs that determine selection (document ID + version, + // retrieval model, system prompt version, sorted selected IDs). + // Two retrieval runs with identical inputs produce the same token, + // regardless of reasoning path. Empty when the strategy did not + // populate it (e.g. tests, fallback paths). + TraceToken string `json:"trace_token,omitempty"` } // Usage is the aggregated token + cost accounting across all LLM calls diff --git a/pkg/retrieval/trace.go b/pkg/retrieval/trace.go new file mode 100644 index 0000000..9b3c05c --- /dev/null +++ b/pkg/retrieval/trace.go @@ -0,0 +1,65 @@ +package retrieval + +import ( + "crypto/sha256" + "encoding/hex" + "sort" + + "github.com/hallelx2/vectorless-engine/pkg/tree" +) + +// SystemPromptVersion is the build-time version stamp folded into every +// trace token. Bump this whenever selectionSystemPrompt (or any other +// retrieval system prompt whose change should invalidate replay) is +// edited so that previously-cached replay entries are no longer +// considered byte-equivalent. +// +// The version is a free-form string; "v1", "v2", … is the established +// convention. Replay clients should treat it as opaque. +const SystemPromptVersion = "v1" + +// ComputeTraceToken returns the canonical replay trace token for a +// retrieval result. +// +// The token is sha256(doc_id || \0 || doc_version || \0 || +// retrieval_model || \0 || system_prompt_version || \0 || +// sorted(selected_ids).joined("\0")), hex-encoded lowercase. The output +// is 64 hex characters. +// +// Sorting the IDs lexicographically makes the token order-invariant: +// two strategies that select the same set of sections — even via +// different reasoning paths — produce the same token. The NUL separator +// prevents pathological IDs containing the chosen delimiter from +// colliding (e.g. "a,b" + "c" vs "a" + "b,c"). +// +// The doc_version parameter is a caller-controlled string so the engine +// can layer document versioning on top in a future phase without +// changing the function signature; today callers pass "1". +func ComputeTraceToken(docID tree.DocumentID, docVersion, model string, ids []tree.SectionID) string { + // Defensive copy: callers reasonably expect ComputeTraceToken not to + // mutate the slice they pass in. Sorting in place would be a subtle + // foot-gun the next time someone reads selected_ids after computing + // the token. + sorted := make([]string, len(ids)) + for i, id := range ids { + sorted[i] = string(id) + } + sort.Strings(sorted) + + h := sha256.New() + h.Write([]byte(string(docID))) + h.Write([]byte{0}) + h.Write([]byte(docVersion)) + h.Write([]byte{0}) + h.Write([]byte(model)) + h.Write([]byte{0}) + h.Write([]byte(SystemPromptVersion)) + h.Write([]byte{0}) + for i, id := range sorted { + if i > 0 { + h.Write([]byte{0}) + } + h.Write([]byte(id)) + } + return hex.EncodeToString(h.Sum(nil)) +} diff --git a/pkg/retrieval/trace_test.go b/pkg/retrieval/trace_test.go new file mode 100644 index 0000000..a24ab77 --- /dev/null +++ b/pkg/retrieval/trace_test.go @@ -0,0 +1,148 @@ +package retrieval_test + +import ( + "regexp" + "testing" + + "github.com/hallelx2/vectorless-engine/pkg/retrieval" + "github.com/hallelx2/vectorless-engine/pkg/tree" +) + +// hexToken matches a lower-case sha256 hex digest exactly 64 chars long. +var hexToken = regexp.MustCompile(`^[0-9a-f]{64}$`) + +func TestComputeTraceTokenShape(t *testing.T) { + t.Parallel() + + tok := retrieval.ComputeTraceToken( + "doc_x", "1", "claude-sonnet-4-5", + []tree.SectionID{"sec_a", "sec_b"}, + ) + if !hexToken.MatchString(tok) { + t.Fatalf("trace token must be 64-char lowercase hex, got %q", tok) + } +} + +// Determinism: same inputs → same token, always. +func TestComputeTraceTokenDeterministic(t *testing.T) { + t.Parallel() + + ids := []tree.SectionID{"sec_a", "sec_b", "sec_c"} + a := retrieval.ComputeTraceToken("doc_x", "1", "model", ids) + b := retrieval.ComputeTraceToken("doc_x", "1", "model", ids) + if a != b { + t.Fatalf("same inputs must produce same token: %q vs %q", a, b) + } +} + +// Sort invariance: permuting the IDs must not change the token. This is +// the contract the replay endpoint relies on — two strategies that pick +// the same SET of sections produce byte-identical replay tokens +// regardless of reasoning order. +func TestComputeTraceTokenSortInvariant(t *testing.T) { + t.Parallel() + + a := retrieval.ComputeTraceToken("doc_x", "1", "model", + []tree.SectionID{"sec_a", "sec_b", "sec_c"}) + b := retrieval.ComputeTraceToken("doc_x", "1", "model", + []tree.SectionID{"sec_c", "sec_a", "sec_b"}) + c := retrieval.ComputeTraceToken("doc_x", "1", "model", + []tree.SectionID{"sec_b", "sec_c", "sec_a"}) + if a != b || b != c { + t.Fatalf("permuted IDs must produce same token: a=%q b=%q c=%q", a, b, c) + } +} + +// Sort invariance must not mutate the caller's slice. Computing the +// token over an already-sorted slice should leave a manually-sorted +// reference equal. +func TestComputeTraceTokenDoesNotMutateInput(t *testing.T) { + t.Parallel() + + ids := []tree.SectionID{"sec_c", "sec_a", "sec_b"} + before := make([]tree.SectionID, len(ids)) + copy(before, ids) + + _ = retrieval.ComputeTraceToken("doc_x", "1", "model", ids) + for i, id := range ids { + if id != before[i] { + t.Fatalf("input mutated at idx %d: before=%q after=%q", i, before[i], id) + } + } +} + +// Input sensitivity: changing any single component must change the +// token. Without this, replay would accept queries against a different +// document or model and return a stale response. +func TestComputeTraceTokenInputSensitivity(t *testing.T) { + t.Parallel() + + base := retrieval.ComputeTraceToken("doc_x", "1", "model", + []tree.SectionID{"sec_a", "sec_b"}) + + cases := []struct { + name string + tok string + }{ + {"different doc", retrieval.ComputeTraceToken("doc_y", "1", "model", + []tree.SectionID{"sec_a", "sec_b"})}, + {"different version", retrieval.ComputeTraceToken("doc_x", "2", "model", + []tree.SectionID{"sec_a", "sec_b"})}, + {"different model", retrieval.ComputeTraceToken("doc_x", "1", "other", + []tree.SectionID{"sec_a", "sec_b"})}, + {"different ids", retrieval.ComputeTraceToken("doc_x", "1", "model", + []tree.SectionID{"sec_a", "sec_c"})}, + {"superset ids", retrieval.ComputeTraceToken("doc_x", "1", "model", + []tree.SectionID{"sec_a", "sec_b", "sec_c"})}, + {"empty ids", retrieval.ComputeTraceToken("doc_x", "1", "model", nil)}, + } + for _, c := range cases { + if c.tok == base { + t.Errorf("%s: token did not change (still %q)", c.name, c.tok) + } + } +} + +// Pathological IDs containing commas (or other delimiter-ish bytes) +// must not collide via the joining strategy. Two distinct ID sets that +// would collide under naive comma-join must produce distinct tokens. +func TestComputeTraceTokenAvoidsDelimiterCollision(t *testing.T) { + t.Parallel() + + // Naive "a,b"+","+"c" join would equal "a"+","+"b,c" — using a NUL + // separator inside the hash input avoids the collision because IDs + // can't contain NULs in practice (and even if they could, the hash + // would still distinguish them). + a := retrieval.ComputeTraceToken("doc_x", "1", "model", + []tree.SectionID{"a,b", "c"}) + b := retrieval.ComputeTraceToken("doc_x", "1", "model", + []tree.SectionID{"a", "b,c"}) + if a == b { + t.Fatalf("comma-delimited IDs must not collide: both produced %q", a) + } +} + +// Empty selection still produces a valid 64-char token — replay must +// work for "we read the document and nothing was relevant" outcomes, +// which are legitimate retrieval results. +func TestComputeTraceTokenEmptySelection(t *testing.T) { + t.Parallel() + + tok := retrieval.ComputeTraceToken("doc_x", "1", "model", nil) + if !hexToken.MatchString(tok) { + t.Fatalf("empty selection should still produce valid hex token, got %q", tok) + } +} + +// SystemPromptVersion is folded into the token so an edit to the +// retrieval system prompt invalidates previously-cached replay +// entries. This test pins the constant so a change to its value is a +// deliberate decision (the assertion fails when SystemPromptVersion is +// bumped, prompting the author to acknowledge replay invalidation). +func TestSystemPromptVersionStable(t *testing.T) { + t.Parallel() + + if retrieval.SystemPromptVersion != "v1" { + t.Errorf("SystemPromptVersion changed to %q; bump replay docs and ensure existing replay entries are intentionally invalidated", retrieval.SystemPromptVersion) + } +}