Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 81 additions & 5 deletions src/functions/graph-retrieval.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import type {
GraphNode,
GraphEdge,
Session,
CompressedObservation,
} from "../types.js";
import { KV } from "../state/schema.js";
import type { StateKV } from "../state/kv.js";
Expand Down Expand Up @@ -41,6 +43,74 @@ function buildGraphContext(
export class GraphRetrieval {
constructor(private kv: StateKV) {}

/**
* Resolve each result to the session whose KV namespace actually holds
* its observation, so HybridSearch.enrichResults can load it via
* KV.observations(sessionId) (#656).
*
* The node-provided `sessionId` is only a hint: a node accumulates
* sourceObservationIds across many extracts/sessions but stores a single
* (most-recent) session, and legacy pre-#656 nodes store none. So for
* every result we VERIFY the obsId actually lives in the hinted session;
* on a miss (wrong hint, empty hint, multi-session node) we fall back to
* scanning known sessions. This makes the resolution authoritative
* rather than trusting a hint that can be stale or shared.
*
* Cost: the common single-session node confirms in one lookup. Results
* sharing an obsId are cached, so repeats are free. Only genuine misses
* pay the per-session scan.
*/
private async resolveSessionIds(
results: GraphRetrievalResult[],
): Promise<void> {
if (results.length === 0) return;

// obsId -> owning sessionId, or null when no known session holds it
// (so an unresolved obsId is scanned at most once, not per result).
const resolved = new Map<string, string | null>();
let sessions: Session[] | null = null; // lazily listed on first miss

const probe = async (sessionId: string, obsId: string): Promise<boolean> => {
const obs = await this.kv
.get<CompressedObservation>(KV.observations(sessionId), obsId)
.catch(() => null);
return obs !== null;
};

for (const r of results) {
if (resolved.has(r.obsId)) {
r.sessionId = resolved.get(r.obsId) ?? "";
continue;
}

// 1) Trust-but-verify the node's hint first (cheapest path).
if (r.sessionId && (await probe(r.sessionId, r.obsId))) {
resolved.set(r.obsId, r.sessionId);
continue;
}

// 2) Hint was empty or wrong — scan known sessions for the obs.
// Guard the list like probe() guards its get(): a KV failure here
// should degrade to an unresolved sessionId ("") for this obs, not
// abort the whole retrieval. Empty array also caches the miss below.
if (sessions === null) {
sessions = await this.kv
.list<Session>(KV.sessions)
.catch(() => [] as Session[]);
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
let found: string | null = null;
for (const session of sessions) {
if (session.id === r.sessionId) continue; // already probed above
if (await probe(session.id, r.obsId)) {
found = session.id;
break;
}
}
resolved.set(r.obsId, found);
r.sessionId = found ?? "";
}
}

async searchByEntities(
entityNames: string[],
maxDepth = 2,
Expand Down Expand Up @@ -89,7 +159,7 @@ export class GraphRetrieval {

results.push({
obsId,
sessionId: "",
sessionId: lastNode.sessionId ?? "",
score,
graphContext: buildGraphContext(path),
pathLength,
Expand All @@ -102,7 +172,7 @@ export class GraphRetrieval {
visitedObs.add(obsId);
results.push({
obsId,
sessionId: "",
sessionId: startNode.sessionId ?? "",
score: 1.0,
graphContext: `[${startNode.type}] ${startNode.name}`,
pathLength: 0,
Expand All @@ -111,7 +181,11 @@ export class GraphRetrieval {
}

results.sort((a, b) => b.score - a.score);
return results.slice(0, maxResults);
const top = results.slice(0, maxResults);
// Resolve sessions only for the surviving top-K so verification/scan
// work never runs for results that get trimmed away.
await this.resolveSessionIds(top);
return top;
}

async expandFromChunks(
Expand Down Expand Up @@ -142,7 +216,7 @@ export class GraphRetrieval {

results.push({
obsId,
sessionId: "",
sessionId: lastNode.sessionId ?? "",
score,
graphContext: buildGraphContext(path),
pathLength,
Expand All @@ -152,7 +226,9 @@ export class GraphRetrieval {
}

results.sort((a, b) => b.score - a.score);
return results.slice(0, maxResults);
const top = results.slice(0, maxResults);
await this.resolveSessionIds(top);
return top;
}

async temporalQuery(
Expand Down
35 changes: 34 additions & 1 deletion src/functions/graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,12 @@ function mergeNode(
]),
],
properties: { ...existing.properties, ...incoming.properties },
// Refresh to the newest source's session (#656). The incoming node
// is the more recent extract; prefer its sessionId when present so a
// node re-observed in a different session points at the live one.
// Falls back to the existing value (which may itself be undefined for
// pre-#656 nodes — retrieval handles that case).
sessionId: incoming.sessionId ?? existing.sessionId,
updatedAt: capturedAt,
};
}
Expand Down Expand Up @@ -378,6 +384,11 @@ function parseAttrs(raw: string): Record<string, string> {
function parseGraphXml(
xml: string,
observationIds: string[],
// obsId -> sessionId, so each extracted node can record the session
// namespace of its source observations (#656). Optional: when omitted
// (or an obsId is missing), the node's sessionId is left undefined and
// retrieval falls back to a cross-session scan.
sessionByObsId?: Map<string, string>,
): {
nodes: GraphNode[];
edges: GraphEdge[];
Expand Down Expand Up @@ -405,12 +416,27 @@ function parseGraphXml(
while ((propMatch = propRegex.exec(propsBlock)) !== null) {
properties[propMatch[1]] = propMatch[2];
}
// Resolve the node's sessionId from its source observations. All
// observationIds in one extract call share the batch, so the first
// resolvable session is representative; merges later refresh it to
// the newest source (see mergeNode).
let sessionId: string | undefined;
if (sessionByObsId) {
for (const obsId of observationIds) {
const sid = sessionByObsId.get(obsId);
if (sid) {
sessionId = sid;
break;
}
}
}
nodes.push({
id: generateId("gn"),
type,
name,
properties,
sourceObservationIds: observationIds,
...(sessionId !== undefined && { sessionId }),
createdAt: now,
});
};
Expand Down Expand Up @@ -478,7 +504,14 @@ export function registerGraphFunction(
);

const obsIds = data.observations.map((o) => o.id);
const { nodes, edges } = parseGraphXml(response, obsIds);
// Map each source observation to its session so extracted nodes
// can be resolved back to KV.observations(sessionId) at retrieval
// time (#656). Skip blanks defensively.
const sessionByObsId = new Map<string, string>();
for (const o of data.observations) {
if (o.sessionId) sessionByObsId.set(o.id, o.sessionId);
}
const { nodes, edges } = parseGraphXml(response, obsIds, sessionByObsId);

// #814 v2: targeted name-index lookups replace the O(n) scan
// over `kv.list<GraphNode>(KV.graphNodes)`. At 75K nodes the
Expand Down
31 changes: 30 additions & 1 deletion src/functions/temporal-graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ Rules:
function parseTemporalGraphXml(
xml: string,
observationIds: string[],
// obsId -> sessionId, so temporal nodes record the session namespace of
// their source observations (#656). Optional; absent obsIds leave the
// node's sessionId undefined and retrieval falls back to a scan.
sessionByObsId?: Map<string, string>,
): { nodes: GraphNode[]; edges: GraphEdge[] } {
const nodes: GraphNode[] = [];
const edges: GraphEdge[] = [];
Expand All @@ -74,12 +78,23 @@ function parseTemporalGraphXml(
aliases.push(propMatch[1]);
}

let sessionId: string | undefined;
if (sessionByObsId) {
for (const obsId of observationIds) {
const sid = sessionByObsId.get(obsId);
if (sid) {
sessionId = sid;
break;
}
}
}
nodes.push({
id: generateId("gn"),
type,
name,
properties,
sourceObservationIds: observationIds,
...(sessionId !== undefined && { sessionId }),
createdAt: now,
aliases: aliases.length > 0 ? aliases : undefined,
});
Expand Down Expand Up @@ -158,6 +173,7 @@ export function registerTemporalGraphFunctions(
async (data: {
observations: Array<{
id: string;
sessionId?: string;
title: string;
narrative: string;
concepts: string[];
Expand All @@ -184,7 +200,17 @@ export function registerTemporalGraphFunctions(
);

const obsIds = data.observations.map((o) => o.id);
const { nodes, edges } = parseTemporalGraphXml(response, obsIds);
// Map source observations to their sessions so temporal nodes
// resolve back to KV.observations(sessionId) at retrieval (#656).
const sessionByObsId = new Map<string, string>();
for (const o of data.observations) {
if (o.sessionId) sessionByObsId.set(o.id, o.sessionId);
}
const { nodes, edges } = parseTemporalGraphXml(
response,
obsIds,
sessionByObsId,
);

const existingNodes = await kv.list<GraphNode>(KV.graphNodes);
const existingEdges = await kv.list<GraphEdge>(KV.graphEdges);
Expand All @@ -206,6 +232,9 @@ export function registerTemporalGraphFunctions(
]),
],
properties: { ...existing.properties, ...node.properties },
// Refresh to the newest source's session (#656); keep the
// existing value when this extract couldn't resolve one.
sessionId: node.sessionId ?? existing.sessionId,
updatedAt: new Date().toISOString(),
aliases: [
...new Set([
Expand Down
6 changes: 6 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,12 @@ export interface GraphNode {
name: string;
properties: Record<string, unknown>;
sourceObservationIds: string[];
// sessionId of the most recent source observation. Carried so graph
// retrieval can resolve a node's observations back to their KV
// namespace (KV.observations is keyed by sessionId). Optional for
// backward compatibility with nodes written before #656; retrieval
// falls back to a cross-session obsId scan when absent.
sessionId?: string;
createdAt: string;
updatedAt?: string;
aliases?: string[];
Expand Down
Loading