From 9bbdd32a9ab31a86eb7a4f71f12662eedde3b02e Mon Sep 17 00:00:00 2001 From: Willi Budzinski Date: Sat, 20 Jun 2026 05:08:31 +0200 Subject: [PATCH] fix: checkpoint worker before native stop --- AGENTS.md | 2 +- README.md | 2 +- assets/tags/light/section-api.svg | 2 +- assets/tags/section-api.svg | 2 +- .../arena-synthesis.md | 112 ++++ .../plan.md | 617 ++++++++++++++++++ .../todo.md | 165 +++++ .../skills/agentmemory-rest-api/REFERENCE.md | 3 +- src/cli.ts | 77 ++- src/cli/shutdown-flush.ts | 53 ++ src/cli/stop-processes.ts | 87 ++- src/functions/search.ts | 22 +- src/functions/shutdown-flush.ts | 18 + src/index.ts | 47 +- src/state/index-persistence.ts | 77 ++- src/triggers/api.ts | 22 + test/api-boundary-coverage.test.ts | 25 + test/cli-stop-port-detection.test.ts | 218 ++++++- test/index-persistence.test.ts | 45 +- test/reconnect-registration.test.ts | 20 + test/search.test.ts | 42 +- test/shutdown-flush.test.ts | 71 ++ website/lib/generated-meta.json | 6 +- 23 files changed, 1643 insertions(+), 92 deletions(-) create mode 100644 docs/todos/2026-06-20-issue-338-stop-order-data-loss/arena-synthesis.md create mode 100644 docs/todos/2026-06-20-issue-338-stop-order-data-loss/plan.md create mode 100644 docs/todos/2026-06-20-issue-338-stop-order-data-loss/todo.md create mode 100644 src/cli/shutdown-flush.ts create mode 100644 src/functions/shutdown-flush.ts create mode 100644 test/shutdown-flush.test.ts diff --git a/AGENTS.md b/AGENTS.md index a489d9564..fbfc862c4 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -118,7 +118,7 @@ Hook scripts in `src/hooks/` are standalone Node.js scripts (no iii-sdk import). ## Current Stats (v0.9.28) - 61 MCP tools (8 visible by default, `AGENTMEMORY_TOOLS=all` for all) -- 135 REST endpoints +- 136 REST endpoints - 6 MCP resources, 3 MCP prompts - 12 hooks, 15 skills - 50+ iii functions diff --git a/README.md b/README.md index 6bc018214..f4f9070aa 100644 --- a/README.md +++ b/README.md @@ -1841,7 +1841,7 @@ Create `~/.agentmemory/.env`:

API

-135 endpoints on port `3111`. The REST API binds to `127.0.0.1` by default. Protected endpoints require `Authorization: Bearer ` when `AGENTMEMORY_SECRET` is set, and mesh sync endpoints require `AGENTMEMORY_SECRET` on both peers. +136 endpoints on port `3111`. The REST API binds to `127.0.0.1` by default. Protected endpoints require `Authorization: Bearer ` when `AGENTMEMORY_SECRET` is set, and mesh sync endpoints require `AGENTMEMORY_SECRET` on both peers. ### Health Thresholds diff --git a/assets/tags/light/section-api.svg b/assets/tags/light/section-api.svg index 474be869b..a55fd5a95 100644 --- a/assets/tags/light/section-api.svg +++ b/assets/tags/light/section-api.svg @@ -12,5 +12,5 @@ API - 135 REST endpoints + 136 REST endpoints diff --git a/assets/tags/section-api.svg b/assets/tags/section-api.svg index 64a27d1cf..458597dd0 100644 --- a/assets/tags/section-api.svg +++ b/assets/tags/section-api.svg @@ -12,5 +12,5 @@ API - 135 REST endpoints + 136 REST endpoints diff --git a/docs/todos/2026-06-20-issue-338-stop-order-data-loss/arena-synthesis.md b/docs/todos/2026-06-20-issue-338-stop-order-data-loss/arena-synthesis.md new file mode 100644 index 000000000..331f197c1 --- /dev/null +++ b/docs/todos/2026-06-20-issue-338-stop-order-data-loss/arena-synthesis.md @@ -0,0 +1,112 @@ +# Arena Synthesis: Issue 338 + +## Rubric + +1. Uses current repo evidence for stop order and path handling. +2. Accounts for the Windows comment and distinguishes stale original claims from + residual valid scope. +3. Identifies the protected boundary / human checkpoint correctly. +4. Proposes a narrow, testable fix direction without broad scope expansion. +5. Names concrete files/tests and residual uncertainty. + +## Scores + +| Candidate | Repo evidence | Windows residual | Boundary/checkpoint | Narrow fix | Files/tests/uncertainty | Total | +| --- | --- | --- | --- | --- | --- | --- | +| A | 5 | 5 | 5 | 4 | 5 | 24 | +| B | 5 | 5 | 4 | 5 | 5 | 24 | +| C | 4 | 4 | 4 | 4 | 3 | 19 | + +## Decision + +Base: Candidate B. + +Candidate B is the cleanest base because it keeps the issue shape narrow: +the original broad claims are stale on this branch, while the remaining +plausible bug is native Windows shutdown still depending on a worker `SIGTERM` +even though the flush lives in the worker signal handler. + +Grafts: +- From Candidate A: stronger proof that existing Windows stop tests still model + worker stop as `SIGTERM`-driven, especially + `test/cli-stop-port-detection.test.ts`. +- From Candidate A: fuller path-handling evidence in `src/cli/engine-launch.ts`, + `src/cli/runtime-config.ts`, `src/cli/iii-config.ts`, + `test/engine-launch.test.ts`, `test/cli-iii-config.test.ts`, and + `test/runtime-config.test.ts`. +- From Candidate A: clearer rejection notes that more grace time and another + stop-order tweak do not solve the Windows residual. +- From Candidate C: concise framing that the original POSIX-facing body is + stale while the Windows-specific comment remains live. + +Rejected: +- A REST endpoint is one possible transport for a pre-stop flush, but the arena + does not pick it before human approval because that would change API surface + and endpoint counts. +- Boot reconciliation alone is not the primary fix. It may recover already + written state, but it does not prevent shutdown-time loss when the worker is + hard-terminated before flush. +- A Windows console-control-event solution is higher uncertainty because the + worker is spawned under iii-engine supervision. + +## Validity Finding + +Issue #338 is **stale for the original broad POSIX/data-dir claims** and +**valid for a narrower native Windows shutdown residual**. + +Already fixed on current base: +- Native stop order is worker-first before engine in `src/cli.ts`. +- Unresponsive native `--force` stop is also worker-first in + `src/cli/stop-processes.ts`. +- Worker shutdown grace is 5000 ms on worker-first paths. +- Bundled/runtime iii config rewrites `./data/state_store.db` and + `./data/stream_store` to absolute paths under `~/.agentmemory/data`. + +Still plausible and actionable: +- Worker stop still routes through `process.kill(pid, "SIGTERM")`. +- The critical flush is still inside the worker's `SIGTERM` / `SIGINT` handler: + `indexPersistence.stop()`, `indexPersistence.save()`, and + `shutdownSdkWithTimeout(sdk)`. +- Public issue comments report that Windows treats this `SIGTERM` path as hard + process termination, so the handler and flush do not run. + +## Likely Smallest Fix Family + +Recommended scope for implementation, pending human approval: + +Add a local/private worker flush or checkpoint path that the CLI invokes before +terminating the worker on Windows. Preserve the existing worker-first stop order +and treat OS-level termination as cleanup after the checkpoint succeeds, or as a +forceful fallback after a clear timeout/failure policy. + +Likely touched files: +- `src/cli.ts` +- `src/cli/stop-processes.ts` +- `src/index.ts` +- `src/shutdown.ts` or a new small lifecycle helper +- possibly `src/triggers/api.ts` if the chosen transport is REST +- focused tests under `test/` + +Human checkpoint is required before implementation because this crosses +persistence, shutdown, and iii-engine lifecycle boundaries, and may add a new +callable control surface. + +## Focused Test Direction + +Before production edits: +- RED test proving Windows stop performs a flush/checkpoint before worker + termination, instead of relying only on `SIGTERM`. +- Test that worker flush happens before engine termination. +- Test timeout/failure behavior for pre-stop flush: either preserve and fail, or + force only with explicit warning when `--force` semantics apply. +- Preserve existing path tests: `test/engine-launch.test.ts`, + `test/cli-iii-config.test.ts`, and `test/runtime-config.test.ts`. + +## Residual Uncertainty + +- No live Windows repro was run in this environment. +- The exact lost artifact should be named by the failing test or harness: + observation KV rows, memory rows, BM25/vector snapshots, audit rows, or + in-flight hook writes. +- Current branch is behind newer `origin/main`; relevant stop/path files should + be rechecked before final implementation or PR work. diff --git a/docs/todos/2026-06-20-issue-338-stop-order-data-loss/plan.md b/docs/todos/2026-06-20-issue-338-stop-order-data-loss/plan.md new file mode 100644 index 000000000..6eeed6d02 --- /dev/null +++ b/docs/todos/2026-06-20-issue-338-stop-order-data-loss/plan.md @@ -0,0 +1,617 @@ +# Issue 338 Windows Stop Flush Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Prevent native Windows `agentmemory stop` from losing in-memory worker index state by checkpointing the worker before OS-level termination and refusing non-force Windows stop when that checkpoint fails. + +**Architecture:** Keep the existing worker-first stop order. Add a narrow worker-owned `mem::shutdown-flush` function and, pending explicit boundary approval, a REST trigger so the CLI can ask the live worker to flush BM25/vector index state through iii before signaling worker and engine PIDs. Preserve existing `SIGTERM` shutdown for POSIX, make Windows non-force stop fail closed when the pre-stop checkpoint cannot be confirmed, and make the flush function unavailable until restored indexes are ready so startup-time stop cannot publish an empty snapshot. + +**Tech Stack:** TypeScript ESM, iii-sdk functions/triggers, Vitest, existing `@clack/prompts` CLI surfaces, repo `corepack pnpm` scripts. + +--- + +## Source Of Truth + +Spec path: none. The source of truth is issue #338, the arena synthesis, the Sprint Contract in `docs/todos/2026-06-20-issue-338-stop-order-data-loss/todo.md`, and the user's approval to implement with `$github-feature-loop`. + +GitHub PR target: `origin/main`. Remote writes remain unapproved. `github-push-prepare` local branch-prep is mandatory later, but push/PR creation still needs separate current-turn approval. + +Security-sensitive surfaces for `$github-push-prepare`: persistence, shutdown lifecycle, REST API surface, local networking, filesystem-backed iii state. + +Boundary approval status: granted by the user for `POST /agentmemory/shutdown/flush`, existing `AGENTMEMORY_SECRET`/plaintext protections, and REST endpoint count 135 -> 136. + +## Sprint Contract + +Goal: Fix the still-valid Windows residual of issue #338 without reopening the already-fixed POSIX stop-order or relative-data-dir claims. + +Scope: +- Add a worker-side flush function for shutdown checkpointing. +- Expose that function through one REST endpoint under `/agentmemory/shutdown/flush` only after explicit boundary approval. +- Make responsive native CLI stop attempt the checkpoint before any native worker or engine signal, even when `worker.pid` is missing. +- Make Windows non-force stop fail closed if the checkpoint is unavailable or fails. +- Preserve/fail closed for Windows orphan-worker non-force stop paths where no checkpoint can be confirmed. +- Keep `--force` able to terminate processes with a clear warning that persistence was not confirmed. +- Update required REST endpoint count surfaces from 135 to 136 only if the REST endpoint is approved. + +Non-goals: +- No schema or data migration. +- No standalone SQLite, direct state DB access, or bypass of iii-engine state primitives. +- No broad startup reconciliation. +- No dependency changes. +- No change to MCP tool count. +- No remote fetch, push, PR creation, or issue comment without separate approval. +- No claim to solve every possible iii-engine disk flush artifact. This task targets the explicit skipped worker BM25/vector checkpoint path; non-index SDK shutdown internals remain out of scope unless repo evidence identifies a safe pre-stop primitive. + +Acceptance criteria: +- RED tests first for strict index flush errors, worker flush function, REST endpoint dispatch/auth, and CLI stop ordering/failure policy. +- Existing POSIX worker-first behavior remains intact. +- Windows stop no longer relies solely on `process.kill(pid, "SIGTERM")` for data persistence. +- Endpoint counts and consistency tests reflect exactly one new REST endpoint. +- Shutdown flush cannot publish an empty index before persisted indexes have loaded/restored. +- Targeted tests, full repo-native tests where feasible, Semgrep, and Gitleaks staged gate are run before completion/commit. + +Stop conditions: +- Stop if implementation needs direct iii state database manipulation, a schema migration, dependency changes, auth weakening, or a second externally visible endpoint. +- Stop if tests show the checkpoint cannot verify persistence success. +- Stop before Task 3 if explicit approval for the new REST endpoint and count change is not granted. +- Stop before remote writes. + +## Feature / Verification Matrix + +| Change | Verification method | Status | Evidence | +| --- | --- | --- | --- | +| Strict index checkpoint can fail visibly | `test/index-persistence.test.ts` RED/GREEN for `saveOrThrow()` | Done | RED failed before implementation; GREEN covered by targeted suite and full `corepack pnpm test`. | +| Worker registers shutdown flush function | New `test/shutdown-flush.test.ts` | Done | Covers save call, propagated failure, and pre-ready no-save behavior. | +| REST endpoint dispatches whitelisted shutdown flush | `test/api-boundary-coverage.test.ts` | Done | Covers auth and `sdk.trigger({ function_id: "mem::shutdown-flush", payload: {} })`. | +| Flush function stays in reconnect replay while gated by readiness | `test/reconnect-registration.test.ts`, `test/shutdown-flush.test.ts` | Done | Replayed registration includes shutdown flush; pre-ready flush returns unavailable without saving. | +| CLI pre-stop flush happens before worker/engine signals | `test/cli-stop-port-detection.test.ts` against exported responsive executor and source wiring | Done | Events show `flush` before worker/engine signal; `runStop()` delegates to the tested executor. | +| Responsive stop with missing worker pidfile still checkpoints | CLI stop executor test | Done | Engine-only responsive Windows stop attempts `flush` before engine signal and fails closed on checkpoint failure. | +| Windows non-force fails closed when flush fails | CLI stop executor test | Done | No process signals or pidfile clearing after failed flush on Windows non-force. | +| Windows orphan worker non-force preserves instead of hard-killing | `test/cli-stop-port-detection.test.ts` | Done | No worker signal and no persistence claim when checkpoint cannot run. | +| Force stop warns when flush fails | CLI stop executor plus unresponsive stop tests | Done | Force path may signal after warning; outro does not claim confirmed persistence. | +| REST endpoint count consistency | `test/consistency.test.ts` plus generated metadata | Done | README, AGENTS, source log, SVG badges, skill reference, and website metadata updated to 136. | +| Regression path checks remain covered | `test/engine-launch.test.ts`, `test/runtime-config.test.ts`, `test/cli-iii-config.test.ts` | Done | Existing absolute path behavior unchanged. | + +## Task-Owned Files + +Expected modify: +- `src/state/index-persistence.ts` +- `src/index.ts` +- `src/triggers/api.ts` +- `src/cli.ts` +- `src/cli/stop-processes.ts` +- `src/functions/search.ts` +- `test/index-persistence.test.ts` +- `test/api-boundary-coverage.test.ts` +- `test/cli-stop-port-detection.test.ts` or a new focused CLI stop test +- `test/reconnect-registration.test.ts` +- `test/search.test.ts` +- `test/consistency.test.ts` only if the endpoint count test needs no source changes beyond current assertions +- `README.md` +- `AGENTS.md` +- `assets/tags/section-api.svg` +- `assets/tags/light/section-api.svg` +- `plugin/skills/agentmemory-rest-api/REFERENCE.md` +- `website/lib/generated-meta.json` +- `docs/todos/2026-06-20-issue-338-stop-order-data-loss/todo.md` + +Expected create: +- `src/cli/shutdown-flush.ts` +- `src/functions/shutdown-flush.ts` +- `test/shutdown-flush.test.ts` + +## Subagent Ledger + +| Workstream | Scope | Edits allowed | Expected output | Status | +| --- | --- | --- | --- | --- | +| Pre-code plan review | This plan plus `todo.md` and arena synthesis | No | High/Medium findings on architecture, verification, scope, or boundary risk | Done; valid findings patched into this plan. | +| Implementation worker | Task-owned source/test/docs files only, after plan review | Yes | TDD implementation report with changed files and commands | Done; implemented by main agent because the blocking next step required integrating plan-review findings. | +| Final security review | Stable diff | No | ACCEPT or findings on persistence/API/auth/local networking | Done; initial readiness/completeness findings fixed, final ACCEPT. | +| Final test coverage review | Stable diff | No | ACCEPT or findings on missing red/green coverage | Done; initial auth/helper/wiring findings fixed, final ACCEPT. | +| Final maintainability review | Stable diff | No | ACCEPT or findings on complexity/scope drift | Done; ACCEPT. | + +## Task 1: RED Tests For Strict Index Checkpoint + +**Files:** +- Modify: `test/index-persistence.test.ts` +- Later modify: `src/state/index-persistence.ts` + +- [ ] **Step 1: Add failing test for strict save errors** + +Add a test near the existing `save() does not throw when kv.set rejects (#204)` case: + +```ts + it("saveOrThrow rejects when a checkpoint write fails", async () => { + const failingKv = { + ...mockKV(), + set: vi.fn(async () => { + throw new Error("TIMEOUT"); + }), + }; + const bm25 = new SearchIndex(); + bm25.add(makeObs({ id: "obs_1", title: "auth handler" })); + const persistence = new IndexPersistence(failingKv as never, bm25, null); + + await expect(persistence.saveOrThrow()).rejects.toThrow("TIMEOUT"); + }); +``` + +- [ ] **Step 2: Run RED test** + +Run: `corepack pnpm exec vitest run test/index-persistence.test.ts -t "saveOrThrow rejects"` + +Expected: FAIL because `saveOrThrow` is not defined. + +- [ ] **Step 3: Implement strict save without changing existing soft save semantics** + +In `src/state/index-persistence.ts`, extract the current write body into `saveOrThrow()` and keep `save()` swallowing/logging failures: + +```ts + async save(): Promise { + try { + await this.saveOrThrow(); + } catch (err) { + this.logFailure(err); + } + } + + async saveOrThrow(): Promise { + if (this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + await this.saveBm25Index(this.bm25.serialize()); + if (this.vector) { + await this.saveVectorIndex(this.vector.serialize()); + } + } +``` + +- [ ] **Step 4: Run GREEN tests for index persistence** + +Run: `corepack pnpm exec vitest run test/index-persistence.test.ts` + +Expected: PASS. Existing `save() does not throw...` remains passing. + +## Task 2: Worker Shutdown Flush Function + +**Files:** +- Create: `src/functions/shutdown-flush.ts` +- Create: `test/shutdown-flush.test.ts` +- Modify later: `src/index.ts` + +- [ ] **Step 1: Add failing function registration tests** + +Create `test/shutdown-flush.test.ts`: + +```ts +import { describe, expect, it, vi } from "vitest"; +import { registerShutdownFlushFunction } from "../src/functions/shutdown-flush.js"; + +describe("registerShutdownFlushFunction", () => { + it("registers mem::shutdown-flush and checkpoints the index", async () => { + const handlers = new Map Promise>(); + const sdk = { + registerFunction: vi.fn((id: string, handler: (payload: unknown) => Promise) => { + handlers.set(id, handler); + }), + }; + const indexPersistence = { saveOrThrow: vi.fn(async () => {}) }; + + registerShutdownFlushFunction(sdk as never, indexPersistence, { + isReady: () => true, + }); + + const handler = handlers.get("mem::shutdown-flush"); + expect(handler).toBeDefined(); + await expect(handler!({})).resolves.toMatchObject({ success: true }); + expect(indexPersistence.saveOrThrow).toHaveBeenCalledTimes(1); + }); + + it("propagates checkpoint failures so callers can fail closed", async () => { + const handlers = new Map Promise>(); + const sdk = { + registerFunction: vi.fn((id: string, handler: (payload: unknown) => Promise) => { + handlers.set(id, handler); + }), + }; + const indexPersistence = { + saveOrThrow: vi.fn(async () => { + throw new Error("state::set failed"); + }), + }; + + registerShutdownFlushFunction(sdk as never, indexPersistence, { + isReady: () => true, + }); + + await expect(handlers.get("mem::shutdown-flush")!({})).rejects.toThrow( + "state::set failed", + ); + }); + + it("does not save before restored indexes are ready", async () => { + const handlers = new Map Promise>(); + const sdk = { + registerFunction: vi.fn((id: string, handler: (payload: unknown) => Promise) => { + handlers.set(id, handler); + }), + }; + const indexPersistence = { saveOrThrow: vi.fn(async () => {}) }; + + registerShutdownFlushFunction(sdk as never, indexPersistence, { + isReady: () => false, + }); + + await expect(handlers.get("mem::shutdown-flush")!({})).resolves.toEqual({ + success: false, + error: "index_not_ready", + }); + expect(indexPersistence.saveOrThrow).not.toHaveBeenCalled(); + }); +}); +``` + +- [ ] **Step 2: Run RED test** + +Run: `corepack pnpm exec vitest run test/shutdown-flush.test.ts` + +Expected: FAIL because `src/functions/shutdown-flush.ts` does not exist. + +- [ ] **Step 3: Implement the function** + +Create `src/functions/shutdown-flush.ts`: + +```ts +import type { ISdk } from "iii-sdk"; +import type { IndexPersistence } from "../state/index-persistence.js"; + +type ShutdownFlushIndexPersistence = Pick; + +export function registerShutdownFlushFunction( + sdk: ISdk, + indexPersistence: ShutdownFlushIndexPersistence, + readiness: { isReady: () => boolean }, +): void { + sdk.registerFunction("mem::shutdown-flush", async () => { + if (!readiness.isReady()) { + return { success: false, error: "index_not_ready" }; + } + await indexPersistence.saveOrThrow(); + return { success: true, flushedAt: new Date().toISOString() }; + }); +} +``` + +- [ ] **Step 4: Run GREEN test** + +Run: `corepack pnpm exec vitest run test/shutdown-flush.test.ts` + +Expected: PASS. + +## Task 3: Register Flush Function And REST Endpoint + +Do not start this task until the user explicitly approves adding +`POST /agentmemory/shutdown/flush` and increasing the REST endpoint count from +135 to 136. If approval is withheld, stop and revise the plan to use a +non-public transport. + +**Files:** +- Modify: `src/index.ts` +- Modify: `src/triggers/api.ts` +- Modify: `test/api-boundary-coverage.test.ts` + +- [ ] **Step 1: Add failing API boundary test** + +In `test/api-boundary-coverage.test.ts`, add a test using the existing `mockSdk()` and `req()` helpers. Authenticate with `Bearer secret` because the `beforeEach` registers the API with secret `"secret"`. Use `sdk.registerTrigger.mock.calls` instead of a nonexistent `registeredTriggers` variable. Also extend the local `trigger` mock to return `{ success: true }` for `mem::shutdown-flush`. + +```ts +const handler = sdk.getFunction("api::shutdown-flush")!; +expect(sdk.registerTrigger).toHaveBeenCalledWith({ + type: "http", + function_id: "api::shutdown-flush", + config: { api_path: "/agentmemory/shutdown/flush", http_method: "POST" }, +}); +await expect(handler(req({ auth: "Bearer secret", body: { ignored: true } }))).resolves.toEqual({ + status_code: 200, + body: { success: true }, +}); +expect(sdk.trigger).toHaveBeenCalledWith({ + function_id: "mem::shutdown-flush", + payload: {}, +}); +``` + +Use the local helper names already present in that file rather than introducing a second SDK harness. + +- [ ] **Step 2: Run RED API test** + +Run: `corepack pnpm exec vitest run test/api-boundary-coverage.test.ts -t "shutdown flush"` + +Expected: FAIL because `api::shutdown-flush` is not registered. + +- [ ] **Step 3: Register API endpoint with auth and whitelisted empty payload** + +In `src/triggers/api.ts`, add: + +```ts + sdk.registerFunction("api::shutdown-flush", async (req: ApiRequest) => { + const authErr = checkAuth(req, secret); + if (authErr) return authErr; + const result = await sdk.trigger({ + function_id: "mem::shutdown-flush", + payload: {}, + }); + if ( + result && + typeof result === "object" && + (result as { success?: unknown }).success === false + ) { + return { status_code: 503, body: result }; + } + return { status_code: 200, body: result }; + }); + sdk.registerTrigger({ + type: "http", + function_id: "api::shutdown-flush", + config: { api_path: "/agentmemory/shutdown/flush", http_method: "POST" }, + }); +``` + +Do not forward `req.body`. + +- [ ] **Step 4: Register worker function in reconnect replay** + +In `src/index.ts`: +- import `registerShutdownFlushFunction` +- create `indexPersistence` before `registerAllFunctions` +- call `setIndexPersistence(indexPersistence)` before function registration +- initialize `let shutdownFlushReady = false` +- call `registerShutdownFlushFunction(sdk, indexPersistence, { isReady: () => shutdownFlushReady })` inside `registerAllFunctions` +- keep `await indexPersistence.load()` after registration, but set `shutdownFlushReady = true` only after persisted BM25/vector state has loaded/restored or dimension recovery has completed +- update `test/reconnect-registration.test.ts` so the replayed registration closure includes `registerShutdownFlushFunction(...)` + +Expected shape: + +```ts + const indexPersistence = new IndexPersistence(kv, bm25Index, vectorIndex); + setIndexPersistence(indexPersistence); + let shutdownFlushReady = false; + + const registerAllFunctions = () => { + registerShutdownFlushFunction(sdk, indexPersistence, { + isReady: () => shutdownFlushReady, + }); + registerPrivacyFunction(sdk); + // existing registrations... + }; +``` + +- [ ] **Step 5: Run GREEN API and consistency-focused tests** + +Run: +- `corepack pnpm exec vitest run test/shutdown-flush.test.ts test/api-boundary-coverage.test.ts -t "shutdown flush|registerShutdownFlushFunction"` +- `corepack pnpm exec vitest run test/reconnect-registration.test.ts` +- `corepack pnpm exec vitest run test/consistency.test.ts` + +Expected: API tests pass; consistency likely fails until count docs are updated. + +## Task 4: CLI Pre-Stop Flush And Windows Fail-Closed Policy + +**Files:** +- Modify: `src/cli.ts` +- Modify: `src/cli/stop-processes.ts` +- Modify: `test/cli-stop-port-detection.test.ts` or create one focused CLI stop test. + +- [ ] **Step 1: Add failing tests for a responsive native stop executor** + +Export a new responsive-stop executor from `src/cli/stop-processes.ts`; do not leave the behavior only inside `src/cli.ts`. The tests must call that exported executor directly and `test/cli-stop-port-detection.test.ts` must also keep a source assertion that `runStop()` delegates to it. + +The executor should accept injected effects: + +```ts +type ResponsiveNativeStopEffects = { + isWindows: boolean; + force: boolean; + flush: () => Promise; + signal: ( + pid: number, + signal: NodeJS.Signals, + timeoutMs: number, + role: "worker" | "engine", + ) => Promise; + clear: () => void; + warn: (message: string) => void; + error: (message: string) => void; + outro: (message: string) => void; +}; +``` + +Add a worker + engine ordering test: + +```ts +expect(events).toEqual([ + "flush", + "worker:40000:SIGTERM:5000", + "engine:39672:SIGTERM:3000", +]); +``` + +Add an engine-only test for the missing-worker-pidfile case: + +```ts +expect(events).toEqual([ + "flush", + "engine:39672:SIGTERM:3000", + "clear", + "outro:Stopped. Memories persisted to disk; restart anytime with: npx @agentmemory/agentmemory", +]); +``` + +- [ ] **Step 2: Add failing test for Windows fail-closed behavior** + +Expected event sequence for Windows non-force when flush fails: + +```ts +expect(result).toEqual({ action: "preserved", exitCode: 1 }); +expect(events).toEqual(["flush", "error"]); +``` + +No worker or engine signal may occur, and pidfiles must not be cleared. + +- [ ] **Step 3: Add failing test for force warning** + +Expected behavior for force when flush fails: + +```ts +expect(events).toContain("warn:pre-stop flush failed; --force will terminate without confirmed persistence"); +expect(events).toContain("worker:40000:SIGTERM:5000"); +expect(events).toContain("engine:39672:SIGTERM:3000"); +expect(outro).not.toContain("Memories persisted"); +``` + +- [ ] **Step 4: Add failing tests for unresponsive Windows orphan and force wording** + +In `test/cli-stop-port-detection.test.ts`, update/add tests so: +- Windows `stop-orphaned-worker` does not signal the worker in non-force mode and returns a preserve/fail-closed result. +- Unresponsive `force-stop` still signals worker before engine, but the outro does not contain `Memories persisted`. + +- [ ] **Step 5: Run RED CLI tests** + +Run the selected focused CLI stop test command, for example: + +`corepack pnpm exec vitest run test/cli-stop-port-detection.test.ts` + +Expected: FAIL because the responsive executor and Windows orphan policy do not exist. + +- [ ] **Step 6: Implement pre-stop checkpoint** + +In `src/cli.ts`, add a `POST` helper that uses existing auth header policy: + +```ts +async function postShutdownFlush(base: string, timeoutMs = 5000): Promise { + try { + const url = `${base}/agentmemory/shutdown/flush`; + const headers = buildJsonRequestHeaders(url); + if (!headers.ok) { + p.log.warn(headers.message); + return false; + } + const res = await fetch(url, { + method: "POST", + signal: AbortSignal.timeout(timeoutMs), + headers: headers.headers, + body: "{}", + }); + const body = await res.json().catch(() => null); + return ( + res.ok && + body !== null && + typeof body === "object" && + (body as { success?: unknown }).success === true + ); + } catch { + return false; + } +} +``` + +Call it once before any responsive native stop attempts to signal worker or engine candidates. This includes the case where `worker.pid` is missing and only an engine PID is known. + +Policy: +- if flush succeeds: proceed with existing worker-first stop. +- if flush fails on Windows and `force` is false: do not signal worker or engine; print an error explaining that Windows termination cannot run the worker shutdown handler; exit 1. +- if flush fails and `force` is true: warn and proceed, but final outro must not claim persistence. +- if flush fails on non-Windows: warn and proceed because POSIX signal handler remains the existing shutdown path. +- if no worker or engine PIDs are known: do not call flush. + +- [ ] **Step 7: Implement responsive executor delegation and unresponsive Windows orphan policy** + +In `src/cli/stop-processes.ts`: +- add `executeResponsiveNativeStop()` and use it from `runStop()` instead of inline loops. +- keep worker-first signaling and 5000 ms worker grace. +- change `executeUnresponsiveNativeStop()` so Windows non-force `stop-orphaned-worker` preserves/fails closed instead of signaling the worker. +- change unresponsive `force-stop` outro to avoid claiming `Memories persisted`. + +- [ ] **Step 8: Run GREEN CLI tests** + +Run: +- `corepack pnpm exec vitest run test/cli-stop-port-detection.test.ts` + +Expected: PASS. + +## Task 5: Endpoint Count And Docs Consistency + +Run this task only if the REST endpoint was explicitly approved and implemented. + +**Files:** +- Modify: `README.md` +- Modify: `AGENTS.md` +- Modify: `src/index.ts` +- Modify: `assets/tags/section-api.svg` +- Modify: `assets/tags/light/section-api.svg` + +- [ ] **Step 1: Update count from 135 to 136** + +Update only the endpoint-count strings required by `test/consistency.test.ts`: +- `README.md`: `135 endpoints on port` -> `136 endpoints on port` +- `AGENTS.md`: `135 REST endpoints` -> `136 REST endpoints` +- `src/index.ts`: `REST API: 135 endpoints` -> `REST API: 136 endpoints` +- both SVG badges: `135 REST endpoints` -> `136 REST endpoints` + +- [ ] **Step 2: Run consistency test** + +Run: `corepack pnpm exec vitest run test/consistency.test.ts` + +Expected: PASS. + +## Task 6: Verification, Review, And GitHub Push Prepare + +**Files:** +- Modify: task record only for results. + +- [ ] **Step 1: Run targeted test set** + +Run: +- `corepack pnpm exec vitest run test/index-persistence.test.ts` +- `corepack pnpm exec vitest run test/shutdown-flush.test.ts` +- `corepack pnpm exec vitest run test/api-boundary-coverage.test.ts -t "shutdown flush|auth|boundary"` +- `corepack pnpm exec vitest run test/cli-stop-port-detection.test.ts` +- `corepack pnpm exec vitest run test/reconnect-registration.test.ts` +- `corepack pnpm exec vitest run test/engine-launch.test.ts test/runtime-config.test.ts test/cli-iii-config.test.ts test/consistency.test.ts` + +Expected: PASS. + +- [ ] **Step 2: Run broader repo-native tests if dependencies are present** + +Run: `corepack pnpm test` + +Expected: PASS. If dependencies are missing, run `corepack pnpm install --frozen-lockfile --ignore-scripts` from a sanitized package-manager environment, then rerun. + +- [ ] **Step 3: Run focused simplification pass** + +Inspect changed files and remove avoidable complexity without changing the approved boundary. Re-run affected targeted tests after any edit. + +- [ ] **Step 4: Final review subagents** + +Dispatch final Security, Test Coverage, and Maintainability reviewers against the stable diff. Triage every finding with evidence. + +- [ ] **Step 5: Security gates** + +Run Semgrep because persistence/API/local-networking/shutdown behavior changed: + +`semgrep scan --config p/default --error --metrics=off .` + +After staging only task-owned files, run: + +`gitleaks protect --staged --redact` + +OSV is not required unless dependency, lockfile, container, or vendored surfaces change. + +- [ ] **Step 6: Commit and local GitHub push prep** + +Stage only task-owned files, inspect staged diff, run `verification-before-completion`, and commit with: + +`git commit -m "fix: checkpoint worker before native stop"` + +Then run `$github-push-prepare` local branch-prep. It may fetch `origin main` as part of that skill, but push and PR creation remain blocked until separately approved. diff --git a/docs/todos/2026-06-20-issue-338-stop-order-data-loss/todo.md b/docs/todos/2026-06-20-issue-338-stop-order-data-loss/todo.md new file mode 100644 index 000000000..22bfbbcce --- /dev/null +++ b/docs/todos/2026-06-20-issue-338-stop-order-data-loss/todo.md @@ -0,0 +1,165 @@ +# Issue 338 Stop Order Data Loss + +Task id: `2026-06-20-issue-338-stop-order-data-loss` + +## Scope + +Handle fork issue #338 on branch `issue/338-stop-order-data-loss` from +verified `origin/main` `51f926fe918a100228d6ce92dadb19933e46ccbf`. + +## Sprint Contract + +Goal: validate and, if approved at the required boundary, fix data loss on +`agentmemory stop` followed by restart without broadening persistence or +iii-engine lifecycle behavior beyond the issue. + +Scope: +- Validate issue #338 against current fork code and public unauthenticated issue + evidence. +- Use `$arena` before implementation. +- If the issue remains valid, keep implementation surgical around CLI stop, + shutdown flush, iii runtime config pathing, or focused startup reconciliation + as approved. +- Add focused red/green tests before production edits. +- Target PR workflow only at `origin` (`https://github.com/wbugitlab1/agentmemory.git`). + +Non-goals: +- Do not target `https://github.com/rohitg00/agentmemory/`. +- Do not change MCP tool counts, REST endpoint counts, schemas, auth, tenancy, + package versions, or storage model unless explicitly approved in this task. +- Do not import broad upstream patches or unrelated hook/installer behavior. +- Do not perform remote writes until implementation is verified and the PR + workflow reaches that step. + +Acceptance criteria: +- Current stop order, shutdown flush path, data-dir path handling, and relevant + Windows behavior are assessed with repo evidence. +- Arena synthesis records whether the issue is valid, stale, duplicate, already + fixed, or scope-expanded. +- Human checkpoint happens before any persistence path, iii-engine lifecycle, + startup reconcile, public API/tool/schema, installer, or engine-boundary + behavior change. +- If implementation proceeds, tests cover stop order, flush behavior, path + handling, and any shutdown grace or explicit flush behavior changed. +- Required verification and security gates pass or blockers are recorded. + +Known boundaries: +- Public issue metadata and comments were read through unauthenticated GitHub + API calls only. +- The base already contains worker-before-engine stop order, 5 second worker + shutdown grace in native stop paths, and bundled runtime config rewriting to + `~/.agentmemory/data`. +- Issue comments report that Windows still loses data because Node `SIGTERM` + behaves as a hard process termination there; fixing that may require an + explicit flush IPC/HTTP path, Windows-native graceful stop, or startup + reconciliation. These are persistence and engine-lifecycle boundaries. +- Current checkout contains task-owned implementation, test, generated count, + and task-state changes only. The branch tracks `origin/main` and is behind + newer `origin/main` commits by local status, but the requested start ref is + preserved until local branch-prep rebases/merges are explicitly handled. + +Stop conditions: +- Stop before closing the issue as invalid, stale, duplicate, unreproducible, or + already fixed. +- Stop before scope expansion beyond the issue. +- Stop before persistence, iii-engine lifecycle, Windows control-flow, + startup-reconcile, public API/tool/schema, installer, auth, dependency, or + storage-boundary changes. +- Stop if required tests or security gates fail and the finding is not fixed. + +## Evidence + +- `git status -sb --untracked-files=all`: clean on detached `HEAD` before + branch creation; after branch creation it reports + `issue/338-stop-order-data-loss...origin/main [behind 4]`. +- `git remote -v`: `origin` is the target fork; `upstream` exists but is out of + bounds. +- Public issue #338 is open and imports upstream #843. Body reports POSIX stop + order, relative state paths, and too-short worker grace. +- Public comments add a Windows-specific repro on v0.9.27 where worker-first + `SIGTERM` is still a hard kill and no shutdown flush handler runs. +- `src/cli.ts` currently stops worker candidates before engine candidates in + responsive native stop and uses 5000 ms worker grace. +- `src/cli/stop-processes.ts` currently executes unresponsive `--force` plans + worker-first and uses 5000 ms worker grace. +- `src/cli/build-runtime.ts`, `src/cli/engine-launch.ts`, README, and tests + already cover absolute bundled runtime state paths under `~/.agentmemory/data`. +- Related task records read: issue 303 data-dir, issue 909 bounded SDK + shutdown, issue 796 full shutdown timeout, and engine exit supervision. + +## Feature / Verification Matrix + +| Change / Decision | Verification method | Status | Evidence | +| --- | --- | --- | --- | +| Branch/worktree setup | Git status/remotes/worktree/ref checks | Done | Branch created from requested `origin/main` ref. | +| Issue metadata | Public unauthenticated GitHub API | Done | Issue #338 open; 2 comments read. | +| Existing body fixes | Source and test inspection | Done | Stop order, 5s worker grace, and data-dir fixes are present on current base. | +| Windows residual validity | Arena plus source/test inspection | Done | Arena synthesis finds #338 valid only for native Windows shutdown residual. | +| Human checkpoint | User decision | Done | User approved implementation with `$github-feature-loop`, then explicitly approved adding `POST /agentmemory/shutdown/flush` and updating REST endpoint counts from 135 to 136. | +| Implementation plan | `$github-feature-loop` / `$writing-plans` | Done | Plan saved at `docs/todos/2026-06-20-issue-338-stop-order-data-loss/plan.md`. | +| Implementation | TDD | Done | Added strict index checkpointing, worker shutdown flush, REST dispatch, CLI pre-stop flush, readiness gating, and Windows fail-closed stop behavior. | +| Verification/security gates | Repo-native checks plus required scans | Done | RED/GREEN targeted tests, lint, full test suite, build, three final review lanes, Semgrep, and staged Gitleaks are green. | + +## Subagent Ledger + +| Workstream | Scope | Edits allowed | Expected output | Result | Residual risk | +| --- | --- | --- | --- | --- | --- | +| Arena candidate A | Issue #338 validity and likely fix path | No | Validity report, evidence, fix recommendation | Done | Strong evidence, slightly overcommits to REST as a likely transport. | +| Arena candidate B | Issue #338 validity and likely fix path | No | Validity report, evidence, fix recommendation | Done | Selected as base report. | +| Arena candidate C | Issue #338 validity and likely fix path | No | Validity report, evidence, fix recommendation | Done | Concise framing, weaker concrete test/file detail. | +| Arena judge | Candidate reports and rubric | No | Scores and recommended base | Done | Recommended B as base, graft A evidence and C framing. | +| Plan reviewer: architecture | Plan architecture/integration/ownership risk | No | High/Medium findings before implementation | Done | Valid findings: checkpoint must not depend on `worker.pid`; flush must be gated until indexes are loaded; Windows orphan non-force path must preserve/fail closed. | +| Plan reviewer: verification | Plan TDD/test feasibility | No | High/Medium findings before implementation | Done | Valid findings: helper-only test insufficient; `runStop()` must delegate to tested executor or be directly harnessed; API harness names/secret needed correction; force wording must be mandatory. | +| Plan reviewer: boundary | API/persistence/scope approval risk | No | High/Medium findings before implementation | Done | Valid finding: explicit approval is still needed before adding `POST /agentmemory/shutdown/flush` and changing REST count. | +| Final security review | Stable implementation diff | No | ACCEPT or findings on persistence/API/auth/local networking | Done | Initial readiness/completeness findings were fixed; final result ACCEPT. | +| Final test coverage review | Stable implementation diff | No | ACCEPT or findings on RED/GREEN and regression coverage | Done | Initial auth/helper/wiring findings were fixed; final result ACCEPT. | +| Final maintainability review | Stable implementation diff | No | ACCEPT or findings on complexity/scope drift | Done | ACCEPT. | + +## Progress Notes + +- 2026-06-20: Read active repo instructions, repo-local triage workflow, + `$arena`, TDD, debugging, and feature-loop guidance. +- 2026-06-20: Confirmed worktree, remotes, verified requested `origin/main` + ref, and created branch `issue/338-stop-order-data-loss`. +- 2026-06-20: Read referenced triage batch record; delegation says #338 + replaces completed #328, while the record still lists #328 active. +- 2026-06-20: Public unauthenticated issue read showed body fixes are partly + stale on current base, but comments preserve a likely valid Windows shutdown + data-loss path. +- 2026-06-20: `$arena` completed. Synthesis recorded in + `docs/todos/2026-06-20-issue-338-stop-order-data-loss/arena-synthesis.md`. + Decision: issue #338 is stale for original broad POSIX/data-dir claims and + valid for a narrower native Windows shutdown residual. +- 2026-06-20: User approved implementation with `$github-feature-loop`. Plan + saved in `docs/todos/2026-06-20-issue-338-stop-order-data-loss/plan.md`. + Remote fetch/push/PR creation remains unapproved except the later + `$github-push-prepare` local branch-prep phase may fetch `origin main` under + that skill's rules. +- 2026-06-20: Pre-code plan review completed with three reviewers. The plan was + updated to require a tested responsive native stop executor wired from + `runStop()`, checkpoint before engine-only responsive stop when `worker.pid` + is missing, readiness gating so startup-time flush cannot overwrite restored + indexes with an empty snapshot, Windows orphan-worker fail-closed behavior, + and force/unresponsive messaging that avoids unconfirmed persistence claims. + Explicit REST/API boundary approval remains pending before Task 3. +- 2026-06-20: User explicitly approved the REST/API boundary: add + `POST /agentmemory/shutdown/flush`, reuse existing `AGENTMEMORY_SECRET` auth + and plaintext protections, and update REST endpoint counts from 135 to 136. +- 2026-06-20: Added RED tests before production edits for strict + `saveOrThrow()`, shutdown flush registration/readiness/failure propagation, + REST dispatch/auth, reconnect replay/readiness, and CLI stop ordering/failure + policy. +- 2026-06-20: Implemented worker-owned `mem::shutdown-flush`, authenticated + `POST /agentmemory/shutdown/flush`, strict index checkpointing, diagnostic + index loads, rebuild completeness tracking, startup readiness gating, and a + tested responsive native stop executor that flushes before any signal. +- 2026-06-20: Verification passed: + `corepack pnpm exec vitest run test/index-persistence.test.ts test/search.test.ts test/shutdown-flush.test.ts test/api-boundary-coverage.test.ts test/cli-stop-port-detection.test.ts test/reconnect-registration.test.ts test/engine-launch.test.ts test/runtime-config.test.ts test/cli-iii-config.test.ts test/consistency.test.ts`, + `corepack pnpm run lint`, `corepack pnpm test` (2935 tests), and + `corepack pnpm run build`. +- 2026-06-20: Final security, test-coverage, and maintainability review lanes + returned ACCEPT after follow-up fixes for startup readiness and test wiring. +- 2026-06-20: Required final security gates passed: + `semgrep scan --config p/default --error --metrics=off .` scanned 958 + tracked files with 0 findings, and + `gitleaks protect --staged --redact` scanned staged content with no leaks. diff --git a/plugin/skills/agentmemory-rest-api/REFERENCE.md b/plugin/skills/agentmemory-rest-api/REFERENCE.md index 790166fc1..8a867285f 100644 --- a/plugin/skills/agentmemory-rest-api/REFERENCE.md +++ b/plugin/skills/agentmemory-rest-api/REFERENCE.md @@ -5,7 +5,7 @@ Generated from `src/triggers/api.ts`. Do not edit the block below by hand; run ` The REST API is the primary surface. All paths are under `http://localhost:3111` (override with `--port`). When `AGENTMEMORY_SECRET` is set, send `Authorization: Bearer $AGENTMEMORY_SECRET`; localhost is otherwise open. -135 registered endpoints: +136 registered endpoints: | Method | Path | | --- | --- | @@ -116,6 +116,7 @@ The REST API is the primary surface. All paths are under `http://localhost:3111` | POST | `/agentmemory/session/start` | | GET | `/agentmemory/sessions` | | POST | `/agentmemory/sessions/reap` | +| POST | `/agentmemory/shutdown/flush` | | GET | `/agentmemory/signals` | | POST | `/agentmemory/signals/send` | | GET | `/agentmemory/sketches` | diff --git a/src/cli.ts b/src/cli.ts index 2dffc9fb2..f60383035 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -50,6 +50,7 @@ import { getImportJsonlTimeoutMs, getCliRestPort, } from "./cli/http.js"; +import { postShutdownFlush } from "./cli/shutdown-flush.js"; import { applyRuntimeEnvFileValues, applyRuntimeHostArgs, @@ -67,6 +68,7 @@ import { planEngineRestart, } from "./cli/engine-supervisor.js"; import { + executeResponsiveNativeStop, executeUnresponsiveNativeStop, parseNetstatListeningPids, planUnresponsiveNativeStop, @@ -2859,38 +2861,51 @@ async function runStop(): Promise { process.exit(1); } - let allStopped = true; - // #843: stop worker first, then engine. The worker's shutdown - // handler calls indexPersistence.save() -> kv.set() -> iii state::set - // to flush BM25/vector snapshots + audit rows. Killing iii first - // leaves those writes with no engine to land on, and the index + - // observations end up as in-memory state the iii process never - // persists. Worker SIGTERM grace bumped 3s -> 5s to give a large - // index a real chance to commit before the engine goes away. - for (const pid of workerCandidates) { - const s = p.spinner(); - s.start(`Stopping agentmemory worker (pid ${pid})... [flushing state]`); - const ok = await signalAndWait(pid, "SIGTERM", 5000); - s.stop(ok ? `Stopped worker pid ${pid}` : `Failed to stop worker pid ${pid}`); - if (!ok) allStopped = false; - } - for (const pid of candidates) { - if (workerCandidates.has(pid)) continue; - const s = p.spinner(); - s.start(`Stopping iii-engine (pid ${pid})...`); - const ok = await signalAndWait(pid, "SIGTERM", 3000); - s.stop(ok ? `Stopped pid ${pid}` : `Failed to stop pid ${pid}`); - if (!ok) allStopped = false; - } - - clearEnginePidfile(); - clearEngineState(); - clearWorkerPidfile(); - if (!allStopped) { - p.log.error("One or more processes survived SIGKILL. Investigate with `ps`."); - process.exit(1); + // #843/#338: stop worker first, then engine. On POSIX, the worker's + // shutdown handler flushes BM25/vector snapshots before iii exits. + // On Windows, Node's process.kill(SIGTERM) can terminate without + // running that handler, so ask the live worker to checkpoint before + // any native worker or engine signal. + const stopResult = await executeResponsiveNativeStop( + { + workerPids: [...workerCandidates], + enginePids: [...candidates], + }, + { + isWindows: IS_WINDOWS, + force, + flush: () => postShutdownFlush(getBaseUrl(), { + timeoutMs: 5000, + warn: (message) => p.log.warn(message), + }), + signal: async (pid, signal, timeoutMs, role) => { + const s = p.spinner(); + if (role === "worker") { + s.start(`Stopping agentmemory worker (pid ${pid})... [flushing state]`); + } else { + s.start(`Stopping iii-engine (pid ${pid})...`); + } + const ok = await signalAndWait(pid, signal, timeoutMs); + if (role === "worker") { + s.stop(ok ? `Stopped worker pid ${pid}` : `Failed to stop worker pid ${pid}`); + } else { + s.stop(ok ? `Stopped pid ${pid}` : `Failed to stop pid ${pid}`); + } + return ok; + }, + clear: () => { + clearEnginePidfile(); + clearEngineState(); + clearWorkerPidfile(); + }, + warn: (message) => p.log.warn(message), + error: (message) => p.log.error(message), + outro: (message) => p.outro(message), + }, + ); + if (stopResult.exitCode !== 0) { + process.exit(stopResult.exitCode); } - p.outro("Stopped. Memories persisted to disk; restart anytime with: npx @agentmemory/agentmemory"); } async function runMcp(): Promise { diff --git a/src/cli/shutdown-flush.ts b/src/cli/shutdown-flush.ts new file mode 100644 index 000000000..322f452d1 --- /dev/null +++ b/src/cli/shutdown-flush.ts @@ -0,0 +1,53 @@ +import { buildJsonRequestHeaders } from "./http.js"; + +type Env = Record; + +type FetchLike = ( + input: string, + init: { + method: "POST"; + signal: AbortSignal; + headers: Record; + body: string; + }, +) => Promise<{ + ok: boolean; + json: () => Promise; +}>; + +export async function postShutdownFlush( + base: string, + options: { + timeoutMs?: number; + env?: Env; + fetchFn?: FetchLike; + warn?: (message: string) => void; + } = {}, +): Promise { + const timeoutMs = options.timeoutMs ?? 5000; + const url = `${base.replace(/\/+$/, "")}/agentmemory/shutdown/flush`; + const headers = buildJsonRequestHeaders(url, options.env); + if (!headers.ok) { + options.warn?.(headers.message); + return false; + } + + try { + const fetchFn = options.fetchFn ?? fetch; + const res = await fetchFn(url, { + method: "POST", + signal: AbortSignal.timeout(timeoutMs), + headers: headers.headers, + body: "{}", + }); + const body = await res.json().catch(() => null); + return ( + res.ok && + body !== null && + typeof body === "object" && + (body as { success?: unknown }).success === true + ); + } catch { + return false; + } +} diff --git a/src/cli/stop-processes.ts b/src/cli/stop-processes.ts index c63ee5e4c..497ec2b3a 100644 --- a/src/cli/stop-processes.ts +++ b/src/cli/stop-processes.ts @@ -11,6 +11,31 @@ export type UnresponsiveNativeStopResult = | { action: "preserved"; exitCode: 1 } | { action: "stopped"; allStopped: boolean; exitCode: 0 | 1 }; +export type ResponsiveNativeStopPlan = { + enginePids: number[]; + workerPids: number[]; +}; + +export type ResponsiveNativeStopResult = + | { action: "preserved"; exitCode: 1 } + | { action: "stopped"; allStopped: boolean; exitCode: 0 | 1 }; + +export type ResponsiveNativeStopEffects = { + isWindows: boolean; + force: boolean; + flush: () => Promise; + signal: ( + pid: number, + signal: NodeJS.Signals, + timeoutMs: number, + role: "worker" | "engine", + ) => Promise; + clear: () => void; + warn: (message: string) => void; + error: (message: string) => void; + outro: (message: string) => void; +}; + export type UnresponsiveNativeStopEffects = { port: number; isWindows: boolean; @@ -84,6 +109,57 @@ export function planUnresponsiveNativeStop(input: { }; } +export async function executeResponsiveNativeStop( + plan: ResponsiveNativeStopPlan, + effects: ResponsiveNativeStopEffects, +): Promise { + const workerPids = sortedPids(new Set(plan.workerPids)); + const enginePids = new Set(plan.enginePids); + for (const pid of workerPids) enginePids.delete(pid); + const dedupedEnginePids = sortedPids(enginePids); + + const hasProcesses = workerPids.length > 0 || dedupedEnginePids.length > 0; + let checkpointConfirmed = true; + if (hasProcesses) { + checkpointConfirmed = await effects.flush(); + if (!checkpointConfirmed) { + if (effects.isWindows && !effects.force) { + effects.error( + "Pre-stop checkpoint failed. On Windows, terminating the worker may skip its shutdown handler and lose in-memory index state. Processes were left running; retry after the API is ready or use `agentmemory stop --force` to terminate without confirmed persistence.", + ); + return { action: "preserved", exitCode: 1 }; + } + effects.warn( + effects.force + ? "pre-stop flush failed; --force will terminate without confirmed persistence" + : "pre-stop flush failed; relying on the worker shutdown signal path", + ); + } + } + + let allStopped = true; + for (const pid of workerPids) { + const ok = await effects.signal(pid, "SIGTERM", 5000, "worker"); + if (!ok) allStopped = false; + } + for (const pid of dedupedEnginePids) { + const ok = await effects.signal(pid, "SIGTERM", 3000, "engine"); + if (!ok) allStopped = false; + } + + effects.clear(); + if (!allStopped) { + effects.error("One or more processes survived SIGKILL. Investigate with `ps`."); + return { action: "stopped", allStopped, exitCode: 1 }; + } + if (checkpointConfirmed) { + effects.outro("Stopped. Memories persisted to disk; restart anytime with: npx @agentmemory/agentmemory"); + } else { + effects.outro("Stopped. Persistence was not confirmed before termination; restart anytime with: npx @agentmemory/agentmemory"); + } + return { action: "stopped", allStopped, exitCode: 0 }; +} + export async function executeUnresponsiveNativeStop( plan: UnresponsiveNativeStopPlan, effects: UnresponsiveNativeStopEffects, @@ -94,6 +170,15 @@ export async function executeUnresponsiveNativeStop( effects.outro("Nothing to stop."); return { action: "nothing", exitCode: 0 }; case "stop-orphaned-worker": { + if (effects.isWindows) { + effects.warn( + `Orphaned worker pid ${plan.workerPid} is still running, but no iii-engine API is responding on :${effects.port} to confirm a pre-stop checkpoint.`, + ); + effects.info( + `Preserving ~/.agentmemory/worker.pid. Re-run with \`agentmemory stop --force\` to terminate without confirmed persistence, or inspect manually:\n tasklist /FI "PID eq ${plan.workerPid}"`, + ); + return { action: "preserved", exitCode: 1 }; + } const ok = await effects.signal(plan.workerPid, "SIGTERM", 3000, "worker"); effects.clear(); if (!ok) { @@ -135,7 +220,7 @@ export async function executeUnresponsiveNativeStop( effects.error("One or more processes survived SIGKILL. Investigate manually."); return { action: "stopped", allStopped, exitCode: 1 }; } - effects.outro("Stopped. Memories persisted to disk; restart anytime with: npx @agentmemory/agentmemory"); + effects.outro("Stopped unresponsive process(es). Persistence was not confirmed before termination; restart anytime with: npx @agentmemory/agentmemory"); return { action: "stopped", allStopped, exitCode: 0 }; } } diff --git a/src/functions/search.ts b/src/functions/search.ts index 0e085f521..1cf3cf123 100644 --- a/src/functions/search.ts +++ b/src/functions/search.ts @@ -214,6 +214,12 @@ export async function vectorIndexAddBatchGuarded( // batches. Set to 1 to fall back to the legacy per-item path. const DEFAULT_REBUILD_EMBED_BATCH = 32 +export type RebuildIndexResult = { + count: number + complete: boolean + failedSessions: string[] +} + function getRebuildEmbedBatchSize(): number { const raw = process.env.REBUILD_EMBED_BATCH_SIZE if (!raw) return DEFAULT_REBUILD_EMBED_BATCH @@ -221,7 +227,7 @@ function getRebuildEmbedBatchSize(): number { return Number.isFinite(n) && n > 0 ? n : DEFAULT_REBUILD_EMBED_BATCH } -export async function rebuildIndex(kv: StateKV): Promise { +export async function rebuildIndexWithStatus(kv: StateKV): Promise { const idx = getSearchIndex() idx.clear() @@ -242,10 +248,12 @@ export async function rebuildIndex(kv: StateKV): Promise { } const pending: EmbedJob[] = [] let count = 0 + let complete = true const flush = async (): Promise => { if (pending.length === 0) return - await vectorIndexAddBatchGuarded(pending) + const result = await vectorIndexAddBatchGuarded(pending) + if (result.fail > 0) complete = false pending.length = 0 } const enqueue = async (job: EmbedJob): Promise => { @@ -272,6 +280,7 @@ export async function rebuildIndex(kv: StateKV): Promise { count++ } } catch (err) { + complete = false logger.warn('rebuildIndex: failed to load memories', { error: err instanceof Error ? err.message : String(err), }) @@ -280,7 +289,7 @@ export async function rebuildIndex(kv: StateKV): Promise { const sessions = await kv.list(KV.sessions) if (!sessions.length) { await flush() - return count + return { count, complete, failedSessions: [] } } const obsPerSession: CompressedObservation[][] = [] @@ -300,6 +309,7 @@ export async function rebuildIndex(kv: StateKV): Promise { obsPerSession.push(...results) } if (failedSessions.length > 0) { + complete = false logger.warn('rebuildIndex: failed to load observations for sessions', { failedSessions }) } for (const observations of obsPerSession) { @@ -319,7 +329,11 @@ export async function rebuildIndex(kv: StateKV): Promise { // Drain the last partial batch. await flush() - return count + return { count, complete, failedSessions } +} + +export async function rebuildIndex(kv: StateKV): Promise { + return (await rebuildIndexWithStatus(kv)).count } export function registerSearchFunction(sdk: ISdk, kv: StateKV): void { diff --git a/src/functions/shutdown-flush.ts b/src/functions/shutdown-flush.ts new file mode 100644 index 000000000..b44932bc1 --- /dev/null +++ b/src/functions/shutdown-flush.ts @@ -0,0 +1,18 @@ +import type { ISdk } from "iii-sdk"; +import type { IndexPersistence } from "../state/index-persistence.js"; + +type ShutdownFlushIndexPersistence = Pick; + +export function registerShutdownFlushFunction( + sdk: ISdk, + indexPersistence: ShutdownFlushIndexPersistence, + readiness: { isReady: () => boolean }, +): void { + sdk.registerFunction("mem::shutdown-flush", async () => { + if (!readiness.isReady()) { + return { success: false, error: "index_not_ready" }; + } + await indexPersistence.saveOrThrow(); + return { success: true, flushedAt: new Date().toISOString() }; + }); +} diff --git a/src/index.ts b/src/index.ts index 6d838bca4..6ce8ea268 100644 --- a/src/index.ts +++ b/src/index.ts @@ -39,7 +39,7 @@ import { registerDiskSizeManager } from "./functions/disk-size-manager.js"; import { registerCompressFunction } from "./functions/compress.js"; import { registerSearchFunction, - rebuildIndex, + rebuildIndexWithStatus, getSearchIndex, setVectorIndex, setEmbeddingProvider, @@ -108,6 +108,7 @@ import { registerTemporalGraphFunctions } from "./functions/temporal-graph.js"; import { registerRetentionFunctions } from "./functions/retention.js"; import { registerCompressFileFunction } from "./functions/compress-file.js"; import { registerReplayFunctions } from "./functions/replay.js"; +import { registerShutdownFlushFunction } from "./functions/shutdown-flush.js"; import { registerOutlineFunctions } from "./functions/outline.js"; import { registerApiTriggers } from "./triggers/api.js"; import { registerEventTriggers } from "./triggers/events.js"; @@ -352,7 +353,18 @@ async function main() { graphWeight, ); + const indexPersistence = new IndexPersistence(kv, bm25Index, vectorIndex); + // Wire the persistence hook so delete paths can flush BM25/vector + // index mutations to disk. Without this, an in-memory remove can be + // lost across a hard process exit and the persisted snapshot + // restores the deleted entry at next boot. + setIndexPersistence(indexPersistence); + let shutdownFlushState: "loading" | "rebuilding" | "ready" | "unavailable" = "loading"; + const registerAllFunctions = () => { + registerShutdownFlushFunction(sdk, indexPersistence, { + isReady: () => shutdownFlushState === "ready", + }); registerPrivacyFunction(sdk); registerObserveFunction(sdk, kv, dedupMap, config.maxObservationsPerSession); registerImageQuotaCleanup(sdk, kv); @@ -437,16 +449,9 @@ async function main() { const healthMonitor = registerHealthMonitor(sdk, kv); - const indexPersistence = new IndexPersistence(kv, bm25Index, vectorIndex); - // Wire the persistence hook so delete paths can flush BM25/vector - // index mutations to disk. Without this, an in-memory remove can be - // lost across a hard process exit and the persisted snapshot - // restores the deleted entry at next boot. - setIndexPersistence(indexPersistence); - - const loaded = await indexPersistence.load().catch((err) => { + const loaded = await indexPersistence.loadWithDiagnostics().catch((err) => { console.warn(`[agentmemory] Failed to load persisted index:`, err); - return null; + return { bm25: null, vector: null, complete: false }; }); if (loaded?.bm25 && loaded.bm25.size > 0) { bm25Index.restoreFrom(loaded.bm25); @@ -484,6 +489,7 @@ async function main() { const needsRebuild = bm25Index.size === 0; if (needsRebuild) { + shutdownFlushState = "rebuilding"; // Fire-and-forget. rebuildIndex iterates every observation across // every session and AWAITS an embedding-provider call per record. // On a large corpus + rate-limited embedding endpoint that can @@ -492,17 +498,26 @@ async function main() { // unbound for the duration). The index lazily fills in over time // and search degrades gracefully — partial coverage > no viewer // for hours. Errors still surface via the inner .catch. - void rebuildIndex(kv) - .then((indexCount) => { - if (indexCount > 0) { - bootLog(`Search index rebuilt: ${indexCount} entries`); - indexPersistence.scheduleSave(); + void rebuildIndexWithStatus(kv) + .then(async (result) => { + if (result.count > 0) { + bootLog(`Search index rebuilt: ${result.count} entries`); + if (!loaded.complete || !result.complete) { + shutdownFlushState = "unavailable"; + return; + } + await indexPersistence.saveOrThrow(); + shutdownFlushState = "ready"; + } else { + shutdownFlushState = loaded.complete && result.complete ? "ready" : "unavailable"; } }) .catch((err) => { + shutdownFlushState = "unavailable"; console.warn(`[agentmemory] Failed to rebuild search index:`, err); }); } else { + shutdownFlushState = loaded.complete ? "ready" : "unavailable"; // Backfill memories into BM25 for users upgrading from <0.9.5: prior // versions of mem::remember never indexed memories, so the persisted // BM25 covers observations only and `memory_smart_search` returns @@ -583,7 +598,7 @@ async function main() { `Ready. ${embeddingProvider ? "Triple-stream (BM25+Vector+Graph)" : "BM25+Graph"} search active.`, ); bootLog( - `REST API: 135 endpoints at http://localhost:${config.restPort}/agentmemory/*`, + `REST API: 136 endpoints at http://localhost:${config.restPort}/agentmemory/*`, ); bootLog( `MCP surface (opt-in via \`npx @agentmemory/mcp\`): ${getAllTools().length} tools · 6 resources · 3 prompts`, diff --git a/src/state/index-persistence.ts b/src/state/index-persistence.ts index 6df0e2fda..d601c2f11 100644 --- a/src/state/index-persistence.ts +++ b/src/state/index-persistence.ts @@ -24,6 +24,8 @@ type IndexShardManifest = { chars: number; }; +type LoadDataResult = { ok: true; data: string | null } | { ok: false; data: null }; + type IndexPersistenceOptions = { shardChars?: number; createGeneration?: () => string; @@ -88,38 +90,51 @@ export class IndexPersistence { } async save(): Promise { + try { + await this.saveOrThrow(); + } catch (err) { + this.logFailure(err); + } + } + + async saveOrThrow(): Promise { if (this.timer) { clearTimeout(this.timer); this.timer = null; } - try { - await this.saveBm25Index(this.bm25.serialize()); - if (this.vector) { - await this.saveVectorIndex(this.vector.serialize()); - } - } catch (err) { - this.logFailure(err); + await this.saveBm25Index(this.bm25.serialize()); + if (this.vector) { + await this.saveVectorIndex(this.vector.serialize()); } } async load(): Promise<{ bm25: SearchIndex | null; vector: VectorIndex | null; + }> { + const { bm25, vector } = await this.loadWithDiagnostics(); + return { bm25, vector }; + } + + async loadWithDiagnostics(): Promise<{ + bm25: SearchIndex | null; + vector: VectorIndex | null; + complete: boolean; }> { let bm25: SearchIndex | null = null; let vector: VectorIndex | null = null; - const bm25Data = await this.loadBm25Data(); - if (bm25Data && typeof bm25Data === "string") { - bm25 = SearchIndex.deserialize(bm25Data); + const bm25Data = await this.loadBm25DataResult(); + if (bm25Data.data && typeof bm25Data.data === "string") { + bm25 = SearchIndex.deserialize(bm25Data.data); } - const vecData = await this.loadVectorData(); - if (vecData && typeof vecData === "string") { - vector = VectorIndex.deserialize(vecData); + const vecData = await this.loadVectorDataResult(); + if (vecData.data && typeof vecData.data === "string") { + vector = VectorIndex.deserialize(vecData.data); } - return { bm25, vector }; + return { bm25, vector, complete: bm25Data.ok && vecData.ok }; } stop(): void { @@ -338,11 +353,11 @@ export class IndexPersistence { }); } - private async loadBm25Data(): Promise { + private async loadBm25DataResult(): Promise { return this.loadShardedData(BM25_KEY, BM25_MANIFEST_KEY, "BM25"); } - private async loadVectorData(): Promise { + private async loadVectorDataResult(): Promise { return this.loadShardedData(VECTOR_KEY, VECTOR_MANIFEST_KEY, "vector"); } @@ -350,14 +365,14 @@ export class IndexPersistence { legacyKey: string, manifestKey: string, label: string, - ): Promise { + ): Promise { const manifest = await this.readIndexValue( KV.bm25Index, manifestKey, label, "manifest", ); - if (!manifest.ok) return null; + if (!manifest.ok) return { ok: false, data: null }; // #797: some iii-state adapters return `undefined` (not `null`) for // a missing key. The previous `value !== null` check passed // undefined through to loadManifestData, which then crashed on @@ -370,6 +385,10 @@ export class IndexPersistence { ) { return this.loadManifestData(manifest.value, label); } + if (manifest.value != null) { + logger.warn(`index persistence: ${label} shard manifest invalid`); + return { ok: false, data: null }; + } const legacy = await this.readIndexValue( KV.bm25Index, @@ -377,9 +396,11 @@ export class IndexPersistence { label, "legacy", ); - if (!legacy.ok) return null; - if (legacy.value && typeof legacy.value === "string") return legacy.value; - return null; + if (!legacy.ok) return { ok: false, data: null }; + if (legacy.value && typeof legacy.value === "string") { + return { ok: true, data: legacy.value }; + } + return { ok: true, data: null }; } private async readIndexValue( @@ -403,7 +424,7 @@ export class IndexPersistence { private async loadManifestData( manifest: IndexShardManifest, label: string, - ): Promise { + ): Promise { if ( manifest.v !== 1 || !Array.isArray(manifest.shards) || @@ -412,12 +433,12 @@ export class IndexPersistence { manifest.chars < 0 ) { logger.warn(`index persistence: ${label} shard manifest invalid`); - return null; + return { ok: false, data: null }; } for (const shard of manifest.shards) { if (!isValidShardDescriptor(shard)) { logger.warn(`index persistence: ${label} shard manifest invalid`); - return null; + return { ok: false, data: null }; } } const loadedShards = await Promise.all( @@ -434,7 +455,7 @@ export class IndexPersistence { scope: shard.scope, key: shard.key, }); - return null; + return { ok: false, data: null }; } if (chunk.length !== shard.chars) { logger.warn(`index persistence: ${label} shard length mismatch`, { @@ -443,7 +464,7 @@ export class IndexPersistence { expected: shard.chars, actual: chunk.length, }); - return null; + return { ok: false, data: null }; } chunks.push(chunk); chars += chunk.length; @@ -453,8 +474,8 @@ export class IndexPersistence { expected: manifest.chars, actual: chars, }); - return null; + return { ok: false, data: null }; } - return chunks.join(""); + return { ok: true, data: chunks.join("") }; } } diff --git a/src/triggers/api.ts b/src/triggers/api.ts index 5138e191e..59df39617 100644 --- a/src/triggers/api.ts +++ b/src/triggers/api.ts @@ -267,6 +267,28 @@ export function registerApiTriggers( config: { api_path: "/agentmemory/livez", http_method: "GET" }, }); + sdk.registerFunction("api::shutdown-flush", async (req: ApiRequest): Promise => { + const authErr = checkAuth(req, secret); + if (authErr) return authErr; + const result = await sdk.trigger({ + function_id: "mem::shutdown-flush", + payload: {}, + }); + if ( + result && + typeof result === "object" && + (result as { success?: unknown }).success === false + ) { + return { status_code: 503, body: result }; + } + return { status_code: 200, body: result }; + }); + sdk.registerTrigger({ + type: "http", + function_id: "api::shutdown-flush", + config: { api_path: "/agentmemory/shutdown/flush", http_method: "POST" }, + }); + sdk.registerFunction("api::config-flags", async (req: ApiRequest): Promise => { const authErr = checkAuth(req, secret); diff --git a/test/api-boundary-coverage.test.ts b/test/api-boundary-coverage.test.ts index 82bed619e..5b35240e3 100644 --- a/test/api-boundary-coverage.test.ts +++ b/test/api-boundary-coverage.test.ts @@ -187,6 +187,7 @@ function mockSdk() { if (input.function_id === "mem::slot-delete") return { success: true, payload: input.payload }; if (input.function_id === "mem::lesson-save") return { action: "created", payload: input.payload }; if (input.function_id === "mem::graph-extract") return { success: true, nodesAdded: 1, edgesAdded: 1 }; + if (input.function_id === "mem::shutdown-flush") return { success: true }; return { ok: true, function_id: input.function_id, payload: input.payload }; }); return { @@ -324,6 +325,29 @@ describe("REST API boundary coverage", () => { }); }); + it("POST /agentmemory/shutdown/flush dispatches a whitelisted empty checkpoint payload", async () => { + const shutdownFlush = sdk.getFunction("api::shutdown-flush")!; + + expect(sdk.registerTrigger).toHaveBeenCalledWith({ + type: "http", + function_id: "api::shutdown-flush", + config: { + api_path: "/agentmemory/shutdown/flush", + http_method: "POST", + }, + }); + await expect( + shutdownFlush(req({ auth: "Bearer secret", body: { ignored: true } })), + ).resolves.toEqual({ + status_code: 200, + body: { success: true }, + }); + expect(sdk.trigger).toHaveBeenCalledWith({ + function_id: "mem::shutdown-flush", + payload: {}, + }); + }); + it("rejects non-object metadata before remember dispatch", async () => { const remember = sdk.getFunction("api::remember")!; @@ -336,6 +360,7 @@ describe("REST API boundary coverage", () => { it("denies unauthenticated requests before direct-auth REST handlers reach state or memory functions", async () => { const protectedHandlers = [ + "api::shutdown-flush", "api::config-flags", "api::compress-file", "api::outline-build", diff --git a/test/cli-stop-port-detection.test.ts b/test/cli-stop-port-detection.test.ts index 83d60ce98..01230f40e 100644 --- a/test/cli-stop-port-detection.test.ts +++ b/test/cli-stop-port-detection.test.ts @@ -1,6 +1,8 @@ -import { describe, expect, it } from "vitest"; +import { describe, expect, it, vi } from "vitest"; import { readFileSync } from "node:fs"; +import { postShutdownFlush } from "../src/cli/shutdown-flush.js"; import { + executeResponsiveNativeStop, executeUnresponsiveNativeStop, parseNetstatListeningPids, planUnresponsiveNativeStop, @@ -8,6 +10,44 @@ import { } from "../src/cli/stop-processes.js"; describe("Windows stop port detection (#550)", () => { + it("posts the real shutdown flush request with auth headers and an empty body", async () => { + const fetchFn = vi.fn(async () => ({ + ok: true, + json: async () => ({ success: true }), + })); + + await expect( + postShutdownFlush("https://memory.example/", { + env: { AGENTMEMORY_SECRET: "secret" }, + fetchFn, + timeoutMs: 1234, + }), + ).resolves.toBe(true); + + expect(fetchFn).toHaveBeenCalledTimes(1); + expect(fetchFn.mock.calls[0]?.[0]).toBe("https://memory.example/agentmemory/shutdown/flush"); + expect(fetchFn.mock.calls[0]?.[1]).toMatchObject({ + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: "Bearer secret", + }, + body: "{}", + }); + expect(fetchFn.mock.calls[0]?.[1].signal).toBeInstanceOf(AbortSignal); + }); + + it("treats rejected shutdown flush responses as failed checkpoints", async () => { + const fetchFn = vi.fn(async () => ({ + ok: true, + json: async () => ({ success: false, error: "index_not_ready" }), + })); + + await expect( + postShutdownFlush("http://localhost:3111", { fetchFn }), + ).resolves.toBe(false); + }); + it("parses IPv4 and IPv6 TCP listeners from netstat output without relying on localized state text", () => { const output = [ " Proto Local Address Foreign Address State PID", @@ -132,10 +172,151 @@ describe("Windows stop port detection (#550)", () => { "40000:SIGTERM:5000", "39672:SIGTERM:3000", "clear", + "outro:Stopped unresponsive process(es). Persistence was not confirmed before termination; restart anytime with: npx @agentmemory/agentmemory", + ]); + }); + + it("executes responsive native stop by checkpointing before worker and engine signals", async () => { + const events: string[] = []; + + const result = await executeResponsiveNativeStop( + { + workerPids: [40000], + enginePids: [39672], + }, + { + isWindows: true, + force: false, + flush: async () => { + events.push("flush"); + return true; + }, + signal: async (pid, signal, timeoutMs, role) => { + events.push(`${role}:${pid}:${signal}:${timeoutMs}`); + return true; + }, + clear: () => events.push("clear"), + warn: (message) => events.push(`warn:${message}`), + error: (message) => events.push(`error:${message}`), + outro: (message) => events.push(`outro:${message}`), + }, + ); + + expect(result).toEqual({ action: "stopped", allStopped: true, exitCode: 0 }); + expect(events).toEqual([ + "flush", + "worker:40000:SIGTERM:5000", + "engine:39672:SIGTERM:3000", + "clear", + "outro:Stopped. Memories persisted to disk; restart anytime with: npx @agentmemory/agentmemory", + ]); + }); + + it("checkpoints responsive engine-only stop when the worker pidfile is missing", async () => { + const events: string[] = []; + + const result = await executeResponsiveNativeStop( + { + workerPids: [], + enginePids: [39672], + }, + { + isWindows: true, + force: false, + flush: async () => { + events.push("flush"); + return true; + }, + signal: async (pid, signal, timeoutMs, role) => { + events.push(`${role}:${pid}:${signal}:${timeoutMs}`); + return true; + }, + clear: () => events.push("clear"), + warn: (message) => events.push(`warn:${message}`), + error: (message) => events.push(`error:${message}`), + outro: (message) => events.push(`outro:${message}`), + }, + ); + + expect(result).toEqual({ action: "stopped", allStopped: true, exitCode: 0 }); + expect(events).toEqual([ + "flush", + "engine:39672:SIGTERM:3000", + "clear", "outro:Stopped. Memories persisted to disk; restart anytime with: npx @agentmemory/agentmemory", ]); }); + it("preserves Windows responsive stop without signaling when checkpointing fails", async () => { + const events: string[] = []; + + const result = await executeResponsiveNativeStop( + { + workerPids: [40000], + enginePids: [39672], + }, + { + isWindows: true, + force: false, + flush: async () => { + events.push("flush"); + return false; + }, + signal: async (pid) => { + events.push(`signal:${pid}`); + return true; + }, + clear: () => events.push("clear"), + warn: (message) => events.push(`warn:${message}`), + error: (message) => events.push(`error:${message}`), + outro: (message) => events.push(`outro:${message}`), + }, + ); + + expect(result).toEqual({ action: "preserved", exitCode: 1 }); + expect(events).toEqual([ + "flush", + "error:Pre-stop checkpoint failed. On Windows, terminating the worker may skip its shutdown handler and lose in-memory index state. Processes were left running; retry after the API is ready or use `agentmemory stop --force` to terminate without confirmed persistence.", + ]); + }); + + it("warns and avoids persistence claims when force proceeds after a failed checkpoint", async () => { + const events: string[] = []; + + const result = await executeResponsiveNativeStop( + { + workerPids: [40000], + enginePids: [39672], + }, + { + isWindows: true, + force: true, + flush: async () => { + events.push("flush"); + return false; + }, + signal: async (pid, signal, timeoutMs, role) => { + events.push(`${role}:${pid}:${signal}:${timeoutMs}`); + return true; + }, + clear: () => events.push("clear"), + warn: (message) => events.push(`warn:${message}`), + error: (message) => events.push(`error:${message}`), + outro: (message) => events.push(`outro:${message}`), + }, + ); + + expect(result).toEqual({ action: "stopped", allStopped: true, exitCode: 0 }); + expect(events).toEqual([ + "flush", + "warn:pre-stop flush failed; --force will terminate without confirmed persistence", + "worker:40000:SIGTERM:5000", + "engine:39672:SIGTERM:3000", + "clear", + "outro:Stopped. Persistence was not confirmed before termination; restart anytime with: npx @agentmemory/agentmemory", + ]); + }); + it("executes unresponsive preserve and failed force-stop plans without real process effects", async () => { const preserveEvents: string[] = []; const preserveResult = await executeUnresponsiveNativeStop( @@ -187,6 +368,33 @@ describe("Windows stop port detection (#550)", () => { ]); }); + it("preserves Windows orphan workers in non-force mode because SIGTERM cannot confirm a flush", async () => { + const events: string[] = []; + + const result = await executeUnresponsiveNativeStop( + { action: "stop-orphaned-worker", workerPid: 40000 }, + { + port: 3111, + isWindows: true, + signal: async (pid) => { + events.push(`signal:${pid}`); + return true; + }, + clear: () => events.push("clear"), + warn: (message) => events.push(`warn:${message}`), + error: (message) => events.push(`error:${message}`), + info: (message) => events.push(`info:${message}`), + outro: (message) => events.push(`outro:${message}`), + }, + ); + + expect(result).toEqual({ action: "preserved", exitCode: 1 }); + expect(events).toEqual([ + "warn:Orphaned worker pid 40000 is still running, but no iii-engine API is responding on :3111 to confirm a pre-stop checkpoint.", + "info:Preserving ~/.agentmemory/worker.pid. Re-run with `agentmemory stop --force` to terminate without confirmed persistence, or inspect manually:\n tasklist /FI \"PID eq 40000\"", + ]); + }); + it("wires runStop to the unresponsive planner and conservative Windows netstat failure handling", () => { const source = readFileSync("src/cli.ts", "utf-8"); expect(source).toContain("unresponsive"); @@ -194,6 +402,14 @@ describe("Windows stop port detection (#550)", () => { expect(source).toContain("findEnginePidsByPort"); expect(source).toContain("const stopPlan = planUnresponsiveNativeStop"); expect(source).toContain("const stopResult = await executeUnresponsiveNativeStop"); + expect(source).toContain("executeResponsiveNativeStop"); + expect(source).toContain('import { postShutdownFlush } from "./cli/shutdown-flush.js";'); + const responsiveStart = source.indexOf("const stopResult = await executeResponsiveNativeStop("); + const responsiveBlock = source.slice(responsiveStart, source.indexOf("if (stopResult.exitCode", responsiveStart)); + expect(responsiveBlock).toContain("flush: () => postShutdownFlush(getBaseUrl(), {"); + expect(responsiveBlock.indexOf("flush: () => postShutdownFlush(getBaseUrl(), {")).toBeLessThan( + responsiveBlock.indexOf("signal: async (pid, signal, timeoutMs, role) => {"), + ); expect(source).toContain("windowsNetstatArgs()"); expect(source).toContain("netstat -ano -p TCP"); expect(source).toMatch(/catch \(err\)[\s\S]{0,240}netstat/); diff --git a/test/index-persistence.test.ts b/test/index-persistence.test.ts index 929791657..338ebb26e 100644 --- a/test/index-persistence.test.ts +++ b/test/index-persistence.test.ts @@ -617,9 +617,10 @@ describe("IndexPersistence", () => { kv as never, new SearchIndex(), null, - ).load(); + ).loadWithDiagnostics(); expect(loaded.bm25).toBeNull(); + expect(loaded.complete).toBe(false); }); it("fails closed when a manifest shard length mismatches", async () => { @@ -637,9 +638,10 @@ describe("IndexPersistence", () => { kv as never, new SearchIndex(), null, - ).load(); + ).loadWithDiagnostics(); expect(loaded.bm25).toBeNull(); + expect(loaded.complete).toBe(false); }); it("fails closed before reading invalid shard descriptors", async () => { @@ -756,6 +758,20 @@ describe("IndexPersistence", () => { await expect(persistence.save()).resolves.toBeUndefined(); }); + it("saveOrThrow rejects when a checkpoint write fails", async () => { + const failingKv = { + ...mockKV(), + set: vi.fn(async () => { + throw new Error("TIMEOUT"); + }), + }; + const bm25 = new SearchIndex(); + bm25.add(makeObs({ id: "obs_1", title: "auth handler" })); + const persistence = new IndexPersistence(failingKv as never, bm25, null); + + await expect(persistence.saveOrThrow()).rejects.toThrow("TIMEOUT"); + }); + // #797: first run after upgrading to 0.9.25 crashed with // 'TypeError: Cannot read properties of undefined (reading "v")' // because some iii-state adapters return `undefined` (not `null`) @@ -777,6 +793,28 @@ describe("IndexPersistence", () => { expect(loaded.vector).toBeNull(); }); + it("loadWithDiagnostics reports incomplete when a persisted index read fails", async () => { + const failingKv = { + ...mockKV(), + get: vi.fn(async (scope: string, key: string) => { + if (scope === BM25_SCOPE && key === BM25_MANIFEST_KEY) { + throw new Error("manifest offline"); + } + return null; + }), + }; + const persistence = new IndexPersistence( + failingKv as never, + new SearchIndex(), + null, + ); + + const loaded = await persistence.loadWithDiagnostics(); + + expect(loaded.bm25).toBeNull(); + expect(loaded.complete).toBe(false); + }); + it("load() does not crash when a manifest row value is the wrong shape (#797)", async () => { const wrongShapeKv = { ...mockKV(), @@ -789,5 +827,8 @@ describe("IndexPersistence", () => { ); await expect(persistence.load()).resolves.toBeDefined(); + await expect(persistence.loadWithDiagnostics()).resolves.toMatchObject({ + complete: false, + }); }); }); diff --git a/test/reconnect-registration.test.ts b/test/reconnect-registration.test.ts index 5d56a477f..784a89210 100644 --- a/test/reconnect-registration.test.ts +++ b/test/reconnect-registration.test.ts @@ -61,6 +61,7 @@ describe("registerWithReconnectReplay", () => { expect(replayCall).toBeGreaterThan(closureStart); const replayedSource = source.slice(closureStart, replayCall); + expect(replayedSource).toContain("registerShutdownFlushFunction(sdk, indexPersistence"); expect(replayedSource).toContain("registerApiTriggers(sdk, kv, secret, metricsStore, provider)"); expect(replayedSource).toContain("registerEventTriggers(sdk, kv)"); expect(replayedSource).toContain("registerMcpEndpoints(sdk, kv, secret)"); @@ -69,6 +70,25 @@ describe("registerWithReconnectReplay", () => { expect(source.slice(replayCall)).toContain("registerHealthMonitor(sdk, kv)"); }); + it("keeps shutdown flush unavailable while the initial index rebuild is in flight", () => { + const source = readFileSync("src/index.ts", "utf-8"); + + expect(source).toContain( + 'let shutdownFlushState: "loading" | "rebuilding" | "ready" | "unavailable" = "loading"', + ); + expect(source).toContain('isReady: () => shutdownFlushState === "ready"'); + expect(source).toContain('shutdownFlushState = "rebuilding";'); + expect(source).toContain("indexPersistence.loadWithDiagnostics()"); + expect(source).toContain("rebuildIndexWithStatus(kv)"); + expect(source).toContain("if (!loaded.complete || !result.complete)"); + expect(source).toContain("await indexPersistence.saveOrThrow();"); + expect(source).toContain('shutdownFlushState = "ready";'); + expect(source).toContain('shutdownFlushState = loaded.complete && result.complete ? "ready" : "unavailable";'); + expect(source).toContain('shutdownFlushState = loaded.complete ? "ready" : "unavailable";'); + expect(source).toContain('shutdownFlushState = "unavailable";'); + expect(source).not.toContain("shutdownFlushReady = true"); + }); + it("schedules audit retry sweeps every 15 minutes unless disabled by env", () => { const source = readFileSync("src/index.ts", "utf-8"); diff --git a/test/search.test.ts b/test/search.test.ts index 06db7d16d..3af639218 100644 --- a/test/search.test.ts +++ b/test/search.test.ts @@ -4,7 +4,7 @@ vi.mock("../src/logger.js", () => ({ logger: { info: vi.fn(), warn: vi.fn(), error: vi.fn() }, })); -import { registerSearchFunction, getSearchIndex, rebuildIndex, setVectorIndex, setEmbeddingProvider, getVectorIndex } from "../src/functions/search.js"; +import { registerSearchFunction, getSearchIndex, rebuildIndex, rebuildIndexWithStatus, setVectorIndex, setEmbeddingProvider, getVectorIndex } from "../src/functions/search.js"; import { VectorIndex } from "../src/state/vector-index.js"; import { KV } from "../src/state/schema.js"; import type { CompressedObservation, Memory, Session } from "../src/types.js"; @@ -228,6 +228,46 @@ describe("mem::search", () => { expect(result.results[0]?.session).toEqual({ id: "ses_1" }); }); + it("rebuildIndexWithStatus reports incomplete when memory listing fails", async () => { + const failingKv = { + ...kv, + list: async (scope: string): Promise => { + if (scope === KV.memories) { + throw new Error("memories offline"); + } + return kv.list(scope); + }, + }; + + const result = await rebuildIndexWithStatus(failingKv as never); + + expect(result).toMatchObject({ + complete: false, + failedSessions: [], + }); + expect(result.count).toBe(2); + }); + + it("rebuildIndexWithStatus reports incomplete when a session observation list fails", async () => { + const failingKv = { + ...kv, + list: async (scope: string): Promise => { + if (scope === KV.observations("ses_1")) { + throw new Error("observations offline"); + } + return kv.list(scope); + }, + }; + + const result = await rebuildIndexWithStatus(failingKv as never); + + expect(result).toMatchObject({ + complete: false, + failedSessions: ["ses_1"], + }); + expect(result.count).toBe(0); + }); + it("does not attach another agent's session label to filtered memory results", async () => { await kv.set(KV.sessions, "ses_1", { id: "ses_1", diff --git a/test/shutdown-flush.test.ts b/test/shutdown-flush.test.ts new file mode 100644 index 000000000..f911e9c76 --- /dev/null +++ b/test/shutdown-flush.test.ts @@ -0,0 +1,71 @@ +import { describe, expect, it, vi } from "vitest"; +import { registerShutdownFlushFunction } from "../src/functions/shutdown-flush.js"; + +describe("registerShutdownFlushFunction", () => { + it("registers mem::shutdown-flush and checkpoints the index", async () => { + const handlers = new Map Promise>(); + const sdk = { + registerFunction: vi.fn( + (id: string, handler: (payload: unknown) => Promise) => { + handlers.set(id, handler); + }, + ), + }; + const indexPersistence = { saveOrThrow: vi.fn(async () => {}) }; + + registerShutdownFlushFunction(sdk as never, indexPersistence, { + isReady: () => true, + }); + + const handler = handlers.get("mem::shutdown-flush"); + expect(handler).toBeDefined(); + await expect(handler!({})).resolves.toMatchObject({ success: true }); + expect(indexPersistence.saveOrThrow).toHaveBeenCalledTimes(1); + }); + + it("propagates checkpoint failures so callers can fail closed", async () => { + const handlers = new Map Promise>(); + const sdk = { + registerFunction: vi.fn( + (id: string, handler: (payload: unknown) => Promise) => { + handlers.set(id, handler); + }, + ), + }; + const indexPersistence = { + saveOrThrow: vi.fn(async () => { + throw new Error("state::set failed"); + }), + }; + + registerShutdownFlushFunction(sdk as never, indexPersistence, { + isReady: () => true, + }); + + await expect(handlers.get("mem::shutdown-flush")!({})).rejects.toThrow( + "state::set failed", + ); + }); + + it("does not save before restored indexes are ready", async () => { + const handlers = new Map Promise>(); + const sdk = { + registerFunction: vi.fn( + (id: string, handler: (payload: unknown) => Promise) => { + handlers.set(id, handler); + }, + ), + }; + const indexPersistence = { saveOrThrow: vi.fn(async () => {}) }; + + registerShutdownFlushFunction(sdk as never, indexPersistence, { + isReady: () => false, + }); + + await expect(handlers.get("mem::shutdown-flush")!({})).resolves.toEqual({ + success: false, + error: "index_not_ready", + }); + expect(indexPersistence.saveOrThrow).not.toHaveBeenCalled(); + }); +}); diff --git a/website/lib/generated-meta.json b/website/lib/generated-meta.json index bb8222a13..1ebf81c69 100644 --- a/website/lib/generated-meta.json +++ b/website/lib/generated-meta.json @@ -2,9 +2,9 @@ "version": "0.9.28", "mcpTools": 61, "hooks": 12, - "restEndpoints": 135, - "testsPassing": 2626, + "restEndpoints": 136, + "testsPassing": 2651, "connectAdapters": 20, "skills": 15, - "generatedAt": "2026-06-19T16:03:59.493Z" + "generatedAt": "2026-06-20T02:47:04.426Z" }