diff --git a/src/index.ts b/src/index.ts index d2ea568..f6de307 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,7 +1,7 @@ import Database from "better-sqlite3"; import { mkdirSync } from "node:fs"; -import { mkdir, readFile, stat, writeFile } from "node:fs/promises"; -import { dirname, join } from "node:path"; +import { appendFile, mkdir, readFile, stat, writeFile } from "node:fs/promises"; +import { dirname, join, resolve, sep } from "node:path"; import { createId, nowIso, type ArtifactRecord, type LogChunk, type RuntimeEvent, type RuntimeRecord, type SessionRecord, type TaskRecord, type TaskStore } from "@agent-dispatch/core"; export interface SqliteTaskStoreOptions { @@ -75,9 +75,7 @@ export class SqliteTaskStore implements TaskStore { async appendLog(taskId: string, chunk: string): Promise { await this.ensureReady(); - const path = this.logPath(taskId); - const existing = await readFile(path, "utf8").catch(() => ""); - await writeFile(path, `${existing}${chunk}`); + await appendFile(this.logPath(taskId), chunk); } async readLogs(taskId: string, cursor = 0, limit = 64_000): Promise { @@ -101,7 +99,7 @@ export class SqliteTaskStore implements TaskStore { async saveArtifactFile(taskId: string, name: string, bytes: Uint8Array, contentType = "application/octet-stream"): Promise { await this.ensureReady(); - const uri = join(this.stateDir, "artifacts", taskId, name); + const uri = this.artifactPath(taskId, name); await mkdir(dirname(uri), { recursive: true }); await writeFile(uri, bytes); const sizeBytes = (await stat(uri)).size; @@ -110,6 +108,10 @@ export class SqliteTaskStore implements TaskStore { return artifact; } + close(): void { + this.db.close(); + } + private initialize(): void { this.db.exec("create table if not exists schema_migrations (id text primary key, applied_at text not null)"); for (const migration of migrations) { @@ -123,6 +125,15 @@ export class SqliteTaskStore implements TaskStore { } } + private artifactPath(taskId: string, name: string): string { + const root = resolve(this.stateDir, "artifacts", taskId); + const uri = resolve(root, name); + if (uri !== root && !uri.startsWith(`${root}${sep}`)) { + throw new Error("Artifact name must stay within the task artifact directory."); + } + return uri; + } + private logPath(taskId: string): string { return join(this.stateDir, "logs", `${taskId}.log`); } diff --git a/test/store.test.ts b/test/store.test.ts index 1a33a30..dd2016e 100644 --- a/test/store.test.ts +++ b/test/store.test.ts @@ -1,4 +1,4 @@ -import { mkdtemp, rm } from "node:fs/promises"; +import { mkdtemp, readFile, rm, stat } from "node:fs/promises"; import { join } from "node:path"; import { tmpdir } from "node:os"; import { afterEach, beforeEach, describe, expect, it } from "vitest"; @@ -14,6 +14,7 @@ beforeEach(async () => { }); afterEach(async () => { + store.close(); await rm(stateDir, { recursive: true, force: true }); }); @@ -67,12 +68,35 @@ describe("SqliteTaskStore", () => { expect(second).toMatchObject({ data: "cde", nextCursor: 5 }); }); + it("appends log chunks without rewriting existing content", async () => { + await Promise.all(Array.from({ length: 25 }, (_, index) => store.appendLog("task_1", `${index}\n`))); + const logs = await store.readLogs("task_1", 0, 10_000); + + for (let index = 0; index < 25; index += 1) { + expect(logs.data).toContain(`${index}\n`); + } + }); + it("persists artifact metadata and files", async () => { const artifact = await store.saveArtifactFile("task_1", "result.txt", Buffer.from("done"), "text/plain"); const artifacts = await store.listArtifacts("task_1"); expect(artifacts).toHaveLength(1); expect(artifacts[0]).toMatchObject({ id: artifact.id, kind: "file", contentType: "text/plain", sizeBytes: 4 }); + await expect(readFile(artifact.uri, "utf8")).resolves.toBe("done"); + }); + + it("rejects artifact paths outside the task artifact directory", async () => { + await expect(store.saveArtifactFile("task_1", "../escape.txt", Buffer.from("nope"))).rejects.toThrow("Artifact name"); + await expect(store.saveArtifactFile("task_1", "/tmp/escape.txt", Buffer.from("nope"))).rejects.toThrow("Artifact name"); + await expect(stat(join(stateDir, "artifacts", "escape.txt"))).rejects.toThrow(); + }); + + it("closes the database connection explicitly", async () => { + store.close(); + expect(() => store.appliedMigrations()).toThrow(); + store = new SqliteTaskStore({ stateDir }); + expect(store.appliedMigrations()).toEqual(["001_initial"]); }); });