diff --git a/README.md b/README.md index 035f29b..792a4cd 100644 --- a/README.md +++ b/README.md @@ -75,8 +75,8 @@ codememory serve codememory search "where is the auth logic?" # Git graph (rollout build) -codememory git-init --repo /absolute/path/to/repo --mode local --full-history -codememory git-sync --repo /absolute/path/to/repo --incremental +codememory git-init --repo /absolute/path/to/repo +codememory git-sync --repo /absolute/path/to/repo --full codememory git-status --repo /absolute/path/to/repo --json ``` @@ -136,6 +136,14 @@ Full workflow and options: [docs/TOOL_USE_ANNOTATION.md](docs/TOOL_USE_ANNOTATIO | `get_file_dependencies(file_path, domain="code")` | Returns imports and dependents for a file | | `identify_impact(file_path, max_depth=3, domain="code")` | Blast radius analysis for changes | | `get_file_info(file_path, domain="code")` | File structure overview (classes, functions) | +| `create_memory_entities(entities)` | Create or update agent-authored memory nodes in Neo4j | +| `create_memory_relations(relations)` | Create typed relationships between memory nodes | +| `add_memory_observations(observations)` | Append observation strings to existing memory nodes | +| `delete_memory_entities(entity_names)` | Delete memory nodes by name | +| `delete_memory_relations(relations)` | Delete typed relationships between memory nodes | +| `delete_memory_observations(observations)` | Remove observation strings from memory nodes | +| `search_memory_nodes(query, limit=5)` | Search memory nodes by name, type, and observations | +| `read_memory_graph()` | Read a summary of the current memory graph | | `get_git_file_history(file_path, limit=20, domain="git")` | File-level commit history and ownership signals (git rollout) | | `get_commit_context(sha, include_diff_stats=true)` | Commit metadata and change statistics (git rollout) | | `find_recent_risky_changes(path_or_symbol, window_days, domain="hybrid")` | Recent high-risk changes using hybrid signals (git rollout) | diff --git a/docs/API.md b/docs/API.md index 6a21627..7776961 100644 --- a/docs/API.md +++ b/docs/API.md @@ -253,7 +253,7 @@ $ codememory serve **Server behavior:** - Runs until interrupted (Ctrl+C) -- Exposes 4 MCP tools (see [MCP Tools](#mcp-tools)) +- Exposes MCP tools for code graph queries, git graph queries, and agent-authored memory writes (see [MCP Tools](#mcp-tools)) - Uses local config or environment variables - Graceful shutdown on SIGTERM/SIGINT @@ -846,30 +846,137 @@ print(f"Cost: ${metrics['cost_usd']:.4f}") ##### `semantic_search()` -Perform vector similarity search. +Perform vector similarity search with optional multi-repo filtering. ```python -def semantic_search(self, query: str, limit: int = 5) -> List[Dict] +def semantic_search( + self, + query: str, + limit: int = 5, + repo_id: Optional[str] = None +) -> List[Dict] ``` +**Parameters:** +| Parameter | Type | Required | Default | Description | +|-----------|------|----------|---------|-------------| +| `query` | str | Yes | - | Natural language search query | +| `limit` | int | No | 5 | Maximum results to return | +| `repo_id` | Optional[str] | No | None | Restrict results to a specific repo. Falls back to `self.repo_id` if set. | + +**Behavior when `repo_id` is active:** +- Over-fetches `limit × 3` candidates from the vector index +- Adds a `WHERE entity.repo_id = $repo_id` filter after the DESCRIBE hop +- Calls `_rerank_results()` to score and trim to `limit` + **Returns:** ```python [ { "name": "authenticate", "sig": "src/auth.py:authenticate", - "score": 0.92, + "score": 0.92, # raw vector similarity (0–1) + "final_score": 0.94, # 0.9×vector_score + structural_bonus "text": "def authenticate(username, password):..." }, ... ] ``` +- `final_score` is always present when `repo_id` filtering is active (via `_rerank_results()`). + **Example:** ```python -results = builder.semantic_search("JWT validation", limit=3) +results = builder.semantic_search("JWT validation", limit=3, repo_id="my-service") for r in results: - print(f"{r['name']} - Score: {r['score']:.2f}") + print(f"{r['name']} - Score: {r['score']:.2f} Final: {r['final_score']:.2f}") +``` + +--- + +##### `_rerank_results()` + +Private method. Re-scores a candidate list by combining vector similarity with graph connectivity bonuses, then trims to `limit`. + +```python +def _rerank_results(self, results: List[Dict], limit: int) -> List[Dict] +``` + +**Parameters:** +| Parameter | Type | Description | +|-----------|------|-------------| +| `results` | List[Dict] | Candidate results (over-fetched, each with a `score` field) | +| `limit` | int | Final number of results to return | + +**Scoring formula:** +``` +final_score = 0.9 × vector_score + structural_bonus +``` + +**Connectivity bonuses (structural_bonus):** +| Relation | Bonus | +|----------|-------| +| `calls_out` | +0.05 | +| `called_by` | +0.05 | +| `methods` | +0.03 | + +**Behavior:** +- Sorts descending by `final_score` +- Trims list to `limit` +- Adds `final_score` key to each result dict + +**GDS upgrade path:** Replace heuristic bonuses with `entity.pagerank` from `gds.pageRank.write()` once GDS is available. + +**Note:** This is a private method — call `semantic_search()` directly; it invokes `_rerank_results()` internally. + +--- + +##### `search_memory_nodes()` + +Search the graph for memory nodes (agent-authored notes and observations) with optional repo filtering. Returns both outgoing and incoming relations for each result. + +```python +def search_memory_nodes( + self, + query: str, + limit: int = 5, + repo_id: Optional[str] = None +) -> List[Dict] +``` + +**Parameters:** +| Parameter | Type | Required | Default | Description | +|-----------|------|----------|---------|-------------| +| `query` | str | Yes | - | Natural language search query | +| `limit` | int | No | 5 | Maximum results to return | +| `repo_id` | Optional[str] | No | None | Restrict results to a specific repo. Falls back to `self.repo_id` if set. | + +**Returns:** +```python +[ + { + "name": "note_about_auth", + "sig": "memory:note_about_auth", + "score": 0.88, + "text": "Authentication flow requires...", + "outgoing_relations": [ + {"target": "src/auth.py:authenticate", "relation_type": "REFERENCES"} + ], + "incoming_relations": [ + {"source": "src/api/routes/auth.py", "relation_type": "DOCUMENTED_BY"} + ] + }, + ... +] +``` + +**`incoming_relations` format:** `[{"source": str, "relation_type": str}, ...]` + +**Example:** +```python +nodes = builder.search_memory_nodes("auth flow notes", limit=5, repo_id="my-service") +for n in nodes: + print(f"{n['name']} ({len(n['incoming_relations'])} incoming)") ``` --- @@ -1129,7 +1236,8 @@ def get_indexing_config(self) -> Dict[str, Any] { "name": str, # Entity name "sig": str, # Entity signature - "score": float, # Similarity (0-1) + "score": float, # Raw vector similarity (0–1) + "final_score": float, # Reranked score: 0.9×score + structural_bonus (present when repo_id filtering is active) "text": str # Code snippet } ``` @@ -1146,5 +1254,5 @@ def get_indexing_config(self) -> Dict[str, Any] --- -**API Version:** 1.0.0 -**Last Updated:** 2025-02-09 +**API Version:** 1.1.0 +**Last Updated:** 2026-04-05 diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index a3abe50..733e447 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -235,6 +235,35 @@ FOR (n:Function|Class|File) ON EACH [n.name, n.docstring, n.path] --- +## Multi-Repo Partitioning (repo_id) + +CodeMemory supports multiple repositories in a single Neo4j database using `repo_id` partitioning. + +### Identity Model + +| Node | Old identity | New identity | +|------|-------------|--------------| +| File | `path` (global) | `(repo_id, path)` (composite) | +| Function | `signature` (global) | `(repo_id, signature)` (composite) | +| Class | `qualified_name` (global) | `(repo_id, qualified_name)` (composite) | +| Memory | `name` (global) | `(repo_id, name)` (composite) | + +A `Repository` anchor node (`{repo_id, root_path}`) is also created per repo. + +### Backward Compatibility + +When `CODEMEMORY_REPO` is not set, `repo_id` is `None` and all queries omit the repo filter — identical to the pre-partitioning behavior. + +### Retrieval Model + +When `repo_id` is active, `semantic_search()` over-fetches by 3x, filters by `entity.repo_id`, then applies structural reranking (`_rerank_results()`) before returning the final result set. This prevents worktree pollution (multiple indexed copies of the same function appearing in results). + +### GDS Upgrade Path + +When Aura API credentials are available (`gds.aura.api.credentials(clientId, clientSecret)`), replace the heuristic structural bonus in `_rerank_results()` with GDS-computed `entity.pagerank`. See comments in `graph.py` near `_rerank_results()`. + +--- + ## 4-Pass Ingestion Pipeline The ingestion pipeline processes code in 4 sequential passes to build the complete graph. diff --git a/docs/FIELD_TEST_TEMPLATE.md b/docs/FIELD_TEST_TEMPLATE.md index c60d0cf..85acef3 100644 --- a/docs/FIELD_TEST_TEMPLATE.md +++ b/docs/FIELD_TEST_TEMPLATE.md @@ -28,8 +28,8 @@ codememory index codememory status --json # 2) Git graph setup + sync -codememory git-init --repo /absolute/path/to/repo --mode local --full-history -codememory git-sync --repo /absolute/path/to/repo --incremental +codememory git-init --repo /absolute/path/to/repo +codememory git-sync --repo /absolute/path/to/repo --full codememory git-status --repo /absolute/path/to/repo --json # 3) Optional MCP checks (domain routing) @@ -62,7 +62,7 @@ Record exact values from command output. ### Performance - `codememory index` elapsed time: -- `codememory git-sync --incremental` elapsed time: +- `codememory git-sync` elapsed time: - Embedding calls: - Token usage: - Estimated cost: @@ -71,7 +71,7 @@ Record exact values from command output. - [ ] PASS / FAIL: `git-init` succeeds with expected repo metadata. - [ ] PASS / FAIL: first `git-sync` ingests history and sets checkpoint. -- [ ] PASS / FAIL: second `git-sync --incremental` with no new commits reports zero new commits. +- [ ] PASS / FAIL: second `git-sync` with no new commits reports zero new commits. - [ ] PASS / FAIL: `git-status --json` returns stable envelope (`ok`, `error`, `data`, `metrics`). - [ ] PASS / FAIL: code graph queries still work with git graph enabled. - [ ] PASS / FAIL: `domain="code"` queries return expected code entities. diff --git a/docs/GIT_GRAPH.md b/docs/GIT_GRAPH.md index a7befa8..aa1272e 100644 --- a/docs/GIT_GRAPH.md +++ b/docs/GIT_GRAPH.md @@ -52,17 +52,12 @@ Use explicit domain routing in MCP tool calls: Initialize git graph metadata and checkpoint state for a repository. ```bash -codememory git-init \ - --repo /absolute/path/to/repo \ - --mode local \ - --full-history +codememory git-init --repo /absolute/path/to/repo ``` Common options: - `--repo PATH` -- `--mode local|local+github` -- `--full-history` -- `--since ` +- `--json` Expected output (human-readable): @@ -78,14 +73,17 @@ Checkpoint: Sync commits from git history into the git graph. ```bash -codememory git-sync --repo /absolute/path/to/repo --incremental +# Initial full backfill +codememory git-sync --repo /absolute/path/to/repo --full + +# Later incremental updates +codememory git-sync --repo /absolute/path/to/repo ``` Common options: - `--repo PATH` -- `--incremental` - `--full` -- `--from-ref ` +- `--json` Expected output (human-readable): @@ -153,8 +151,8 @@ Expected JSON envelope: Quick validation sequence: ```bash -codememory git-init --repo /absolute/path/to/repo --mode local --full-history -codememory git-sync --repo /absolute/path/to/repo --incremental +codememory git-init --repo /absolute/path/to/repo +codememory git-sync --repo /absolute/path/to/repo --full codememory git-status --repo /absolute/path/to/repo --json ``` diff --git a/docs/MCP_INTEGRATION.md b/docs/MCP_INTEGRATION.md index 4fe93bf..06bc359 100644 --- a/docs/MCP_INTEGRATION.md +++ b/docs/MCP_INTEGRATION.md @@ -776,7 +776,7 @@ Before refactoring: codememory search "function_name" codememory impact path/to/file.py # Optional git graph sync (git-enabled builds) -codememory git-sync --repo /absolute/path/to/repo --incremental +codememory git-sync --repo /absolute/path/to/repo ``` ### 5. Keep Index Updated diff --git a/docs/MULTI_CODEBASE_RESEARCH.md b/docs/MULTI_CODEBASE_RESEARCH.md new file mode 100644 index 0000000..17c9a0f --- /dev/null +++ b/docs/MULTI_CODEBASE_RESEARCH.md @@ -0,0 +1,565 @@ +# Multi-Codebase Research Notes for CodeMemory + +Prepared: 2026-04-03 +Scope: current CodeMemory behavior, multi-codebase design options, and relevant production patterns from vector/graph systems. + +--- + +## 1. Executive Summary + +- CodeMemory is still effectively a single-codebase system for both the main code graph and the new memory graph. +- The main blockers are identity and retrieval: + - identities such as `File.path` and `Memory.name` are currently global + - code and memory search do not currently scope by repository +- There is already an internal precedent for repo scoping: + - the git graph uses `repo_id` + - it already uses composite uniqueness constraints such as `(repo_id, sha)` +- There are two serious architecture directions: + - separate Neo4j database per repo + - one shared Neo4j database with `repo_id` partitioning everywhere +- If the product goal is hard isolation first, separate databases are the cleanest model. +- If the product goal is one Aura instance plus future cross-repo intelligence, shared database plus `repo_id` is the better long-term model. +- If CodeMemory stays shared-database, retrieval should not stop at plain vector search. The graph adds most value when retrieval is: + - seed retrieval + - repo scoping + - local graph expansion + - reranking + +--- + +## 2. Current State in This Repo + +### 2.1 Code Graph + +- Main implementation lives in: + - `D:\code\codememory\src\codememory\ingestion\graph.py` + - `D:\code\codememory\src\codememory\server\app.py` + - `D:\code\codememory\src\codememory\server\tools.py` +- Current code search index: + - vector index `code_embeddings` +- Current code search behavior in `KnowledgeGraphBuilder.semantic_search()`: + 1. embed query + 2. vector search `db.index.vector.queryNodes('code_embeddings', ...)` + 3. hop from `Chunk` to described entity via `[:DESCRIBES]` + 4. return `name`, `sig`, `text` +- Important limitation: + - the default code search path does not return `CALLS`, `IMPORTS`, or broader graph neighborhoods in the same result set + - graph relationships are exposed through separate tools like dependency and impact analysis + +### 2.2 Memory Graph + +- Current memory support is in the same Neo4j database under label `Memory`. +- Current memory schema setup in `KnowledgeGraphBuilder.setup_memory_schema()` creates: + - uniqueness constraint on `Memory.name` + - vector index `memory_embeddings` + - fulltext index `memory_search` +- Current memory write behavior: + - `create_memory_entities()` creates or updates `(:Memory {name, type, entity_type, observations, observation_text, metadata_json})` + - a normalized secondary label is applied from the entity type + - `create_memory_relations()` creates real typed relationships such as `CONTAINS` or `BUILDS_ON` + - `add_memory_observations()` and `delete_memory_observations()` refresh the node embedding when OpenAI is configured + - `backfill_memory_embeddings()` exists for historical nodes +- Current memory search behavior in `KnowledgeGraphBuilder.search_memory_nodes()`: + - without OpenAI: fulltext only + - with OpenAI: vector search on `memory_embeddings` unioned with fulltext search + - after candidate selection, outgoing `(:Memory)-[r]->(:Memory)` relationships are returned as `outgoing_relations` +- Important limitation: + - memory relationships are returned after the node search + - relationship embeddings are not indexed today + +### 2.3 Identity and Multi-Repo Gaps + +- Main code graph currently has no usable repo partition field. +- `repo_id` does not appear on `File`, `Function`, `Class`, `Chunk`, or `Memory`. +- Current single-repo assumptions: + - `File.path` is globally unique + - `Function.signature` is globally unique + - `Memory.name` is globally unique +- Result: + - two repos with the same relative path would collide + - two repos with the same memory node name would collide + - search results are not repo-scoped today + +### 2.4 Existing Internal Precedent: Git Graph + +- Git graph implementation lives in: + - `D:\code\codememory\src\codememory\ingestion\git_graph.py` +- It already uses: + - `repo_id = str(repo_root)` + - `GitRepo {repo_id}` + - composite constraints like: + - `(repo_id, sha)` on `GitCommit` + - `(repo_id, email_norm)` on `GitAuthor` + - `(repo_id, sha, path)` on `GitFileVersion` +- This matters because it proves CodeMemory already has one working repo-scoped design inside the same Neo4j instance. + +--- + +## 3. Live Aura Snapshot + +Observed on 2026-04-03 via read-only inspection against the configured Aura instance: + +- Databases currently online: + - `neo4j` + - `system` +- Neo4j component report: + - `Neo4j Kernel 5.27-aura` + - `edition: enterprise` +- Important interpretation: + - multiple standard databases are supported in Neo4j Enterprise in principle + - this specific Aura deployment currently exposes only one standard database: `neo4j` + - that is a deployment fact, not a general Neo4j limitation +- Existing live memory shape already aligned with: + - base label `Memory` + - `name`, `type`, `observations` + - typed relationships between memory nodes + +--- + +## 4. Key Concepts + +### 4.1 Composite Uniqueness and Key Constraints + +- A single-property uniqueness constraint says: + - "`path` must be unique everywhere" +- A composite uniqueness constraint says: + - "the pair `(repo_id, path)` must be unique" +- That is the right model when identity is repo-scoped instead of global. +- Examples that fit CodeMemory: + - `File(repo_id, path)` + - `Function(repo_id, signature)` + - `Chunk(repo_id, chunk_id)` or another repo-scoped chunk identity + - `Memory(repo_id, name)` +- Use uniqueness constraints when existence of all fields is not guaranteed. +- Use key constraints when the properties must both: + - always exist + - be unique as a combination + +Why this matters: + +- without repo in the identity, multiple repos will overwrite or collide +- with repo in the identity, identical paths or memory names can safely exist in different repos + +### 4.2 Vector Index Scope + +- Historically, Neo4j vector indexes are schema objects over a label/type and a property. +- Current Neo4j docs say: + - a vector index can index nodes or relationships + - as of Neo4j 2026.01, vector indexes can also be multi-label, multi-relationship-type, or include additional properties for filtering +- Current CodeMemory code still uses the older procedural query style: + - `db.index.vector.queryNodes(...)` +- Practical implication: + - today, CodeMemory should still be thought of as using shared node-type indexes + - repo-aware filtering is not implemented in the current code path + - newer Neo4j filtering features may improve the shared-database design later, but they should be treated as version-sensitive and unverified for this deployment + +### 4.3 Query-Time Filtering + +- Query-time filtering does not mean traversing the whole graph before the search. +- The usual shape is: + 1. vector search finds candidate nodes + 2. candidates are filtered or reranked for the active repo + 3. graph expansion happens only around the surviving candidates if needed +- The real performance risk is not graph traversal first. +- The real performance risk is: + - a shared index may return too many wrong-repo candidates in the top `k` + - after filtering, too few useful results remain +- Common fix: + - over-fetch candidates + - then rerank after repo scoping + +### 4.4 Graph-Aware Retrieval + +- If retrieval stops at plain ANN top-k, the graph is underused. +- A more graph-native retrieval pipeline looks like: + 1. retrieve semantic seed nodes + 2. scope to the active repo + 3. expand nearby graph structure + 4. rerank using semantic and structural signals + 5. return graph-aware context to the model +- For CodeMemory, structural signals could include: + - same file + - same class + - direct `CALLS` + - direct `IMPORTS` + - memory relation overlap + - git co-change or provenance links later + +--- + +## 5. Architecture Options + +### Option A: Separate Neo4j Database Per Repo + +### Summary + +- One repo gets one standard Neo4j database. +- Code graph and memory graph for that repo live together inside that database. + +### Strengths + +- strongest isolation boundary +- simplest correctness story +- no cross-repo collisions by construction +- each repo gets its own constraints and indexes +- retrieval never needs repo filtering + +### Weaknesses + +- operationally heavier +- requires database lifecycle management + - create + - delete + - backup + - connect to the right database +- harder to support cross-repo search later +- migration and tooling become more DB-aware + +### Best Fit + +- enterprise or regulated environments +- few to moderate number of repos +- strong tenant isolation requirement +- little or no need for cross-repo queries + +### Relevance to Current Aura + +- Neo4j Enterprise supports multiple standard databases in principle. +- This specific Aura instance currently shows only one standard database online. +- Before choosing this path, verify whether the plan and permissions actually allow creating additional databases in this deployment. + +--- + +### Option B: Shared Neo4j Database With `repo_id` Partitioning + +### Summary + +- Keep one database, typically `neo4j` +- Put all repos into that graph +- Add `repo_id` to all code and memory nodes +- Make identities composite instead of global + +### Required Schema Changes + +- Add `repo_id` to: + - `File` + - `Function` + - `Class` + - `Chunk` + - `Memory` +- Add or reuse a repository anchor node such as: + - `(:Repository {repo_id, name, root_path})` +- Update constraints to repo-scoped forms such as: + - `FOR (f:File) REQUIRE (f.repo_id, f.path) IS UNIQUE` + - `FOR (m:Memory) REQUIRE (m.repo_id, m.name) IS UNIQUE` + +### Strengths + +- one Aura instance +- one MCP server +- easier future cross-repo search +- operationally simpler than many databases +- matches the repo-scoped git graph precedent already in the codebase + +### Weaknesses + +- retrieval becomes more subtle +- every write and read path must carry repo context +- migrations are more invasive +- naive top-k vector search plus blind filtering is not enough + +### Best Fit + +- many repos +- one shared platform +- desire for optional future cross-repo intelligence +- willingness to implement repo-aware retrieval properly + +### Recommended Retrieval Shape If This Option Is Chosen + +1. Retrieve more candidates than final output requires. +2. Scope or filter to the active repo. +3. Expand local graph structure. +4. Rerank using semantic plus structural features. +5. Return only active-repo results by default. + +--- + +### Option C: Shared Database With One Vector Index Per Repo + +### Summary + +- Keep one database +- create repo-specific vector indexes such as: + - `code_embeddings_repo_a` + - `code_embeddings_repo_b` + - `memory_embeddings_repo_a` + +### Why This Is Usually Not The Right Default + +- every new repo creates new index lifecycle work +- index count scales with repo count +- index build time and storage overhead scale with repo count +- code has to route every query to the correct dynamic index +- cross-repo search becomes a fan-out problem +- Neo4j does not give CodeMemory a namespace-first multitenancy abstraction similar to Pinecone namespaces or Weaviate tenants + +### When It Might Be Justified + +- very small number of repos +- extremely strict performance targets on shared indexes +- no expectation that repo count will grow much + +### Overall Assessment + +- possible +- not impossible +- but usually a poor default for a growing multi-repo system inside one Neo4j database + +--- + +### Option D: Hybrid Promotion Model + +### Summary + +- Start small repos in a shared database +- promote very large or high-isolation repos to dedicated databases later + +### Why It Is Interesting + +- matches real-world tenant skew +- avoids overcommitting early +- lets the platform support both: + - cheap shared mode + - premium isolated mode + +### Tradeoff + +- most flexible +- most operationally complex + +### Real-World Analogy + +- Qdrant’s tiered multitenancy is conceptually similar: + - small tenants share infrastructure + - large tenants can be promoted into more isolated placement + +--- + +## 6. Real-World Patterns and Implementations + +| System | Common tenant model | What they recommend | Why it matters for CodeMemory | +|---|---|---|---| +| Pinecone | One index per workload, one namespace per tenant | Writes and queries always target a namespace | Strong argument that namespace-style partitioning is the normal vector-native pattern, not one ANN index per tenant | +| Qdrant | Usually one collection per embedding model, tenant partitioning inside it | Prefer payload-based partitioning for most cases; add dedicated shards for large tenants | Strong precedent for shared infrastructure first, then promotion when size skews | +| Weaviate | Multi-tenancy inside a collection | Separate tenant shards with lower overhead than separate collections | Strong precedent for shared schema plus tenant-aware partitions | +| Milvus | Multiple levels: database, collection, partition, partition key | Partition-key-based multitenancy for scale; DB-level for strongest isolation | Useful comparison because it explicitly documents several tradeoff tiers | +| Neo4j | Multiple standard databases in Enterprise; vector indexes on node/relationship schemas | Good for graph-native workloads, but multitenancy needs explicit schema or DB design | Most relevant for CodeMemory because we need graph structure plus embeddings, not embeddings alone | + +### 6.1 Pinecone + +- Official pattern: + - one namespace per tenant inside an index + - all data plane operations target one namespace +- Pinecone frames this as: + - tenant isolation + - faster queries + - simpler offboarding +- Important takeaway for CodeMemory: + - vector-native systems usually provide a first-class tenant boundary inside the index + - Neo4j does not currently give CodeMemory that same ergonomic abstraction in the current code path + +### 6.2 Qdrant + +- Official guidance says: + - do not create huge numbers of collections by default + - in most cases, use a single collection per embedding model with tenant partitioning +- Qdrant also documents tiered multitenancy: + - shared fallback shard for small tenants + - dedicated shards for large tenants + - tenant promotion as they grow +- Important takeaway for CodeMemory: + - this is a strong precedent for a hybrid future if repo sizes become highly uneven + +### 6.3 Weaviate + +- Official guidance says: + - multi-tenancy isolates data for different tenants within a single Weaviate instance + - each tenant is stored on a separate shard + - multi-tenancy has lower overhead than one collection per tenant +- Important takeaway for CodeMemory: + - shared schema plus tenant-scoped physical placement is a proven pattern + - this is closer to shared DB plus `repo_id` than to dynamic index-per-repo + +### 6.4 Milvus + +- Milvus explicitly documents several multi-tenancy levels: + - database + - collection + - partition + - partition key +- Milvus recommends partition key when scale matters because it narrows the search scope and avoids scanning irrelevant partitions. +- Important takeaway for CodeMemory: + - the product decision is really about placement level + - stronger isolation and better scaling are traded off against flexibility and operational complexity + +### 6.5 Neo4j-Specific Takeaway + +- Neo4j can support multiple standard databases in Enterprise. +- Neo4j vector indexes now also support more advanced schema options in the latest docs. +- However, CodeMemory today is not using Neo4j like a tenant-native vector store. +- CodeMemory today is a graph system with vector-assisted retrieval. +- That means the important product decision is not only index placement. +- It is also: + - identity model + - repo context propagation + - graph-aware retrieval behavior + +--- + +## 7. What This Means for CodeMemory + +### If The Goal Is Hard Isolation First + +- Choose separate database per repo. +- Keep code and memory together inside each repo database. +- Simplify retrieval because repo scoping becomes implicit. + +### If The Goal Is One Aura Instance Plus Future Cross-Repo Intelligence + +- Choose shared database plus `repo_id`. +- Migrate code graph and memory graph to repo-scoped identities. +- Rework retrieval so it is repo-aware and graph-aware. + +### What Should Not Ship + +- shared database with global uniqueness on `File.path` +- shared database with global uniqueness on `Memory.name` +- shared database with no repo context in MCP tools +- naive vector top-k followed by blind filtering and no reranking +- dynamic per-repo index creation as the default scaling model + +--- + +## 8. Concrete Shared-Database Design Sketch + +### 8.1 Identity + +- Add `repo_id` everywhere. +- Add a repository node: + - `(:Repository {repo_id, root_path, remote_url, default_branch})` + +### 8.2 Constraints + +- `FOR (f:File) REQUIRE (f.repo_id, f.path) IS UNIQUE` +- `FOR (fn:Function) REQUIRE (fn.repo_id, fn.signature) IS UNIQUE` +- `FOR (c:Class) REQUIRE (c.repo_id, c.name, c.file_path) IS UNIQUE` or another stable repo-scoped class identity +- `FOR (m:Memory) REQUIRE (m.repo_id, m.name) IS UNIQUE` + +### 8.3 Query Model + +- MCP tools receive repo context explicitly or derive it from configured repo root. +- Search defaults to active repo only. +- Cross-repo search must be opt-in. + +### 8.4 Retrieval Model + +1. vector retrieve seed candidates +2. keep active repo candidates +3. expand local graph neighborhood +4. rerank +5. return final context + +### 8.5 Migration Shape + +1. define stable repo identity +2. add `repo_id` to all newly written nodes +3. backfill historical nodes +4. create repo-scoped constraints +5. update search and write tools +6. remove legacy global assumptions + +--- + +## 9. Open Questions Worth Researching Next + +- Does the current Aura plan permit creating and managing multiple standard databases? +- How many repos should one Aura deployment realistically support? +- Do users need cross-repo search, or only strict per-repo isolation? +- Should repo context be implicit from `CODEMEMORY_REPO`, explicit in every MCP tool, or both? +- Do we need a concept of global/shared memory in addition to repo-scoped memory? +- Should memory retrieval stay node-level, or do some memory entries need chunking later? +- Should reranking happen in Neo4j, in application code, or via a separate reranker model? +- Are Neo4j’s latest vector filtering features available on this exact Aura deployment and query path? + +--- + +## 10. Provisional Recommendation + +- Short term: + - choose between hard-isolation and shared-platform goals first + - do not make index-level decisions before that product choice +- If forced to choose today for CodeMemory specifically: + - pick shared database plus `repo_id` + - because the repo already has a `repo_id` precedent in the git graph + - and because this preserves future cross-repo capabilities +- But only choose shared database if retrieval is upgraded beyond naive vector top-k. + +--- + +## 11. Sources + +### Official Neo4j + +- Neo4j database administration: + - [https://neo4j.com/docs/operations-manual/current/database-administration/](https://neo4j.com/docs/operations-manual/current/database-administration/) +- Neo4j vector indexes: + - [https://neo4j.com/docs/cypher-manual/current/indexes/semantic-indexes/vector-indexes/](https://neo4j.com/docs/cypher-manual/current/indexes/semantic-indexes/vector-indexes/) +- Neo4j constraints overview: + - [https://neo4j.com/docs/cypher-manual/current/schema/constraints/](https://neo4j.com/docs/cypher-manual/current/schema/constraints/) +- Neo4j create constraints: + - [https://neo4j.com/docs/cypher-manual/current/schema/constraints/create-constraints/](https://neo4j.com/docs/cypher-manual/current/schema/constraints/create-constraints/) + +### Pinecone + +- Pinecone multitenancy: + - [https://docs.pinecone.io/guides/index-data/implement-multitenancy](https://docs.pinecone.io/guides/index-data/implement-multitenancy) +- Pinecone namespaces: + - [https://docs.pinecone.io/guides/indexes/use-namespaces](https://docs.pinecone.io/guides/indexes/use-namespaces) + +### Qdrant + +- Qdrant multitenancy: + - [https://qdrant.tech/documentation/manage-data/multitenancy/](https://qdrant.tech/documentation/manage-data/multitenancy/) +- Qdrant multitenancy with LlamaIndex: + - [https://qdrant.tech/documentation/examples/llama-index-multitenancy/](https://qdrant.tech/documentation/examples/llama-index-multitenancy/) + +### Weaviate + +- Weaviate multi-tenancy operations: + - [https://docs.weaviate.io/weaviate/manage-collections/multi-tenancy](https://docs.weaviate.io/weaviate/manage-collections/multi-tenancy) +- Weaviate collection definition and multi-tenancy config: + - [https://docs.weaviate.io/weaviate/config-refs/schema](https://docs.weaviate.io/weaviate/config-refs/schema) +- Weaviate data concepts: + - [https://docs.weaviate.io/weaviate/concepts/data](https://docs.weaviate.io/weaviate/concepts/data) +- Weaviate best practices: + - [https://docs.weaviate.io/weaviate/best-practices](https://docs.weaviate.io/weaviate/best-practices) + +### Milvus + +- Milvus partition key: + - [https://milvus.io/docs/v2.4.x/use-partition-key.md](https://milvus.io/docs/v2.4.x/use-partition-key.md) +- Milvus multi-tenancy strategies: + - [https://milvus.io/docs/v2.2.x/multi_tenancy.md](https://milvus.io/docs/v2.2.x/multi_tenancy.md) + +--- + +## 12. Repo Evidence + +- Main code graph and memory graph: + - `D:\code\codememory\src\codememory\ingestion\graph.py` +- MCP layer: + - `D:\code\codememory\src\codememory\server\app.py` + - `D:\code\codememory\src\codememory\server\tools.py` +- Existing repo-scoped precedent: + - `D:\code\codememory\src\codememory\ingestion\git_graph.py` diff --git a/docs/MULTI_CODEBASE_RESEARCH_REPORT.md b/docs/MULTI_CODEBASE_RESEARCH_REPORT.md new file mode 100644 index 0000000..b718b2a --- /dev/null +++ b/docs/MULTI_CODEBASE_RESEARCH_REPORT.md @@ -0,0 +1,182 @@ +# Multi-Codebase GraphRAG Research Report + +## Executive Summary + +This report distills findings from an exported research session focused on multi-codebase GraphRAG architecture for CodeMemory. + +Top conclusion: start with a **single Neo4j graph plus `repo_id` partitioning** (Pattern A), then add a migration path toward stronger isolation if scale or tenancy requirements demand it. + +A critical practical finding from live probing: retrieval quality is currently harmed more by **duplicate indexed worktree paths** than by multi-repo architecture limits. + +## Scope and Method + +The source session synthesized: + +- 22 sources across academia, vendor documentation, and large-scale industry implementations +- comparative architecture analysis for multi-tenancy and multi-corpus GraphRAG +- live Aura capability probing on the active environment +- feasibility testing of graph-enriched retrieval in pure Cypher + +## Research Findings + +### 1. Microsoft GraphRAG Baseline + +- Microsoft GraphRAG core paper (arXiv:2404.16130) describes hierarchical community summaries, primarily for a single corpus. +- Open discussions indicate no official merged multi-corpus indexing solution; per-corpus pipelines remain common. +- Example discussion: https://github.com/microsoft/graphrag/discussions/1635 + +**Takeaway:** per-corpus indexing + query-time federation remains the de facto pattern. + +### 2. Academic Consensus + +- Recent surveys and system papers characterize GraphRAG as a pipeline of graph indexing, graph retrieval, and graph-informed generation. +- Multi-tenant/multi-corpus handling is acknowledged as important but not standardized. + +**Takeaway:** there is no dominant canonical architecture for multi-corpus GraphRAG yet. + +### 3. Enterprise Knowledge Graph Patterns + +- Enterprise systems commonly keep a **shared graph substrate** with domain/type-based partitioning. +- Federation layers are added for cross-domain querying and governance at scale. + +**Takeaway:** large organizations often favor one graph with logical partitioning before physical separation. + +### 4. Neo4j Multi-Tenancy Patterns + +- Official patterns support federation/sharding with constraints. +- Cross-database relationships are limited in federated setups. +- Third-party guidance highlights tradeoffs: + - label/property partitioning: flexible but leakage/complexity risk + - separate instances: strongest isolation, highest operational overhead + - multi-database: practical middle ground for moderate tenant counts + +**Takeaway:** architecture should keep an explicit escalation path from logical to physical isolation. + +### 5. Vector-DB Maturity vs Graph-Layer Gap + +- Qdrant, Weaviate, Pinecone have mature tenant partitioning constructs. +- PropertyGraphIndex ecosystems usually leave multi-tenancy policy to application/graph design. + +**Takeaway:** vector partitioning is mostly solved; graph partitioning still requires custom design decisions. + +### 6. Code Intelligence Signals + +- Google/Kythe and Sourcegraph/SCIP patterns validate graph-driven code retrieval at very large scale. +- A practical reference model is **per-repo indexing with stable cross-repo identifiers**. + +**Takeaway:** cross-repo symbol identity should be first-class even if storage starts unified. + +### 7. GraphRAG Algorithmic Variants + +- HippoRAG-style Personalized PageRank (PPR) offers a principled way to localize retrieval in mixed graphs while preserving cross-repo traversal when structurally justified. + +**Takeaway:** PPR is a strong medium-term upgrade path for relevance and dilution control. + +## Architecture Options for CodeMemory + +| Pattern | Description | Best Fit | Tradeoffs | +|---|---|---|---| +| A. Single Graph + `repo_id` | One Neo4j database with logical partitioning | Small to medium repo counts | Easiest to implement; requires strict filtering and governance | +| B. Database per Repo + Federation | Separate DB per repo with federated queries | Larger installations with stronger isolation requirements | Better isolation; added complexity and cross-DB relationship limits | +| C. Hybrid Escalation | Start A, promote heavy repos to dedicated DBs | Mixed workloads over time | Operational flexibility; migration complexity | + +**Recommended starting point:** Pattern A with explicit migration hooks toward Pattern C/B. + +## Dilution and Relevance Strategy + +A layered strategy emerged from the session: + +1. **Pre-filter at retrieval time** when available. +2. **Over-fetch + post-filter** as fallback (`k * multiplier` then filter). +3. **Graph-structural re-ranking** using topology signals. +4. **PPR-based ranking** where Graph Data Science is available. +5. **Cross-encoder re-ranking** only for a small final candidate set when high precision is needed. + +Example composite score: + +`final_score = α*vector_sim + β*structural_proximity + γ*co_change + δ*recency` + +## Aura Capability Probe Results (from session) + +### Confirmed Available + +- Neo4j Aura Enterprise: `5.27-aura` +- Cypher versions: default Cypher 5, Cypher 25 available +- Vector indexes present (`code_embeddings`, `memory_embeddings`) +- Post-`YIELD` filtering in vector queries works +- `vector.similarity.cosine()` available +- `db.index.vector.queryRelationships` available +- `genai.vector.encodeBatch` available (config-dependent) + +### Available but Blocked by Configuration + +- GDS algorithms (PageRank/Leiden/ArticleRank) discovered but gated by Aura API credentials for projection workflows. + +### Not Available in Current Environment + +- Composite database/fabric workflows +- Additional tenant databases (only `neo4j` and `system` observed) + +## Critical Issues Identified + +1. **Worktree pollution in indexed paths** +- Duplicate code from worktree directories appears in top retrieval hits. +- This currently causes major relevance degradation, independent of multi-repo design. + +2. **Semantic search under-enriched by default** +- Current response shape is mostly vector hit metadata and snippet. +- Missing default structural context (file, callers/callees, sibling symbols, imports). + +3. **Memory retrieval asymmetry** +- Outgoing relationship enrichment exists; incoming/contextual edges are limited. + +4. **CALLS edge property sparsity** +- No useful relationship properties currently available for weighting (e.g., frequency/strength). + +## Feasible Immediate Upgrade (No GDS Required) + +The session validated a pure-Cypher enriched retrieval pattern: + +- vector seed (`db.index.vector.queryNodes`) +- `Chunk -> DESCRIBES -> entity` +- recover containing file via `DEFINES`/`CONTAINS` +- collect `CALLS` (outgoing and incoming) +- include class methods (`HAS_METHOD`) +- include file dependencies (`IMPORTS`) +- include sibling functions via file-level `DEFINES` + +This supports out-of-the-box graph-enriched answers without additional infrastructure. + +## Prioritized Recommendations + +### Immediate (before multi-repo expansion) + +1. Exclude transient worktree paths from ingestion/indexing. +2. Make semantic search graph-enriched by default. +3. Expand memory search to include incoming and richer neighborhood context. + +### Multi-Repo Enablement + +4. Add `repo_id` to all relevant node/edge types. +5. Enforce repo-scoped retrieval via post-`YIELD` filtering and over-fetch fallback. +6. Add structural re-ranking features (same-file, call distance, import path, co-change, recency). + +### Advanced Phase + +7. Configure Aura API credentials for GDS projection functions. +8. Evaluate Leiden/PPR-based retrieval for module-aware ranking. +9. Consider optional cross-encoder final-stage reranking for high-precision workflows. + +## Decision + +Proceed with: + +- **Pattern A** (single graph + `repo_id`) as the default architecture +- mandatory ingestion hygiene (remove worktree duplication) +- graph-enriched retrieval as the default response mode +- an explicit migration path toward hybrid/physical isolation when scale requires it + +## Notes + +- This report is a transformation of the exported session log in `RESEARCH-SESSION.txt`. +- It intentionally preserves conclusions and validated capability findings while removing chat/tool noise. diff --git a/docs/TROUBLESHOOTING.md b/docs/TROUBLESHOOTING.md index cf8cfb6..5196b2f 100644 --- a/docs/TROUBLESHOOTING.md +++ b/docs/TROUBLESHOOTING.md @@ -275,7 +275,9 @@ If missing, upgrade to a git graph-enabled release/build. **Solution:** ```bash -codememory git-init --repo /absolute/path/to/repo --mode local --full-history +codememory git-init --repo /absolute/path/to/repo +# For the initial backfill: +codememory git-sync --repo /absolute/path/to/repo --full # Confirm /absolute/path/to/repo contains a .git directory ``` diff --git a/docs/plans/ultraplan-4.5.26.md b/docs/plans/ultraplan-4.5.26.md new file mode 100644 index 0000000..658d3a7 --- /dev/null +++ b/docs/plans/ultraplan-4.5.26.md @@ -0,0 +1,383 @@ +# Ultraplan 4.5.26: Multi-Repo Collision Fix via repo_id Partitioning + +## Context + +**Immediate motivation:** Worktree paths (`.kilo/worktrees/...`, `.claude/worktrees/...`) are polluting search results right now — confirmed by the Obsidian doc (4 copies of 2 functions in top-8 results). This is a multi-repo collision problem that exists in production today. + +**Architecture decision:** Multiple databases are unavailable on this Aura instance (only neo4j + system). Shared database with repo_id partitioning is the only viable path — which also matches the git graph's existing pattern in `git_graph.py`. + +## What's Already Done + +Tier 1 graph enrichment is implemented — `semantic_search()` already returns `file_path`, `calls_out`, `called_by`, `methods`, `file_imports`, `siblings` (see `graph.py:1110-1141`). The plan does NOT need to re-implement this. + +## Hardened Completion Bar + +- [x] `repo_id` on all code/memory identities written by ingestion and memory CRUD +- [x] Composite uniqueness constraints replacing global ones +- [x] Query-time filtering by `repo_id` in code search and memory search +- [x] `search_memory_nodes()` incoming relations +- [x] Application-level reranking in `semantic_search()` +- [x] All ingestion passes scoped when `repo_root` / `repo_id` is active +- [x] All code-domain reads scoped when `repo_root` / `repo_id` is active +- [x] Memory read/search/backfill paths scoped when `repo_root` / `repo_id` is active +- [x] CLI and MCP both propagate repo context consistently +- [ ] GDS/Leiden path (future work only; not part of migration completion) + +--- + +## Current vs Target Architecture + +| Aspect | Current | Target | +|--------|---------|--------| +| **File** | `{path}` ← global unique | `{repo_id, path}` ← composite unique | +| **Function** | `{sig}` ← global unique | `{repo_id, sig}` | +| **Class** | `{qual}` ← global unique | `{repo_id, qual}` | +| **Memory** | `{name}` ← global unique | `{repo_id, name}` | +| **Chunk** | (no id) | (filter via `entity.repo_id`) | +| **init_graph()** | no `repo_root` passed | `repo_root=repo_root` passed | +| **Schema** | `self.repo_id = None` | `self.repo_id = str(repo_root.resolve())` | +| **semantic_search()** | ANN top-k (no filter) | ANN top-(limit×3) → WHERE repo_id → rerank → return top-limit | +| **search_memory_nodes()** | outgoing relations only | outgoing + incoming relations; filter by repo_id | + +--- + +## Critical Files + +| File | Purpose | +|------|---------| +| `src/codememory/ingestion/graph.py` | All schema, ingestion, and search changes | +| `src/codememory/server/app.py` | `init_graph()` at line 136 (pass `repo_root`) | +| `src/codememory/ingestion/git_graph.py` | Reference model for `repo_id = str(repo_root)` | + +--- + +## Step-by-Step Implementation + +### Step 1 — Wire repo_id into KnowledgeGraphBuilder + +**File:** `graph.py:128-190` (`__init__`) + +In `__init__`, after `self.repo_root = repo_root`, add: + +```python +self.repo_id: Optional[str] = str(repo_root.resolve()) if repo_root else None +``` + +This mirrors the git graph pattern exactly (`git_graph.py:98`). + +--- + +### Step 2 — Pass repo_root from init_graph() + +**File:** `app.py:171-176` (`init_graph`) + +Change the `KnowledgeGraphBuilder` instantiation to: + +```python +graph = KnowledgeGraphBuilder( + uri=uri, + user=user, + password=password, + openai_key=openai_key, + repo_root=repo_root, # ← add this +) +``` + +`repo_root` is already resolved by lines 144-149 in the same function. + +--- + +### Step 3 — Schema: composite constraints + Repository anchor + +**File:** `graph.py:275-309` (`setup_database`) + +Add a migration sub-step that: + +- Drops old global constraints (with `IF EXISTS`) +- Creates composite ones (with `IF NOT EXISTS`) + +**New constraint set** (replace the current 3 in queries): + +```cypher +DROP CONSTRAINT file_path_unique IF EXISTS +DROP CONSTRAINT function_sig_unique IF EXISTS +DROP CONSTRAINT class_name_unique IF EXISTS + +CREATE CONSTRAINT file_repo_path_unique IF NOT EXISTS + FOR (f:File) REQUIRE (f.repo_id, f.path) IS UNIQUE + +CREATE CONSTRAINT function_repo_sig_unique IF NOT EXISTS + FOR (fn:Function) REQUIRE (fn.repo_id, fn.signature) IS UNIQUE + +CREATE CONSTRAINT class_repo_qual_unique IF NOT EXISTS + FOR (c:Class) REQUIRE (c.repo_id, c.qualified_name) IS UNIQUE + +CREATE CONSTRAINT repo_id_unique IF NOT EXISTS + FOR (r:Repository) REQUIRE r.repo_id IS UNIQUE +``` + +Also add a Repository anchor node upsert at the end of `setup_database()`: + +```cypher +MERGE (r:Repository {repo_id: $repo_id}) +SET r.root_path = $root_path, r.updated_at = datetime() +``` + +Only run this if `self.repo_id` is set. + +> **Note:** The composite constraints won't prevent MERGE from working — they just enforce uniqueness at the `(repo_id, property)` level. Existing nodes without `repo_id` must be backfilled before dropping the old constraints. Add a `_backfill_repo_id()` method (see Step 4). + +--- + +### Step 4 — Backfill existing nodes + migration method + +**File:** `graph.py` — new method `migrate_repo_id()` on `KnowledgeGraphBuilder` + +Called automatically from `setup_database()` when `self.repo_id` is not `None`, before dropping old constraints: + +```python +def _backfill_repo_id(self, session): + """Set repo_id on all File/Function/Class/Memory nodes that lack it.""" + if not self.repo_id: + return + for label in ("File", "Function", "Class", "Memory"): + session.run( + f"MATCH (n:{label}) WHERE n.repo_id IS NULL SET n.repo_id = $repo_id", + repo_id=self.repo_id, + ) +``` + +**Call order inside `setup_database()`:** + +1. `_backfill_repo_id(session)` +2. Drop old constraints +3. Create composite constraints +4. Upsert Repository node + +--- + +### Step 5 — Ingestion passes: stamp repo_id on new nodes + +**File:** `graph.py` + +All `MERGE` statements that create `File`, `Function`, and `Class` nodes need `repo_id` added to both the identity key and the `SET` clause. + +**Pass 1** (`pass_1_structure_scan`, ~line 451): + +```cypher +-- Change identity from path alone to (repo_id, path) +MERGE (f:File {repo_id: $repo_id, path: $path}) +SET f.name = $name, f.ohash = $ohash, f.last_updated = datetime() +``` + +Also update the lookup `MATCH (f:File {path: $path})` at line 443 and `_delete_file_subgraph` at line 242 to include `repo_id` in the match if `self.repo_id` is set. + +**Pass 2** (`pass_2_entity_definition`, ~line 563): + +```cypher +-- Class: +MERGE (c:Class {repo_id: $repo_id, qualified_name: $sig}) +SET c.name = $name, c.code = $code + +-- Function: +MERGE (fn:Function {repo_id: $repo_id, signature: $sig}) +SET fn.name = $name, fn.code = $code +``` + +**Pass 3** (`pass_3_imports`, check file lines ~817+): + +Update `MATCH (f:File {path: ...})` lookups to include `repo_id` when available. + +**Pass 4** (`pass_4_call_graph`): + +Same — update `MATCH (fn:Function {signature: ...})` to scope by `repo_id`. + +**Chunk nodes:** No change needed — Chunk identity uses `randomUUID()`. Filtering goes via `entity.repo_id` after the `[:DESCRIBES]` hop. + +--- + +### Step 6 — Memory schema: composite constraint + +**File:** `graph.py:311-340` (`setup_memory_schema`) + +Replace: + +```python +"CREATE CONSTRAINT memory_name_unique IF NOT EXISTS " +f"FOR (m:{self.MEMORY_LABEL}) REQUIRE m.name IS UNIQUE" +``` + +With: + +```python +"DROP CONSTRAINT memory_name_unique IF EXISTS", +"CREATE CONSTRAINT memory_repo_name_unique IF NOT EXISTS " +f"FOR (m:{self.MEMORY_LABEL}) REQUIRE (m.repo_id, m.name) IS UNIQUE", +``` + +Also backfill Memory nodes the same way in `_backfill_repo_id`. + +--- + +### Step 7 — create_memory_entities(): stamp repo_id + +**File:** `graph.py:1366-1426` + +Change the `MERGE` identity from `{name: $name}` to `{repo_id: $repo_id, name: $name}` and add `repo_id` to the `ON CREATE SET` block: + +```cypher +MERGE (m:Memory {repo_id: $repo_id, name: $name}) +ON CREATE SET + m.repo_id = $repo_id, + ... +``` + +The `repo_id` param will be `self.repo_id` (passed through `_execute_create()`). + +Similar change needed in `delete_memory_entities()`, `create_memory_relations()`, `add_memory_observations()`, `delete_memory_observations()` — all `MATCH on Memory.name` need to scope by `repo_id`. + +--- + +### Step 8 — semantic_search(): repo filtering + over-fetch + +**File:** `graph.py:1040` — add `repo_id: Optional[str] = None` parameter + +```python +def semantic_search(self, query: str, limit: int = 5, repo_id: Optional[str] = None) -> List[Dict]: + active_repo = repo_id or self.repo_id + overfetch = limit * 3 # safe for ≤5 repos; scale up for more +``` + +**Vector path** — insert post-yield WHERE after `YIELD node as chunk, score`: + +```cypher +CALL db.index.vector.queryNodes('code_embeddings', $overfetch, $vec) +YIELD node as chunk, score +MATCH (chunk)-[:DESCRIBES]->(entity) +WHERE entity.repo_id = $repo_id -- post-yield filter (confirmed working on this Aura) +...existing graph expansions... +ORDER BY score DESC +LIMIT $limit +``` + +When `active_repo` is `None`, omit the `WHERE` clause entirely (backward-compatible for single-repo installs without `repo_id` stamped yet). + +**Fulltext fallback** — same pattern: add `WHERE node.repo_id = $repo_id` after the fulltext `YIELD` if `active_repo` is set. + +--- + +### Step 9 — search_memory_nodes(): repo filtering + incoming relations + +**File:** `graph.py:1695` + +Add `repo_id: Optional[str] = None` parameter. + +Current query only fetches outgoing: + +```cypher +OPTIONAL MATCH (node:Memory)-[r]->(target:Memory) +... +collect(outgoing) as outgoing_relations +``` + +Add incoming: + +```cypher +OPTIONAL MATCH (node:Memory)-[r_out]->(target:Memory) +OPTIONAL MATCH (source:Memory)-[r_in]->(node:Memory) +... +collect(DISTINCT CASE WHEN target IS NULL THEN NULL ELSE {target: target.name, relation_type: type(r_out)} END) as outgoing_relations, +collect(DISTINCT CASE WHEN source IS NULL THEN NULL ELSE {source: source.name, relation_type: type(r_in)} END) as incoming_relations +``` + +Add repo filter: `WHERE node.repo_id = $repo_id` in the subquery (or as a post-filter in Python) when `active_repo` is set. + +Update return value to include `incoming_relations` in the row dict. + +Update `_format_memory_entity_results()` in `app.py:360` to display incoming relations. + +--- + +### Step 10 — Application-level reranking + +**File:** `graph.py` — new private method `_rerank_results()` + +```python +def _rerank_results(self, results: List[Dict], limit: int) -> List[Dict]: + """ + Structural reranking after repo-filtered vector retrieval. + Structural bonuses from graph connectivity (CALLS properties are empty, + so use edge existence only). + """ + for r in results: + structural = 0.0 + if r.get("calls_out"): + structural += 0.05 # has outgoing calls + if r.get("called_by"): + structural += 0.05 # is called by others (more central) + if r.get("methods"): + structural += 0.03 # is a class with methods + r["final_score"] = r.get("score", 0.0) * 0.9 + structural + results.sort(key=lambda x: x["final_score"], reverse=True) + return results[:limit] +``` + +Call `_rerank_results()` at the end of `_execute_search()` in `semantic_search()`, after the repo-filtered results come back and before returning. + +Add `final_score` to the formatted output in `tools.py` if it differs meaningfully from `score`. + +--- + +### Step 11 — GDS/Leiden: what to do when Aura creds are available + +**No code changes now.** Document in a code comment in `graph.py` near `semantic_search()`: + +#### To unlock when `gds.graph.project` is available: + +- **PageRank reranking** — replace the structural bonus heuristic in `_rerank_results()` with a GDS-computed pagerank property on Function/Class nodes. Run `gds.pageRank.write()` periodically (or post-ingestion) per repo, write to `entity.pagerank`. Then: `final_score = 0.7*vector + 0.2*pagerank + 0.1*structural`. + +- **Leiden clustering** — run `gds.leiden.write()` per repo, write `community_id` to nodes. Use community membership as an additional grouping/filtering signal in search results (e.g., "found 3 results from cluster auth, 2 from cluster api"). + +- **Activation:** `gds.aura.api.credentials(clientId, clientSecret)` — one-time Aura console config step, then `gds.graph.project` becomes available. + +#### Available now without GDS: + +- `vector.similarity.cosine()` — pairwise reranking in Cypher +- BFS expansion: manually hop 1-2 levels on `CALLS` from seed results, collect neighbors as additional context + +--- + +## Migration Notes + +The constraint change is the trickiest part. **Execution order matters:** + +1. `_backfill_repo_id()` — set `repo_id` on all existing nodes +2. DROP old constraints — free the global uniqueness +3. CREATE composite ones — new identity model +4. UPSERT Repository node — anchor + +This is safe for a single-repo installation: all existing nodes get the same `repo_id` and the composite uniqueness is equivalent to the old global uniqueness for that one repo. + +For the watcher (`watcher.py`): no direct changes needed — it calls `run_pipeline()` which calls the passes that will now stamp `repo_id`. + +--- + +## Verification + +- **Worktree pollution fix:** run `semantic_search("metrics.py websocket")`, confirm results show only `realtime_api/websocket/metrics.py` paths, not `.kilo/worktrees/...` paths +- **Composite constraints:** `SHOW CONSTRAINTS` in Neo4j Browser — should show `(repo_id, path)` pair, no global `f.path` alone +- **Incoming memory relations:** run `search_memory_nodes("...")` on a node with known incoming edges — confirm `incoming_relations` field is populated +- **Reranking:** confirm `final_score` field appears in results and callable functions (with `called_by`) score higher than isolated utilities with the same vector score +- **Backward compat:** single-repo install where `CODEMEMORY_REPO` is not set should still work — `repo_id=None` path skips `WHERE` filter + +--- +## Completion Status + +Hardened on: 2026-04-07 + +- Multi-repo `repo_id` migration is considered complete only when: + - ingestion passes are repo-scoped + - code-domain reads are repo-scoped + - memory CRUD/search/read/backfill paths are repo-scoped + - both CLI and MCP propagate repo context consistently +- GDS/Leiden remains future work and is not required for repo isolation completion. diff --git a/src/codememory/cli.py b/src/codememory/cli.py index 465d235..7ed5499 100644 --- a/src/codememory/cli.py +++ b/src/codememory/cli.py @@ -373,18 +373,45 @@ def cmd_status(args): with builder.driver.session() as session: # Get stats - files = session.run("MATCH (f:File) RETURN count(f) as count").single()["count"] - functions = session.run("MATCH (fn:Function) RETURN count(fn) as count").single()[ - "count" - ] - classes = session.run("MATCH (c:Class) RETURN count(c) as count").single()["count"] - chunks = session.run("MATCH (ch:Chunk) RETURN count(ch) as count").single()["count"] + repo_params = {"repo_id": builder.repo_id} if builder.repo_id else {} + files = session.run( + "MATCH (f:File) " + + ("WHERE f.repo_id = $repo_id " if builder.repo_id else "") + + "RETURN count(f) as count", + **repo_params, + ).single()["count"] + functions = session.run( + "MATCH (fn:Function) " + + ("WHERE fn.repo_id = $repo_id " if builder.repo_id else "") + + "RETURN count(fn) as count", + **repo_params, + ).single()["count"] + classes = session.run( + "MATCH (c:Class) " + + ("WHERE c.repo_id = $repo_id " if builder.repo_id else "") + + "RETURN count(c) as count", + **repo_params, + ).single()["count"] + chunks = session.run( + """ + MATCH (ch:Chunk)-[:DESCRIBES]->(entity) + """ + + ("WHERE entity.repo_id = $repo_id " if builder.repo_id else "") + + "RETURN count(DISTINCT ch) as count", + **repo_params, + ).single()["count"] # Get last update - last_update = session.run(""" + last_update = session.run( + """ MATCH (f:File) + """ + + ("WHERE f.repo_id = $repo_id " if builder.repo_id else "") + + """ RETURN max(f.last_updated) as last_updated - """).single()["last_updated"] + """, + **repo_params, + ).single()["last_updated"] stats = { "files": files, @@ -555,6 +582,7 @@ def cmd_search(args): user=neo4j_cfg["user"], password=neo4j_cfg["password"], openai_key=openai_key, + repo_root=repo_root, ) try: @@ -604,6 +632,7 @@ def cmd_deps(args): user=neo4j_cfg["user"], password=neo4j_cfg["password"], openai_key=openai_key, + repo_root=repo_root, ) try: @@ -663,6 +692,7 @@ def cmd_impact(args): user=neo4j_cfg["user"], password=neo4j_cfg["password"], openai_key=openai_key, + repo_root=repo_root, ) try: diff --git a/src/codememory/ingestion/graph.py b/src/codememory/ingestion/graph.py index 8ad4bcf..a173a25 100644 --- a/src/codememory/ingestion/graph.py +++ b/src/codememory/ingestion/graph.py @@ -13,6 +13,7 @@ import math import posixpath import re +import json from pathlib import Path from typing import Any, List, Dict, Optional, Tuple, Set from functools import wraps @@ -122,6 +123,7 @@ class KnowledgeGraphBuilder: EMBEDDING_MODEL = "text-embedding-3-large" COST_PER_1M_TOKENS = 0.13 # USD VECTOR_DIMENSIONS = 3072 + MEMORY_LABEL = "Memory" def __init__( self, @@ -165,6 +167,7 @@ def __init__( self.openai_client = OpenAI(api_key=openai_key) if openai_key else None self.parsers = self._init_parsers() self.repo_root = repo_root + self.repo_id: Optional[str] = str(repo_root.resolve()) if repo_root else None self.token_usage = { "embedding_tokens": 0, "embedding_calls": 0, @@ -267,21 +270,24 @@ def _should_prune_file( def _delete_file_subgraph(self, session: neo4j.Session, rel_path: str): """Delete one File node and all derived entities/chunks.""" session.run( - """ - MATCH (chunk:Chunk)-[:DESCRIBES]->(f:File {path: $path}) + f""" + MATCH (chunk:Chunk)-[:DESCRIBES]->(f:File {self._file_key}) DETACH DELETE chunk """, - path=rel_path, + **self._with_repo(path=rel_path), ) session.run( - """ - MATCH (f:File {path: $path})-[:DEFINES]->(entity) + f""" + MATCH (f:File {self._file_key})-[:DEFINES]->(entity) OPTIONAL MATCH (chunk:Chunk)-[:DESCRIBES]->(entity) DETACH DELETE chunk, entity """, - path=rel_path, + **self._with_repo(path=rel_path), + ) + session.run( + f"MATCH (f:File {self._file_key}) DETACH DELETE f", + **self._with_repo(path=rel_path), ) - session.run("MATCH (f:File {path: $path}) DETACH DELETE f", path=rel_path) def _create_document_chunk( self, @@ -294,11 +300,11 @@ def _create_document_chunk( ) -> None: """Create semantic chunks attached directly to a File node.""" session.run( - """ - MATCH (chunk:Chunk)-[:DESCRIBES]->(f:File {path: $path}) + f""" + MATCH (chunk:Chunk)-[:DESCRIBES]->(f:File {self._file_key}) DETACH DELETE chunk """, - path=rel_path, + **self._with_repo(path=rel_path), ) if extension == ".md": @@ -310,29 +316,26 @@ def _create_document_chunk( return session.run( - """ - MATCH (f:File {path: $path}) + f""" + MATCH (f:File {self._file_key}) SET f.name = $name """, - path=rel_path, - name=file_name, + **self._with_repo(path=rel_path, name=file_name), ) for label, chunk_text in chunks: enriched_text = f"Context: File {rel_path} > {label}\n\n{chunk_text}" embedding = self.get_embedding(enriched_text) session.run( - """ - MATCH (f:File {path: $path}) - CREATE (ch:Chunk {id: randomUUID()}) + f""" + MATCH (f:File {self._file_key}) + CREATE (ch:Chunk {{id: randomUUID()}}) SET ch.text = $text, ch.embedding = $embedding, ch.created_at = datetime() MERGE (ch)-[:DESCRIBES]->(f) """, - path=rel_path, - text=chunk_text, - embedding=embedding, + **self._with_repo(path=rel_path, text=chunk_text, embedding=embedding), ) def _split_markdown_document( @@ -408,41 +411,155 @@ def close(self): # DATABASE SETUP # ========================================================================= + def _backfill_repo_id(self, session) -> None: + """Stamp repo_id on all existing nodes that lack it before migrating constraints.""" + if not self.repo_id: + return + for label in ("File", "Function", "Class", "Memory"): + session.run( + f"MATCH (n:{label}) WHERE n.repo_id IS NULL SET n.repo_id = $repo_id", + repo_id=self.repo_id, + ) + logger.info(f"✅ Backfilled repo_id='{self.repo_id}' on existing nodes.") + + def _with_repo(self, **kwargs) -> dict: + """Return kwargs with repo_id added when self.repo_id is set.""" + if self.repo_id: + kwargs["repo_id"] = self.repo_id + return kwargs + + @property + def _file_key(self) -> str: + """Cypher property map for File identity — scoped by repo_id when set.""" + return "{repo_id: $repo_id, path: $path}" if self.repo_id else "{path: $path}" + + @property + def _function_key(self) -> str: + return "{repo_id: $repo_id, signature: $sig}" if self.repo_id else "{signature: $sig}" + + @property + def _class_key(self) -> str: + return "{repo_id: $repo_id, qualified_name: $sig}" if self.repo_id else "{qualified_name: $sig}" + + @property + def _memory_key(self) -> str: + return "{repo_id: $repo_id, name: $name}" if self.repo_id else "{name: $name}" + def setup_database(self): """ Pass 0: Pre-flight Configuration. Creates constraints and vector indexes to optimize ingestion and retrieval. + When repo_id is set, migrates to composite (repo_id, property) constraints + for multi-repo support. """ logger.info("🚀 [Pass 0] Configuring Database Constraints & Indexes...") + with self.driver.session() as session: + # --- Migration: run before any constraint changes --- + if self.repo_id: + self._backfill_repo_id(session) + + base_queries = [ + # Vector Index for Hybrid Search + f""" + CREATE VECTOR INDEX code_embeddings IF NOT EXISTS + FOR (c:Chunk) ON (c.embedding) + OPTIONS {{indexConfig: {{ + `vector.dimensions`: {self.VECTOR_DIMENSIONS}, + `vector.similarity_function`: 'cosine' + }} }} + """, + # Fulltext Index for Keyword Search + """ + CREATE FULLTEXT INDEX entity_text_search IF NOT EXISTS + FOR (n:Function|Class|File) ON EACH [n.name, n.docstring, n.path] + """, + ] + + if self.repo_id: + # Composite constraints for multi-repo isolation + constraint_queries = [ + # Drop old global constraints (safe: IF EXISTS) + "DROP CONSTRAINT file_path_unique IF EXISTS", + "DROP CONSTRAINT function_sig_unique IF EXISTS", + "DROP CONSTRAINT class_name_unique IF EXISTS", + # Create composite (repo_id, property) constraints + "CREATE CONSTRAINT file_repo_path_unique IF NOT EXISTS " + "FOR (f:File) REQUIRE (f.repo_id, f.path) IS UNIQUE", + "CREATE CONSTRAINT function_repo_sig_unique IF NOT EXISTS " + "FOR (fn:Function) REQUIRE (fn.repo_id, fn.signature) IS UNIQUE", + "CREATE CONSTRAINT class_repo_qual_unique IF NOT EXISTS " + "FOR (c:Class) REQUIRE (c.repo_id, c.qualified_name) IS UNIQUE", + # Repository anchor node constraint + "CREATE CONSTRAINT repo_id_unique IF NOT EXISTS " + "FOR (r:Repository) REQUIRE r.repo_id IS UNIQUE", + ] + else: + # Legacy single-repo constraints (no repo_id) + constraint_queries = [ + "CREATE CONSTRAINT file_path_unique IF NOT EXISTS FOR (f:File) REQUIRE f.path IS UNIQUE", + "CREATE CONSTRAINT function_sig_unique IF NOT EXISTS FOR (f:Function) REQUIRE f.signature IS UNIQUE", + "CREATE CONSTRAINT class_name_unique IF NOT EXISTS FOR (c:Class) REQUIRE c.qualified_name IS UNIQUE", + ] + + for q in constraint_queries + base_queries: + try: + session.run(q) + except (neo4j.exceptions.DatabaseError, neo4j.exceptions.ClientError) as e: + logger.warning(f"Constraint/Index check: {e}") + + # Upsert Repository anchor node + if self.repo_id: + try: + session.run( + "MERGE (r:Repository {repo_id: $repo_id}) " + "SET r.root_path = $root_path, r.updated_at = datetime()", + repo_id=self.repo_id, + root_path=str(self.repo_root) if self.repo_root else self.repo_id, + ) + except (neo4j.exceptions.DatabaseError, neo4j.exceptions.ClientError) as e: + logger.warning(f"Repository anchor upsert: {e}") + + logger.info("✅ Database configured.") + + def setup_memory_schema(self): + """Create constraints and indexes for agent-authored memory entities.""" + if self.repo_id: + name_constraint = ( + "DROP CONSTRAINT memory_name_unique IF EXISTS", + "CREATE CONSTRAINT memory_repo_name_unique IF NOT EXISTS " + f"FOR (m:{self.MEMORY_LABEL}) REQUIRE (m.repo_id, m.name) IS UNIQUE", + ) + else: + name_constraint = ( + "CREATE CONSTRAINT memory_name_unique IF NOT EXISTS " + f"FOR (m:{self.MEMORY_LABEL}) REQUIRE m.name IS UNIQUE", + ) queries = [ - # 1. Uniqueness Constraints (Critical for Merge performance) - "CREATE CONSTRAINT file_path_unique IF NOT EXISTS FOR (f:File) REQUIRE f.path IS UNIQUE", - "CREATE CONSTRAINT function_sig_unique IF NOT EXISTS FOR (f:Function) REQUIRE f.signature IS UNIQUE", - "CREATE CONSTRAINT class_name_unique IF NOT EXISTS FOR (c:Class) REQUIRE c.qualified_name IS UNIQUE", - # 2. Vector Index for Hybrid Search + *name_constraint, f""" - CREATE VECTOR INDEX code_embeddings IF NOT EXISTS - FOR (c:Chunk) ON (c.embedding) + CREATE VECTOR INDEX memory_embeddings IF NOT EXISTS + FOR (m:{self.MEMORY_LABEL}) ON (m.embedding) OPTIONS {{indexConfig: {{ `vector.dimensions`: {self.VECTOR_DIMENSIONS}, `vector.similarity_function`: 'cosine' }} }} """, - # 3. Fulltext Index for Keyword Search - """ - CREATE FULLTEXT INDEX entity_text_search IF NOT EXISTS - FOR (n:Function|Class|File) ON EACH [n.name, n.docstring, n.path] - """, + ( + "CREATE FULLTEXT INDEX memory_search IF NOT EXISTS " + f"FOR (m:{self.MEMORY_LABEL}) ON EACH [m.name, m.entity_type, m.observation_text]" + ), ] - with self.driver.session() as session: - for q in queries: - try: - session.run(q) - except (neo4j.exceptions.DatabaseError, neo4j.exceptions.ClientError) as e: - logger.warning(f"Constraint/Index check: {e}") - logger.info("✅ Database configured.") + def _execute_setup() -> None: + with self.driver.session() as session: + for query in queries: + try: + session.run(query) + except (neo4j.exceptions.DatabaseError, neo4j.exceptions.ClientError) as e: + logger.warning(f"Memory schema check: {e}") + + self.circuit_breaker.call(_execute_setup) # ========================================================================= # EMBEDDING GENERATION @@ -545,7 +662,8 @@ def pass_1_structure_scan( # Check if file exists and hash matches (Change Detection) result = session.run( - "MATCH (f:File {path: $path}) RETURN f.ohash as hash", path=rel_path + f"MATCH (f:File {self._file_key}) RETURN f.ohash as hash", + **self._with_repo(path=rel_path), ).single() if result and result["hash"] == current_ohash: @@ -554,22 +672,26 @@ def pass_1_structure_scan( # Create/Update File Node session.run( - """ - MERGE (f:File {path: $path}) + f""" + MERGE (f:File {self._file_key}) SET f.name = $name, f.ohash = $ohash, f.last_updated = datetime() - """, - path=rel_path, - name=file_name, - ohash=current_ohash, + """, + **self._with_repo(path=rel_path, name=file_name, ohash=current_ohash), ) count += 1 # Prune File nodes that are no longer indexable under current rules. + prune_query = ( + "MATCH (f:File) WHERE f.repo_id = $repo_id RETURN f.path as path" + if self.repo_id + else "MATCH (f:File) RETURN f.path as path" + ) + prune_params = {"repo_id": self.repo_id} if self.repo_id else {} existing_paths = [ record["path"] - for record in session.run("MATCH (f:File) RETURN f.path as path") + for record in session.run(prune_query, **prune_params) ] for rel_path in existing_paths: if self._should_prune_file(rel_path, repo_path, supported_extensions): @@ -601,7 +723,13 @@ def pass_2_entity_definition(self, repo_path: Optional[Path] = None): with self.driver.session() as session: # Fetch all files that need indexing - result = session.run("MATCH (f:File) RETURN f.path as path") + if self.repo_id: + result = session.run( + "MATCH (f:File) WHERE f.repo_id = $repo_id RETURN f.path as path", + repo_id=self.repo_id, + ) + else: + result = session.run("MATCH (f:File) RETURN f.path as path") files_to_process = [record["path"] for record in result] for i, rel_path in enumerate(files_to_process): @@ -675,27 +803,24 @@ def pass_2_entity_definition(self, repo_path: Optional[Path] = None): if tag == "class": # 1. Create Class Node session.run( - """ - MATCH (f:File {path: $path}) - MERGE (c:Class {qualified_name: $sig}) + f""" + MATCH (f:File {self._file_key}) + MERGE (c:Class {self._class_key}) SET c.name = $name, c.code = $code MERGE (f)-[:DEFINES]->(c) - """, - path=rel_path, - sig=signature, - name=name, - code=node_text, + """, + **self._with_repo(path=rel_path, sig=signature, name=name, code=node_text), ) # 2. Hybrid Chunking: Class Context # Skip if chunk already exists (avoid re-embedding) existing = session.run( - """ - MATCH (c:Class {qualified_name: $sig}) + f""" + MATCH (c:Class {self._class_key}) OPTIONAL MATCH (ch:Chunk)-[:DESCRIBES]->(c) RETURN ch.id as chunk_id LIMIT 1 - """, - sig=signature, + """, + **self._with_repo(sig=signature), ).single() if not existing or not existing["chunk_id"]: @@ -704,17 +829,15 @@ def pass_2_entity_definition(self, repo_path: Optional[Path] = None): embedding = self.get_embedding(enriched_text) session.run( - """ - MATCH (c:Class {qualified_name: $sig}) - CREATE (ch:Chunk {id: randomUUID()}) + f""" + MATCH (c:Class {self._class_key}) + CREATE (ch:Chunk {{id: randomUUID()}}) SET ch.text = $text, ch.embedding = $embedding, ch.created_at = datetime() MERGE (ch)-[:DESCRIBES]->(c) - """, - sig=signature, - text=node_text, - embedding=embedding, + """, + **self._with_repo(sig=signature, text=node_text, embedding=embedding), ) elif tag == "function": @@ -736,40 +859,45 @@ def pass_2_entity_definition(self, repo_path: Optional[Path] = None): # 1. Create Function Node session.run( - """ - MATCH (f:File {path: $path}) - MERGE (fn:Function {signature: $sig}) + f""" + MATCH (f:File {self._file_key}) + MERGE (fn:Function {self._function_key}) SET fn.name = $name, fn.code = $code MERGE (f)-[:DEFINES]->(fn) - """, - path=rel_path, - sig=full_sig, - name=name, - code=node_text, + """, + **self._with_repo(path=rel_path, sig=full_sig, name=name, code=node_text), ) # Link to parent class if exists if parent_class: class_sig = f"{rel_path}:{parent_class}" - session.run( + has_method_query = ( + """ + MATCH (c:Class {qualified_name: $csig, repo_id: $repo_id}) + MATCH (fn:Function {signature: $fsig, repo_id: $repo_id}) + MERGE (c)-[:HAS_METHOD]->(fn) """ + if self.repo_id + else """ MATCH (c:Class {qualified_name: $csig}) MATCH (fn:Function {signature: $fsig}) MERGE (c)-[:HAS_METHOD]->(fn) - """, - csig=class_sig, - fsig=full_sig, + """ + ) + session.run( + has_method_query, + **self._with_repo(csig=class_sig, fsig=full_sig), ) # 2. Hybrid Chunking: Function Context # Skip if chunk already exists (avoid re-embedding) existing = session.run( - """ - MATCH (fn:Function {signature: $sig}) + f""" + MATCH (fn:Function {self._function_key}) OPTIONAL MATCH (ch:Chunk)-[:DESCRIBES]->(fn) RETURN ch.id as chunk_id LIMIT 1 - """, - sig=full_sig, + """, + **self._with_repo(sig=full_sig), ).single() if not existing or not existing["chunk_id"]: @@ -784,17 +912,15 @@ def pass_2_entity_definition(self, repo_path: Optional[Path] = None): embedding = self.get_embedding(enriched_text) session.run( - """ - MATCH (fn:Function {signature: $sig}) - CREATE (ch:Chunk {id: randomUUID()}) + f""" + MATCH (fn:Function {self._function_key}) + CREATE (ch:Chunk {{id: randomUUID()}}) SET ch.text = $text, ch.embedding = $embedding, ch.created_at = datetime() MERGE (ch)-[:DESCRIBES]->(fn) - """, - sig=full_sig, - text=node_text, - embedding=embedding, + """, + **self._with_repo(sig=full_sig, text=node_text, embedding=embedding), ) logger.info("✅ [Pass 2] Entities and Semantic Chunks created.") @@ -938,7 +1064,13 @@ def pass_3_imports(self, repo_path: Optional[Path] = None): supported_exts = {".py", ".js", ".jsx", ".ts", ".tsx"} with self.driver.session() as session: - result = session.run("MATCH (f:File) RETURN f.path as path") + if self.repo_id: + result = session.run( + "MATCH (f:File) WHERE f.repo_id = $repo_id RETURN f.path as path", + repo_id=self.repo_id, + ) + else: + result = session.run("MATCH (f:File) RETURN f.path as path") all_paths = [r["path"] for r in result] path_set = set(all_paths) files = [path for path in all_paths if Path(path).suffix in supported_exts] @@ -951,7 +1083,10 @@ def pass_3_imports(self, repo_path: Optional[Path] = None): logger.warning( f"⚠️ File found in graph but missing on disk (Stale): {rel_path}. Deleting node." ) - session.run("MATCH (f:File {path: $path}) DETACH DELETE f", path=rel_path) + session.run( + f"MATCH (f:File {self._file_key}) DETACH DELETE f", + **self._with_repo(path=rel_path), + ) continue code = full_path.read_text(errors="ignore") @@ -961,13 +1096,23 @@ def pass_3_imports(self, repo_path: Optional[Path] = None): modules = self._extract_js_ts_import_modules(code) # Rebuild imports for this source file to avoid stale edges. - session.run( - """ - MATCH (source:File {path: $src})-[r:IMPORTS]->() - DELETE r - """, - src=rel_path, - ) + if self.repo_id: + session.run( + """ + MATCH (source:File {repo_id: $repo_id, path: $src})-[r:IMPORTS]->() + DELETE r + """, + src=rel_path, + repo_id=self.repo_id, + ) + else: + session.run( + """ + MATCH (source:File {path: $src})-[r:IMPORTS]->() + DELETE r + """, + src=rel_path, + ) exact_targets: Set[str] = set() fuzzy_parts: Set[str] = set() @@ -983,28 +1128,54 @@ def pass_3_imports(self, repo_path: Optional[Path] = None): fuzzy_parts.add(fuzzy_part) if exact_targets: - session.run( - """ - MATCH (source:File {path: $src}) - UNWIND $targets as target_path - MATCH (target:File {path: target_path}) - MERGE (source)-[:IMPORTS]->(target) - """, - src=rel_path, - targets=sorted(exact_targets), - ) + if self.repo_id: + session.run( + """ + MATCH (source:File {repo_id: $repo_id, path: $src}) + UNWIND $targets as target_path + MATCH (target:File {repo_id: $repo_id, path: target_path}) + MERGE (source)-[:IMPORTS]->(target) + """, + src=rel_path, + targets=sorted(exact_targets), + repo_id=self.repo_id, + ) + else: + session.run( + """ + MATCH (source:File {path: $src}) + UNWIND $targets as target_path + MATCH (target:File {path: target_path}) + MERGE (source)-[:IMPORTS]->(target) + """, + src=rel_path, + targets=sorted(exact_targets), + ) for mod_part in sorted(fuzzy_parts): - session.run( - """ - MATCH (source:File {path: $src}) - MATCH (target:File) - WHERE target.path CONTAINS $mod_part - MERGE (source)-[:IMPORTS]->(target) - """, - src=rel_path, - mod_part=mod_part, - ) + if self.repo_id: + session.run( + """ + MATCH (source:File {repo_id: $repo_id, path: $src}) + MATCH (target:File {repo_id: $repo_id}) + WHERE target.path CONTAINS $mod_part + MERGE (source)-[:IMPORTS]->(target) + """, + src=rel_path, + mod_part=mod_part, + repo_id=self.repo_id, + ) + else: + session.run( + """ + MATCH (source:File {path: $src}) + MATCH (target:File) + WHERE target.path CONTAINS $mod_part + MERGE (source)-[:IMPORTS]->(target) + """, + src=rel_path, + mod_part=mod_part, + ) logger.info("✅ [Pass 3] Import graph built.") @@ -1030,12 +1201,22 @@ def pass_4_call_graph(self, repo_path: Optional[Path] = None): with self.driver.session() as session: # Get all function definitions ordered by file - result = session.run( - """ - MATCH (f:File)-[:DEFINES]->(fn:Function) - RETURN f.path as path, collect({name: fn.name, sig: fn.signature}) as funcs - """ - ) + if self.repo_id: + result = session.run( + """ + MATCH (f:File)-[:DEFINES]->(fn:Function) + WHERE f.repo_id = $repo_id + RETURN f.path as path, collect({name: fn.name, sig: fn.signature}) as funcs + """, + repo_id=self.repo_id, + ) + else: + result = session.run( + """ + MATCH (f:File)-[:DEFINES]->(fn:Function) + RETURN f.path as path, collect({name: fn.name, sig: fn.signature}) as funcs + """ + ) file_records = list(result) total_files = len(file_records) @@ -1074,17 +1255,31 @@ def pass_4_call_graph(self, repo_path: Optional[Path] = None): caller_sig = func["sig"] # Create relationships for found calls - session.run( - """ - UNWIND $calls as called_name - MATCH (caller:Function {signature: $caller_sig}) - MATCH (callee:Function {name: called_name}) - WHERE caller <> callee - MERGE (caller)-[:CALLS]->(callee) - """, - caller_sig=caller_sig, - calls=calls_in_file, - ) + if self.repo_id: + session.run( + """ + UNWIND $calls as called_name + MATCH (caller:Function {signature: $caller_sig, repo_id: $repo_id}) + MATCH (callee:Function {name: called_name, repo_id: $repo_id}) + WHERE caller <> callee + MERGE (caller)-[:CALLS]->(callee) + """, + caller_sig=caller_sig, + calls=calls_in_file, + repo_id=self.repo_id, + ) + else: + session.run( + """ + UNWIND $calls as called_name + MATCH (caller:Function {signature: $caller_sig}) + MATCH (callee:Function {name: called_name}) + WHERE caller <> callee + MERGE (caller)-[:CALLS]->(callee) + """, + caller_sig=caller_sig, + calls=calls_in_file, + ) except (neo4j.exceptions.DatabaseError, neo4j.exceptions.ClientError) as e: logger.warning(f"⚠️ Failed to process calls in {rel_path}: {e}") @@ -1151,17 +1346,27 @@ def run_pipeline( # SEMANTIC SEARCH (for MCP Server) # ========================================================================= - def semantic_search(self, query: str, limit: int = 5) -> List[Dict]: + def semantic_search(self, query: str, limit: int = 5, repo_id: Optional[str] = None) -> List[Dict]: """ - Hybrid Search for the Agent using vector similarity. + Graph-enriched hybrid search for the Agent. + + Vector search finds semantically relevant Chunk nodes; graph traversal + then expands each hit to return structural context (file path, call + relationships, sibling functions, file imports) in the same query. + When repo_id is active, over-fetches by 3x then filters and reranks. Args: query: Natural language query limit: Maximum number of results to return + repo_id: Override repo scope (defaults to self.repo_id) Returns: - List of dicts with name, signature, score, and text + List of dicts with: name, sig, score, final_score, text, file_path, + calls_out, called_by, methods, file_imports, siblings """ + active_repo = repo_id or self.repo_id + overfetch = limit * 3 if active_repo else limit + def _is_valid_vector(vec: List[float]) -> bool: if not vec: return False @@ -1173,20 +1378,43 @@ def _is_valid_vector(vec: List[float]) -> bool: return math.isfinite(norm_sq) and norm_sq > 0.0 def _fallback_fulltext_search() -> List[Dict]: - cypher = """ + repo_filter = "AND node.repo_id = $repo_id" if active_repo else "" + cypher = f""" CALL db.index.fulltext.queryNodes('entity_text_search', $query) YIELD node, score + WHERE 1=1 {repo_filter} OPTIONAL MATCH (ch:Chunk)-[:DESCRIBES]->(node) + OPTIONAL MATCH (node)<-[:DEFINES]-(file:File) + OPTIONAL MATCH (node)-[:CALLS]->(callee:Function) + OPTIONAL MATCH (caller:Function)-[:CALLS]->(node) + OPTIONAL MATCH (node)-[:HAS_METHOD]->(method:Function) + OPTIONAL MATCH (file)-[:IMPORTS]->(imported_file:File) + OPTIONAL MATCH (file)-[:DEFINES]->(sibling) WHERE sibling <> node + WITH node, score, ch, file, + collect(DISTINCT callee.name)[..5] as calls_out, + collect(DISTINCT caller.name)[..5] as called_by, + collect(DISTINCT method.name)[..5] as methods, + collect(DISTINCT imported_file.path)[..3] as file_imports, + collect(DISTINCT sibling.name)[..8] as siblings RETURN coalesce(node.name, node.path, 'Unknown') as name, coalesce(node.signature, node.qualified_name, '') as sig, score, - coalesce(ch.text, node.docstring, node.path, '') as text + coalesce(ch.text, node.docstring, node.path, '') as text, + file.path as file_path, + calls_out, + called_by, + methods, + file_imports, + siblings ORDER BY score DESC LIMIT $limit """ + params: Dict = {"query": query, "limit": overfetch} + if active_repo: + params["repo_id"] = active_repo with self.driver.session() as session: - res = session.run(cypher, query=query, limit=limit) + res = session.run(cypher, **params) return [dict(r) for r in res] def _execute_search(): @@ -1196,21 +1424,69 @@ def _execute_search(): "Semantic query vector invalid (likely missing OpenAI key or zero-vector); " "falling back to full-text search." ) - return _fallback_fulltext_search() + results = _fallback_fulltext_search() + return self._rerank_results(results, limit) - cypher = """ - CALL db.index.vector.queryNodes('code_embeddings', $limit, $vec) - YIELD node, score - MATCH (node)-[:DESCRIBES]->(target) - RETURN target.name as name, target.signature as sig, score, node.text as text + repo_filter = "WHERE entity.repo_id = $repo_id" if active_repo else "" + cypher = f""" + CALL db.index.vector.queryNodes('code_embeddings', $overfetch, $vec) + YIELD node as chunk, score + MATCH (chunk)-[:DESCRIBES]->(entity) + {repo_filter} + OPTIONAL MATCH (entity)<-[:DEFINES]-(file:File) + OPTIONAL MATCH (entity)-[:CALLS]->(callee:Function) + OPTIONAL MATCH (caller:Function)-[:CALLS]->(entity) + OPTIONAL MATCH (entity)-[:HAS_METHOD]->(method:Function) + OPTIONAL MATCH (file)-[:IMPORTS]->(imported_file:File) + OPTIONAL MATCH (file)-[:DEFINES]->(sibling) WHERE sibling <> entity + WITH entity, chunk, score, file, + collect(DISTINCT callee.name)[..5] as calls_out, + collect(DISTINCT caller.name)[..5] as called_by, + collect(DISTINCT method.name)[..5] as methods, + collect(DISTINCT imported_file.path)[..3] as file_imports, + collect(DISTINCT sibling.name)[..8] as siblings + RETURN + coalesce(entity.name, entity.path, 'Unknown') as name, + coalesce(entity.signature, entity.qualified_name, '') as sig, + score, + chunk.text as text, + file.path as file_path, + calls_out, + called_by, + methods, + file_imports, + siblings ORDER BY score DESC """ + params: Dict = {"overfetch": overfetch, "vec": vector} + if active_repo: + params["repo_id"] = active_repo with self.driver.session() as session: - res = session.run(cypher, limit=limit, vec=vector) - return [dict(r) for r in res] - + res = session.run(cypher, **params) + results = [dict(r) for r in res] + return self._rerank_results(results, limit) + return self.circuit_breaker.call(_execute_search) + def _rerank_results(self, results: List[Dict], limit: int) -> List[Dict]: + """ + Structural reranking after repo-filtered retrieval. + Combines vector score (90%) with graph connectivity bonus (10%). + # GDS/PageRank upgrade path: when gds.aura.api.credentials are configured, + # replace the structural bonus with entity.pagerank from gds.pageRank.write(). + """ + for r in results: + structural = 0.0 + if r.get("calls_out"): + structural += 0.05 # has outgoing calls + if r.get("called_by"): + structural += 0.05 # is called by others — more central + if r.get("methods"): + structural += 0.03 # is a class with methods + r["final_score"] = r.get("score", 0.0) * 0.9 + structural + results.sort(key=lambda x: x["final_score"], reverse=True) + return results[:limit] + # ========================================================================= # DEPENDENCY ANALYSIS (for MCP Server) # ========================================================================= @@ -1225,8 +1501,9 @@ def get_file_dependencies(self, file_path: str) -> Dict[str, List[str]]: Returns: Dict with 'imports' and 'imported_by' lists """ - cypher = """ - MATCH (f:File {path: $path}) + repo_clause = ", repo_id: $repo_id" if self.repo_id else "" + cypher = f""" + MATCH (f:File {{path: $path{repo_clause}}}) OPTIONAL MATCH (f)-[:IMPORTS]->(imported) OPTIONAL MATCH (dependent)-[:IMPORTS]->(f) RETURN @@ -1234,7 +1511,7 @@ def get_file_dependencies(self, file_path: str) -> Dict[str, List[str]]: collect(DISTINCT dependent.path) as imported_by """ with self.driver.session() as session: - result = session.run(cypher, path=file_path).single() + result = session.run(cypher, **self._with_repo(path=file_path)).single() if result: return { "imports": result["imports"] or [], @@ -1258,8 +1535,9 @@ def identify_impact( """ def _execute_impact_analysis(): depth = max(1, int(max_depth)) + repo_clause = ", repo_id: $repo_id" if self.repo_id else "" cypher = f""" - MATCH path = (f:File {{path: $path}})<-[:IMPORTS*1..{depth}]-(dependent) + MATCH path = (f:File {{path: $path{repo_clause}}})<-[:IMPORTS*1..{depth}]-(dependent) RETURN DISTINCT dependent.path as path, length(path) as depth, @@ -1267,7 +1545,7 @@ def _execute_impact_analysis(): ORDER BY depth, path """ with self.driver.session() as session: - result = session.run(cypher, path=file_path) + result = session.run(cypher, **self._with_repo(path=file_path)) affected_files = [ {"path": r["path"], "depth": r["depth"], "impact_type": r["impact_type"]} for r in result @@ -1276,6 +1554,692 @@ def _execute_impact_analysis(): return self.circuit_breaker.call(_execute_impact_analysis) + # ========================================================================= + # MEMORY GRAPH QUERIES (for MCP Server) + # ========================================================================= + + @staticmethod + def _serialize_memory_observations(observations: List[str]) -> str: + """Join observations for search-friendly storage.""" + return "\n".join( + obs.strip() for obs in observations if isinstance(obs, str) and obs.strip() + ) + + @staticmethod + def _normalize_memory_label(value: str) -> str: + """Normalize a user-provided memory type into a safe Neo4j label.""" + cleaned = re.sub(r"[^0-9A-Za-z_]", "_", str(value or "").strip()) + cleaned = re.sub(r"_+", "_", cleaned).strip("_") + if not cleaned: + return "concept" + if cleaned[0].isdigit(): + cleaned = f"Type_{cleaned}" + return cleaned + + @staticmethod + def _normalize_memory_relation_type(value: str) -> str: + """Normalize a relation type into a safe Neo4j relationship type.""" + cleaned = re.sub(r"[^0-9A-Za-z_]", "_", str(value or "").strip().upper()) + cleaned = re.sub(r"_+", "_", cleaned).strip("_") + if not cleaned: + raise ValueError("Relation type cannot be empty.") + if cleaned[0].isdigit(): + cleaned = f"REL_{cleaned}" + return cleaned + + @staticmethod + def _build_memory_embedding_text( + name: str, entity_type: str, observations: List[str] + ) -> str: + """Build the canonical text used for memory embeddings.""" + lines = [f"Name: {name}", f"Type: {entity_type}"] + if observations: + lines.append("Observations:") + lines.extend(f"- {observation}" for observation in observations) + return "\n".join(lines) + + def _get_memory_embedding_or_none(self, text: str) -> Optional[List[float]]: + """Return a memory embedding when OpenAI is configured, otherwise None.""" + if self.openai_client is None: + return None + return self.get_embedding(text) + + @staticmethod + def _normalize_memory_entity(entity: Dict[str, Any]) -> Dict[str, Any]: + """Validate and normalize one memory entity payload.""" + if not isinstance(entity, dict): + raise ValueError("Each entity must be an object.") + + name = str(entity.get("name", "")).strip() + if not name: + raise ValueError("Each entity requires a non-empty 'name'.") + + entity_type = str( + entity.get("entityType") or entity.get("entity_type") or "concept" + ).strip() + observations = entity.get("observations") or [] + if not isinstance(observations, list): + raise ValueError(f"Entity '{name}' observations must be a list of strings.") + + normalized_observations = [] + for observation in observations: + text = str(observation).strip() + if text: + normalized_observations.append(text) + + metadata = entity.get("metadata") or {} + if metadata and not isinstance(metadata, dict): + raise ValueError(f"Entity '{name}' metadata must be an object.") + + return { + "name": name, + "entity_type": entity_type or "concept", + "entity_label": KnowledgeGraphBuilder._normalize_memory_label(entity_type or "concept"), + "observations": normalized_observations, + "observation_text": KnowledgeGraphBuilder._serialize_memory_observations( + normalized_observations + ), + "embedding_text": KnowledgeGraphBuilder._build_memory_embedding_text( + name, entity_type or "concept", normalized_observations + ), + "metadata_json": json.dumps(metadata, sort_keys=True) if metadata else "{}", + } + + @staticmethod + def _normalize_memory_relation(relation: Dict[str, Any]) -> Dict[str, str]: + """Validate and normalize one memory relation payload.""" + if not isinstance(relation, dict): + raise ValueError("Each relation must be an object.") + + source = str( + relation.get("from") + or relation.get("from_entity") + or relation.get("source") + or "" + ).strip() + target = str( + relation.get("to") + or relation.get("to_entity") + or relation.get("target") + or "" + ).strip() + relation_type = str( + relation.get("relationType") + or relation.get("relation_type") + or relation.get("type") + or "" + ).strip() + + if not source or not target or not relation_type: + raise ValueError("Each relation requires 'from', 'to', and 'relationType'.") + + return { + "from": source, + "to": target, + "relation_type": KnowledgeGraphBuilder._normalize_memory_relation_type(relation_type), + } + + @staticmethod + def _normalize_memory_observation_update(item: Dict[str, Any]) -> Dict[str, Any]: + """Validate and normalize one observation update payload.""" + if not isinstance(item, dict): + raise ValueError("Each observation update must be an object.") + + entity_name = str( + item.get("entityName") or item.get("entity_name") or item.get("name") or "" + ).strip() + if not entity_name: + raise ValueError("Each observation update requires 'entityName'.") + + contents = item.get("contents") or item.get("observations") or item.get("content") or [] + if isinstance(contents, str): + contents = [contents] + if not isinstance(contents, list): + raise ValueError( + f"Observation update for '{entity_name}' must provide a list of strings." + ) + + normalized_contents = [str(content).strip() for content in contents if str(content).strip()] + if not normalized_contents: + raise ValueError( + f"Observation update for '{entity_name}' must include at least one string." + ) + + return { + "entity_name": entity_name, + "contents": normalized_contents, + } + + def create_memory_entities(self, entities: List[Dict[str, Any]]) -> Dict[str, Any]: + """Create or update memory entities.""" + normalized_entities = [self._normalize_memory_entity(entity) for entity in entities] + if not normalized_entities: + raise ValueError("At least one entity is required.") + + self.setup_memory_schema() + + def _execute_create() -> Dict[str, Any]: + entity_names: List[str] = [] + with self.driver.session() as session: + for entity in normalized_entities: + embedding = self._get_memory_embedding_or_none(entity["embedding_text"]) + params = {**entity, **({"repo_id": self.repo_id} if self.repo_id else {})} + session.run( + f""" + MERGE (m:{self.MEMORY_LABEL} {self._memory_key}) + ON CREATE SET + m.type = $entity_type, + m.entity_type = $entity_type, + m.observations = $observations, + m.observation_text = $observation_text, + m.metadata_json = $metadata_json, + m.created_at = datetime(), + m.updated_at = datetime() + ON MATCH SET + m.type = $entity_type, + m.entity_type = $entity_type, + m.observations = CASE + WHEN size($observations) = 0 THEN coalesce(m.observations, []) + ELSE $observations + END, + m.observation_text = CASE + WHEN size($observations) = 0 THEN coalesce(m.observation_text, '') + ELSE $observation_text + END, + m.metadata_json = $metadata_json, + m.updated_at = datetime() + """, + **params, + ) + if embedding is not None: + session.run( + f""" + MATCH (m:{self.MEMORY_LABEL} {self._memory_key}) + SET m.embedding = $embedding + """, + **self._with_repo(name=entity["name"], embedding=embedding), + ) + session.run( + f""" + MATCH (m:{self.MEMORY_LABEL} {self._memory_key}) + SET m:`{entity['entity_label']}` + """, + **self._with_repo(name=entity["name"]), + ) + entity_names.append(entity["name"]) + + return {"count": len(entity_names), "entity_names": entity_names} + + return self.circuit_breaker.call(_execute_create) + + def delete_memory_entities(self, names: List[str]) -> Dict[str, Any]: + """Delete memory entities by name.""" + normalized_names = [str(name).strip() for name in names if str(name).strip()] + if not normalized_names: + raise ValueError("At least one entity name is required.") + + def _execute_delete() -> Dict[str, Any]: + with self.driver.session() as session: + repo_clause = "AND m.repo_id = $repo_id" if self.repo_id else "" + result = session.run( + f""" + MATCH (m:{self.MEMORY_LABEL}) + WHERE m.name IN $names {repo_clause} + WITH collect(m.name) as matched_names, collect(m) as matched_nodes + FOREACH (node IN matched_nodes | DETACH DELETE node) + RETURN matched_names as matched_names + """, + **self._with_repo(names=normalized_names), + ).single() + + deleted_names = result["matched_names"] if result and result["matched_names"] else [] + missing_names = [name for name in normalized_names if name not in deleted_names] + return { + "count": len(deleted_names), + "deleted_names": deleted_names, + "missing_names": missing_names, + } + + return self.circuit_breaker.call(_execute_delete) + + def create_memory_relations(self, relations: List[Dict[str, Any]]) -> Dict[str, Any]: + """Create typed relations between memory entities.""" + normalized_relations = [self._normalize_memory_relation(relation) for relation in relations] + if not normalized_relations: + raise ValueError("At least one relation is required.") + + self.setup_memory_schema() + + def _execute_create_relations() -> Dict[str, Any]: + created = [] + missing = [] + with self.driver.session() as session: + for relation in normalized_relations: + repo_clause = ", repo_id: $repo_id" if self.repo_id else "" + result = session.run( + f""" + MATCH (source:{self.MEMORY_LABEL} {{name: $source{repo_clause}}}) + MATCH (target:{self.MEMORY_LABEL} {{name: $target{repo_clause}}}) + MERGE (source)-[r:`{relation['relation_type']}`]->(target) + ON CREATE SET r.created_at = datetime() + ON MATCH SET r.updated_at = datetime() + RETURN source.name as source, target.name as target, type(r) as relation_type + """, + **self._with_repo(source=relation["from"], target=relation["to"]), + ).single() + + if result: + created.append(dict(result)) + else: + missing.append(relation) + + return {"count": len(created), "relations": created, "missing": missing} + + return self.circuit_breaker.call(_execute_create_relations) + + def delete_memory_relations(self, relations: List[Dict[str, Any]]) -> Dict[str, Any]: + """Delete typed relations between memory entities.""" + normalized_relations = [self._normalize_memory_relation(relation) for relation in relations] + if not normalized_relations: + raise ValueError("At least one relation is required.") + + def _execute_delete_relations() -> Dict[str, Any]: + deleted = [] + missing = [] + with self.driver.session() as session: + for relation in normalized_relations: + repo_clause = ", repo_id: $repo_id" if self.repo_id else "" + result = session.run( + f""" + MATCH (source:{self.MEMORY_LABEL} {{name: $source{repo_clause}}}) + MATCH (target:{self.MEMORY_LABEL} {{name: $target{repo_clause}}}) + OPTIONAL MATCH (source)-[r:`{relation['relation_type']}`]->(target) + WITH source, target, r + FOREACH (_ IN CASE WHEN r IS NULL THEN [] ELSE [1] END | DELETE r) + RETURN source.name as source, target.name as target, '{relation['relation_type']}' as relation_type, r IS NOT NULL as deleted + """, + **self._with_repo(source=relation["from"], target=relation["to"]), + ).single() + + if result and result["deleted"]: + deleted.append( + { + "from": result["source"], + "to": result["target"], + "relation_type": result["relation_type"], + } + ) + else: + missing.append(relation) + + return {"count": len(deleted), "relations": deleted, "missing": missing} + + return self.circuit_breaker.call(_execute_delete_relations) + + def add_memory_observations(self, items: List[Dict[str, Any]]) -> Dict[str, Any]: + """Append observations to memory entities.""" + normalized_items = [self._normalize_memory_observation_update(item) for item in items] + if not normalized_items: + raise ValueError("At least one observation update is required.") + + self.setup_memory_schema() + + def _execute_add_observations() -> Dict[str, Any]: + updated = [] + missing = [] + with self.driver.session() as session: + for item in normalized_items: + repo_clause = ", repo_id: $repo_id" if self.repo_id else "" + result = session.run( + f""" + MATCH (m:{self.MEMORY_LABEL} {{name: $name{repo_clause}}}) + WITH m, [obs IN $contents WHERE NOT obs IN coalesce(m.observations, [])] as new_obs + SET m.observations = coalesce(m.observations, []) + new_obs, + m.observation_text = reduce(acc = '', obs IN (coalesce(m.observations, []) + new_obs) | + CASE WHEN acc = '' THEN obs ELSE acc + '\n' + obs END), + m.updated_at = datetime() + RETURN m.name as name, coalesce(m.type, m.entity_type, 'concept') as entity_type, size(new_obs) as added_count, m.observations as observations + """, + **self._with_repo(name=item["entity_name"], contents=item["contents"]), + ).single() + + if result: + row = dict(result) + embedding = self._get_memory_embedding_or_none( + self._build_memory_embedding_text( + row["name"], row["entity_type"], row["observations"] or [] + ) + ) + if embedding is not None: + session.run( + f""" + MATCH (m:{self.MEMORY_LABEL} {{name: $name{repo_clause}}}) + SET m.embedding = $embedding + """, + **self._with_repo(name=row["name"], embedding=embedding), + ) + updated.append(row) + else: + missing.append(item["entity_name"]) + + return {"count": len(updated), "entities": updated, "missing_names": missing} + + return self.circuit_breaker.call(_execute_add_observations) + + def delete_memory_observations(self, items: List[Dict[str, Any]]) -> Dict[str, Any]: + """Delete observations from memory entities.""" + normalized_items = [self._normalize_memory_observation_update(item) for item in items] + if not normalized_items: + raise ValueError("At least one observation delete request is required.") + + def _execute_delete_observations() -> Dict[str, Any]: + updated = [] + missing = [] + with self.driver.session() as session: + for item in normalized_items: + repo_clause = ", repo_id: $repo_id" if self.repo_id else "" + result = session.run( + f""" + MATCH (m:{self.MEMORY_LABEL} {{name: $name{repo_clause}}}) + WITH m, [obs IN coalesce(m.observations, []) WHERE NOT obs IN $contents] as kept + SET m.observations = kept, + m.observation_text = reduce(acc = '', obs IN kept | + CASE WHEN acc = '' THEN obs ELSE acc + '\n' + obs END), + m.updated_at = datetime() + RETURN m.name as name, coalesce(m.type, m.entity_type, 'concept') as entity_type, size(kept) as remaining_count, m.observations as observations + """, + **self._with_repo(name=item["entity_name"], contents=item["contents"]), + ).single() + + if result: + row = dict(result) + embedding = self._get_memory_embedding_or_none( + self._build_memory_embedding_text( + row["name"], row["entity_type"], row["observations"] or [] + ) + ) + if embedding is not None: + session.run( + f""" + MATCH (m:{self.MEMORY_LABEL} {{name: $name{repo_clause}}}) + SET m.embedding = $embedding + """, + **self._with_repo(name=row["name"], embedding=embedding), + ) + updated.append(row) + else: + missing.append(item["entity_name"]) + + return {"count": len(updated), "entities": updated, "missing_names": missing} + + return self.circuit_breaker.call(_execute_delete_observations) + + def backfill_memory_embeddings(self, limit: int = 100, only_missing: bool = True) -> Dict[str, Any]: + """Backfill embeddings for existing Memory nodes.""" + self.setup_memory_schema() + safe_limit = max(1, int(limit)) + if self.openai_client is None: + raise ValueError("OPENAI_API_KEY is required to backfill memory embeddings.") + + def _execute_backfill() -> Dict[str, Any]: + with self.driver.session() as session: + filter_parts = [] + if only_missing: + filter_parts.append("m.embedding IS NULL") + if self.repo_id: + filter_parts.append("m.repo_id = $repo_id") + filter_clause = f"WHERE {' AND '.join(filter_parts)}" if filter_parts else "" + rows = list( + session.run( + f""" + MATCH (m:{self.MEMORY_LABEL}) + {filter_clause} + RETURN m.name as name, + coalesce(m.type, m.entity_type, 'concept') as entity_type, + coalesce(m.observations, []) as observations + ORDER BY m.name + LIMIT $limit + """, + **self._with_repo(limit=safe_limit), + ) + ) + + updated_names = [] + for row in rows: + payload = dict(row) + embedding = self.get_embedding( + self._build_memory_embedding_text( + payload["name"], + payload["entity_type"], + payload["observations"] or [], + ) + ) + session.run( + f""" + MATCH (m:{self.MEMORY_LABEL} {{name: $name{', repo_id: $repo_id' if self.repo_id else ''}}}) + SET m.embedding = $embedding, + m.updated_at = datetime() + """, + **self._with_repo(name=payload["name"], embedding=embedding), + ) + updated_names.append(payload["name"]) + + remaining_where = ["m.embedding IS NULL"] + if self.repo_id: + remaining_where.append("m.repo_id = $repo_id") + remaining_result = session.run( + f""" + MATCH (m:{self.MEMORY_LABEL}) + WHERE {' AND '.join(remaining_where)} + RETURN count(m) as remaining + """, + **self._with_repo(), + ).single() + + return { + "count": len(updated_names), + "entity_names": updated_names, + "remaining_without_embeddings": ( + remaining_result["remaining"] if remaining_result else 0 + ), + } + + return self.circuit_breaker.call(_execute_backfill) + + def search_memory_nodes( + self, query: str, limit: int = 5, repo_id: Optional[str] = None + ) -> List[Dict[str, Any]]: + """Search memory entities with vector search and fulltext fallback. + Returns outgoing and incoming relations. Filters by repo_id when active.""" + normalized_query = str(query).strip() + safe_limit = max(1, int(limit)) + if not normalized_query: + return [] + + active_repo = repo_id or self.repo_id + self.setup_memory_schema() + + def _execute_search() -> List[Dict[str, Any]]: + repo_filter = "AND node.repo_id = $repo_id" if active_repo else "" + + def _is_valid_vector(vec: List[float]) -> bool: + if not vec: + return False + norm_sq = 0.0 + for value in vec: + if not isinstance(value, (int, float)) or not math.isfinite(value): + return False + norm_sq += float(value) * float(value) + return math.isfinite(norm_sq) and norm_sq > 0.0 + + if self.openai_client is None: + cypher = f""" + CALL db.index.fulltext.queryNodes('memory_search', $query_text) + YIELD node, score + WHERE 1=1 {repo_filter} + OPTIONAL MATCH (node:{self.MEMORY_LABEL})-[r_out]->(target:{self.MEMORY_LABEL}) + OPTIONAL MATCH (source:{self.MEMORY_LABEL})-[r_in]->(node:{self.MEMORY_LABEL}) + RETURN + node.name as name, + coalesce(node.type, node.entity_type, 'concept') as entity_type, + coalesce(node.observations, []) as observations, + coalesce(node.metadata_json, '{{}}') as metadata_json, + score, + ['fulltext'] as sources, + collect(DISTINCT CASE + WHEN target IS NULL THEN NULL + ELSE {{target: target.name, relation_type: type(r_out)}} + END) as outgoing_relations, + collect(DISTINCT CASE + WHEN source IS NULL THEN NULL + ELSE {{source: source.name, relation_type: type(r_in)}} + END) as incoming_relations + ORDER BY score DESC + LIMIT $limit + """ + params: Dict = {"query_text": normalized_query, "limit": safe_limit} + else: + vector = self.get_embedding(normalized_query) + if not _is_valid_vector(vector): + logger.warning( + "Memory query vector invalid (likely missing OpenAI key or zero-vector); " + "falling back to full-text search." + ) + cypher = f""" + CALL db.index.fulltext.queryNodes('memory_search', $query_text) + YIELD node, score + WHERE 1=1 {repo_filter} + OPTIONAL MATCH (node:{self.MEMORY_LABEL})-[r_out]->(target:{self.MEMORY_LABEL}) + OPTIONAL MATCH (source:{self.MEMORY_LABEL})-[r_in]->(node:{self.MEMORY_LABEL}) + RETURN + node.name as name, + coalesce(node.type, node.entity_type, 'concept') as entity_type, + coalesce(node.observations, []) as observations, + coalesce(node.metadata_json, '{{}}') as metadata_json, + score, + ['fulltext'] as sources, + collect(DISTINCT CASE + WHEN target IS NULL THEN NULL + ELSE {{target: target.name, relation_type: type(r_out)}} + END) as outgoing_relations, + collect(DISTINCT CASE + WHEN source IS NULL THEN NULL + ELSE {{source: source.name, relation_type: type(r_in)}} + END) as incoming_relations + ORDER BY score DESC + LIMIT $limit + """ + params = {"query_text": normalized_query, "limit": safe_limit} + else: + cypher = f""" + CALL {{ + CALL db.index.vector.queryNodes('memory_embeddings', $limit, $vector) + YIELD node, score + RETURN node, score, 'vector' as source + UNION + CALL db.index.fulltext.queryNodes('memory_search', $query_text) + YIELD node, score + RETURN node, score, 'fulltext' as source + }} + WITH node, max(score) as score, collect(DISTINCT source) as sources + WHERE 1=1 {repo_filter} + OPTIONAL MATCH (node:{self.MEMORY_LABEL})-[r_out]->(target:{self.MEMORY_LABEL}) + OPTIONAL MATCH (source:{self.MEMORY_LABEL})-[r_in]->(node:{self.MEMORY_LABEL}) + RETURN + node.name as name, + coalesce(node.type, node.entity_type, 'concept') as entity_type, + coalesce(node.observations, []) as observations, + coalesce(node.metadata_json, '{{}}') as metadata_json, + score, + sources, + collect(DISTINCT CASE + WHEN target IS NULL THEN NULL + ELSE {{target: target.name, relation_type: type(r_out)}} + END) as outgoing_relations, + collect(DISTINCT CASE + WHEN source IS NULL THEN NULL + ELSE {{source: source.name, relation_type: type(r_in)}} + END) as incoming_relations + ORDER BY score DESC + LIMIT $limit + """ + params = { + "query_text": normalized_query, + "vector": vector, + "limit": safe_limit, + } + if active_repo: + params["repo_id"] = active_repo + with self.driver.session() as session: + result = session.run(cypher, **params) + rows = [] + for record in result: + row = dict(record) + row["outgoing_relations"] = [ + rel for rel in row.get("outgoing_relations", []) if rel + ] + row["incoming_relations"] = [ + rel for rel in row.get("incoming_relations", []) if rel + ] + rows.append(row) + return rows + + return self.circuit_breaker.call(_execute_search) + + def read_memory_graph(self) -> Dict[str, Any]: + """Return a summarized view of the current memory graph.""" + self.setup_memory_schema() + + def _execute_read() -> Dict[str, Any]: + with self.driver.session() as session: + repo_filter = "WHERE m.repo_id = $repo_id" if self.repo_id else "" + nodes_result = session.run( + f""" + MATCH (m:{self.MEMORY_LABEL}) + {repo_filter} + OPTIONAL MATCH (m)-[r]->(target:{self.MEMORY_LABEL}) + RETURN + m.name as name, + coalesce(m.type, m.entity_type, 'concept') as entity_type, + coalesce(m.observations, []) as observations, + collect(DISTINCT CASE + WHEN target IS NULL THEN NULL + ELSE {{ + target: target.name, + relation_type: type(r) + }} + END) as outgoing_relations + ORDER BY m.name + """, + **self._with_repo(), + ) + entities = [] + for record in nodes_result: + row = dict(record) + row["outgoing_relations"] = [ + rel for rel in row.get("outgoing_relations", []) if rel + ] + entities.append(row) + + relation_where = "WHERE source.repo_id = $repo_id AND target.repo_id = $repo_id" if self.repo_id else "" + relation_count_result = session.run( + f""" + MATCH (source:{self.MEMORY_LABEL})-[r]->(target:{self.MEMORY_LABEL}) + {relation_where} + RETURN count(r) as count + """, + **self._with_repo(), + ).single() + relation_count = relation_count_result["count"] if relation_count_result else 0 + + return { + "entity_count": len(entities), + "relation_count": relation_count, + "entities": entities, + } + + return self.circuit_breaker.call(_execute_read) + # ========================================================================= # GIT GRAPH QUERIES (for MCP Server) # ========================================================================= diff --git a/src/codememory/server/app.py b/src/codememory/server/app.py index 01d1162..249836a 100644 --- a/src/codememory/server/app.py +++ b/src/codememory/server/app.py @@ -173,7 +173,9 @@ def init_graph(): user=user, password=password, openai_key=openai_key, + repo_root=repo_root, ) + graph.setup_memory_schema() logger.info(f"✅ Connected to Neo4j at {uri}") return graph @@ -356,6 +358,115 @@ def _format_commit_context_output(context: Dict[str, Any], include_diff_stats: b return output.strip() +def _format_memory_entity_results(results: List[Dict[str, Any]]) -> str: + """Format memory search results for MCP responses.""" + output = f"Found {len(results)} relevant memory node(s):\n\n" + for index, result in enumerate(results, 1): + name = result.get("name", "Unknown") + entity_type = result.get("entity_type", "concept") + score = float(result.get("score", 0.0) or 0.0) + observations = result.get("observations", []) or [] + out_relations = result.get("outgoing_relations", []) or [] + in_relations = result.get("incoming_relations", []) or [] + + output += f"{index}. **{name}** [{entity_type}] (Score: {score:.2f})\n" + if observations: + preview = "; ".join(observations[:3]) + output += f" - Observations: {preview}\n" + if out_relations: + rel_preview = ", ".join( + f"{rel.get('relation_type')} -> {rel.get('target')}" for rel in out_relations[:5] + ) + output += f" - Outgoing: {rel_preview}\n" + if in_relations: + rel_preview = ", ".join( + f"{rel.get('source')} -> {rel.get('relation_type')}" for rel in in_relations[:5] + ) + output += f" - Incoming: {rel_preview}\n" + output += "\n" + + return output.strip() + + +def _format_memory_graph_output(graph_snapshot: Dict[str, Any]) -> str: + """Format a memory graph snapshot for MCP responses.""" + entities = graph_snapshot.get("entities", []) or [] + entity_count = graph_snapshot.get("entity_count", len(entities)) + relation_count = graph_snapshot.get("relation_count", 0) + + output = "## Memory Graph\n\n" + output += f"Entities: {entity_count}\n" + output += f"Relations: {relation_count}\n\n" + + for entity in entities: + output += f"- **{entity.get('name', 'Unknown')}** [{entity.get('entity_type', 'concept')}]\n" + observations = entity.get("observations", []) or [] + if observations: + output += f" Observations: {'; '.join(observations[:3])}\n" + relations = entity.get("outgoing_relations", []) or [] + if relations: + relation_text = ", ".join( + f"{rel.get('relation_type')} -> {rel.get('target')}" for rel in relations + ) + output += f" Relations: {relation_text}\n" + output += "\n" + + return output.strip() + + +def _format_memory_write_result(action: str, result: Dict[str, Any]) -> str: + """Format mutation results for memory MCP tools.""" + output = f"✅ {action}\n\n" + + if "count" in result: + output += f"Affected: {result['count']}\n" + + if result.get("entity_names"): + output += "Entities:\n" + for name in result["entity_names"]: + output += f"- {name}\n" + + if result.get("deleted_names"): + output += "Deleted entities:\n" + for name in result["deleted_names"]: + output += f"- {name}\n" + + if result.get("relations"): + output += "Relations:\n" + for relation in result["relations"]: + source = relation.get("from") or relation.get("source") + target = relation.get("to") or relation.get("target") + relation_type = relation.get("relation_type") or relation.get("relationType") + output += f"- {source} -[{relation_type}]-> {target}\n" + + if result.get("entities"): + output += "Entities updated:\n" + for entity in result["entities"]: + name = entity.get("name", "Unknown") + if "added_count" in entity: + output += f"- {name}: added {entity['added_count']} observation(s)\n" + elif "remaining_count" in entity: + output += f"- {name}: {entity['remaining_count']} observation(s) remain\n" + else: + output += f"- {name}\n" + + missing = result.get("missing") or result.get("missing_names") or [] + if missing: + output += "Missing:\n" + for item in missing: + if isinstance(item, dict): + output += ( + f"- {item.get('from')} -[{item.get('relation_type')}]-> {item.get('to')}\n" + ) + else: + output += f"- {item}\n" + + if "remaining_without_embeddings" in result: + output += f"Remaining without embeddings: {result['remaining_without_embeddings']}\n" + + return output.strip() + + @mcp.tool() @rate_limit @log_tool_call @@ -500,6 +611,202 @@ def get_file_dependencies(file_path: str) -> str: return f"❌ Failed to get dependencies: {str(e)}" +@mcp.tool() +@rate_limit +@log_tool_call +def create_memory_entities(entities: List[Dict[str, Any]]) -> str: + """Create or update agent-authored memory entities.""" + current_graph = get_graph() + if not current_graph: + return "❌ Graph not initialized. Check Neo4j connection." + + try: + result = current_graph.create_memory_entities(entities) + return validate_tool_output(_format_memory_write_result("Memory entities stored.", result)) + except ValueError as e: + return f"❌ Invalid memory entity payload: {str(e)}" + except (neo4j.exceptions.DatabaseError, neo4j.exceptions.ClientError) as e: + logger.error(f"Create memory entities error: {e}") + return f"❌ Failed to create memory entities: {str(e)}" + except Exception as e: + logger.error(f"Unexpected create memory entities error: {e}") + return f"❌ Failed to create memory entities: {str(e)}" + + +@mcp.tool() +@rate_limit +@log_tool_call +def create_memory_relations(relations: List[Dict[str, Any]]) -> str: + """Create typed relations between memory entities.""" + current_graph = get_graph() + if not current_graph: + return "❌ Graph not initialized. Check Neo4j connection." + + try: + result = current_graph.create_memory_relations(relations) + return validate_tool_output(_format_memory_write_result("Memory relations stored.", result)) + except ValueError as e: + return f"❌ Invalid memory relation payload: {str(e)}" + except (neo4j.exceptions.DatabaseError, neo4j.exceptions.ClientError) as e: + logger.error(f"Create memory relations error: {e}") + return f"❌ Failed to create memory relations: {str(e)}" + except Exception as e: + logger.error(f"Unexpected create memory relations error: {e}") + return f"❌ Failed to create memory relations: {str(e)}" + + +@mcp.tool() +@rate_limit +@log_tool_call +def add_memory_observations(observations: List[Dict[str, Any]]) -> str: + """Append observations to existing memory entities.""" + current_graph = get_graph() + if not current_graph: + return "❌ Graph not initialized. Check Neo4j connection." + + try: + result = current_graph.add_memory_observations(observations) + return validate_tool_output(_format_memory_write_result("Memory observations added.", result)) + except ValueError as e: + return f"❌ Invalid memory observation payload: {str(e)}" + except (neo4j.exceptions.DatabaseError, neo4j.exceptions.ClientError) as e: + logger.error(f"Add memory observations error: {e}") + return f"❌ Failed to add memory observations: {str(e)}" + except Exception as e: + logger.error(f"Unexpected add memory observations error: {e}") + return f"❌ Failed to add memory observations: {str(e)}" + + +@mcp.tool() +@rate_limit +@log_tool_call +def delete_memory_entities(entity_names: List[str]) -> str: + """Delete memory entities by name.""" + current_graph = get_graph() + if not current_graph: + return "❌ Graph not initialized. Check Neo4j connection." + + try: + result = current_graph.delete_memory_entities(entity_names) + return validate_tool_output(_format_memory_write_result("Memory entities deleted.", result)) + except ValueError as e: + return f"❌ Invalid memory delete payload: {str(e)}" + except (neo4j.exceptions.DatabaseError, neo4j.exceptions.ClientError) as e: + logger.error(f"Delete memory entities error: {e}") + return f"❌ Failed to delete memory entities: {str(e)}" + except Exception as e: + logger.error(f"Unexpected delete memory entities error: {e}") + return f"❌ Failed to delete memory entities: {str(e)}" + + +@mcp.tool() +@rate_limit +@log_tool_call +def delete_memory_relations(relations: List[Dict[str, Any]]) -> str: + """Delete typed relations between memory entities.""" + current_graph = get_graph() + if not current_graph: + return "❌ Graph not initialized. Check Neo4j connection." + + try: + result = current_graph.delete_memory_relations(relations) + return validate_tool_output(_format_memory_write_result("Memory relations deleted.", result)) + except ValueError as e: + return f"❌ Invalid memory relation delete payload: {str(e)}" + except (neo4j.exceptions.DatabaseError, neo4j.exceptions.ClientError) as e: + logger.error(f"Delete memory relations error: {e}") + return f"❌ Failed to delete memory relations: {str(e)}" + except Exception as e: + logger.error(f"Unexpected delete memory relations error: {e}") + return f"❌ Failed to delete memory relations: {str(e)}" + + +@mcp.tool() +@rate_limit +@log_tool_call +def delete_memory_observations(observations: List[Dict[str, Any]]) -> str: + """Delete observations from memory entities.""" + current_graph = get_graph() + if not current_graph: + return "❌ Graph not initialized. Check Neo4j connection." + + try: + result = current_graph.delete_memory_observations(observations) + return validate_tool_output(_format_memory_write_result("Memory observations deleted.", result)) + except ValueError as e: + return f"❌ Invalid memory observation delete payload: {str(e)}" + except (neo4j.exceptions.DatabaseError, neo4j.exceptions.ClientError) as e: + logger.error(f"Delete memory observations error: {e}") + return f"❌ Failed to delete memory observations: {str(e)}" + except Exception as e: + logger.error(f"Unexpected delete memory observations error: {e}") + return f"❌ Failed to delete memory observations: {str(e)}" + + +@mcp.tool() +@rate_limit +@log_tool_call +def search_memory_nodes(query: str, limit: int = 5) -> str: + """Search agent-authored memory entities.""" + current_graph = get_graph() + if not current_graph: + return "❌ Graph not initialized. Check Neo4j connection." + + try: + results = current_graph.search_memory_nodes(query, limit=limit) + if not results: + return "No relevant memory nodes found." + return validate_tool_output(_format_memory_entity_results(results)) + except (neo4j.exceptions.DatabaseError, neo4j.exceptions.ClientError) as e: + logger.error(f"Search memory nodes error: {e}") + return f"❌ Failed to search memory nodes: {str(e)}" + except Exception as e: + logger.error(f"Unexpected search memory nodes error: {e}") + return f"❌ Failed to search memory nodes: {str(e)}" + + +@mcp.tool() +@rate_limit +@log_tool_call +def read_memory_graph() -> str: + """Return a summary of the current memory graph.""" + current_graph = get_graph() + if not current_graph: + return "❌ Graph not initialized. Check Neo4j connection." + + try: + graph_snapshot = current_graph.read_memory_graph() + return validate_tool_output(_format_memory_graph_output(graph_snapshot)) + except (neo4j.exceptions.DatabaseError, neo4j.exceptions.ClientError) as e: + logger.error(f"Read memory graph error: {e}") + return f"❌ Failed to read memory graph: {str(e)}" + except Exception as e: + logger.error(f"Unexpected read memory graph error: {e}") + return f"❌ Failed to read memory graph: {str(e)}" + + +@mcp.tool() +@rate_limit +@log_tool_call +def backfill_memory_embeddings(limit: int = 100, only_missing: bool = True) -> str: + """Backfill vector embeddings for existing Memory nodes.""" + current_graph = get_graph() + if not current_graph: + return "❌ Graph not initialized. Check Neo4j connection." + + try: + result = current_graph.backfill_memory_embeddings(limit=limit, only_missing=only_missing) + return validate_tool_output(_format_memory_write_result("Memory embeddings backfilled.", result)) + except ValueError as e: + return f"❌ Invalid memory embedding backfill request: {str(e)}" + except (neo4j.exceptions.DatabaseError, neo4j.exceptions.ClientError) as e: + logger.error(f"Backfill memory embeddings error: {e}") + return f"❌ Failed to backfill memory embeddings: {str(e)}" + except Exception as e: + logger.error(f"Unexpected backfill memory embeddings error: {e}") + return f"❌ Failed to backfill memory embeddings: {str(e)}" + + @mcp.tool() @rate_limit @log_tool_call diff --git a/src/codememory/server/tools.py b/src/codememory/server/tools.py index 4d26b04..d6ac345 100644 --- a/src/codememory/server/tools.py +++ b/src/codememory/server/tools.py @@ -15,18 +15,37 @@ def __init__(self, graph: KnowledgeGraphBuilder): def semantic_search(self, query: str, limit: int = 5) -> str: """ - Performs hybrid search and formats the result as a readable string for the Agent. + Graph-enriched hybrid search formatted for the Agent. + + Each result includes the vector-matched code plus graph context: + file location, call relationships, sibling functions, and file imports. """ try: results = self.graph.semantic_search(query, limit) if not results: return "No relevant code found in the graph." - # Format for LLM consumption (Markdown) - report = f"### Found {len(results)} relevant code snippets for '{query}':\n\n" + report = f"### Found {len(results)} relevant code results for '{query}':\n\n" for r in results: - report += f"#### 📄 {r['name']} (Score: {r['score']:.2f})\n" - report += f"**Signature:** `{r['sig']}`\n" + score_display = r.get('final_score', r.get('score', 0.0)) + report += f"#### {r['name']} (Score: {score_display:.2f})\n" + if r.get('sig'): + report += f"**Signature:** `{r['sig']}`\n" + if r.get('file_path'): + report += f"**File:** `{r['file_path']}`\n" + if r.get('calls_out'): + report += f"**Calls:** {', '.join(r['calls_out'])}\n" + if r.get('called_by'): + report += f"**Called by:** {', '.join(r['called_by'])}\n" + if r.get('methods'): + report += f"**Methods:** {', '.join(r['methods'])}\n" + if r.get('siblings'): + report += f"**Siblings in file:** {', '.join(r['siblings'])}\n" + if r.get('file_imports'): + report += f"**File imports:** {', '.join(r['file_imports'])}\n" + if r.get('text'): + report += f"```\n{r['text']}\n```\n" + report += "\n" return report except (neo4j.exceptions.DatabaseError, neo4j.exceptions.ClientError) as e: logger.error(f"search failed:{e}") @@ -102,3 +121,41 @@ def get_commit_context(self, sha: str, include_diff_stats: bool = True) -> str: return report except (neo4j.exceptions.DatabaseError, neo4j.exceptions.ClientError) as e: return f"Error getting commit context: {str(e)}" + + def create_memory_entities(self, entities: List[Dict[str, Any]]) -> Dict[str, Any]: + """Create or update memory entities.""" + return self.graph.create_memory_entities(entities) + + def create_memory_relations(self, relations: List[Dict[str, Any]]) -> Dict[str, Any]: + """Create typed memory relations.""" + return self.graph.create_memory_relations(relations) + + def add_memory_observations(self, observations: List[Dict[str, Any]]) -> Dict[str, Any]: + """Append observations to memory entities.""" + return self.graph.add_memory_observations(observations) + + def delete_memory_entities(self, names: List[str]) -> Dict[str, Any]: + """Delete memory entities.""" + return self.graph.delete_memory_entities(names) + + def delete_memory_relations(self, relations: List[Dict[str, Any]]) -> Dict[str, Any]: + """Delete typed memory relations.""" + return self.graph.delete_memory_relations(relations) + + def delete_memory_observations(self, observations: List[Dict[str, Any]]) -> Dict[str, Any]: + """Delete observations from memory entities.""" + return self.graph.delete_memory_observations(observations) + + def search_memory_nodes(self, query: str, limit: int = 5) -> List[Dict[str, Any]]: + """Search memory entities.""" + return self.graph.search_memory_nodes(query, limit=limit) + + def read_memory_graph(self) -> Dict[str, Any]: + """Read the current memory graph snapshot.""" + return self.graph.read_memory_graph() + + def backfill_memory_embeddings( + self, limit: int = 100, only_missing: bool = True + ) -> Dict[str, Any]: + """Backfill embeddings for existing memory entities.""" + return self.graph.backfill_memory_embeddings(limit=limit, only_missing=only_missing) diff --git a/tests/test_cli.py b/tests/test_cli.py index 2e4ef6f..c10ef3f 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -71,6 +71,7 @@ def test_status_json_success_envelope(monkeypatch, capsys, tmp_path): mock_cfg = _mock_config(exists=True) mock_builder = Mock() + mock_builder.repo_id = str(repo_root.resolve()) session = Mock() session_context = Mock() session_context.__enter__ = Mock(return_value=session) @@ -102,6 +103,10 @@ def test_status_json_success_envelope(monkeypatch, capsys, tmp_path): "chunks": 11, "last_sync": "2026-02-01T00:00:00Z", } + queries = [call.args[0] for call in session.run.call_args_list] + kwargs = [call.kwargs for call in session.run.call_args_list] + assert all("repo_id = $repo_id" in query for query in queries) + assert all(item.get("repo_id") == str(repo_root.resolve()) for item in kwargs) def test_status_json_missing_config_exits_nonzero(monkeypatch, capsys, tmp_path): @@ -297,6 +302,7 @@ def test_search_loads_openai_key_from_repo_dotenv(monkeypatch, tmp_path): user="neo4j", password="password", openai_key="from-search-dotenv", + repo_root=repo_root, ) mock_builder.semantic_search.assert_called_once_with("auth", limit=5) @@ -350,6 +356,13 @@ def test_deps_json_success_uses_graph_method(monkeypatch, capsys, tmp_path): "imports_count": 2, "imported_by_count": 1, } + cli.KnowledgeGraphBuilder.assert_called_once_with( + uri="bolt://localhost:7687", + user="neo4j", + password="password", + openai_key="test-openai-key", + repo_root=repo_root, + ) mock_builder.get_file_dependencies.assert_called_once_with("src/main.py") @@ -377,6 +390,13 @@ def test_impact_json_success_uses_graph_method(monkeypatch, capsys, tmp_path): assert payload["data"]["path"] == "src/main.py" assert payload["data"]["affected_files"][0]["path"] == "src/caller.py" assert payload["metrics"] == {"total_count": 1, "max_depth": 3} + cli.KnowledgeGraphBuilder.assert_called_once_with( + uri="bolt://localhost:7687", + user="neo4j", + password="password", + openai_key="test-openai-key", + repo_root=repo_root, + ) mock_builder.identify_impact.assert_called_once_with("src/main.py", max_depth=3) diff --git a/tests/test_graph.py b/tests/test_graph.py index 1a3320f..0a245f2 100644 --- a/tests/test_graph.py +++ b/tests/test_graph.py @@ -1,8 +1,10 @@ """Tests for the KnowledgeGraphBuilder module.""" -import pytest +from pathlib import Path from unittest.mock import Mock, patch +import pytest + # Skip if neo4j is not available pytestmark = [ pytest.mark.unit, @@ -40,6 +42,28 @@ def builder(self, mock_driver): builder.driver = driver return builder + @pytest.fixture + def repo_builder(self, mock_driver, tmp_path): + """Create a repo-scoped KnowledgeGraphBuilder with mocked dependencies.""" + from codememory.ingestion.graph import KnowledgeGraphBuilder + + driver, session = mock_driver + repo_root = tmp_path / "repo" + repo_root.mkdir() + with patch('neo4j.GraphDatabase.driver', return_value=driver), \ + patch.object(KnowledgeGraphBuilder, '_init_parsers'), \ + patch('codememory.ingestion.graph.OpenAI'): + + builder = KnowledgeGraphBuilder( + uri="bolt://localhost:7687", + user="neo4j", + password="test", + openai_key="sk-test", + repo_root=repo_root, + ) + builder.driver = driver + return builder + def test_initialization(self, builder): """Test that builder initializes correctly.""" assert builder.EMBEDDING_MODEL == "text-embedding-3-large" @@ -136,6 +160,228 @@ def test_split_markdown_document_by_heading(self, builder): ("Section B", "## Section B\nBeta"), ] + def test_setup_memory_schema(self, builder, mock_driver): + """Test memory schema setup issues expected DDL queries.""" + driver, session = mock_driver + + builder.setup_memory_schema() + + queries = [call.args[0] for call in session.run.call_args_list] + assert any("memory_name_unique" in query for query in queries) + assert any("memory_search" in query for query in queries) + + def test_create_memory_entities_runs_merge(self, builder, mock_driver): + """Test memory entity creation runs a MERGE query per entity.""" + driver, session = mock_driver + + with patch.object(builder, "get_embedding", return_value=[0.1] * builder.VECTOR_DIMENSIONS): + builder.create_memory_entities( + [{"name": "auth-flow", "entityType": "concept", "observations": ["Used in login"]}] + ) + + queries = [call.args[0] for call in session.run.call_args_list] + assert any("MERGE (m:Memory {name: $name})" in query for query in queries) + assert any("SET m:`concept`" in query for query in queries) + + def test_search_memory_nodes_formats_rows(self, builder, mock_driver): + """Test memory search returns row dicts from Neo4j results.""" + driver, session = mock_driver + session.run.side_effect = [ + Mock(), + Mock(), + Mock(), + [ + { + "name": "auth-flow", + "entity_type": "concept", + "observations": ["Uses refresh token"], + "metadata_json": "{}", + "score": 0.9, + "sources": ["vector", "fulltext"], + "outgoing_relations": [{"target": "login-page", "relation_type": "IMPLEMENTS"}], + } + ], + ] + + with patch.object(builder, "get_embedding", return_value=[0.1] * builder.VECTOR_DIMENSIONS): + results = builder.search_memory_nodes("auth", limit=5) + + assert len(results) == 1 + assert results[0]["name"] == "auth-flow" + + def test_read_memory_graph_returns_counts(self, builder, mock_driver): + """Test memory graph snapshot combines node list and relation count.""" + driver, session = mock_driver + session.run.side_effect = [ + Mock(), + Mock(), + Mock(), + [ + { + "name": "auth-flow", + "entity_type": "concept", + "observations": ["Uses refresh token"], + "outgoing_relations": [], + } + ], + Mock(single=Mock(return_value={"count": 0})), + ] + + snapshot = builder.read_memory_graph() + + assert snapshot["entity_count"] == 1 + assert snapshot["relation_count"] == 0 + assert snapshot["entities"][0]["name"] == "auth-flow" + + def test_backfill_memory_embeddings_updates_missing_nodes(self, builder, mock_driver): + """Test memory embedding backfill writes embeddings and reports remainder.""" + driver, session = mock_driver + session.run.side_effect = [ + Mock(), + Mock(), + Mock(), + [ + { + "name": "auth-flow", + "entity_type": "project", + "observations": ["Used in login"], + } + ], + Mock(), + Mock(single=Mock(return_value={"remaining": 0})), + ] + + with patch.object(builder, "get_embedding", return_value=[0.1] * builder.VECTOR_DIMENSIONS): + result = builder.backfill_memory_embeddings(limit=10, only_missing=True) + + assert result["count"] == 1 + assert result["remaining_without_embeddings"] == 0 + + def test_pass_2_entity_definition_fetches_files_for_active_repo(self, repo_builder, mock_driver): + """Pass 2 should only seed files from the active repo when repo_id is set.""" + driver, session = mock_driver + session.run.return_value = [] + + repo_builder.pass_2_entity_definition(repo_builder.repo_root) + + first_query = session.run.call_args_list[0] + assert "WHERE f.repo_id = $repo_id" in first_query.args[0] + assert first_query.kwargs["repo_id"] == repo_builder.repo_id + + def test_get_file_dependencies_scopes_by_repo(self, repo_builder, mock_driver): + """Dependency lookup should match the source File by (repo_id, path).""" + driver, session = mock_driver + session.run.return_value.single.return_value = {"imports": ["a.py"], "imported_by": ["b.py"]} + + result = repo_builder.get_file_dependencies("src/main.py") + + assert result["imports"] == ["a.py"] + query = session.run.call_args.args[0] + kwargs = session.run.call_args.kwargs + assert "repo_id: $repo_id" in query + assert kwargs["repo_id"] == repo_builder.repo_id + assert kwargs["path"] == "src/main.py" + + def test_identify_impact_scopes_by_repo(self, repo_builder, mock_driver): + """Impact analysis should anchor traversal to the active repo only.""" + driver, session = mock_driver + session.run.return_value = [ + {"path": "src/caller.py", "depth": 1, "impact_type": "dependents"} + ] + + result = repo_builder.identify_impact("src/main.py", max_depth=2) + + assert result["total_count"] == 1 + query = session.run.call_args.args[0] + kwargs = session.run.call_args.kwargs + assert "repo_id: $repo_id" in query + assert kwargs["repo_id"] == repo_builder.repo_id + + def test_read_memory_graph_scopes_to_active_repo(self, repo_builder, mock_driver): + """Memory graph reads should filter entities and relation counts by repo_id.""" + driver, session = mock_driver + session.run.side_effect = [ + Mock(), + Mock(), + Mock(), + Mock(), + [ + { + "name": "auth-flow", + "entity_type": "concept", + "observations": ["Uses refresh token"], + "outgoing_relations": [], + } + ], + Mock(single=Mock(return_value={"count": 0})), + ] + + snapshot = repo_builder.read_memory_graph() + + assert snapshot["entity_count"] == 1 + query_args = session.run.call_args_list + assert "WHERE m.repo_id = $repo_id" in query_args[4].args[0] + assert query_args[4].kwargs["repo_id"] == repo_builder.repo_id + assert "source.repo_id = $repo_id AND target.repo_id = $repo_id" in query_args[5].args[0] + + def test_backfill_memory_embeddings_scopes_to_active_repo(self, repo_builder, mock_driver): + """Memory backfill should only select and count nodes for the active repo.""" + driver, session = mock_driver + session.run.side_effect = [ + Mock(), + Mock(), + Mock(), + Mock(), + [ + { + "name": "auth-flow", + "entity_type": "project", + "observations": ["Used in login"], + } + ], + Mock(), + Mock(single=Mock(return_value={"remaining": 0})), + ] + + with patch.object(repo_builder, "get_embedding", return_value=[0.1] * repo_builder.VECTOR_DIMENSIONS): + result = repo_builder.backfill_memory_embeddings(limit=10, only_missing=True) + + assert result["count"] == 1 + query_args = session.run.call_args_list + assert "m.embedding IS NULL AND m.repo_id = $repo_id" in query_args[4].args[0] + assert query_args[4].kwargs["repo_id"] == repo_builder.repo_id + assert "{name: $name, repo_id: $repo_id}" in query_args[5].args[0] + assert "m.embedding IS NULL AND m.repo_id = $repo_id" in query_args[6].args[0] + + def test_search_memory_nodes_falls_back_when_query_vector_invalid(self, builder, mock_driver): + """Memory search should fall back to fulltext when embedding generation fails.""" + driver, session = mock_driver + session.run.side_effect = [ + Mock(), + Mock(), + Mock(), + [ + { + "name": "auth-flow", + "entity_type": "concept", + "observations": ["Uses refresh token"], + "metadata_json": "{}", + "score": 0.5, + "sources": ["fulltext"], + "outgoing_relations": [], + "incoming_relations": [], + } + ], + ] + + with patch.object(builder, "get_embedding", return_value=[0.0] * builder.VECTOR_DIMENSIONS): + results = builder.search_memory_nodes("auth", limit=5) + + assert len(results) == 1 + query = session.run.call_args_list[3].args[0] + assert "db.index.fulltext.queryNodes('memory_search'" in query + assert "db.index.vector.queryNodes('memory_embeddings'" not in query + class TestCypherQueries: """Test Cypher query generation and execution.""" diff --git a/tests/test_server.py b/tests/test_server.py index 64748b9..619e641 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -21,13 +21,19 @@ def toolkit(self, mock_graph): return Toolkit(graph=mock_graph) def test_semantic_search(self, toolkit, mock_graph): - """Test semantic search returns formatted results.""" + """Test semantic search returns graph-enriched formatted results.""" mock_results = [ { "text": "def test(): pass", "score": 0.95, "name": "test", "sig": "test.py:test", + "file_path": "test.py", + "calls_out": ["helper"], + "called_by": ["main"], + "methods": [], + "file_imports": ["utils.py"], + "siblings": ["setup", "teardown"], } ] mock_graph.semantic_search.return_value = mock_results @@ -36,6 +42,10 @@ def test_semantic_search(self, toolkit, mock_graph): assert "test" in result assert "0.95" in result + assert "test.py" in result + assert "helper" in result + assert "main" in result + assert "utils.py" in result mock_graph.semantic_search.assert_called_once_with("test function", 5) def test_semantic_search_empty_results(self, toolkit, mock_graph): @@ -168,7 +178,11 @@ def test_search_codebase_success(self): """Test successful search.""" mock_graph = Mock() mock_graph.semantic_search.return_value = [ - {"name": "fn", "score": 0.9, "text": "def fn(): pass", "sig": "a.py:fn"} + { + "name": "fn", "score": 0.9, "text": "def fn(): pass", "sig": "a.py:fn", + "file_path": "a.py", "calls_out": [], "called_by": [], + "methods": [], "file_imports": [], "siblings": [], + } ] with patch("codememory.server.app.graph", mock_graph): @@ -350,3 +364,154 @@ def test_get_commit_context_invalid_sha(self): assert "invalid commit sha" in result.lower() mock_graph.get_commit_context.assert_not_called() + + +class TestMemoryMCPTools: + """Test memory-specific MCP tools.""" + + def test_create_memory_entities_success(self): + mock_graph = Mock() + mock_graph.create_memory_entities.return_value = { + "count": 1, + "entity_names": ["auth-flow"], + } + + with patch("codememory.server.app.graph", mock_graph): + from codememory.server.app import create_memory_entities + + result = create_memory_entities( + [{"name": "auth-flow", "entityType": "concept", "observations": ["Used in login"]}] + ) + + assert "memory entities stored" in result.lower() + assert "auth-flow" in result + mock_graph.create_memory_entities.assert_called_once() + + def test_create_memory_entities_validation_error(self): + mock_graph = Mock() + mock_graph.create_memory_entities.side_effect = ValueError( + "Each entity requires a non-empty 'name'." + ) + + with patch("codememory.server.app.graph", mock_graph): + from codememory.server.app import create_memory_entities + + result = create_memory_entities([{}]) + + assert "invalid memory entity payload" in result.lower() + + def test_create_memory_relations_success(self): + mock_graph = Mock() + mock_graph.create_memory_relations.return_value = { + "count": 1, + "relations": [ + {"source": "auth-flow", "target": "login-page", "relation_type": "IMPLEMENTS"} + ], + } + + with patch("codememory.server.app.graph", mock_graph): + from codememory.server.app import create_memory_relations + + result = create_memory_relations( + [{"from": "auth-flow", "to": "login-page", "relationType": "IMPLEMENTS"}] + ) + + assert "memory relations stored" in result.lower() + assert "IMPLEMENTS" in result + + def test_add_memory_observations_success(self): + mock_graph = Mock() + mock_graph.add_memory_observations.return_value = { + "count": 1, + "entities": [{"name": "auth-flow", "added_count": 2}], + } + + with patch("codememory.server.app.graph", mock_graph): + from codememory.server.app import add_memory_observations + + result = add_memory_observations( + [{"entityName": "auth-flow", "contents": ["Uses refresh token", "Owned by api team"]}] + ) + + assert "memory observations added" in result.lower() + assert "added 2 observation" in result.lower() + + def test_search_memory_nodes_success(self): + mock_graph = Mock() + mock_graph.search_memory_nodes.return_value = [ + { + "name": "auth-flow", + "entity_type": "concept", + "score": 0.91, + "observations": ["Uses refresh token"], + "outgoing_relations": [{"target": "login-page", "relation_type": "IMPLEMENTS"}], + } + ] + + with patch("codememory.server.app.graph", mock_graph): + from codememory.server.app import search_memory_nodes + + result = search_memory_nodes("auth") + + assert "relevant memory node" in result.lower() + assert "auth-flow" in result + assert "IMPLEMENTS" in result + + def test_read_memory_graph_success(self): + mock_graph = Mock() + mock_graph.read_memory_graph.return_value = { + "entity_count": 1, + "relation_count": 1, + "entities": [ + { + "name": "auth-flow", + "entity_type": "concept", + "observations": ["Uses refresh token"], + "outgoing_relations": [{"target": "login-page", "relation_type": "IMPLEMENTS"}], + } + ], + } + + with patch("codememory.server.app.graph", mock_graph): + from codememory.server.app import read_memory_graph + + result = read_memory_graph() + + assert "memory graph" in result.lower() + assert "entities: 1" in result.lower() + assert "relations: 1" in result.lower() + + def test_delete_memory_entities_success(self): + mock_graph = Mock() + mock_graph.delete_memory_entities.return_value = { + "count": 1, + "deleted_names": ["auth-flow"], + "missing_names": ["missing-node"], + } + + with patch("codememory.server.app.graph", mock_graph): + from codememory.server.app import delete_memory_entities + + result = delete_memory_entities(["auth-flow", "missing-node"]) + + assert "memory entities deleted" in result.lower() + assert "missing-node" in result + + def test_backfill_memory_embeddings_success(self): + mock_graph = Mock() + mock_graph.backfill_memory_embeddings.return_value = { + "count": 2, + "entity_names": ["auth-flow", "realtime_api"], + "remaining_without_embeddings": 0, + } + + with patch("codememory.server.app.graph", mock_graph): + from codememory.server.app import backfill_memory_embeddings + + result = backfill_memory_embeddings(limit=25, only_missing=True) + + assert "memory embeddings backfilled" in result.lower() + assert "remaining without embeddings: 0" in result.lower() + mock_graph.backfill_memory_embeddings.assert_called_once_with( + limit=25, only_missing=True + )