Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 251 additions & 0 deletions docs/todos/2026-06-19-issue-261-parallelize-kv/plan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
# Issue 261 Parallelize KV Operations Implementation Plan

> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.

**Goal:** Reduce timeout risk in semantic consolidation and reflection by batching independent KV operations while preserving public behavior.

**Architecture:** Keep provider calls, schemas, MCP/REST surfaces, and result counters unchanged. Parallelize only independent KV reads and writes, using bounded local batches for write-heavy paths and ordered in-memory replay where duplicate insight fingerprints can affect behavior.

**Tech Stack:** TypeScript ESM, iii-sdk function registration, StateKV, Vitest.

---

## Sprint Contract

Goal: Fix issue #261 for `memory_consolidate` semantic and `memory_reflect` timeout-prone sequential KV operations.

Scope:
- Modify `src/functions/consolidation-pipeline.ts`.
- Modify `src/functions/reflect.ts`.
- Add focused tests in `test/consolidation-pipeline.test.ts`.
- Add focused tests in `test/reflect.test.ts`.

Non-goals:
- Do not change persisted schemas, MCP tools, REST endpoints, provider prompts, auth, remotes, dependencies, or package metadata.
- Do not parallelize LLM/provider calls.
- Do not work on any issue other than #261.

Acceptance criteria:
- Semantic consolidation starts independent summary/semantic reads concurrently.
- Semantic consolidation persists multiple parsed semantic facts without per-fact serial awaits.
- Decay persists eligible semantic/procedural rows without per-row serial awaits.
- Reflection batches independent insight lookups and final writes per provider response while preserving duplicate-fingerprint behavior.
- Existing tests for consolidation and reflection still pass.

Intended verification:
- Red: targeted new tests fail on current implementation.
- Green: targeted new and existing tests pass after implementation.
- Broader: lint, build, full non-integration test suite where available.

Known boundaries:
- Branch is `issue/261-parallelize-kv-operations`.
- Remote writes, fetch/pull, push, PR creation, and issue closure require separate explicit current-turn approval.

## Feature / Verification Matrix

| Change | Verification method | Status | Evidence |
| --- | --- | --- | --- |
| Parallel semantic initial reads | New vitest in `test/consolidation-pipeline.test.ts` tracks overlapping `kv.list` calls | Pending | Test not added |
| Parallel semantic fact writes | New vitest in `test/consolidation-pipeline.test.ts` tracks overlapping `kv.set` calls and stored facts | Pending | Test not added |
| Parallel decay writes | New vitest in `test/consolidation-pipeline.test.ts` tracks overlapping decay `kv.set` calls and result counts | Pending | Test not added |
| Parallel reflect get/set batches | New vitest in `test/reflect.test.ts` tracks overlapping `kv.get`/`kv.set` calls | Pending | Test not added |
| Preserve duplicate reflect semantics | New vitest in `test/reflect.test.ts` with duplicate insight content | Pending | Test not added |

## Files

- Modify: `test/consolidation-pipeline.test.ts`
- Modify: `test/reflect.test.ts`
- Modify: `src/functions/consolidation-pipeline.ts`
- Modify: `src/functions/reflect.ts`
- Update: `docs/todos/2026-06-19-issue-261-parallelize-kv/todo.md`

## Task 1: Add failing consolidation pipeline concurrency tests

**Files:**
- Modify: `test/consolidation-pipeline.test.ts`

- [ ] **Step 1: Add a local async gate helper near existing test helpers**

```ts
function deferred<T = void>() {
let resolve!: (value: T) => void;
const promise = new Promise<T>((res) => {
resolve = res;
});
return { promise, resolve };
}
```

- [ ] **Step 2: Add semantic initial-read concurrency test**

Add a test that wraps `kv.list` for `mem:summaries` and `mem:semantic`, tracks current/max in-flight calls, seeds six summaries, runs semantic consolidation, and asserts `maxInFlightLists > 1`.

- [ ] **Step 3: Add semantic write concurrency test**

Add a test where the provider returns three distinct `<fact>` entries, wraps `kv.set` for `mem:semantic`, tracks current/max in-flight semantic writes, runs semantic consolidation, and asserts `maxInFlightSets > 1`, `newFacts === 3`, and three facts are stored.

- [ ] **Step 4: Add decay write concurrency test**

Seed multiple semantic and procedural rows old enough for decay. Wrap `kv.set` for `mem:semantic` and `mem:procedural`, run `tier: "decay"`, assert `maxInFlightSets > 1`, and assert result counts remain correct.

- [ ] **Step 5: Run red consolidation tests**

Run:

```bash
corepack pnpm exec vitest run --exclude test/integration.test.ts test/consolidation-pipeline.test.ts
```

Expected: the new concurrency assertions fail on current code with max in-flight counts of `1`.

## Task 2: Add failing reflect concurrency and duplicate tests

**Files:**
- Modify: `test/reflect.test.ts`

- [ ] **Step 1: Add reflect insight get/set concurrency test**

Use one graph cluster with enough supporting semantic facts. Have the provider return at least three distinct insights. Wrap `kv.get` and `kv.set` for `mem:insights`, track max in-flight gets and sets, run `mem::reflect`, and assert both maxima are greater than `1` while `newInsights` matches the provider response.

- [ ] **Step 2: Add duplicate insight regression test**

Have the provider return the same insight content twice under the same cluster. Run `mem::reflect` and assert `newInsights === 1`, `reinforced === 1`, one insight is stored, and its `reinforcements === 1`.

- [ ] **Step 3: Run red reflect tests**

Run:

```bash
corepack pnpm exec vitest run --exclude test/integration.test.ts test/reflect.test.ts
```

Expected: the concurrency test fails on current code with max in-flight counts of `1`; the duplicate regression may pass on current code and must remain passing after implementation.

## Task 3: Implement bounded KV batching in consolidation pipeline

**Files:**
- Modify: `src/functions/consolidation-pipeline.ts`

- [ ] **Step 1: Add a small local batch helper**

Use a local constant and helper in `consolidation-pipeline.ts`:

```ts
const KV_BATCH_SIZE = 16;

async function runKvBatches<T>(
items: T[],
run: (item: T) => Promise<unknown>,
): Promise<void> {
for (let i = 0; i < items.length; i += KV_BATCH_SIZE) {
await Promise.all(items.slice(i, i + KV_BATCH_SIZE).map(run));
}
}
```

- [ ] **Step 2: Parallelize semantic initial reads**

Replace sequential summary/semantic reads with:

```ts
const [summaries, existingSemantic] = await Promise.all([
kv.list<SessionSummary>(KV.summaries),
kv.list<SemanticMemory>(KV.semantic),
]);
```

- [ ] **Step 3: Batch semantic fact writes**

Inside the semantic fact parse loop, keep parsing/counter logic ordered, collect touched semantic rows in an array or map, and flush with `runKvBatches`. Preserve existing new-fact behavior.

- [ ] **Step 4: Batch decay reads and writes**

Fetch semantic and procedural lists with `Promise.all`, apply decay exactly as before, then write each scope with `runKvBatches`.

- [ ] **Step 5: Run consolidation tests**

Run:

```bash
corepack pnpm exec vitest run --exclude test/integration.test.ts test/consolidation-pipeline.test.ts
```

Expected: all consolidation pipeline tests pass.

## Task 4: Implement ordered batched insight processing in reflect

**Files:**
- Modify: `src/functions/reflect.ts`

- [ ] **Step 1: Add local parsed insight type and batch helper**

Add a local `ParsedInsightCandidate` interface and `runKvBatches` helper near `ConceptCluster`.

- [ ] **Step 2: Parse provider response before KV lookups**

For each provider response, parse up to `maxInsightsPerCluster` and remaining `maxTotal` into candidates with `fp`, `title`, `content`, and `confidence`. Keep the existing empty-content skip behavior.

- [ ] **Step 3: Batch unique fingerprint lookups**

Fetch unique candidate fingerprints in bounded parallel batches and store them in a `Map<string, Insight | null>`.

- [ ] **Step 4: Replay candidates in original order**

For each candidate, use a pending map first, then the fetched existing map. Reinforce non-deleted existing/pending insights, otherwise create the new insight. Increment counters exactly where the current loop does.

- [ ] **Step 5: Batch final dirty insight writes**

Flush dirty insights with `runKvBatches` after candidate replay. Keep provider calls and cluster loop sequential.

- [ ] **Step 6: Run reflect tests**

Run:

```bash
corepack pnpm exec vitest run --exclude test/integration.test.ts test/reflect.test.ts
```

Expected: all reflect tests pass.

## Task 5: Simplification pass and verification

**Files:**
- Modify: touched files only if simplification is warranted.
- Update: `docs/todos/2026-06-19-issue-261-parallelize-kv/todo.md`

- [ ] **Step 1: Focused simple-code pass**

Inspect the active diff for redundant helpers, duplicated logic, unclear names, broad abstractions, or comments that restate code. Preserve APIs, schemas, provider behavior, persistence semantics, and result counters.

- [ ] **Step 2: Run targeted combined tests**

Run:

```bash
corepack pnpm exec vitest run --exclude test/integration.test.ts test/consolidation-pipeline.test.ts test/reflect.test.ts
```

Expected: both files pass.

- [ ] **Step 3: Run broader checks**

Run:

```bash
corepack pnpm run lint
corepack pnpm run build
corepack pnpm test
```

Expected: all pass, or blockers are recorded with the closest targeted evidence.

- [ ] **Step 4: Update task record**

Record final verification evidence, caveats, Sprint Contract status, Feature / Verification Matrix status, and any residual risk in `docs/todos/2026-06-19-issue-261-parallelize-kv/todo.md`.

## Self-Review

- Spec coverage: issue validity, semantic consolidation, reflect, decay, tests, task state, and verification are covered.
- Placeholder scan: no unresolved placeholders or broad "handle edge cases" steps remain.
- Type consistency: plan uses existing `SessionSummary`, `SemanticMemory`, `ProceduralMemory`, `Insight`, `KV`, and `StateKV` concepts.
- Boundary check: no dependency, schema, MCP/REST, auth, provider prompt, or remote changes are planned.
Loading
Loading