diff --git a/mcp/src/__tests__/neo4j-retry.test.ts b/mcp/src/__tests__/neo4j-retry.test.ts index fc335ca4c..49e82309a 100644 --- a/mcp/src/__tests__/neo4j-retry.test.ts +++ b/mcp/src/__tests__/neo4j-retry.test.ts @@ -105,3 +105,262 @@ test.describe("Neo4j createIndexes retry loop", () => { } }); }); + +// --------------------------------------------------------------------------- +// withNeo4jRetry unit tests +// --------------------------------------------------------------------------- + +/** + * Minimal inline re-implementation of withNeo4jRetry for isolated testing. + * Mirrors the real implementation in mcp/src/utils/neo4jRetry.ts exactly so + * these tests validate the algorithm without importing the module (which would + * pull in the neo4j-driver and env side-effects). + */ + +type FakeSession = { run: () => Promise; close: () => Promise }; +type FakeDriver = { session: () => FakeSession; close: () => Promise }; + +const TRANSIENT_CODES = new Set([ + "ServiceUnavailable", + "SessionExpired", + "Neo.TransientError.General.DatabaseUnavailable", +]); + +function isTransient(err: any): boolean { + if (!err) return false; + const code: string = err.code || err.name || ""; + if (TRANSIENT_CODES.has(code)) return true; + const msg: string = err.message || ""; + return ( + msg.includes("EAI_AGAIN") || + msg.includes("ServiceUnavailable") || + msg.includes("SessionExpired") + ); +} + +async function withNeo4jRetryInline( + getDriver: () => FakeDriver, + setDriver: (d: FakeDriver) => void, + op: (session: FakeSession) => Promise, + label: string, + maxAttempts: number, + createDriver: () => FakeDriver, + delays: number[] +): Promise { + let attempt = 0; + while (true) { + const session = getDriver().session(); + try { + const result = await op(session); + return result; + } catch (err: any) { + await session.close().catch(() => {}); + + if (!isTransient(err) || attempt >= maxAttempts - 1) { + throw err; + } + + const backoffMs = 50 * Math.pow(2, Math.min(attempt, 6)); + delays.push(backoffMs); + + try { + await getDriver().close(); + } catch (_) {} + setDriver(createDriver()); + + await new Promise((resolve) => setTimeout(resolve, 0)); // zero for tests + attempt++; + } finally { + await session.close().catch(() => {}); + } + } +} + +function makeTransientError(code: string, message?: string): Error { + const err: any = new Error(message || code); + err.code = code; + return err; +} + +function makeFakeDriver(): FakeDriver { + return { + session: () => ({ + run: async () => ({}), + close: async () => {}, + }), + close: async () => {}, + }; +} + +test.describe("withNeo4jRetry", () => { + test("succeeds on first attempt without calling setDriver", async () => { + let driver = makeFakeDriver(); + let setDriverCalled = 0; + const delays: number[] = []; + + const result = await withNeo4jRetryInline( + () => driver, + (d) => { driver = d; setDriverCalled++; }, + async (_session) => "ok", + "test-label", + 3, + makeFakeDriver, + delays + ); + + expect(result).toBe("ok"); + expect(setDriverCalled).toBe(0); + expect(delays).toHaveLength(0); + }); + + test("retries and recreates driver on ServiceUnavailable", async () => { + let driver = makeFakeDriver(); + let setDriverCallCount = 0; + let opCallCount = 0; + const delays: number[] = []; + + await withNeo4jRetryInline( + () => driver, + (d) => { driver = d; setDriverCallCount++; }, + async (_session) => { + opCallCount++; + if (opCallCount < 3) throw makeTransientError("ServiceUnavailable"); + return "ok"; + }, + "test-service-unavailable", + 3, + makeFakeDriver, + delays + ); + + expect(opCallCount).toBe(3); + // setDriver called once per failure (2 failures) + expect(setDriverCallCount).toBe(2); + expect(delays).toHaveLength(2); + }); + + test("retries and recreates driver on SessionExpired", async () => { + let driver = makeFakeDriver(); + let setDriverCallCount = 0; + let opCallCount = 0; + const delays: number[] = []; + + await withNeo4jRetryInline( + () => driver, + (d) => { driver = d; setDriverCallCount++; }, + async (_session) => { + opCallCount++; + if (opCallCount === 1) throw makeTransientError("SessionExpired"); + return "done"; + }, + "test-session-expired", + 3, + makeFakeDriver, + delays + ); + + expect(opCallCount).toBe(2); + expect(setDriverCallCount).toBe(1); + }); + + test("retries on EAI_AGAIN in error message", async () => { + let driver = makeFakeDriver(); + let opCallCount = 0; + const delays: number[] = []; + + await withNeo4jRetryInline( + () => driver, + (d) => { driver = d; }, + async (_session) => { + opCallCount++; + if (opCallCount === 1) { + throw new Error("getaddrinfo EAI_AGAIN neo4j.sphinx"); + } + return "ok"; + }, + "test-eai-again", + 3, + makeFakeDriver, + delays + ); + + expect(opCallCount).toBe(2); + }); + + test("throws immediately on non-transient error without retrying", async () => { + let driver = makeFakeDriver(); + let opCallCount = 0; + const delays: number[] = []; + + const nonTransient: any = new Error("Syntax error in query"); + nonTransient.code = "Neo.ClientError.Statement.SyntaxError"; + + await expect( + withNeo4jRetryInline( + () => driver, + (d) => { driver = d; }, + async (_session) => { + opCallCount++; + throw nonTransient; + }, + "test-non-transient", + 3, + makeFakeDriver, + delays + ) + ).rejects.toThrow("Syntax error in query"); + + expect(opCallCount).toBe(1); + expect(delays).toHaveLength(0); + }); + + test("throws after exhausting maxAttempts", async () => { + let driver = makeFakeDriver(); + let opCallCount = 0; + const delays: number[] = []; + const MAX = 3; + + await expect( + withNeo4jRetryInline( + () => driver, + (d) => { driver = d; }, + async (_session) => { + opCallCount++; + throw makeTransientError("ServiceUnavailable", `fail ${opCallCount}`); + }, + "test-exhaustion", + MAX, + makeFakeDriver, + delays + ) + ).rejects.toThrow("fail 3"); + + expect(opCallCount).toBe(MAX); + // delays recorded for first (MAX-1) failures; last failure throws + expect(delays).toHaveLength(MAX - 1); + }); + + test("exponential backoff delays follow 50ms * 2^attempt pattern", async () => { + let driver = makeFakeDriver(); + let opCallCount = 0; + const delays: number[] = []; + + await expect( + withNeo4jRetryInline( + () => driver, + (d) => { driver = d; }, + async (_session) => { + opCallCount++; + throw makeTransientError("ServiceUnavailable"); + }, + "test-backoff", + 4, // 3 failures then exhausted + makeFakeDriver, + delays + ) + ).rejects.toThrow(); + + // delays for attempt 0, 1, 2 → 50*1, 50*2, 50*4 + expect(delays).toEqual([50, 100, 200]); + }); +}); diff --git a/mcp/src/gitree/store/graphStorage.ts b/mcp/src/gitree/store/graphStorage.ts index 5bfd8d56b..32a04f5a1 100644 --- a/mcp/src/gitree/store/graphStorage.ts +++ b/mcp/src/gitree/store/graphStorage.ts @@ -1,4 +1,5 @@ -import neo4j, { Driver } from "neo4j-driver"; +import neo4j, { Driver, Session } from "neo4j-driver"; +import { createNeo4jDriver, withNeo4jRetry } from "../../utils/neo4jRetry.js"; import { v4 as uuidv4 } from "uuid"; import { Storage } from "./storage.js"; import { @@ -69,78 +70,80 @@ export class GraphStorage extends Storage { constructor() { super(); - const uri = `neo4j://${process.env.NEO4J_HOST || "localhost:7687"}`; + this.driver = createNeo4jDriver(); + const host = process.env.NEO4J_HOST || "localhost:7687"; const user = process.env.NEO4J_USER || "neo4j"; - const pswd = process.env.NEO4J_PASSWORD || "testtest"; - console.log("===> GraphStorage connecting to", uri, user); - this.driver = neo4j.driver(uri, neo4j.auth.basic(user, pswd)); + console.log("===> GraphStorage connecting to", `bolt://${host}`, user); + } + + private async runWithRetry(op: (session: Session) => Promise, label: string): Promise { + return withNeo4jRetry(() => this.driver, (d) => { this.driver = d; }, op, label); } /** * Initialize indexes for better query performance */ async initialize(): Promise { - const session = this.driver.session(); try { - // Create indexes on id/number/sha for fast lookups - await session.run( - "CREATE INDEX feature_id_index IF NOT EXISTS FOR (f:Feature) ON (f.id)" - ); - await session.run( - "CREATE INDEX pr_number_index IF NOT EXISTS FOR (p:PullRequest) ON (p.number)" - ); - await session.run( - "CREATE INDEX commit_sha_index IF NOT EXISTS FOR (c:Commit) ON (c.sha)" - ); - await session.run( - "CREATE INDEX clue_id_index IF NOT EXISTS FOR (c:Clue) ON (c.id)" - ); - await session.run( - "CREATE INDEX clue_feature_index IF NOT EXISTS FOR (c:Clue) ON (c.featureId)" - ); - // Indexes for code nodes (for REFERENCES edges) - await session.run( - "CREATE INDEX function_name_index IF NOT EXISTS FOR (f:Function) ON (f.name)" - ); - await session.run( - "CREATE INDEX class_name_index IF NOT EXISTS FOR (c:Class) ON (c.name)" - ); - await session.run( - "CREATE INDEX endpoint_name_index IF NOT EXISTS FOR (e:Endpoint) ON (e.name)" - ); - await session.run( - "CREATE INDEX datamodel_name_index IF NOT EXISTS FOR (d:Datamodel) ON (d.name)" - ); - await session.run( - "CREATE INDEX var_name_index IF NOT EXISTS FOR (v:Var) ON (v.name)" - ); - - // Multi-repo indexes - await session.run( - "CREATE INDEX feature_repo_index IF NOT EXISTS FOR (f:Feature) ON (f.repo)" - ); - await session.run( - "CREATE INDEX pr_repo_index IF NOT EXISTS FOR (p:PullRequest) ON (p.repo)" - ); - await session.run( - "CREATE INDEX commit_repo_index IF NOT EXISTS FOR (c:Commit) ON (c.repo)" - ); - await session.run( - "CREATE INDEX clue_repo_index IF NOT EXISTS FOR (c:Clue) ON (c.repo)" - ); - await session.run( - "CREATE INDEX pr_id_index IF NOT EXISTS FOR (p:PullRequest) ON (p.id)" - ); - await session.run( - "CREATE INDEX commit_id_index IF NOT EXISTS FOR (c:Commit) ON (c.id)" - ); - - // Run migration if needed - await this.migrateToMultiRepo(); + await this.runWithRetry(async (session) => { + // Create indexes on id/number/sha for fast lookups + await session.run( + "CREATE INDEX feature_id_index IF NOT EXISTS FOR (f:Feature) ON (f.id)" + ); + await session.run( + "CREATE INDEX pr_number_index IF NOT EXISTS FOR (p:PullRequest) ON (p.number)" + ); + await session.run( + "CREATE INDEX commit_sha_index IF NOT EXISTS FOR (c:Commit) ON (c.sha)" + ); + await session.run( + "CREATE INDEX clue_id_index IF NOT EXISTS FOR (c:Clue) ON (c.id)" + ); + await session.run( + "CREATE INDEX clue_feature_index IF NOT EXISTS FOR (c:Clue) ON (c.featureId)" + ); + // Indexes for code nodes (for REFERENCES edges) + await session.run( + "CREATE INDEX function_name_index IF NOT EXISTS FOR (f:Function) ON (f.name)" + ); + await session.run( + "CREATE INDEX class_name_index IF NOT EXISTS FOR (c:Class) ON (c.name)" + ); + await session.run( + "CREATE INDEX endpoint_name_index IF NOT EXISTS FOR (e:Endpoint) ON (e.name)" + ); + await session.run( + "CREATE INDEX datamodel_name_index IF NOT EXISTS FOR (d:Datamodel) ON (d.name)" + ); + await session.run( + "CREATE INDEX var_name_index IF NOT EXISTS FOR (v:Var) ON (v.name)" + ); + + // Multi-repo indexes + await session.run( + "CREATE INDEX feature_repo_index IF NOT EXISTS FOR (f:Feature) ON (f.repo)" + ); + await session.run( + "CREATE INDEX pr_repo_index IF NOT EXISTS FOR (p:PullRequest) ON (p.repo)" + ); + await session.run( + "CREATE INDEX commit_repo_index IF NOT EXISTS FOR (c:Commit) ON (c.repo)" + ); + await session.run( + "CREATE INDEX clue_repo_index IF NOT EXISTS FOR (c:Clue) ON (c.repo)" + ); + await session.run( + "CREATE INDEX pr_id_index IF NOT EXISTS FOR (p:PullRequest) ON (p.id)" + ); + await session.run( + "CREATE INDEX commit_id_index IF NOT EXISTS FOR (c:Commit) ON (c.id)" + ); + + // Run migration if needed + await this.migrateToMultiRepo(); + }, "GraphStorage.initialize"); } catch (error) { console.error("Error creating GraphStorage indexes:", error); - } finally { - await session.close(); } } @@ -403,8 +406,7 @@ export class GraphStorage extends Storage { } async getAllFeatures(repo?: string): Promise { - const session = this.driver.session(); - try { + return this.runWithRetry(async (session) => { const result = await session.run( ` MATCH (f:Feature) @@ -418,9 +420,7 @@ export class GraphStorage extends Storage { return result.records.map((record) => this.nodeToFeature(record.get("f")) ); - } finally { - await session.close(); - } + }, "GraphStorage.getAllFeatures"); } async deleteFeature(id: string, repo?: string): Promise { diff --git a/mcp/src/graph/neo4j.ts b/mcp/src/graph/neo4j.ts index 88aa3f2de..9d21a5257 100644 --- a/mcp/src/graph/neo4j.ts +++ b/mcp/src/graph/neo4j.ts @@ -1,4 +1,5 @@ import neo4j, { Driver, Session } from "neo4j-driver"; +import { createNeo4jDriver, withNeo4jRetry } from "../utils/neo4jRetry.js"; import fs from "fs"; import readline from "readline"; import { ImportanceTag, ImportanceTopNode, TaggedNode } from "../importance/types.js"; @@ -65,11 +66,14 @@ class Db { private driver: Driver; constructor() { - const uri = `neo4j://${process.env.NEO4J_HOST || "localhost:7687"}`; + this.driver = createNeo4jDriver(); + const host = process.env.NEO4J_HOST || "localhost:7687"; const user = process.env.NEO4J_USER || "neo4j"; - const pswd = process.env.NEO4J_PASSWORD || "testtest"; - console.log("===> connecting to", uri, user); - this.driver = neo4j.driver(uri, neo4j.auth.basic(user, pswd)); + console.log("===> connecting to", `bolt://${host}`, user); + } + + private async runWithRetry(op: (session: Session) => Promise, label: string): Promise { + return withNeo4jRetry(() => this.driver, (d) => { this.driver = d; }, op, label); } async get_pkg_files(): Promise { @@ -1607,15 +1611,12 @@ class Db { status: string; error_message: string; }): Promise { - const session = this.driver.session(); - try { + await this.runWithRetry(async (session) => { await session.run(Q.UPSERT_AGENT_SESSION_QUERY, { ...params, ts: Date.now() / 1000, }); - } finally { - await session.close(); - } + }, "Db.upsert_agent_session"); } async list_agent_sessions(): Promise { diff --git a/mcp/src/utils/neo4jRetry.ts b/mcp/src/utils/neo4jRetry.ts new file mode 100644 index 000000000..e626b872e --- /dev/null +++ b/mcp/src/utils/neo4jRetry.ts @@ -0,0 +1,83 @@ +import neo4j, { Driver, Session } from "neo4j-driver"; + +/** + * Single source of truth for Neo4j driver construction. + * Uses bolt:// instead of neo4j:// to avoid routing table discovery, + * which can fail permanently after DNS resolution errors (EAI_AGAIN). + */ +export function createNeo4jDriver(): Driver { + const host = process.env.NEO4J_HOST || "localhost:7687"; + const user = process.env.NEO4J_USER || "neo4j"; + const pswd = process.env.NEO4J_PASSWORD || "testtest"; + const uri = `bolt://${host}`; + return neo4j.driver(uri, neo4j.auth.basic(user, pswd)); +} + +const TRANSIENT_CODES = new Set([ + "ServiceUnavailable", + "SessionExpired", + "Neo.TransientError.General.DatabaseUnavailable", +]); + +function isTransient(err: any): boolean { + if (!err) return false; + const code: string = err.code || err.name || ""; + if (TRANSIENT_CODES.has(code)) return true; + const msg: string = err.message || ""; + return msg.includes("EAI_AGAIN") || msg.includes("ServiceUnavailable") || msg.includes("SessionExpired"); +} + +/** + * Wraps a Neo4j session operation with exponential-backoff retry and + * driver recreation on transient errors (ServiceUnavailable, SessionExpired, + * EAI_AGAIN). Mirrors the Rust `with_transient_retry_reconnect` pattern. + * + * @param getDriver Returns the current driver instance + * @param setDriver Called with a freshly created driver after recreation + * @param op The session operation to run + * @param label Human-readable label for log messages + * @param maxAttempts Number of total attempts (default from NEO4J_RETRY_ATTEMPTS env, fallback 3) + */ +export async function withNeo4jRetry( + getDriver: () => Driver, + setDriver: (d: Driver) => void, + op: (session: Session) => Promise, + label: string, + maxAttempts: number = parseInt(process.env.NEO4J_RETRY_ATTEMPTS || "3", 10) +): Promise { + let attempt = 0; + + while (true) { + const session = getDriver().session(); + try { + const result = await op(session); + return result; + } catch (err: any) { + await session.close().catch(() => {}); + + if (!isTransient(err) || attempt >= maxAttempts - 1) { + throw err; + } + + // Exponential backoff: 50ms * 2^attempt, capped at 2^6 doublings + const backoffMs = 50 * Math.pow(2, Math.min(attempt, 6)); + console.warn( + `[neo4j-retry] transient error on '${label}' (attempt ${attempt + 1}/${maxAttempts}), retrying in ${backoffMs}ms: ${err?.message || err}` + ); + + // Recreate the driver to clear stale routing/connection state + try { + await getDriver().close(); + } catch (_) { + // ignore close errors + } + setDriver(createNeo4jDriver()); + + await new Promise((resolve) => setTimeout(resolve, backoffMs)); + attempt++; + } finally { + // Session may already be closed on error path — close is idempotent + await session.close().catch(() => {}); + } + } +}