From c10b9d4f0f673f2e16420f8a81059682b9c56416 Mon Sep 17 00:00:00 2001 From: Abhinav Mahajan Date: Mon, 15 Jun 2026 18:11:16 +0530 Subject: [PATCH 1/4] feat(graph): persist sessionId on GraphNode at extraction time --- src/functions/graph.ts | 35 ++++++++++++++++++++++++++++++++- src/functions/temporal-graph.ts | 31 ++++++++++++++++++++++++++++- src/types.ts | 6 ++++++ 3 files changed, 70 insertions(+), 2 deletions(-) diff --git a/src/functions/graph.ts b/src/functions/graph.ts index bb0d72722..5f3dd52c2 100644 --- a/src/functions/graph.ts +++ b/src/functions/graph.ts @@ -287,6 +287,12 @@ function mergeNode( ]), ], properties: { ...existing.properties, ...incoming.properties }, + // Refresh to the newest source's session (#656). The incoming node + // is the more recent extract; prefer its sessionId when present so a + // node re-observed in a different session points at the live one. + // Falls back to the existing value (which may itself be undefined for + // pre-#656 nodes — retrieval handles that case). + sessionId: incoming.sessionId ?? existing.sessionId, updatedAt: capturedAt, }; } @@ -378,6 +384,11 @@ function parseAttrs(raw: string): Record { function parseGraphXml( xml: string, observationIds: string[], + // obsId -> sessionId, so each extracted node can record the session + // namespace of its source observations (#656). Optional: when omitted + // (or an obsId is missing), the node's sessionId is left undefined and + // retrieval falls back to a cross-session scan. + sessionByObsId?: Map, ): { nodes: GraphNode[]; edges: GraphEdge[]; @@ -405,12 +416,27 @@ function parseGraphXml( while ((propMatch = propRegex.exec(propsBlock)) !== null) { properties[propMatch[1]] = propMatch[2]; } + // Resolve the node's sessionId from its source observations. All + // observationIds in one extract call share the batch, so the first + // resolvable session is representative; merges later refresh it to + // the newest source (see mergeNode). + let sessionId: string | undefined; + if (sessionByObsId) { + for (const obsId of observationIds) { + const sid = sessionByObsId.get(obsId); + if (sid) { + sessionId = sid; + break; + } + } + } nodes.push({ id: generateId("gn"), type, name, properties, sourceObservationIds: observationIds, + ...(sessionId !== undefined && { sessionId }), createdAt: now, }); }; @@ -478,7 +504,14 @@ export function registerGraphFunction( ); const obsIds = data.observations.map((o) => o.id); - const { nodes, edges } = parseGraphXml(response, obsIds); + // Map each source observation to its session so extracted nodes + // can be resolved back to KV.observations(sessionId) at retrieval + // time (#656). Skip blanks defensively. + const sessionByObsId = new Map(); + for (const o of data.observations) { + if (o.sessionId) sessionByObsId.set(o.id, o.sessionId); + } + const { nodes, edges } = parseGraphXml(response, obsIds, sessionByObsId); // #814 v2: targeted name-index lookups replace the O(n) scan // over `kv.list(KV.graphNodes)`. At 75K nodes the diff --git a/src/functions/temporal-graph.ts b/src/functions/temporal-graph.ts index 1e8d1df10..61f1f4722 100644 --- a/src/functions/temporal-graph.ts +++ b/src/functions/temporal-graph.ts @@ -48,6 +48,10 @@ Rules: function parseTemporalGraphXml( xml: string, observationIds: string[], + // obsId -> sessionId, so temporal nodes record the session namespace of + // their source observations (#656). Optional; absent obsIds leave the + // node's sessionId undefined and retrieval falls back to a scan. + sessionByObsId?: Map, ): { nodes: GraphNode[]; edges: GraphEdge[] } { const nodes: GraphNode[] = []; const edges: GraphEdge[] = []; @@ -74,12 +78,23 @@ function parseTemporalGraphXml( aliases.push(propMatch[1]); } + let sessionId: string | undefined; + if (sessionByObsId) { + for (const obsId of observationIds) { + const sid = sessionByObsId.get(obsId); + if (sid) { + sessionId = sid; + break; + } + } + } nodes.push({ id: generateId("gn"), type, name, properties, sourceObservationIds: observationIds, + ...(sessionId !== undefined && { sessionId }), createdAt: now, aliases: aliases.length > 0 ? aliases : undefined, }); @@ -158,6 +173,7 @@ export function registerTemporalGraphFunctions( async (data: { observations: Array<{ id: string; + sessionId?: string; title: string; narrative: string; concepts: string[]; @@ -184,7 +200,17 @@ export function registerTemporalGraphFunctions( ); const obsIds = data.observations.map((o) => o.id); - const { nodes, edges } = parseTemporalGraphXml(response, obsIds); + // Map source observations to their sessions so temporal nodes + // resolve back to KV.observations(sessionId) at retrieval (#656). + const sessionByObsId = new Map(); + for (const o of data.observations) { + if (o.sessionId) sessionByObsId.set(o.id, o.sessionId); + } + const { nodes, edges } = parseTemporalGraphXml( + response, + obsIds, + sessionByObsId, + ); const existingNodes = await kv.list(KV.graphNodes); const existingEdges = await kv.list(KV.graphEdges); @@ -206,6 +232,9 @@ export function registerTemporalGraphFunctions( ]), ], properties: { ...existing.properties, ...node.properties }, + // Refresh to the newest source's session (#656); keep the + // existing value when this extract couldn't resolve one. + sessionId: node.sessionId ?? existing.sessionId, updatedAt: new Date().toISOString(), aliases: [ ...new Set([ diff --git a/src/types.ts b/src/types.ts index 6797dfaf9..fdc1fa990 100644 --- a/src/types.ts +++ b/src/types.ts @@ -384,6 +384,12 @@ export interface GraphNode { name: string; properties: Record; sourceObservationIds: string[]; + // sessionId of the most recent source observation. Carried so graph + // retrieval can resolve a node's observations back to their KV + // namespace (KV.observations is keyed by sessionId). Optional for + // backward compatibility with nodes written before #656; retrieval + // falls back to a cross-session obsId scan when absent. + sessionId?: string; createdAt: string; updatedAt?: string; aliases?: string[]; From e12a054d156ecc00f092b89051e40a32dc10664a Mon Sep 17 00:00:00 2001 From: Abhinav Mahajan Date: Mon, 15 Jun 2026 18:11:38 +0530 Subject: [PATCH 2/4] fix(graph): resolve real sessionId for graph search results --- src/functions/graph-retrieval.ts | 81 ++++++++++++++++++++++++++++++-- 1 file changed, 76 insertions(+), 5 deletions(-) diff --git a/src/functions/graph-retrieval.ts b/src/functions/graph-retrieval.ts index 10fd51a76..63385a9b1 100644 --- a/src/functions/graph-retrieval.ts +++ b/src/functions/graph-retrieval.ts @@ -1,6 +1,8 @@ import type { GraphNode, GraphEdge, + Session, + CompressedObservation, } from "../types.js"; import { KV } from "../state/schema.js"; import type { StateKV } from "../state/kv.js"; @@ -41,6 +43,69 @@ function buildGraphContext( export class GraphRetrieval { constructor(private kv: StateKV) {} + /** + * Resolve each result to the session whose KV namespace actually holds + * its observation, so HybridSearch.enrichResults can load it via + * KV.observations(sessionId) (#656). + * + * The node-provided `sessionId` is only a hint: a node accumulates + * sourceObservationIds across many extracts/sessions but stores a single + * (most-recent) session, and legacy pre-#656 nodes store none. So for + * every result we VERIFY the obsId actually lives in the hinted session; + * on a miss (wrong hint, empty hint, multi-session node) we fall back to + * scanning known sessions. This makes the resolution authoritative + * rather than trusting a hint that can be stale or shared. + * + * Cost: the common single-session node confirms in one lookup. Results + * sharing an obsId are cached, so repeats are free. Only genuine misses + * pay the per-session scan. + */ + private async resolveSessionIds( + results: GraphRetrievalResult[], + ): Promise { + if (results.length === 0) return; + + // obsId -> owning sessionId, or null when no known session holds it + // (so an unresolved obsId is scanned at most once, not per result). + const resolved = new Map(); + let sessions: Session[] | null = null; // lazily listed on first miss + + const probe = async (sessionId: string, obsId: string): Promise => { + const obs = await this.kv + .get(KV.observations(sessionId), obsId) + .catch(() => null); + return obs !== null; + }; + + for (const r of results) { + if (resolved.has(r.obsId)) { + r.sessionId = resolved.get(r.obsId) ?? ""; + continue; + } + + // 1) Trust-but-verify the node's hint first (cheapest path). + if (r.sessionId && (await probe(r.sessionId, r.obsId))) { + resolved.set(r.obsId, r.sessionId); + continue; + } + + // 2) Hint was empty or wrong — scan known sessions for the obs. + if (sessions === null) { + sessions = await this.kv.list(KV.sessions); + } + let found: string | null = null; + for (const session of sessions) { + if (session.id === r.sessionId) continue; // already probed above + if (await probe(session.id, r.obsId)) { + found = session.id; + break; + } + } + resolved.set(r.obsId, found); + r.sessionId = found ?? ""; + } + } + async searchByEntities( entityNames: string[], maxDepth = 2, @@ -89,7 +154,7 @@ export class GraphRetrieval { results.push({ obsId, - sessionId: "", + sessionId: lastNode.sessionId ?? "", score, graphContext: buildGraphContext(path), pathLength, @@ -102,7 +167,7 @@ export class GraphRetrieval { visitedObs.add(obsId); results.push({ obsId, - sessionId: "", + sessionId: startNode.sessionId ?? "", score: 1.0, graphContext: `[${startNode.type}] ${startNode.name}`, pathLength: 0, @@ -111,7 +176,11 @@ export class GraphRetrieval { } results.sort((a, b) => b.score - a.score); - return results.slice(0, maxResults); + const top = results.slice(0, maxResults); + // Resolve sessions only for the surviving top-K so verification/scan + // work never runs for results that get trimmed away. + await this.resolveSessionIds(top); + return top; } async expandFromChunks( @@ -142,7 +211,7 @@ export class GraphRetrieval { results.push({ obsId, - sessionId: "", + sessionId: lastNode.sessionId ?? "", score, graphContext: buildGraphContext(path), pathLength, @@ -152,7 +221,9 @@ export class GraphRetrieval { } results.sort((a, b) => b.score - a.score); - return results.slice(0, maxResults); + const top = results.slice(0, maxResults); + await this.resolveSessionIds(top); + return top; } async temporalQuery( From bf998d0712122c8be9d3bce311d8bf552ede4f2d Mon Sep 17 00:00:00 2001 From: Abhinav Mahajan Date: Mon, 15 Jun 2026 18:11:48 +0530 Subject: [PATCH 3/4] test(graph): cover sessionId resolution for graph retrieval --- test/graph-retrieval.test.ts | 170 +++++++++++++++++++++++++++++++++++ test/graph.test.ts | 12 +++ 2 files changed, 182 insertions(+) diff --git a/test/graph-retrieval.test.ts b/test/graph-retrieval.test.ts index 2eb3f923b..35a62132c 100644 --- a/test/graph-retrieval.test.ts +++ b/test/graph-retrieval.test.ts @@ -296,3 +296,173 @@ describe("GraphRetrieval", () => { expect(results.find((r) => r.obsId === "obs_4")).toBeUndefined(); }); }); + +// Regression coverage for #656: graph results carried sessionId: "" which +// made HybridSearch.enrichResults look up KV.observations("") and silently +// drop every graph-retrieved observation. Results must now resolve to the +// session whose KV namespace actually holds the observation. The resolver +// trusts-but-verifies the node's sessionId hint and falls back to a scan +// when the hint is empty (legacy node) or wrong (multi-session node). +describe("GraphRetrieval — sessionId resolution (#656)", () => { + // Mock that serves graph nodes/edges, the sessions list, AND per-session + // observation rows, so the resolver's verify + scan paths have real data. + function mockKVWithSessions( + nodes: GraphNode[], + edges: GraphEdge[], + sessions: Array<{ id: string }>, + obsBySession: Record, + ) { + const store = new Map>(); + const nodesMap = new Map(); + for (const n of nodes) nodesMap.set(n.id, n); + store.set("mem:graph:nodes", nodesMap); + const edgesMap = new Map(); + for (const e of edges) edgesMap.set(e.id, e); + store.set("mem:graph:edges", edgesMap); + + const sessionsMap = new Map(); + for (const s of sessions) sessionsMap.set(s.id, s); + store.set("mem:sessions", sessionsMap); + + for (const [sid, obsIds] of Object.entries(obsBySession)) { + const obsMap = new Map(); + for (const oid of obsIds) obsMap.set(oid, { id: oid, sessionId: sid }); + store.set(`mem:obs:${sid}`, obsMap); + } + + return { + get: async (scope: string, key: string): Promise => + (store.get(scope)?.get(key) as T) ?? null, + set: async (scope: string, key: string, data: T): Promise => { + if (!store.has(scope)) store.set(scope, new Map()); + store.get(scope)!.set(key, data); + return data; + }, + delete: async (scope: string, key: string): Promise => { + store.get(scope)?.delete(key); + }, + list: async (scope: string): Promise => { + const entries = store.get(scope); + return entries ? (Array.from(entries.values()) as T[]) : []; + }, + }; + } + + it("resolves a node's sessionId when the hint is verified in KV (start-node path)", async () => { + const node: GraphNode = { + ...makeNode("n1", "React", "library", ["obs_1"]), + sessionId: "sess_abc", + }; + const kv = mockKVWithSessions([node], [], [{ id: "sess_abc" }], { + sess_abc: ["obs_1"], + }); + const retrieval = new GraphRetrieval(kv as never); + + const results = await retrieval.searchByEntities(["React"]); + expect(results.length).toBeGreaterThan(0); + expect(results[0].obsId).toBe("obs_1"); + // Before the fix this was always "" — the bug. + expect(results[0].sessionId).toBe("sess_abc"); + }); + + it("resolves sessionId across a traversed path (last-node path)", async () => { + const nodes: GraphNode[] = [ + { ...makeNode("n1", "React", "library", ["obs_1"]), sessionId: "sess_a" }, + { ...makeNode("n2", "Hook", "concept", ["obs_2"]), sessionId: "sess_b" }, + ]; + const edges = [makeEdge("e1", "n1", "n2", "uses")]; + const kv = mockKVWithSessions( + nodes, + edges, + [{ id: "sess_a" }, { id: "sess_b" }], + { sess_a: ["obs_1"], sess_b: ["obs_2"] }, + ); + const retrieval = new GraphRetrieval(kv as never); + + const results = await retrieval.searchByEntities(["React"], 2); + const hookResult = results.find((r) => r.obsId === "obs_2"); + expect(hookResult).toBeDefined(); + // obs_2 belongs to n2, whose session is sess_b. + expect(hookResult!.sessionId).toBe("sess_b"); + }); + + it("resolves sessionId in expandFromChunks", async () => { + const nodes: GraphNode[] = [ + { ...makeNode("n1", "auth.ts", "file", ["obs_1"]), sessionId: "sess_a" }, + { ...makeNode("n2", "jwt", "concept", ["obs_2"]), sessionId: "sess_b" }, + ]; + const edges = [makeEdge("e1", "n1", "n2", "uses")]; + const kv = mockKVWithSessions( + nodes, + edges, + [{ id: "sess_a" }, { id: "sess_b" }], + { sess_a: ["obs_1"], sess_b: ["obs_2"] }, + ); + const retrieval = new GraphRetrieval(kv as never); + + const results = await retrieval.expandFromChunks(["obs_1"]); + const jwtResult = results.find((r) => r.obsId === "obs_2"); + expect(jwtResult).toBeDefined(); + expect(jwtResult!.sessionId).toBe("sess_b"); + }); + + it("backfills sessionId for legacy nodes (no sessionId field) via session scan", async () => { + // Legacy node written before #656 — no sessionId. The obs lives in + // sess_legacy; the scan fallback must locate it. + const node = makeNode("n1", "React", "library", ["obs_legacy"]); + expect(node.sessionId).toBeUndefined(); + const kv = mockKVWithSessions( + [node], + [], + [{ id: "sess_other" }, { id: "sess_legacy" }], + { sess_other: ["obs_unrelated"], sess_legacy: ["obs_legacy"] }, + ); + const retrieval = new GraphRetrieval(kv as never); + + const results = await retrieval.searchByEntities(["React"]); + const r = results.find((x) => x.obsId === "obs_legacy"); + expect(r).toBeDefined(); + expect(r!.sessionId).toBe("sess_legacy"); + }); + + it("corrects a stale hint for a multi-session node (verify-then-scan)", async () => { + // A node re-observed across two sessions accumulates both obsIds but + // stores only the most-recent session (sess_b). obs_a actually lives in + // sess_a — trusting the hint blindly would mis-namespace it and drop it + // at enrichment. The resolver must verify and re-locate obs_a to sess_a. + const node: GraphNode = { + ...makeNode("n1", "React", "library", ["obs_a", "obs_b"]), + sessionId: "sess_b", + }; + const kv = mockKVWithSessions( + [node], + [], + [{ id: "sess_a" }, { id: "sess_b" }], + { sess_a: ["obs_a"], sess_b: ["obs_b"] }, + ); + const retrieval = new GraphRetrieval(kv as never); + + const results = await retrieval.searchByEntities(["React"]); + const ra = results.find((x) => x.obsId === "obs_a"); + const rb = results.find((x) => x.obsId === "obs_b"); + expect(ra).toBeDefined(); + expect(rb).toBeDefined(); + // obs_b matches the hint; obs_a is corrected to its true owning session. + expect(rb!.sessionId).toBe("sess_b"); + expect(ra!.sessionId).toBe("sess_a"); + }); + + it("leaves sessionId empty when an obs exists in no session", async () => { + const node = makeNode("n1", "React", "library", ["obs_orphan"]); + const kv = mockKVWithSessions([node], [], [{ id: "sess_x" }], { + sess_x: ["obs_unrelated"], + }); + const retrieval = new GraphRetrieval(kv as never); + + const results = await retrieval.searchByEntities(["React"]); + const r = results.find((x) => x.obsId === "obs_orphan"); + expect(r).toBeDefined(); + // No session owns it — graceful empty, not a crash. + expect(r!.sessionId).toBe(""); + }); +}); diff --git a/test/graph.test.ts b/test/graph.test.ts index da8b26651..22ed48186 100644 --- a/test/graph.test.ts +++ b/test/graph.test.ts @@ -106,6 +106,18 @@ describe("Graph Functions", () => { expect(edges[0].type).toBe("uses"); }); + it("graph-extract stamps nodes with the source observation's sessionId (#656)", async () => { + await sdk.trigger("mem::graph-extract", { observations: [testObs] }); + + const nodes = await kv.list("mem:graph:nodes"); + expect(nodes.length).toBeGreaterThan(0); + // Every node built from testObs must carry its session so retrieval + // can resolve KV.observations(sessionId) instead of the empty namespace. + for (const n of nodes) { + expect(n.sessionId).toBe("ses_1"); + } + }); + it("graph-extract accepts self-closing entity tags", async () => { mockProvider.compress.mockResolvedValueOnce(` From c6d3942d3fc2fd33d356845f1631ec3c2fe1b404 Mon Sep 17 00:00:00 2001 From: Abhinav Mahajan Date: Mon, 15 Jun 2026 18:24:55 +0530 Subject: [PATCH 4/4] fix(graph): ensure graceful degradation when session list retrieval fails --- src/functions/graph-retrieval.ts | 7 ++++++- test/graph-retrieval.test.ts | 22 ++++++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/src/functions/graph-retrieval.ts b/src/functions/graph-retrieval.ts index 63385a9b1..3c8eb2a9f 100644 --- a/src/functions/graph-retrieval.ts +++ b/src/functions/graph-retrieval.ts @@ -90,8 +90,13 @@ export class GraphRetrieval { } // 2) Hint was empty or wrong — scan known sessions for the obs. + // Guard the list like probe() guards its get(): a KV failure here + // should degrade to an unresolved sessionId ("") for this obs, not + // abort the whole retrieval. Empty array also caches the miss below. if (sessions === null) { - sessions = await this.kv.list(KV.sessions); + sessions = await this.kv + .list(KV.sessions) + .catch(() => [] as Session[]); } let found: string | null = null; for (const session of sessions) { diff --git a/test/graph-retrieval.test.ts b/test/graph-retrieval.test.ts index 35a62132c..532cbf990 100644 --- a/test/graph-retrieval.test.ts +++ b/test/graph-retrieval.test.ts @@ -465,4 +465,26 @@ describe("GraphRetrieval — sessionId resolution (#656)", () => { // No session owns it — graceful empty, not a crash. expect(r!.sessionId).toBe(""); }); + + it("degrades gracefully when the sessions list throws (no abort)", async () => { + // Legacy node (no sessionId hint) so resolution must hit the scan path. + const node = makeNode("n1", "React", "library", ["obs_1"]); + const kv = mockKVWithSessions([node], [], [{ id: "sess_a" }], { + sess_a: ["obs_1"], + }); + // Fail only the sessions list; node/edge lists still work so retrieval + // produces results. Resolution must fall back to an empty sessionId + // rather than rejecting the whole retrieval. + const originalList = kv.list; + kv.list = (async (scope: string) => { + if (scope === "mem:sessions") throw new Error("kv down"); + return originalList(scope); + }) as never; + const retrieval = new GraphRetrieval(kv as never); + + const results = await retrieval.searchByEntities(["React"]); + const r = results.find((x) => x.obsId === "obs_1"); + expect(r).toBeDefined(); + expect(r!.sessionId).toBe(""); + }); });