diff --git a/__tests__/scripts/dev-safe.test.ts b/__tests__/scripts/dev-safe.test.ts index 2f540d14..97299821 100644 --- a/__tests__/scripts/dev-safe.test.ts +++ b/__tests__/scripts/dev-safe.test.ts @@ -20,9 +20,11 @@ import { buildNpmScriptCommand, buildAgentServerCommand, buildRuntimeServicesInfo, + cleanupStaleArtifacts, formatMissingUvxGuidance, formatMissingFrontendDependenciesGuidance, getMissingFrontendDependencyBins, + startConversationWebhookReceiver, validateFrontendDependencies, validateLocalAgentServerPath, findFreePort, @@ -31,10 +33,12 @@ import { resetPersistedSessionApiKeyCache, } from "../../scripts/dev-safe.mjs"; import { + existsSync, mkdtempSync, mkdirSync, readFileSync, rmSync, + utimesSync, writeFileSync, } from "node:fs"; import { tmpdir } from "node:os"; @@ -899,3 +903,238 @@ describe("buildRuntimeServicesInfo", () => { ); }); }); + +// ── cleanupStaleArtifacts ──────────────────────────────────────────────────── + +describe("cleanupStaleArtifacts", () => { + let tmpDir: string; + + afterEach(() => { + rmSync(tmpDir, { recursive: true, force: true }); + }); + + function setup() { + tmpDir = mkdtempSync(path.join(tmpdir(), "cleanup-test-")); + const conversationsDir = path.join(tmpDir, "conversations"); + const workspacesDir = path.join(tmpDir, "workspaces"); + const bashEventsDir = path.join(tmpDir, "bash_events"); + mkdirSync(conversationsDir, { recursive: true }); + mkdirSync(workspacesDir, { recursive: true }); + mkdirSync(bashEventsDir, { recursive: true }); + return { conversationsDir, workspacesDir, bashEventsDir }; + } + + it("removes workspace dir whose conversation no longer exists", () => { + const { conversationsDir, workspacesDir, bashEventsDir } = setup(); + const orphanId = "a".repeat(32); + mkdirSync(path.join(workspacesDir, orphanId)); + + const result = cleanupStaleArtifacts(conversationsDir, workspacesDir, bashEventsDir); + + expect(existsSync(path.join(workspacesDir, orphanId))).toBe(false); + expect(result.removedWorkspaces).toBe(1); + }); + + it("preserves workspace dir when conversation dir still exists", () => { + const { conversationsDir, workspacesDir, bashEventsDir } = setup(); + const convId = "b".repeat(32); + mkdirSync(path.join(conversationsDir, convId)); + mkdirSync(path.join(workspacesDir, convId)); + + const result = cleanupStaleArtifacts(conversationsDir, workspacesDir, bashEventsDir); + + expect(existsSync(path.join(workspacesDir, convId))).toBe(true); + expect(result.removedWorkspaces).toBe(0); + }); + + it("skips non-hex-named workspace dirs like automation-runs", () => { + const { conversationsDir, workspacesDir, bashEventsDir } = setup(); + mkdirSync(path.join(workspacesDir, "automation-runs")); + + const result = cleanupStaleArtifacts(conversationsDir, workspacesDir, bashEventsDir); + + expect(existsSync(path.join(workspacesDir, "automation-runs"))).toBe(true); + expect(result.removedWorkspaces).toBe(0); + }); + + it("removes bash event files older than 24 hours", () => { + const { conversationsDir, workspacesDir, bashEventsDir } = setup(); + const oldFile = path.join(bashEventsDir, "old_event"); + writeFileSync(oldFile, "{}"); + const yesterday = new Date(Date.now() - 25 * 60 * 60 * 1000); + utimesSync(oldFile, yesterday, yesterday); + + const result = cleanupStaleArtifacts(conversationsDir, workspacesDir, bashEventsDir); + + expect(existsSync(oldFile)).toBe(false); + expect(result.removedBashEvents).toBe(1); + }); + + it("preserves bash event files newer than 24 hours", () => { + const { conversationsDir, workspacesDir, bashEventsDir } = setup(); + const recentFile = path.join(bashEventsDir, "recent_event"); + writeFileSync(recentFile, "{}"); + + const result = cleanupStaleArtifacts(conversationsDir, workspacesDir, bashEventsDir); + + expect(existsSync(recentFile)).toBe(true); + expect(result.removedBashEvents).toBe(0); + }); + + it("returns combined counts across all categories", () => { + const { conversationsDir, workspacesDir, bashEventsDir } = setup(); + + // Two orphaned workspaces + mkdirSync(path.join(workspacesDir, "c".repeat(32))); + mkdirSync(path.join(workspacesDir, "d".repeat(32))); + + // Two old bash events + for (const name of ["evt1", "evt2"]) { + const f = path.join(bashEventsDir, name); + writeFileSync(f, "{}"); + const old = new Date(Date.now() - 25 * 60 * 60 * 1000); + utimesSync(f, old, old); + } + + const result = cleanupStaleArtifacts(conversationsDir, workspacesDir, bashEventsDir); + + expect(result.removedWorkspaces).toBe(2); + expect(result.removedBashEvents).toBe(2); + }); + + it("handles non-existent directories without throwing", () => { + tmpDir = mkdtempSync(path.join(tmpdir(), "cleanup-test-")); + expect(() => + cleanupStaleArtifacts( + path.join(tmpDir, "conversations"), + path.join(tmpDir, "workspaces"), + path.join(tmpDir, "bash_events"), + ), + ).not.toThrow(); + }); +}); + +// ── startConversationWebhookReceiver ───────────────────────────────────────── + +describe("startConversationWebhookReceiver", () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let server: any; + let tmpDir: string; + + afterEach(async () => { + await new Promise((resolve) => server?.close(resolve)); + rmSync(tmpDir, { recursive: true, force: true }); + }); + + function setup() { + tmpDir = mkdtempSync(path.join(tmpdir(), "webhook-test-")); + const workspacesDir = path.join(tmpDir, "workspaces"); + mkdirSync(workspacesDir, { recursive: true }); + return workspacesDir; + } + + it("starts and returns a positive port number", async () => { + const workspacesDir = setup(); + const result = await startConversationWebhookReceiver(workspacesDir); + server = result.server; + expect(result.port).toBeGreaterThan(0); + }); + + it("removes the workspace when execution_status is 'deleting'", async () => { + const workspacesDir = setup(); + const convId = "e".repeat(32); + mkdirSync(path.join(workspacesDir, convId)); + + const { port, server: srv } = await startConversationWebhookReceiver(workspacesDir); + server = srv; + + const res = await fetch(`http://127.0.0.1:${port}/conversations`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ id: convId, execution_status: "deleting" }), + }); + + expect(res.status).toBe(200); + expect(existsSync(path.join(workspacesDir, convId))).toBe(false); + }); + + it("accepts UUID-with-dashes format and maps to hex workspace name", async () => { + const workspacesDir = setup(); + const hexId = "f".repeat(32); + // UUID-with-dashes equivalent: ffffffff-ffff-ffff-ffff-ffffffffffff + const uuidId = `${"f".repeat(8)}-${"f".repeat(4)}-${"f".repeat(4)}-${"f".repeat(4)}-${"f".repeat(12)}`; + mkdirSync(path.join(workspacesDir, hexId)); + + const { port, server: srv } = await startConversationWebhookReceiver(workspacesDir); + server = srv; + + await fetch(`http://127.0.0.1:${port}/conversations`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ id: uuidId, execution_status: "deleting" }), + }); + + expect(existsSync(path.join(workspacesDir, hexId))).toBe(false); + }); + + it("ignores non-deleting execution statuses", async () => { + const workspacesDir = setup(); + const convId = "a1b2c3d4e5f6".repeat(2) + "a1b2c3d4"; + mkdirSync(path.join(workspacesDir, convId)); + + const { port, server: srv } = await startConversationWebhookReceiver(workspacesDir); + server = srv; + + await fetch(`http://127.0.0.1:${port}/conversations`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ id: convId, execution_status: "running" }), + }); + + expect(existsSync(path.join(workspacesDir, convId))).toBe(true); + }); + + it("rejects non-hex conversation IDs to prevent path traversal", async () => { + const workspacesDir = setup(); + const { port, server: srv } = await startConversationWebhookReceiver(workspacesDir); + server = srv; + + // A path traversal attempt must not remove anything outside workspacesDir + const res = await fetch(`http://127.0.0.1:${port}/conversations`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify({ id: "../../../etc", execution_status: "deleting" }), + }); + + expect(res.status).toBe(200); + // workspacesDir itself must still exist + expect(existsSync(workspacesDir)).toBe(true); + }); + + it("returns 200 and does not throw on malformed JSON", async () => { + const workspacesDir = setup(); + const { port, server: srv } = await startConversationWebhookReceiver(workspacesDir); + server = srv; + + const res = await fetch(`http://127.0.0.1:${port}/conversations`, { + method: "POST", + headers: { "content-type": "application/json" }, + body: "not-json{{{", + }); + + expect(res.status).toBe(200); + }); + + it("returns 404 for unrecognised paths", async () => { + const workspacesDir = setup(); + const { port, server: srv } = await startConversationWebhookReceiver(workspacesDir); + server = srv; + + const res = await fetch(`http://127.0.0.1:${port}/events`, { + method: "POST", + body: "{}", + }); + + expect(res.status).toBe(404); + }); +}); diff --git a/scripts/dev-safe.mjs b/scripts/dev-safe.mjs index 437a7d1c..8d10b766 100644 --- a/scripts/dev-safe.mjs +++ b/scripts/dev-safe.mjs @@ -5,10 +5,12 @@ import { mkdirSync, readdirSync, readFileSync, + rmSync, statSync, unlinkSync, writeFileSync, } from "node:fs"; +import http from "node:http"; import net from "node:net"; import { homedir, tmpdir } from "node:os"; import path from "node:path"; @@ -855,6 +857,21 @@ async function main() { mkdirSync(dir, { recursive: true }); } + const { removedWorkspaces, removedBashEvents } = cleanupStaleArtifacts( + config.conversationsPath, + config.workspacesPath, + config.bashEventsDir, + ); + if (removedWorkspaces > 0 || removedBashEvents > 0) { + console.log( + `- cleanup: removed ${removedWorkspaces} orphaned workspace(s), ` + + `${removedBashEvents} expired bash event(s)`, + ); + } + + const { port: webhookPort, server: webhookServer } = + await startConversationWebhookReceiver(config.workspacesPath); + const agentServerCmd = buildAgentServerCommand(); const secretKeySource = process.env.OH_SECRET_KEY @@ -893,6 +910,7 @@ async function main() { env: { ...process.env, ...buildAgentServerEnv(config), + OH_WEBHOOKS_0_BASE_URL: `http://127.0.0.1:${webhookPort}`, }, }, ); @@ -906,6 +924,7 @@ async function main() { } shuttingDown = true; + webhookServer.close(); if (frontend) { signalProcessTree(frontend, signal); } @@ -1060,6 +1079,124 @@ export function releaseStaleConversationLeases(conversationsDir) { return removed; } +/** Bash events older than this are deleted at startup. */ +const BASH_EVENTS_MAX_AGE_MS = 24 * 60 * 60 * 1000; + +/** + * Remove stale workspace directories and old bash events at server startup. + * + * Workspaces: removes any {workspacesDir}/{id} directory where the + * corresponding {conversationsDir}/{id} no longer exists. Only considers + * 32-character hex-named entries (conversation IDs), leaving unrelated + * directories such as automation-runs untouched. + * + * Bash events: removes individual files in {bashEventsDir} whose mtime + * is older than BASH_EVENTS_MAX_AGE_MS (24 hours). + * + * Must be called before starting the agent-server. + * + * @param {string} conversationsDir + * @param {string} workspacesDir + * @param {string} bashEventsDir + * @returns {{ removedWorkspaces: number, removedBashEvents: number }} + */ +export function cleanupStaleArtifacts(conversationsDir, workspacesDir, bashEventsDir) { + const CONV_ID_RE = /^[0-9a-f]{32}$/i; + + let removedWorkspaces = 0; + if (existsSync(workspacesDir)) { + for (const name of readdirSync(workspacesDir)) { + if (!CONV_ID_RE.test(name)) continue; + if (!existsSync(path.join(conversationsDir, name))) { + try { + rmSync(path.join(workspacesDir, name), { recursive: true, force: true }); + removedWorkspaces++; + } catch { + // best effort — leave it for the next startup + } + } + } + } + + let removedBashEvents = 0; + const cutoffMs = Date.now() - BASH_EVENTS_MAX_AGE_MS; + if (existsSync(bashEventsDir)) { + for (const name of readdirSync(bashEventsDir)) { + const filePath = path.join(bashEventsDir, name); + try { + if (statSync(filePath).mtimeMs < cutoffMs) { + unlinkSync(filePath); + removedBashEvents++; + } + } catch { + // best effort + } + } + } + + return { removedWorkspaces, removedBashEvents }; +} + +/** + * Start a lightweight HTTP server that receives conversation lifecycle webhooks + * from the agent-server. + * + * When the agent-server posts execution_status "deleting" for a conversation, + * the corresponding workspace directory is removed immediately. + * + * The server binds to 127.0.0.1 on an OS-assigned ephemeral port. The caller + * is responsible for closing the returned server on process exit. + * + * @param {string} workspacesDir Path to the workspaces root directory. + * @returns {Promise<{ port: number, server: import("node:http").Server }>} + */ +export async function startConversationWebhookReceiver(workspacesDir) { + const CONV_ID_RE = /^[0-9a-f]{32}$/i; + + const server = http.createServer((req, res) => { + if (req.method !== "POST" || !req.url?.startsWith("/conversations")) { + res.writeHead(404).end(); + return; + } + + let body = ""; + req.on("data", (chunk) => { + body += chunk; + }); + req.on("end", () => { + try { + const info = JSON.parse(body); + if (info.execution_status === "deleting" && info.id) { + // Conversation IDs arrive as UUID strings (with dashes); workspace + // directories use the 32-char hex form (no dashes). + const hex = String(info.id).replace(/-/g, ""); + if (CONV_ID_RE.test(hex)) { + const workspaceDir = path.join(workspacesDir, hex); + if (existsSync(workspaceDir)) { + try { + rmSync(workspaceDir, { recursive: true, force: true }); + } catch { + // best effort + } + } + } + } + } catch { + // malformed JSON — ignore + } + res.writeHead(200).end(); + }); + }); + + return new Promise((resolve, reject) => { + server.once("error", reject); + server.listen(0, "127.0.0.1", () => { + const addr = /** @type {import("node:net").AddressInfo} */ (server.address()); + resolve({ port: addr.port, server }); + }); + }); +} + if ( process.argv[1] && import.meta.url === pathToFileURL(process.argv[1]).href diff --git a/scripts/dev-static.mjs b/scripts/dev-static.mjs index 16644019..8ed2251e 100644 --- a/scripts/dev-static.mjs +++ b/scripts/dev-static.mjs @@ -47,9 +47,11 @@ import { buildAgentServerCommand, buildSafeDevConfig, buildAgentServerEnv, + cleanupStaleArtifacts, formatMissingUvxGuidance, isPortBusy, releaseStaleConversationLeases, + startConversationWebhookReceiver, } from "./dev-safe.mjs"; import { getProcessTreeSpawnOptions, @@ -212,6 +214,8 @@ function checkPrerequisites() { const processes = new Map(); let shuttingDown = false; +/** @type {import("node:http").Server | null} */ +let activeWebhookServer = null; function spawnService(name, command, args, options = {}) { const proc = spawn( @@ -303,6 +307,9 @@ function startAgentServer(config) { const agentServerEnv = { ...buildAgentServerEnv(safeConfig), ...buildAgentServerAutomationEnv(config), + ...(config.webhookReceiverPort != null + ? { OH_WEBHOOKS_0_BASE_URL: `http://127.0.0.1:${config.webhookReceiverPort}` } + : {}), }; spawnService( @@ -468,6 +475,8 @@ function shutdown() { console.log(""); console.log(`${c.yellow}Shutting down...${c.reset}`); + activeWebhookServer?.close(); + for (const [name, proc] of processes) { logService(name, "Stopping...", c.dim); signalProcessTree(proc, "SIGTERM"); @@ -592,6 +601,24 @@ async function main() { ); } + const { removedWorkspaces, removedBashEvents } = cleanupStaleArtifacts( + conversationsPath, + join(config.stateDir, "workspaces"), + join(config.stateDir, "bash_events"), + ); + if (removedWorkspaces > 0 || removedBashEvents > 0) { + logService( + "cleanup", + `Removed ${removedWorkspaces} orphaned workspace(s), ${removedBashEvents} expired bash event(s)`, + c.dim, + ); + } + + const { port: webhookReceiverPort, server: webhookServer } = + await startConversationWebhookReceiver(join(config.stateDir, "workspaces")); + config.webhookReceiverPort = webhookReceiverPort; + activeWebhookServer = webhookServer; + startAgentServer(config); await waitForService( "agent-server", diff --git a/scripts/dev-with-automation.mjs b/scripts/dev-with-automation.mjs index a8a82b84..1daf931e 100644 --- a/scripts/dev-with-automation.mjs +++ b/scripts/dev-with-automation.mjs @@ -57,9 +57,11 @@ import { buildAgentServerEnv, buildNpmScriptCommand, buildRuntimeServicesInfo, + cleanupStaleArtifacts, formatMissingUvxGuidance, findFreePorts, getOrCreatePersistedApiKey, + startConversationWebhookReceiver, validateFrontendDependencies, validateLocalAgentServerPath, } from "./dev-safe.mjs"; @@ -560,6 +562,9 @@ function startAgentServer(config) { const agentServerEnv = { ...buildAgentServerEnv(safeConfig), ...buildAgentServerAutomationEnv(config), + ...(config.webhookReceiverPort != null + ? { OH_WEBHOOKS_0_BASE_URL: `http://127.0.0.1:${config.webhookReceiverPort}` } + : {}), }; spawnService( @@ -1009,6 +1014,28 @@ async function main(options = {}) { extraPrereqs(config); } + // Cleanup stale artifacts from previous sessions + const { removedWorkspaces, removedBashEvents } = cleanupStaleArtifacts( + join(config.stateDir, "conversations"), + join(config.stateDir, "workspaces"), + join(config.stateDir, "bash_events"), + ); + if (removedWorkspaces > 0 || removedBashEvents > 0) { + logService( + "cleanup", + `Removed ${removedWorkspaces} orphaned workspace(s), ${removedBashEvents} expired bash event(s)`, + c.dim, + ); + } + + // Start webhook receiver for real-time workspace cleanup on conversation deletion + const { port: webhookReceiverPort, server: webhookServer } = + await startConversationWebhookReceiver(join(config.stateDir, "workspaces")); + config.webhookReceiverPort = webhookReceiverPort; + registerShutdownHook( + () => new Promise((resolve) => webhookServer.close(resolve)), + ); + if (useStaticMode && typeof buildStaticFrontend === "function") { buildStaticFrontend(config, args); }