diff --git a/docs/todos/2026-06-19-issue-261-parallelize-kv/plan.md b/docs/todos/2026-06-19-issue-261-parallelize-kv/plan.md new file mode 100644 index 00000000..d3f10d2f --- /dev/null +++ b/docs/todos/2026-06-19-issue-261-parallelize-kv/plan.md @@ -0,0 +1,251 @@ +# Issue 261 Parallelize KV Operations Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Reduce timeout risk in semantic consolidation and reflection by batching independent KV operations while preserving public behavior. + +**Architecture:** Keep provider calls, schemas, MCP/REST surfaces, and result counters unchanged. Parallelize only independent KV reads and writes, using bounded local batches for write-heavy paths and ordered in-memory replay where duplicate insight fingerprints can affect behavior. + +**Tech Stack:** TypeScript ESM, iii-sdk function registration, StateKV, Vitest. + +--- + +## Sprint Contract + +Goal: Fix issue #261 for `memory_consolidate` semantic and `memory_reflect` timeout-prone sequential KV operations. + +Scope: +- Modify `src/functions/consolidation-pipeline.ts`. +- Modify `src/functions/reflect.ts`. +- Add focused tests in `test/consolidation-pipeline.test.ts`. +- Add focused tests in `test/reflect.test.ts`. + +Non-goals: +- Do not change persisted schemas, MCP tools, REST endpoints, provider prompts, auth, remotes, dependencies, or package metadata. +- Do not parallelize LLM/provider calls. +- Do not work on any issue other than #261. + +Acceptance criteria: +- Semantic consolidation starts independent summary/semantic reads concurrently. +- Semantic consolidation persists multiple parsed semantic facts without per-fact serial awaits. +- Decay persists eligible semantic/procedural rows without per-row serial awaits. +- Reflection batches independent insight lookups and final writes per provider response while preserving duplicate-fingerprint behavior. +- Existing tests for consolidation and reflection still pass. + +Intended verification: +- Red: targeted new tests fail on current implementation. +- Green: targeted new and existing tests pass after implementation. +- Broader: lint, build, full non-integration test suite where available. + +Known boundaries: +- Branch is `issue/261-parallelize-kv-operations`. +- Remote writes, fetch/pull, push, PR creation, and issue closure require separate explicit current-turn approval. + +## Feature / Verification Matrix + +| Change | Verification method | Status | Evidence | +| --- | --- | --- | --- | +| Parallel semantic initial reads | New vitest in `test/consolidation-pipeline.test.ts` tracks overlapping `kv.list` calls | Pending | Test not added | +| Parallel semantic fact writes | New vitest in `test/consolidation-pipeline.test.ts` tracks overlapping `kv.set` calls and stored facts | Pending | Test not added | +| Parallel decay writes | New vitest in `test/consolidation-pipeline.test.ts` tracks overlapping decay `kv.set` calls and result counts | Pending | Test not added | +| Parallel reflect get/set batches | New vitest in `test/reflect.test.ts` tracks overlapping `kv.get`/`kv.set` calls | Pending | Test not added | +| Preserve duplicate reflect semantics | New vitest in `test/reflect.test.ts` with duplicate insight content | Pending | Test not added | + +## Files + +- Modify: `test/consolidation-pipeline.test.ts` +- Modify: `test/reflect.test.ts` +- Modify: `src/functions/consolidation-pipeline.ts` +- Modify: `src/functions/reflect.ts` +- Update: `docs/todos/2026-06-19-issue-261-parallelize-kv/todo.md` + +## Task 1: Add failing consolidation pipeline concurrency tests + +**Files:** +- Modify: `test/consolidation-pipeline.test.ts` + +- [ ] **Step 1: Add a local async gate helper near existing test helpers** + +```ts +function deferred() { + let resolve!: (value: T) => void; + const promise = new Promise((res) => { + resolve = res; + }); + return { promise, resolve }; +} +``` + +- [ ] **Step 2: Add semantic initial-read concurrency test** + +Add a test that wraps `kv.list` for `mem:summaries` and `mem:semantic`, tracks current/max in-flight calls, seeds six summaries, runs semantic consolidation, and asserts `maxInFlightLists > 1`. + +- [ ] **Step 3: Add semantic write concurrency test** + +Add a test where the provider returns three distinct `` entries, wraps `kv.set` for `mem:semantic`, tracks current/max in-flight semantic writes, runs semantic consolidation, and asserts `maxInFlightSets > 1`, `newFacts === 3`, and three facts are stored. + +- [ ] **Step 4: Add decay write concurrency test** + +Seed multiple semantic and procedural rows old enough for decay. Wrap `kv.set` for `mem:semantic` and `mem:procedural`, run `tier: "decay"`, assert `maxInFlightSets > 1`, and assert result counts remain correct. + +- [ ] **Step 5: Run red consolidation tests** + +Run: + +```bash +corepack pnpm exec vitest run --exclude test/integration.test.ts test/consolidation-pipeline.test.ts +``` + +Expected: the new concurrency assertions fail on current code with max in-flight counts of `1`. + +## Task 2: Add failing reflect concurrency and duplicate tests + +**Files:** +- Modify: `test/reflect.test.ts` + +- [ ] **Step 1: Add reflect insight get/set concurrency test** + +Use one graph cluster with enough supporting semantic facts. Have the provider return at least three distinct insights. Wrap `kv.get` and `kv.set` for `mem:insights`, track max in-flight gets and sets, run `mem::reflect`, and assert both maxima are greater than `1` while `newInsights` matches the provider response. + +- [ ] **Step 2: Add duplicate insight regression test** + +Have the provider return the same insight content twice under the same cluster. Run `mem::reflect` and assert `newInsights === 1`, `reinforced === 1`, one insight is stored, and its `reinforcements === 1`. + +- [ ] **Step 3: Run red reflect tests** + +Run: + +```bash +corepack pnpm exec vitest run --exclude test/integration.test.ts test/reflect.test.ts +``` + +Expected: the concurrency test fails on current code with max in-flight counts of `1`; the duplicate regression may pass on current code and must remain passing after implementation. + +## Task 3: Implement bounded KV batching in consolidation pipeline + +**Files:** +- Modify: `src/functions/consolidation-pipeline.ts` + +- [ ] **Step 1: Add a small local batch helper** + +Use a local constant and helper in `consolidation-pipeline.ts`: + +```ts +const KV_BATCH_SIZE = 16; + +async function runKvBatches( + items: T[], + run: (item: T) => Promise, +): Promise { + for (let i = 0; i < items.length; i += KV_BATCH_SIZE) { + await Promise.all(items.slice(i, i + KV_BATCH_SIZE).map(run)); + } +} +``` + +- [ ] **Step 2: Parallelize semantic initial reads** + +Replace sequential summary/semantic reads with: + +```ts +const [summaries, existingSemantic] = await Promise.all([ + kv.list(KV.summaries), + kv.list(KV.semantic), +]); +``` + +- [ ] **Step 3: Batch semantic fact writes** + +Inside the semantic fact parse loop, keep parsing/counter logic ordered, collect touched semantic rows in an array or map, and flush with `runKvBatches`. Preserve existing new-fact behavior. + +- [ ] **Step 4: Batch decay reads and writes** + +Fetch semantic and procedural lists with `Promise.all`, apply decay exactly as before, then write each scope with `runKvBatches`. + +- [ ] **Step 5: Run consolidation tests** + +Run: + +```bash +corepack pnpm exec vitest run --exclude test/integration.test.ts test/consolidation-pipeline.test.ts +``` + +Expected: all consolidation pipeline tests pass. + +## Task 4: Implement ordered batched insight processing in reflect + +**Files:** +- Modify: `src/functions/reflect.ts` + +- [ ] **Step 1: Add local parsed insight type and batch helper** + +Add a local `ParsedInsightCandidate` interface and `runKvBatches` helper near `ConceptCluster`. + +- [ ] **Step 2: Parse provider response before KV lookups** + +For each provider response, parse up to `maxInsightsPerCluster` and remaining `maxTotal` into candidates with `fp`, `title`, `content`, and `confidence`. Keep the existing empty-content skip behavior. + +- [ ] **Step 3: Batch unique fingerprint lookups** + +Fetch unique candidate fingerprints in bounded parallel batches and store them in a `Map`. + +- [ ] **Step 4: Replay candidates in original order** + +For each candidate, use a pending map first, then the fetched existing map. Reinforce non-deleted existing/pending insights, otherwise create the new insight. Increment counters exactly where the current loop does. + +- [ ] **Step 5: Batch final dirty insight writes** + +Flush dirty insights with `runKvBatches` after candidate replay. Keep provider calls and cluster loop sequential. + +- [ ] **Step 6: Run reflect tests** + +Run: + +```bash +corepack pnpm exec vitest run --exclude test/integration.test.ts test/reflect.test.ts +``` + +Expected: all reflect tests pass. + +## Task 5: Simplification pass and verification + +**Files:** +- Modify: touched files only if simplification is warranted. +- Update: `docs/todos/2026-06-19-issue-261-parallelize-kv/todo.md` + +- [ ] **Step 1: Focused simple-code pass** + +Inspect the active diff for redundant helpers, duplicated logic, unclear names, broad abstractions, or comments that restate code. Preserve APIs, schemas, provider behavior, persistence semantics, and result counters. + +- [ ] **Step 2: Run targeted combined tests** + +Run: + +```bash +corepack pnpm exec vitest run --exclude test/integration.test.ts test/consolidation-pipeline.test.ts test/reflect.test.ts +``` + +Expected: both files pass. + +- [ ] **Step 3: Run broader checks** + +Run: + +```bash +corepack pnpm run lint +corepack pnpm run build +corepack pnpm test +``` + +Expected: all pass, or blockers are recorded with the closest targeted evidence. + +- [ ] **Step 4: Update task record** + +Record final verification evidence, caveats, Sprint Contract status, Feature / Verification Matrix status, and any residual risk in `docs/todos/2026-06-19-issue-261-parallelize-kv/todo.md`. + +## Self-Review + +- Spec coverage: issue validity, semantic consolidation, reflect, decay, tests, task state, and verification are covered. +- Placeholder scan: no unresolved placeholders or broad "handle edge cases" steps remain. +- Type consistency: plan uses existing `SessionSummary`, `SemanticMemory`, `ProceduralMemory`, `Insight`, `KV`, and `StateKV` concepts. +- Boundary check: no dependency, schema, MCP/REST, auth, provider prompt, or remote changes are planned. diff --git a/docs/todos/2026-06-19-issue-261-parallelize-kv/todo.md b/docs/todos/2026-06-19-issue-261-parallelize-kv/todo.md new file mode 100644 index 00000000..4d78c0f9 --- /dev/null +++ b/docs/todos/2026-06-19-issue-261-parallelize-kv/todo.md @@ -0,0 +1,122 @@ +# Issue 261 Parallelize KV Operations + +Scope: GitHub issue #261 in branch `issue/261-parallelize-kv-operations`. + +## Validity Evidence + +- Worktree: `/Users/A1538552/.codex/worktrees/e4ab/agentmemory`. +- Start state: detached `HEAD` at `eacce17e`; switched to new branch `issue/261-parallelize-kv-operations`. +- Remote boundary: use only `origin` (`https://github.com/wbugitlab1/agentmemory.git`); do not target `upstream` (`https://github.com/rohitg00/agentmemory.git`). +- `memory_consolidate` routes to `mem::consolidate-pipeline` through `src/mcp/server.ts`. +- Semantic consolidation reads `KV.summaries` and `KV.semantic` sequentially, then persists each parsed fact with awaited `kv.set` inside the loop in `src/functions/consolidation-pipeline.ts`. +- Decay writes semantic and procedural rows sequentially in `src/functions/consolidation-pipeline.ts`. +- `mem::reflect` parallelizes initial list reads, but each generated insight still performs awaited `kv.get` and `kv.set` inside the parse loop in `src/functions/reflect.ts`. +- Issue is locally valid. GitHub issue body was not read because local source evidence validates the stated failure mode and credentialed GitHub reads require a separate approval boundary if needed. + +## Sprint Contract + +Goal: Reduce avoidable timeout risk in semantic consolidation and reflection by batching independent KV reads and writes while preserving behavior. + +Scope: +- `src/functions/consolidation-pipeline.ts` +- `src/functions/reflect.ts` +- Focused tests in `test/consolidation-pipeline.test.ts` and `test/reflect.test.ts` + +Non-goals: +- No changes to MCP tool counts, REST endpoints, persisted schemas, export/import versions, auth, remotes, dependencies, or provider behavior. +- No work on issues other than #261. +- No push, PR creation, issue closure, or remote state changes without explicit current-turn approval. + +Acceptance criteria: +- Semantic consolidation fetches independent KV collections in parallel and persists parsed semantic fact updates/creates without per-fact serial awaits. +- Reflection preserves per-cluster provider call sequencing and result limits, but batches independent insight existence checks and persistence per response. +- Decay persists semantic and procedural updates in parallel where behavior is unchanged. +- Tests prove the new concurrency behavior with delayed/mock KV operations and continue to prove existing result semantics. + +Intended verification: +- Red tests first for the concurrency expectations. +- Targeted vitest runs for `test/consolidation-pipeline.test.ts` and `test/reflect.test.ts`. +- `corepack pnpm run lint`, `corepack pnpm run build`, and `corepack pnpm test` if dependency state permits. +- Required security gates before commit or handoff if commits are prepared: staged Gitleaks; OSV/Semgrep only if the final touched surface triggers those gates under repo policy. + +Known boundaries: +- Initial branch was created locally from existing HEAD; no fetch/pull has run. +- `upstream` remote exists but is out of scope. +- Existing issue 821-830 worktrees were inspected only through `git worktree list --porcelain` and will not be touched. + +Stop conditions: +- Branch creation failure or pre-existing branch evidence. +- A fix requires schema, API, auth, routing, dependency, provider contract, or remote state changes. +- Verification is blocked by missing dependencies or package-manager hardening and no targeted alternative can cover the changed surface. + +## Feature / Verification Matrix + +| Change | Verification method | Status | Evidence | +| --- | --- | --- | --- | +| Parallel semantic consolidation reads and writes | Failing then passing tests in `test/consolidation-pipeline.test.ts` | Done | Red run failed with max in-flight count `1`; green run passed targeted tests | +| Parallel reflect insight lookups and writes per provider response | Failing then passing tests in `test/reflect.test.ts` | Done | Red run failed with max in-flight count `1`; green run passed targeted tests | +| Parallel decay writes where behavior is unchanged | Failing then passing test in `test/consolidation-pipeline.test.ts` | Done | Red run failed with max in-flight count `1`; green run passed targeted tests | +| Preserve existing consolidation and reflect behavior | Targeted existing tests and broader repo checks | Done | `corepack pnpm test` passed 202 files / 2803 tests after final fix | +| Batch failure semantics settle started writes before return | Regression tests in `test/reflect.test.ts` and `test/consolidation-pipeline.test.ts` | Done | Red reflect test showed one of two writes settled before return; consolidation regression covers its duplicated helper; final targeted suite passed 35 tests | + +## Subagent Ledger + +| Workstream | Scope | Edits allowed | Expected output | Result | Residual risk | +| --- | --- | --- | --- | --- | --- | +| Arena candidate A | Implementation design for #261 | No main-worktree edits | Candidate patch strategy and rationale | Candidate recommended batching independent KV operations and direct overlap tests | Superseded by Candidate B bounded-batch base | +| Arena candidate B | Implementation design for #261 | No main-worktree edits | Candidate patch strategy and rationale | Candidate recommends bounded KV batches, same-id coalescing, ordered reflect replay, and duplicate regression tests | Same-key/failure behavior must be checked | +| Arena candidate C | Implementation design for #261 | No main-worktree edits | Candidate patch strategy and rationale | Candidate converges on batching and clearly calls out pending-map duplicate reflect handling | Direct unbounded `Promise.all` should be avoided for larger decay sets | +| Arena judge | Read-only comparison of candidates | No | Rubric scores and base recommendation | Recommended Candidate B as base, graft Candidate C pending-map language and A/C explicit overlap tests | Parent must verify red/green behavior | +| Final security review | Current working-tree diff | No | `ACCEPT` or actionable findings | `ACCEPT` | None reported | +| Final test coverage review | Current working-tree diff | No | `ACCEPT` or actionable findings | Found missing failure-mode coverage for batched writes; fixed with reflect regression, consolidation regression, and `allSettled` helper | Second re-review found consolidation-specific gap; fixed | +| Final maintainability review | Current working-tree diff | No | `ACCEPT` or actionable findings | Found pre-flush reflect counter drift and stale task record; fixed counters and updated record; re-review returned `ACCEPT` | None reported | + +## Arena Frame + +Artifact: one synthesized implementation approach for issue #261, to be turned into TDD tests and a surgical patch in this worktree. + +Rubric: +- Preserves current external behavior, storage schema, tool/API surfaces, provider calls, and per-run limits. +- Parallelizes only independent KV operations; does not introduce unbounded concurrency or reorder operations that affect counters/result semantics. +- Uses TDD with focused tests that fail on the current serial implementation and pass after the patch. +- Keeps the touched surface limited to consolidation-pipeline, reflect, and adjacent tests. +- Produces code that matches existing TypeScript style and remains easy to review. + +## Progress + +- Read repo instructions and relevant skills. +- Confirmed git state and remotes. +- Created branch `issue/261-parallelize-kv-operations`. +- Validated issue locally from source evidence. +- Created this task record before implementation edits. +- Ran arena: + - Candidate A: batch independent KV operations, preserve provider sequencing, add overlap tests. + - Candidate B: bounded batch helper, coalesced same-id semantic writes, ordered reflect replay, duplicate regression test. + - Candidate C: direct batching shape with explicit pending-map duplicate reflect guard. + - Judge scored B highest and recommended it as base. +- Synthesis: use Candidate B as base, graft Candidate C's explicit pending-map duplicate handling and A/C's direct read/write overlap tests. +- Red test evidence: `corepack pnpm exec vitest run --exclude test/integration.test.ts test/consolidation-pipeline.test.ts test/reflect.test.ts` failed as expected with 4 concurrency assertions reporting max in-flight count `1`; 29 existing/guard tests passed. +- Green test evidence after initial implementation: targeted run passed 2 files / 33 tests. +- Final review fixes: + - Changed batched write helpers to use `Promise.allSettled` per batch and rethrow after all started writes settle. + - Moved reflect `newInsights`, `reinforced`, and `totalInsights` counter updates after dirty insight writes succeed. + - Added reflect regression coverage for failed batched writes. +- Final verification evidence: + - `corepack pnpm exec vitest run --exclude test/integration.test.ts test/reflect.test.ts` passed 1 file / 19 tests. + - `corepack pnpm exec vitest run --exclude test/integration.test.ts test/consolidation-pipeline.test.ts` passed 1 file / 16 tests after adding the consolidation failure-mode regression. + - `corepack pnpm exec vitest run --exclude test/integration.test.ts test/consolidation-pipeline.test.ts test/reflect.test.ts` passed 2 files / 35 tests. + - `corepack pnpm run lint` passed. + - `semgrep scan --config p/default --error --metrics=off .` passed with 0 findings. + - `corepack pnpm run build` passed with existing tsdown plugin-timing and dynamic-import warnings. + - `corepack pnpm test` passed 202 files / 2804 tests. +- Second review loop: + - Test coverage re-review found the same batch-failure regression was not pinned for consolidation's duplicated helper; added a semantic write failure-mode test. + - Maintainability re-review returned `ACCEPT`. + +## Final Review Notes + +- Sprint Contract status: acceptance criteria met locally. +- No schema, API, auth, dependency, provider prompt, remote, or MCP/REST surface changes were made. +- Residual risk: batched writes still have the same possible partial-persistence class as any multi-write operation if one key fails after another succeeds; the helper now waits for all started writes in each batch to settle before returning, and reflect counters are applied only after the flush succeeds. +- Remote state was not changed. Push, PR creation, PR merge, issue closure, and thread archival remain approval-gated. +- Terminal archival contract: if this valid issue proceeds to PR merge, the merge approval request must explicitly bundle PR merge into `origin/main` and archiving this Codex thread after successful merge; after a successful merge, call `set_thread_archived({ archived: true })` before final handoff. Do not archive before that terminal outcome. diff --git a/src/functions/consolidation-pipeline.ts b/src/functions/consolidation-pipeline.ts index b3f272bc..9732f4a5 100644 --- a/src/functions/consolidation-pipeline.ts +++ b/src/functions/consolidation-pipeline.ts @@ -18,6 +18,23 @@ import { recordAudit } from "./audit.js"; import { getConsolidationDecayDays, isConsolidationEnabled } from "../config.js"; import { logger } from "../logger.js"; +const KV_BATCH_SIZE = 16; + +async function runKvBatches( + items: T[], + run: (item: T) => Promise, +): Promise { + for (let i = 0; i < items.length; i += KV_BATCH_SIZE) { + const results = await Promise.allSettled( + items.slice(i, i + KV_BATCH_SIZE).map(run), + ); + const rejected = results.find( + (result): result is PromiseRejectedResult => result.status === "rejected", + ); + if (rejected) throw rejected.reason; + } +} + function applyDecay( items: Array<{ strength: number; @@ -58,8 +75,10 @@ export function registerConsolidationPipelineFunction( const results: Record = {}; if (tier === "all" || tier === "semantic") { - const summaries = await kv.list(KV.summaries); - const existingSemantic = await kv.list(KV.semantic); + const [summaries, existingSemantic] = await Promise.all([ + kv.list(KV.summaries), + kv.list(KV.semantic), + ]); if (summaries.length >= 5) { const recentSummaries = summaries @@ -88,6 +107,7 @@ export function registerConsolidationPipelineFunction( let match; let newFacts = 0; const now = new Date().toISOString(); + const semanticWrites = new Map(); while ((match = factRegex.exec(response)) !== null) { const parsedConf = parseFloat(match[1]); @@ -102,7 +122,7 @@ export function registerConsolidationPipelineFunction( existing.lastAccessedAt = now; existing.updatedAt = now; existing.confidence = Math.max(existing.confidence, confidence); - await kv.set(KV.semantic, existing.id, existing); + semanticWrites.set(existing.id, existing); } else { const sem: SemanticMemory = { id: generateId("sem"), @@ -116,10 +136,13 @@ export function registerConsolidationPipelineFunction( createdAt: now, updatedAt: now, }; - await kv.set(KV.semantic, sem.id, sem); + semanticWrites.set(sem.id, sem); newFacts++; } } + await runKvBatches([...semanticWrites.values()], (sem) => + kv.set(KV.semantic, sem.id, sem), + ); results.semantic = { newFacts, totalSummaries: summaries.length }; } catch (err) { const msg = err instanceof Error ? err.message : String(err); @@ -248,17 +271,16 @@ export function registerConsolidationPipelineFunction( } if (tier === "all" || tier === "decay") { - const semantic = await kv.list(KV.semantic); + const [semantic, procedural] = await Promise.all([ + kv.list(KV.semantic), + kv.list(KV.procedural), + ]); + applyDecay(semantic, decayDays); - for (const s of semantic) { - await kv.set(KV.semantic, s.id, s); - } + await runKvBatches(semantic, (s) => kv.set(KV.semantic, s.id, s)); - const procedural = await kv.list(KV.procedural); applyDecay(procedural, decayDays); - for (const p of procedural) { - await kv.set(KV.procedural, p.id, p); - } + await runKvBatches(procedural, (p) => kv.set(KV.procedural, p.id, p)); results.decay = { semantic: semantic.length, diff --git a/src/functions/reflect.ts b/src/functions/reflect.ts index eceb591a..faef4b36 100644 --- a/src/functions/reflect.ts +++ b/src/functions/reflect.ts @@ -24,6 +24,43 @@ interface ConceptCluster { crystalIds: string[]; } +interface ParsedInsightCandidate { + fp: string; + title: string; + content: string; + confidence: number; +} + +const KV_BATCH_SIZE = 16; + +async function runKvBatches( + items: T[], + run: (item: T) => Promise, +): Promise { + for (let i = 0; i < items.length; i += KV_BATCH_SIZE) { + const results = await Promise.allSettled( + items.slice(i, i + KV_BATCH_SIZE).map(run), + ); + const rejected = results.find( + (result): result is PromiseRejectedResult => result.status === "rejected", + ); + if (rejected) throw rejected.reason; + } +} + +async function collectKvBatches( + items: T[], + run: (item: T) => Promise, +): Promise { + const results: R[] = []; + for (let i = 0; i < items.length; i += KV_BATCH_SIZE) { + results.push( + ...(await Promise.all(items.slice(i, i + KV_BATCH_SIZE).map(run))), + ); + } + return results; +} + function reinforceInsight(insight: Insight): void { const now = new Date().toISOString(); insight.reinforcements++; @@ -261,12 +298,12 @@ export function registerReflectFunctions( const insightRegex = /([\s\S]*?)<\/insight>/g; let match; - let clusterCount = 0; + const candidates: ParsedInsightCandidate[] = []; while ( (match = insightRegex.exec(response)) !== null && - clusterCount < maxInsightsPerCluster && - totalInsights < maxTotal + candidates.length < maxInsightsPerCluster && + totalInsights + candidates.length < maxTotal ) { const parsedConf = parseFloat(match[1]); const confidence = Number.isNaN(parsedConf) @@ -278,19 +315,35 @@ export function registerReflectFunctions( if (!content) continue; const fp = fingerprintId("ins", content.trim().toLowerCase()); - const existing = await kv.get(KV.insights, fp); + candidates.push({ fp, title, content, confidence }); + } + + const uniqueFps = [...new Set(candidates.map((candidate) => candidate.fp))]; + const existingEntries = await collectKvBatches(uniqueFps, async (fp) => + [fp, await kv.get(KV.insights, fp)] as const, + ); + const existingByFp = new Map(existingEntries); + const pendingByFp = new Map(); + const dirtyInsights = new Map(); + let clusterNewInsights = 0; + let clusterReinforced = 0; + + for (const candidate of candidates) { + const existing = + pendingByFp.get(candidate.fp) ?? existingByFp.get(candidate.fp); if (existing && !existing.deleted) { reinforceInsight(existing); - await kv.set(KV.insights, existing.id, existing); - reinforced++; + pendingByFp.set(candidate.fp, existing); + dirtyInsights.set(existing.id, existing); + clusterReinforced++; } else { const now = new Date().toISOString(); const insight: Insight = { - id: fp, - title, - content, - confidence, + id: candidate.fp, + title: candidate.title, + content: candidate.content, + confidence: candidate.confidence, reinforcements: 0, sourceConceptCluster: conceptNames, sourceMemoryIds: cluster.factIds, @@ -302,13 +355,19 @@ export function registerReflectFunctions( updatedAt: now, decayRate: 0.05, }; - await kv.set(KV.insights, insight.id, insight); - newInsights++; + pendingByFp.set(candidate.fp, insight); + dirtyInsights.set(insight.id, insight); + clusterNewInsights++; } - clusterCount++; - totalInsights++; } + + await runKvBatches([...dirtyInsights.values()], (insight) => + kv.set(KV.insights, insight.id, insight), + ); + newInsights += clusterNewInsights; + reinforced += clusterReinforced; + totalInsights += candidates.length; } catch { continue; } diff --git a/test/consolidation-pipeline.test.ts b/test/consolidation-pipeline.test.ts index 63f580d7..065e7d0a 100644 --- a/test/consolidation-pipeline.test.ts +++ b/test/consolidation-pipeline.test.ts @@ -14,6 +14,10 @@ import { isConsolidationEnabled } from "../src/config.js"; import { logger } from "../src/logger.js"; import type { SessionSummary, Memory, SemanticMemory, ProceduralMemory } from "../src/types.js"; +function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + function mockKV() { const store = new Map>(); return { @@ -168,6 +172,126 @@ describe("Consolidation Pipeline", () => { expect(stored[0].confidence).toBe(0.9); }); + it("starts semantic summary and fact reads concurrently", async () => { + const provider = { + name: "test", + compress: vi.fn(), + summarize: vi.fn().mockResolvedValue( + `TypeScript is the primary language`, + ), + }; + + const baseList = kv.list.bind(kv); + let inFlight = 0; + let maxInFlight = 0; + kv.list = async (scope: string): Promise => { + if (scope === "mem:summaries" || scope === "mem:semantic") { + inFlight++; + maxInFlight = Math.max(maxInFlight, inFlight); + try { + await delay(10); + return await baseList(scope); + } finally { + inFlight--; + } + } + return baseList(scope); + }; + + registerConsolidationPipelineFunction(sdk as never, kv as never, provider as never); + + for (let i = 0; i < 6; i++) { + await kv.set("mem:summaries", `ses_${i}`, makeSummary(i)); + } + + await sdk.trigger("mem::consolidate-pipeline", { tier: "semantic" }); + + expect(maxInFlight).toBeGreaterThan(1); + }); + + it("persists parsed semantic facts concurrently", async () => { + const provider = { + name: "test", + compress: vi.fn(), + summarize: vi.fn().mockResolvedValue( + `Fact oneFact twoFact three`, + ), + }; + + const baseSet = kv.set.bind(kv); + let inFlight = 0; + let maxInFlight = 0; + kv.set = async (scope: string, key: string, data: T): Promise => { + if (scope === "mem:semantic") { + inFlight++; + maxInFlight = Math.max(maxInFlight, inFlight); + try { + await delay(10); + return await baseSet(scope, key, data); + } finally { + inFlight--; + } + } + return baseSet(scope, key, data); + }; + + registerConsolidationPipelineFunction(sdk as never, kv as never, provider as never); + + for (let i = 0; i < 6; i++) { + await kv.set("mem:summaries", `ses_${i}`, makeSummary(i)); + } + + const result = (await sdk.trigger("mem::consolidate-pipeline", { + tier: "semantic", + })) as { success: boolean; results: Record }; + + expect(result.results.semantic.newFacts).toBe(3); + expect(await kv.list("mem:semantic")).toHaveLength(3); + expect(maxInFlight).toBeGreaterThan(1); + }); + + it("settles started semantic fact writes before reporting a batch failure", async () => { + const provider = { + name: "test", + compress: vi.fn(), + summarize: vi.fn().mockResolvedValue( + `Fact oneFact two`, + ), + }; + + const baseSet = kv.set.bind(kv); + const startedWrites: string[] = []; + const settledWrites: string[] = []; + let shouldFailNextSemanticWrite = true; + kv.set = async (scope: string, key: string, data: T): Promise => { + if (scope === "mem:semantic") { + const shouldFail = shouldFailNextSemanticWrite; + shouldFailNextSemanticWrite = false; + startedWrites.push(key); + await delay(shouldFail ? 5 : 20); + settledWrites.push(key); + if (shouldFail) { + throw new Error("semantic write failed"); + } + } + return baseSet(scope, key, data); + }; + + registerConsolidationPipelineFunction(sdk as never, kv as never, provider as never); + + for (let i = 0; i < 6; i++) { + await kv.set("mem:summaries", `ses_${i}`, makeSummary(i)); + } + + const result = (await sdk.trigger("mem::consolidate-pipeline", { + tier: "semantic", + })) as { success: boolean; results: Record }; + + expect(startedWrites).toHaveLength(2); + expect(settledWrites).toHaveLength(2); + expect(result.results.semantic.error).toBe("semantic write failed"); + }); + it("with enough patterns, creates procedural memories from provider response", async () => { const provider = { name: "test", @@ -340,6 +464,66 @@ describe("Consolidation Pipeline", () => { ); }); + it("persists decay updates concurrently", async () => { + const provider = { + name: "test", + compress: vi.fn(), + summarize: vi.fn(), + }; + + const baseSet = kv.set.bind(kv); + let inFlight = 0; + let maxInFlight = 0; + kv.set = async (scope: string, key: string, data: T): Promise => { + if (scope === "mem:semantic" || scope === "mem:procedural") { + inFlight++; + maxInFlight = Math.max(maxInFlight, inFlight); + try { + await delay(10); + return await baseSet(scope, key, data); + } finally { + inFlight--; + } + } + return baseSet(scope, key, data); + }; + + registerConsolidationPipelineFunction(sdk as never, kv as never, provider as never); + + for (let i = 0; i < 3; i++) { + await kv.set("mem:semantic", `sem_${i}`, { + id: `sem_${i}`, + fact: `Fact ${i}`, + confidence: 0.8, + sourceSessionIds: [], + sourceMemoryIds: [], + accessCount: 1, + lastAccessedAt: "2025-01-01T00:00:00Z", + strength: 0.8, + createdAt: "2025-01-01T00:00:00Z", + updatedAt: "2025-01-01T00:00:00Z", + } satisfies SemanticMemory); + await kv.set("mem:procedural", `proc_${i}`, { + id: `proc_${i}`, + name: `Procedure ${i}`, + steps: ["Step"], + triggerCondition: "when testing", + frequency: 1, + sourceSessionIds: [], + strength: 0.8, + createdAt: "2025-01-01T00:00:00Z", + updatedAt: "2025-01-01T00:00:00Z", + } satisfies ProceduralMemory); + } + + const result = (await sdk.trigger("mem::consolidate-pipeline", { + tier: "decay", + })) as { success: boolean; results: Record }; + + expect(result.results.decay).toEqual({ semantic: 3, procedural: 3 }); + expect(maxInFlight).toBeGreaterThan(1); + }); + it("consolidation records an audit entry", async () => { const provider = { name: "test", diff --git a/test/reflect.test.ts b/test/reflect.test.ts index a0463ff3..f191f055 100644 --- a/test/reflect.test.ts +++ b/test/reflect.test.ts @@ -7,6 +7,10 @@ vi.mock("../src/logger.js", () => ({ import { registerReflectFunctions } from "../src/functions/reflect.js"; import type { Insight, GraphNode, GraphEdge, SemanticMemory, Lesson, Crystal } from "../src/types.js"; +function delay(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + function mockKV() { const store = new Map>(); return { @@ -218,6 +222,130 @@ describe("Reflect", () => { expect(after[0].reinforcements).toBe(1); }); + it("looks up and persists generated insights concurrently within a response", async () => { + provider.summarize.mockResolvedValue(` +Security requires layered protection. +Boundary tests catch trust transition bugs. +Validation must happen before persistence. +`); + + const baseGet = kv.get.bind(kv); + const baseSet = kv.set.bind(kv); + let inFlightGets = 0; + let maxInFlightGets = 0; + let inFlightSets = 0; + let maxInFlightSets = 0; + + kv.get = async (scope: string, key: string): Promise => { + if (scope === "mem:insights") { + inFlightGets++; + maxInFlightGets = Math.max(maxInFlightGets, inFlightGets); + try { + await delay(10); + return await baseGet(scope, key); + } finally { + inFlightGets--; + } + } + return baseGet(scope, key); + }; + kv.set = async (scope: string, key: string, data: T): Promise => { + if (scope === "mem:insights") { + inFlightSets++; + maxInFlightSets = Math.max(maxInFlightSets, inFlightSets); + try { + await delay(10); + return await baseSet(scope, key, data); + } finally { + inFlightSets--; + } + } + return baseSet(scope, key, data); + }; + + await kv.set("mem:graph:nodes", "node_security", makeConceptNode("security")); + await kv.set("mem:graph:nodes", "node_validation", makeConceptNode("validation")); + await kv.set("mem:graph:edges", "edge_1", makeEdge("security", "validation")); + await kv.set("mem:semantic", "sem_1", makeSemantic("Always validate security inputs")); + await kv.set("mem:semantic", "sem_2", makeSemantic("Testing improves security coverage")); + await kv.set("mem:semantic", "sem_3", makeSemantic("Validation prevents injection")); + + const result = (await sdk.trigger("mem::reflect", {})) as { + newInsights: number; + }; + + expect(result.newInsights).toBe(3); + expect(maxInFlightGets).toBeGreaterThan(1); + expect(maxInFlightSets).toBeGreaterThan(1); + }); + + it("preserves duplicate generated insight reinforcement semantics", async () => { + provider.summarize.mockResolvedValue(` +Security requires layered protection. +Security requires layered protection. +`); + + await kv.set("mem:graph:nodes", "node_security", makeConceptNode("security")); + await kv.set("mem:graph:nodes", "node_validation", makeConceptNode("validation")); + await kv.set("mem:graph:edges", "edge_1", makeEdge("security", "validation")); + await kv.set("mem:semantic", "sem_1", makeSemantic("Always validate security inputs")); + await kv.set("mem:semantic", "sem_2", makeSemantic("Testing improves security coverage")); + await kv.set("mem:semantic", "sem_3", makeSemantic("Validation prevents injection")); + + const result = (await sdk.trigger("mem::reflect", {})) as { + newInsights: number; + reinforced: number; + }; + + const insights = await kv.list("mem:insights"); + expect(result.newInsights).toBe(1); + expect(result.reinforced).toBe(1); + expect(insights).toHaveLength(1); + expect(insights[0].reinforcements).toBe(1); + }); + + it("does not count generated insights when the batched write fails", async () => { + provider.summarize.mockResolvedValue(` +Security requires layered protection. +Boundary tests catch trust transition bugs. +`); + + const baseSet = kv.set.bind(kv); + const startedWrites: string[] = []; + const settledWrites: string[] = []; + let shouldFailNextInsightWrite = true; + kv.set = async (scope: string, key: string, data: T): Promise => { + if (scope === "mem:insights") { + const shouldFail = shouldFailNextInsightWrite; + shouldFailNextInsightWrite = false; + startedWrites.push(key); + await delay(shouldFail ? 5 : 20); + settledWrites.push(key); + if (shouldFail) { + throw new Error("insight write failed"); + } + } + return baseSet(scope, key, data); + }; + + await kv.set("mem:graph:nodes", "node_security", makeConceptNode("security")); + await kv.set("mem:graph:nodes", "node_validation", makeConceptNode("validation")); + await kv.set("mem:graph:edges", "edge_1", makeEdge("security", "validation")); + await kv.set("mem:semantic", "sem_1", makeSemantic("Always validate security inputs")); + await kv.set("mem:semantic", "sem_2", makeSemantic("Testing improves security coverage")); + await kv.set("mem:semantic", "sem_3", makeSemantic("Validation prevents injection")); + + const result = (await sdk.trigger("mem::reflect", {})) as { + newInsights: number; + reinforced: number; + }; + + expect(startedWrites).toHaveLength(2); + expect(settledWrites).toHaveLength(2); + expect(result.newInsights).toBe(0); + expect(result.reinforced).toBe(0); + }); + it("falls back to Jaccard grouping when graph is empty", async () => { await kv.set("mem:semantic", "sem_1", makeSemantic("security validation is important")); await kv.set("mem:semantic", "sem_2", makeSemantic("security testing prevents bugs"));