Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import Database from "better-sqlite3";
import { mkdirSync } from "node:fs";
import { mkdir, readFile, stat, writeFile } from "node:fs/promises";
import { dirname, join } from "node:path";
import { appendFile, mkdir, readFile, stat, writeFile } from "node:fs/promises";
import { dirname, join, resolve, sep } from "node:path";
import { createId, nowIso, type ArtifactRecord, type LogChunk, type RuntimeEvent, type RuntimeRecord, type SessionRecord, type TaskRecord, type TaskStore } from "@agent-dispatch/core";

export interface SqliteTaskStoreOptions {
Expand Down Expand Up @@ -75,9 +75,7 @@ export class SqliteTaskStore implements TaskStore {

async appendLog(taskId: string, chunk: string): Promise<void> {
await this.ensureReady();
const path = this.logPath(taskId);
const existing = await readFile(path, "utf8").catch(() => "");
await writeFile(path, `${existing}${chunk}`);
await appendFile(this.logPath(taskId), chunk);
}

async readLogs(taskId: string, cursor = 0, limit = 64_000): Promise<LogChunk> {
Expand All @@ -101,7 +99,7 @@ export class SqliteTaskStore implements TaskStore {

async saveArtifactFile(taskId: string, name: string, bytes: Uint8Array, contentType = "application/octet-stream"): Promise<ArtifactRecord> {
await this.ensureReady();
const uri = join(this.stateDir, "artifacts", taskId, name);
const uri = this.artifactPath(taskId, name);
await mkdir(dirname(uri), { recursive: true });
await writeFile(uri, bytes);
const sizeBytes = (await stat(uri)).size;
Expand All @@ -110,6 +108,10 @@ export class SqliteTaskStore implements TaskStore {
return artifact;
}

close(): void {
this.db.close();
}

private initialize(): void {
this.db.exec("create table if not exists schema_migrations (id text primary key, applied_at text not null)");
for (const migration of migrations) {
Expand All @@ -123,6 +125,15 @@ export class SqliteTaskStore implements TaskStore {
}
}

private artifactPath(taskId: string, name: string): string {
const root = resolve(this.stateDir, "artifacts", taskId);
const uri = resolve(root, name);
if (uri !== root && !uri.startsWith(`${root}${sep}`)) {
throw new Error("Artifact name must stay within the task artifact directory.");
}
return uri;
}

private logPath(taskId: string): string {
return join(this.stateDir, "logs", `${taskId}.log`);
}
Expand Down
26 changes: 25 additions & 1 deletion test/store.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { mkdtemp, rm } from "node:fs/promises";
import { mkdtemp, readFile, rm, stat } from "node:fs/promises";
import { join } from "node:path";
import { tmpdir } from "node:os";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
Expand All @@ -14,6 +14,7 @@ beforeEach(async () => {
});

afterEach(async () => {
store.close();
await rm(stateDir, { recursive: true, force: true });
});

Expand Down Expand Up @@ -67,12 +68,35 @@ describe("SqliteTaskStore", () => {
expect(second).toMatchObject({ data: "cde", nextCursor: 5 });
});

it("appends log chunks without rewriting existing content", async () => {
await Promise.all(Array.from({ length: 25 }, (_, index) => store.appendLog("task_1", `${index}\n`)));
const logs = await store.readLogs("task_1", 0, 10_000);

for (let index = 0; index < 25; index += 1) {
expect(logs.data).toContain(`${index}\n`);
}
});

it("persists artifact metadata and files", async () => {
const artifact = await store.saveArtifactFile("task_1", "result.txt", Buffer.from("done"), "text/plain");
const artifacts = await store.listArtifacts("task_1");

expect(artifacts).toHaveLength(1);
expect(artifacts[0]).toMatchObject({ id: artifact.id, kind: "file", contentType: "text/plain", sizeBytes: 4 });
await expect(readFile(artifact.uri, "utf8")).resolves.toBe("done");
});

it("rejects artifact paths outside the task artifact directory", async () => {
await expect(store.saveArtifactFile("task_1", "../escape.txt", Buffer.from("nope"))).rejects.toThrow("Artifact name");
await expect(store.saveArtifactFile("task_1", "/tmp/escape.txt", Buffer.from("nope"))).rejects.toThrow("Artifact name");
await expect(stat(join(stateDir, "artifacts", "escape.txt"))).rejects.toThrow();
});

it("closes the database connection explicitly", async () => {
store.close();
expect(() => store.appliedMigrations()).toThrow();
store = new SqliteTaskStore({ stateDir });
expect(store.appliedMigrations()).toEqual(["001_initial"]);
});
});

Expand Down
Loading