Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
dca1aae
docs: add portable runtime v1 spec
yourbuddyconner May 6, 2026
ac0e54c
feat(engine): scaffold portable runtime engine prototype
yourbuddyconner May 6, 2026
72b017e
docs: spec and plan for persistent store + restart-safe gates
yourbuddyconner May 6, 2026
1a4c732
chore(engine): add drizzle-orm and better-sqlite3 deps
yourbuddyconner May 6, 2026
a0ceb41
feat(engine): add Drizzle SQLite schema for engine tables
yourbuddyconner May 6, 2026
cd27d3d
feat(engine): generate initial sqlite migration
yourbuddyconner May 6, 2026
d5842c7
test(engine): add SessionStore contract test suite
yourbuddyconner May 6, 2026
22c0b97
test(engine): run contract suite against InMemorySessionStore
yourbuddyconner May 6, 2026
a7d4bd5
feat(engine): SqliteSessionStore implementation
yourbuddyconner May 6, 2026
f507e23
test(engine): run contract suite against SqliteSessionStore
yourbuddyconner May 6, 2026
1e00e33
feat(engine): deterministic gate IDs derived from resumeKey
yourbuddyconner May 6, 2026
982ee04
feat(engine): requestDecision short-circuits on suspendedDecision replay
yourbuddyconner May 6, 2026
34b17a4
feat(engine): persist real tool call id and args on gate suspension
yourbuddyconner May 6, 2026
a9db358
test(engine): unit-test the gate short-circuit predicate
yourbuddyconner May 6, 2026
53ab91b
feat(engine): restoreSession rehydrates session, threads, and transcr…
yourbuddyconner May 6, 2026
fa0b3bc
feat(engine): replay blocked tool turns on session restore
yourbuddyconner May 6, 2026
ba7c74a
test(engine): full restart cycle restoreSession + resolve resumes turn
yourbuddyconner May 6, 2026
fc3a487
docs(engine): document persistent store and restart-safe gates
yourbuddyconner May 6, 2026
9720a48
feat(engine): add REPL bin for end-to-end Anthropic verification
yourbuddyconner May 6, 2026
8cecc6b
feat(engine): LocalSandbox provider for dev/testing on the host machine
yourbuddyconner May 6, 2026
cdac013
feat(engine): plugin Action Bridge via list_tools + call_tool indirec…
yourbuddyconner May 6, 2026
5c96805
spec: detail compaction design (pruning + summarization)
yourbuddyconner May 6, 2026
355e5c6
feat(engine): compaction (pruning + iterative summarization)
yourbuddyconner May 6, 2026
e7165f1
spec: add SessionStore.updateEntry for in-place entry mutation
yourbuddyconner May 6, 2026
0de190f
feat(engine): persist pruning + auto-continue after proactive compaction
yourbuddyconner May 6, 2026
05d89d1
feat(engine): REPL knobs for dogfooding compaction
yourbuddyconner May 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,709 changes: 2,709 additions & 0 deletions docs/plans/2026-05-05-persistent-store-restart-safe-gates.md

Large diffs are not rendered by default.

2,013 changes: 2,013 additions & 0 deletions docs/specs/2026-05-02-portable-runtime-engine-design.md

Large diffs are not rendered by default.

109 changes: 109 additions & 0 deletions packages/engine/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# @valet/engine

Prototype implementation of the portable runtime engine described in
[`docs/specs/2026-05-02-portable-runtime-engine-design.md`](../../docs/specs/2026-05-02-portable-runtime-engine-design.md).

This is the V1 in-repo engine library. It runs the agent loop, owns
session/thread state, executes tools, and emits typed events. Platform
adapters (Cloudflare, Kubernetes) host this library; this package itself
has zero platform dependencies.

## What works in this prototype

- Engine public API: `createSession`, `restoreSession({ sessionId, options })`,
`getSession`, `deleteSession`, `Session.prompt`, `Session.thread()`,
`Session.resolveDecision`, `Session.withdrawDecision`,
`Session.abort/pause/resume`.
- Per-thread state: each thread gets its own `pi-agent-core` `Agent`
instance with its own queue and DAG history.
- Per-thread queue modes: `followup` (FIFO), `steer` (abort + start),
`collect` (buffered window).
- Decision gates: tool calls `ctx.requestDecision({...})` to suspend the
turn. The gate is persisted, a `DecisionGateEntry` lands in the DAG, the
engine emits `decision_gate`, and the turn resumes when the user calls
`session.resolveDecision()`. Pending gates withdraw on `steer` or
`abort` and expire after `expiresAt`.
- **Restart-safe re-entrant decision gates.** Gate IDs are deterministic:
`gate:{sessionId}:{threadId}:{queueItemId}:{resumeKey}`. Tools must
supply a stable `resumeKey`. On `restoreSession`, the engine re-arms
pending gates and replays the suspended tool with `ctx.suspendedDecision`
populated; `requestDecision` short-circuits and returns the stored
resolution instead of opening a new gate. Validated by an end-to-end
test that opens a gate, throws away the engine, builds a new one on the
same store, calls `restoreSession`, then `resolveDecision`, and verifies
the agent's continuation message is persisted.
- Multi-thread: threads run concurrently, share the sandbox, and have
isolated histories. Aborting one thread doesn't affect siblings.
- Built-in `thread_read` tool: a thread can read recent messages from a
sibling, parent, or child thread.
- Built-in tools: `read`, `write`, `edit`, `bash`, `thread_read`.
- **Persistent SessionStore.** `SqliteSessionStore` (Drizzle SQLite schema,
migrations, in-process via `better-sqlite3`) implements the same
`SessionStore` interface as `InMemorySessionStore`. Both pass an
identical 10-test contract suite. Schema mirrors the V1 spec's required
tables: `engine_sessions`, `engine_threads`, `engine_entries`,
`engine_queue_state`, `engine_decision_gates`,
`engine_decision_gate_refs`, `engine_suspended_turns`, plus stubbed
`engine_queue_items` for future per-item visibility.
- In-memory providers: `InMemorySessionStore`, `InMemoryEventBus`,
`InMemoryBlobStore`, `InMemoryCredentialStore`, `VirtualSandbox` /
`VirtualSandboxProvider` (in-memory FS + a small whitelist of safe shell
commands). These double as test fixtures.

## What's deferred (post-prototype)

- **D1 wiring.** `SqliteSessionStore` uses `better-sqlite3`. The Cloudflare
adapter will reuse the same Drizzle queries through `drizzle-orm/d1`.
- **Postgres dialect mirror.** The K8s adapter contract requires a
pg-core schema mirror. Same logical schema, different column helpers;
doable in one task once the K8s adapter is on deck.
- **Per-queue-item rows.** Today the active and pending queue items are
persisted via the JSON-encoded `engine_queue_state.pending` column.
`engine_queue_items` exists as a schema stub; populating it gives the
adapter visibility into individual items but isn't a correctness
requirement.
- **Compaction.** Token-aware context compression is not implemented.
`CompactionEntry` is in the DAG schema; the algorithm itself is a
follow-up.
- **Roles & skills loading.** The types are defined, but role and skill
resolution at prompt time is not wired in.
- **Model failover.** Single-model only for now.
- **Plugin Action Bridge.** The `actionSourceToTools` adapter described
in the spec is not implemented yet — plugins should currently export
`ToolDef[]` directly.
- **Structured results.** Schema-validated output extraction with
`---RESULT_START---` delimiters is not implemented.

## Spec-vs-reality deltas (notes from the pi-ai/pi-agent-core spike)

The spec was written before pinning the API surface of `pi-ai` /
`pi-agent-core`. The implementation reconciles:

1. **`ToolDef.execute(args, ctx)` vs `AgentTool.execute(toolCallId, params, signal, onUpdate)`.**
The spec keeps the spec-faithful `ToolDef` shape as the public type;
internally we wrap each `ToolDef` to a pi `AgentTool` via
`tool-bridge.ts` and capture `ToolContext` in a closure.
2. **No native turn-suspension primitive.** pi-agent-core's
`beforeToolCall` can `{ block: true }` (deny path) but doesn't pause.
For a "wait for human" gate we await a Promise inside the tool.
3. **`message_start` vs `message_update`.** pi-agent-core fires
`message_start` once per assistant message; `message_update` carries
delta events (text, thinking, tool calls). The engine subscribes to
both.
4. **Custom `AgentMessage` types via `convertToLlm`.** The engine could
later persist `DecisionGateEntry` etc. as custom AgentMessages
alongside the LLM transcript, then filter them out before each LLM
call. We don't need this in the prototype because we persist via the
SessionStore directly, but the pattern is useful when we want
in-context awareness of past gates.

## Tests

```sh
pnpm --filter @valet/engine test
```

Covers: happy path (3), decision gates (4), queue modes (4),
multi-thread + thread_read (3), short-circuit predicate unit tests (6),
SessionStore contract suite × 2 backends (20), full restart cycle (1) —
41 tests total.
265 changes: 265 additions & 0 deletions packages/engine/bin/repl.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
#!/usr/bin/env -S node --import tsx
/**
* End-to-end smoke REPL for @valet/engine.
*
* Wires up:
* - InMemorySessionStore + InMemoryEventBus
* - VirtualSandbox (default) or LocalSandbox (real host filesystem + shell)
* - The engine's built-in tools (read/write/edit/bash/thread_read)
* - A real Anthropic model via pi-ai (defaults to claude-haiku-4-5)
*
* Env:
* ANTHROPIC_API_KEY required
* VALET_MODEL pi-ai anthropic model id (default claude-haiku-4-5)
* VALET_SANDBOX virtual | local (default virtual)
* VALET_WORKSPACE workspace dir for local sandbox (default cwd)
* VALET_SYSTEM_PROMPT override the system prompt
* GITHUB_TOKEN when set, registers @valet/plugin-github actions via
* the actionSourceToTools bridge (read/write GitHub)
* VALET_CONTEXT_WINDOW override the model's local contextWindow (forces
* compaction at a smaller budget for dogfooding)
* VALET_MAX_TOKENS override the model's local maxTokens
*
* Usage:
*
* # in-memory sandbox, single prompt:
* pnpm --filter @valet/engine repl "say hi"
*
* # local sandbox pointed at the current repo, interactive:
* VALET_SANDBOX=local pnpm --filter @valet/engine repl
*
* # local sandbox pointed at an explicit dir:
* VALET_SANDBOX=local VALET_WORKSPACE=/path/to/repo pnpm --filter @valet/engine repl "list the top-level files"
*/
import { createInterface } from "node:readline/promises";
import { stdin, stdout } from "node:process";
import { resolve } from "node:path";
import { getModel } from "@mariozechner/pi-ai";
import { githubActions } from "@valet/plugin-github/actions";
import {
actionBridgeTools,
Engine,
InMemoryCredentialStore,
InMemoryEventBus,
InMemorySessionStore,
LocalSandboxProvider,
VirtualSandboxProvider,
type ActionSourceConfig,
type BusEvent,
type SandboxProvider,
type Session,
type ToolDef,
} from "../src/index.js";

const MODEL_ID = process.env.VALET_MODEL ?? "claude-haiku-4-5";
const SANDBOX_KIND = (process.env.VALET_SANDBOX ?? "virtual").toLowerCase();
const WORKSPACE =
process.env.VALET_WORKSPACE ??
(SANDBOX_KIND === "local" ? process.cwd() : "/");

const SYSTEM_PROMPT_VIRTUAL =
"You are a helpful coding assistant running inside an in-memory virtual sandbox. " +
"You have built-in tools: read, write, edit, bash, thread_read. " +
"The sandbox starts empty at /. Be concise.";

const SYSTEM_PROMPT_LOCAL =
`You are a helpful coding assistant running on a local developer machine. ` +
`Your workspace is ${WORKSPACE}. Relative paths resolve there. ` +
`You have built-in tools: read, write, edit, bash, thread_read. ` +
`Be concise. Confirm with the user before making destructive changes.`;

const SYSTEM_PROMPT =
process.env.VALET_SYSTEM_PROMPT ??
(SANDBOX_KIND === "local" ? SYSTEM_PROMPT_LOCAL : SYSTEM_PROMPT_VIRTUAL);

function fail(message: string, code = 1): never {
process.stderr.write(`error: ${message}\n`);
process.exit(code);
}

async function loadPluginTools(): Promise<ToolDef[]> {
const sources: ActionSourceConfig[] = [];
if (process.env.GITHUB_TOKEN) {
sources.push({ service: "github", actions: githubActions });
}
if (sources.length === 0) return [];
return actionBridgeTools({ sources });
}

async function buildSession(): Promise<{ session: Session; bus: InMemoryEventBus }> {
if (!process.env.ANTHROPIC_API_KEY) {
fail(
"ANTHROPIC_API_KEY is not set. Export it in your shell before running this REPL.",
);
}
// pi-ai's `getModel` is typed against MODELS at compile time; we cast the
// env-supplied id at the boundary because it's user input.
const baseModel = getModel("anthropic", MODEL_ID as "claude-haiku-4-5");
if (!baseModel) {
fail(
`unknown anthropic model "${MODEL_ID}". Check VALET_MODEL or pi-ai's MODELS table.`,
);
}
// VALET_CONTEXT_WINDOW + VALET_MAX_TOKENS let us force compaction at low
// budgets for dogfooding — the engine uses these to compute `usable`,
// while Anthropic's API still accepts the real (much larger) context.
const overrideCtx = process.env.VALET_CONTEXT_WINDOW
? parseInt(process.env.VALET_CONTEXT_WINDOW, 10)
: undefined;
const overrideMax = process.env.VALET_MAX_TOKENS
? parseInt(process.env.VALET_MAX_TOKENS, 10)
: undefined;
const model =
overrideCtx || overrideMax
? {
...baseModel,
contextWindow: overrideCtx ?? baseModel.contextWindow,
maxTokens: overrideMax ?? baseModel.maxTokens,
}
: baseModel;

const store = new InMemorySessionStore();
const bus = new InMemoryEventBus();
const credentials = new InMemoryCredentialStore();
const sandboxProvider: SandboxProvider =
SANDBOX_KIND === "local"
? new LocalSandboxProvider()
: new VirtualSandboxProvider();
const engine = new Engine({
providers: { store, bus, credentials, sandboxProvider },
});

const userId = "repl-user";
const tools: ToolDef[] = [];

// Plugin sources: when their respective env tokens are set, save the
// credential and add the source to the bridge. The bridge then exposes
// a single (list_tools, call_tool) pair regardless of how many sources
// are wired in.
if (process.env.GITHUB_TOKEN) {
await credentials.save({ type: "user", id: userId }, "github", {
type: "oauth2",
accessToken: process.env.GITHUB_TOKEN,
});
}
const pluginTools = await loadPluginTools();
if (pluginTools.length > 0) {
tools.push(...pluginTools);
stdout.write(
`\x1b[90m[plugins] ${pluginTools.length} bridge tools (list_tools + call_tool)\x1b[0m\n`,
);
}

const workspace = SANDBOX_KIND === "local" ? resolve(WORKSPACE) : WORKSPACE;
const session = await engine.createSession({
userId,
orgId: "repl-org",
workspace,
sandbox: { workspace },
model,
systemPrompt: SYSTEM_PROMPT,
tools,
});

return { session, bus };
}

function subscribePrinter(bus: InMemoryEventBus): void {
bus.subscribe({}, (e: BusEvent) => {
const ev = e.event;
switch (ev.type) {
case "text_delta":
stdout.write(ev.text);
break;
case "tool_start":
stdout.write(
`\n\x1b[90m[tool] ${ev.tool}(${JSON.stringify(ev.args)})\x1b[0m\n`,
);
break;
case "tool_end":
stdout.write(
`\x1b[90m[tool] ${ev.tool} -> ${ev.isError ? "ERROR" : "ok"}: ${truncate(ev.result, 200)}\x1b[0m\n`,
);
break;
case "decision_gate":
stdout.write(
`\n\x1b[33m[gate] ${ev.gate.type}: ${ev.gate.title}\x1b[0m\n` +
` id=${ev.gate.id}\n actions=${ev.gate.actions.map((a) => a.id).join(", ")}\n`,
);
break;
case "decision_gate_resolved":
stdout.write(`\x1b[33m[gate] resolved=${ev.resolution.actionId}\x1b[0m\n`);
break;
case "compaction_start":
stdout.write(`\n\x1b[35m[compaction] started…\x1b[0m\n`);
break;
case "compaction_end":
stdout.write(`\x1b[35m[compaction] done\x1b[0m\n`);
break;
case "turn_end":
stdout.write(`\n\x1b[90m[turn ended: ${ev.reason}]\x1b[0m\n`);
break;
case "error":
stdout.write(`\n\x1b[31m[error] ${ev.code}: ${ev.error}\x1b[0m\n`);
break;
default:
if (process.env.VALET_DEBUG === "1") {
stdout.write(`\x1b[90m[debug] ${ev.type}\x1b[0m\n`);
}
break;
}
});
}

function truncate(s: string, max: number): string {
if (s.length <= max) return s;
return s.slice(0, max) + `…(+${s.length - max} chars)`;
}

async function waitForIdle(bus: InMemoryEventBus, threadId: string): Promise<void> {
return new Promise((resolve) => {
const unsub = bus.subscribe({}, (e) => {
if (
e.event.type === "status" &&
e.event.threadId === threadId &&
e.event.status === "idle"
) {
unsub();
resolve();
}
});
});
}

async function runOneShot(prompt: string): Promise<void> {
const { session, bus } = await buildSession();
subscribePrinter(bus);
const receipt = await session.prompt(prompt);
await waitForIdle(bus, receipt.threadId);
}

async function runInteractive(): Promise<void> {
const { session, bus } = await buildSession();
subscribePrinter(bus);
const rl = createInterface({ input: stdin, output: stdout });
stdout.write(
`\nvalet engine repl — model=${MODEL_ID} sandbox=${SANDBOX_KIND}` +
(SANDBOX_KIND === "local" ? ` workspace=${WORKSPACE}` : "") +
`\ntype a prompt, 'exit' to quit.\n`,
);
while (true) {
const line = (await rl.question("\n> ")).trim();
if (line === "" ) continue;
if (line === "exit" || line === "quit") break;
const receipt = await session.prompt(line);
await waitForIdle(bus, receipt.threadId);
}
rl.close();
}

const args = process.argv.slice(2);
if (args.length > 0) {
await runOneShot(args.join(" "));
} else {
await runInteractive();
}
7 changes: 7 additions & 0 deletions packages/engine/drizzle.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { defineConfig } from "drizzle-kit";

export default defineConfig({
dialect: "sqlite",
schema: "./src/schema/sqlite.ts",
out: "./migrations/sqlite",
});
Loading
Loading