Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/cli/commands/datastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import { datastoreStatusCommand } from "./datastore_status.ts";
import { datastoreSetupCommand } from "./datastore_setup.ts";
import { datastoreSyncCommand } from "./datastore_sync.ts";
import { datastoreLockCommand } from "./datastore_lock.ts";
import { datastoreCompactCommand } from "./datastore_compact.ts";

export const datastoreCommand = new Command()
.description("Manage datastore configuration")
Expand All @@ -31,4 +32,5 @@ export const datastoreCommand = new Command()
.command("status", datastoreStatusCommand)
.command("setup", datastoreSetupCommand)
.command("sync", datastoreSyncCommand)
.command("lock", datastoreLockCommand);
.command("lock", datastoreLockCommand)
.command("compact", datastoreCompactCommand);
97 changes: 97 additions & 0 deletions src/cli/commands/datastore_compact.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Swamp, an Automation Framework
// Copyright (C) 2026 System Initiative, Inc.
//
// This file is part of Swamp.
//
// Swamp is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License version 3
// as published by the Free Software Foundation, with the Swamp
// Extension and Definition Exception (found in the "COPYING-EXCEPTION"
// file).
//
// Swamp is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with Swamp. If not, see <https://www.gnu.org/licenses/>.

import { Command } from "@cliffy/command";
import {
consumeStream,
createLibSwampContext,
datastoreCompact,
type DatastoreCompactDeps,
} from "../../libswamp/mod.ts";
import { createDatastoreCompactRenderer } from "../../presentation/renderers/datastore_compact.ts";
import {
createContext,
type GlobalOptions,
resolveRepoDir,
} from "../context.ts";
import { requireInitializedRepo } from "../repo_context.ts";
import { createCatalogStore } from "../../infrastructure/persistence/repository_factory.ts";
import {
SWAMP_SUBDIRS,
swampPath,
} from "../../infrastructure/persistence/paths.ts";
import { join } from "@std/path";

// deno-lint-ignore no-explicit-any
type AnyOptions = any;

export const datastoreCompactCommand = new Command()
.name("compact")
.description(
"Checkpoint the WAL and vacuum the catalog database to reclaim disk space",
)
.example(
"Compact the catalog database",
"swamp datastore compact",
)
.example(
"Compact and output stats as JSON",
"swamp datastore compact --json",
)
.option(
"--repo-dir <dir:string>",
"Repository directory (env: SWAMP_REPO_DIR)",
)
.action(async function (options: AnyOptions) {
const cliCtx = createContext(options as GlobalOptions, [
"datastore",
"compact",
]);

const { repoDir, datastoreResolver } = await requireInitializedRepo({
repoDir: resolveRepoDir(options.repoDir),
outputMode: cliCtx.outputMode,
});

const catalogStore = createCatalogStore(repoDir, datastoreResolver);
const dataBaseDir = datastoreResolver?.resolvePath(SWAMP_SUBDIRS.data) ??
swampPath(repoDir, SWAMP_SUBDIRS.data);
const catalogDbPath = join(dataBaseDir, "_catalog.db");

const deps: DatastoreCompactDeps = {
checkpoint: () => catalogStore.checkpoint(),
vacuum: () => catalogStore.vacuum(),
catalogDbSize: async () => {
try {
const stat = await Deno.stat(catalogDbPath);
return stat.size;
} catch {
return 0;
}
},
};

const ctx = createLibSwampContext({ logger: cliCtx.logger });
const renderer = createDatastoreCompactRenderer(cliCtx.outputMode);
try {
await consumeStream(datastoreCompact(ctx, deps), renderer.handlers());
} finally {
catalogStore.close();
}
});
70 changes: 70 additions & 0 deletions src/infrastructure/persistence/catalog_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ export interface CatalogRow {
*/
export const ITERATE_PAGE_SIZE = 1000;

/** Stats returned by {@link CatalogStore.checkpoint}. */
export interface CatalogCheckpointStats {
/** Total WAL frames at the time of the checkpoint call. */
walPagesTotal: number;
/** Frames successfully written to the main database file. */
walPagesCheckpointed: number;
}

/**
* Schema version. Bump this when the catalog table structure changes.
* On startup, if the stored version differs, the catalog is dropped and
Expand Down Expand Up @@ -494,6 +502,68 @@ export class CatalogStore {
return [...values].sort();
}

/**
* Checkpoints the WAL file using TRUNCATE mode, which writes all WAL frames
* to the main database file and physically truncates the WAL to zero bytes.
*
* If active readers are still using WAL pages, SQLite returns fewer
* checkpointed frames than total frames (partial checkpoint). The caller
* should surface this discrepancy rather than treating it as an error — the
* next checkpoint will catch the remaining frames.
*/
checkpoint(): CatalogCheckpointStats {
const row = this.db.prepare(
"PRAGMA wal_checkpoint(TRUNCATE)",
).get() as { busy: number; log: number; checkpointed: number };
return {
walPagesTotal: row.log,
walPagesCheckpointed: row.checkpointed,
};
}

/**
* Removes multiple versions for a single (type, model, name) triple in a
* single BEGIN IMMEDIATE transaction, replacing N individual removeVersion()
* calls with one fsync.
*
* If the transaction fails, it is rolled back and the catalog remains
* consistent. Falls back gracefully — callers may catch and retry with
* individual removeVersion() calls if needed.
*/
bulkRemoveVersions(
typeNormalized: string,
modelId: string,
dataName: string,
versions: readonly number[],
): void {
if (versions.length === 0) return;
this.db.exec("BEGIN IMMEDIATE");
try {
const stmt = this.db.prepare(
`DELETE FROM catalog
WHERE type_normalized = ? AND model_id = ? AND data_name = ? AND version = ?`,
);
for (const version of versions) {
stmt.run(typeNormalized, modelId, dataName, version);
}
this.db.exec("COMMIT");
} catch (error) {
this.db.exec("ROLLBACK");
throw error;
}
}

/**
* Runs VACUUM to rebuild the database file and reclaim freed pages.
*
* Must be called outside any open transaction. Acquires an exclusive lock
* and rewrites the entire database file — on a large catalog this may take
* several seconds. Safe to call only when no other connections are active.
*/
vacuum(): void {
this.db.exec("VACUUM");
}

/**
* Closes the database connection.
*/
Expand Down
60 changes: 60 additions & 0 deletions src/infrastructure/persistence/catalog_store_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,66 @@ Deno.test("CatalogStore: migrates v1 catalog DB to v2 without throwing", () => {
store.close();
});

Deno.test("CatalogStore: bulkRemoveVersions deletes all specified versions atomically", () => {
const dbPath = makeTempDbPath();
const store = new CatalogStore(dbPath);

store.upsertNewVersion(makeRow({ version: 1 }));
store.upsertNewVersion(makeRow({ version: 2 }));
store.upsertNewVersion(makeRow({ version: 3 }));
store.upsertNewVersion(makeRow({ version: 4 }));

store.bulkRemoveVersions("test-model", "model-001", "my-data", [1, 2, 3]);

const rows = [...store.iterate()];
assertEquals(rows.length, 1);
assertEquals(rows[0].version, 4);
store.close();
});

Deno.test("CatalogStore: bulkRemoveVersions is a no-op for empty array", () => {
const dbPath = makeTempDbPath();
const store = new CatalogStore(dbPath);

store.upsertNewVersion(makeRow({ version: 1 }));
store.bulkRemoveVersions("test-model", "model-001", "my-data", []);

assertEquals(store.count(), 1);
store.close();
});

Deno.test("CatalogStore: checkpoint returns WAL page counts and truncates WAL", () => {
const dbPath = makeTempDbPath();
const store = new CatalogStore(dbPath);

// Write enough rows to force WAL pages to accumulate
for (let i = 0; i < 50; i++) {
store.upsertNewVersion(
makeRow({ model_id: `m-${i}`, data_name: `d-${i}`, version: 1 }),
);
}

const stats = store.checkpoint();

// WAL must have been checkpointed (all pages written to main db)
assertEquals(
stats.walPagesCheckpointed,
stats.walPagesTotal,
"Expected full checkpoint — all WAL pages should be written to main db",
);

// WAL file should be gone or empty after TRUNCATE
try {
const walStat = Deno.statSync(dbPath + "-wal");
assertEquals(walStat.size, 0, "WAL file should be truncated to zero bytes");
} catch (e) {
if (!(e instanceof Deno.errors.NotFound)) throw e;
// WAL file not present — also correct after TRUNCATE
}

store.close();
});

Deno.test("CatalogStore: invalidate clears populated flag but keeps data", () => {
const dbPath = makeTempDbPath();
const store = new CatalogStore(dbPath);
Expand Down
16 changes: 7 additions & 9 deletions src/infrastructure/persistence/unified_data_repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1382,15 +1382,13 @@ export class FileSystemUnifiedDataRepository implements UnifiedDataRepository {
// Re-scan actual versions after parallel deletions to avoid stale marker.
// Skip for dry-run — nothing was actually removed.
if (!dryRun && versionsToRemove.length > 0) {
// Drop catalog rows for the versions that were removed from disk.
for (const removedVersion of versionsToRemove) {
this.catalogStore.removeVersion(
type.normalized,
modelId,
data.name,
removedVersion,
);
}
// Drop catalog rows for all removed versions in one transaction.
this.catalogStore.bulkRemoveVersions(
type.normalized,
modelId,
data.name,
versionsToRemove,
);

const currentVersions = await this.listVersions(
type,
Expand Down
15 changes: 14 additions & 1 deletion src/libswamp/data/gc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ export interface DataGcPreview {
export interface DataGcData {
dataEntriesExpired: number;
versionsDeleted: number;
walPagesTotal: number;
walPagesCheckpointed: number;
bytesReclaimed: number;
dryRun: boolean;
expiredEntries: Array<{
Expand Down Expand Up @@ -85,6 +87,11 @@ export interface DataGcDeps {
deleteExpiredData: (opts: {
dryRun: boolean;
}) => Promise<LifecycleGCResult>;
/** Checkpoints the catalog WAL. Omit in tests that don't need compaction. */
compactCatalog?: () => {
walPagesTotal: number;
walPagesCheckpointed: number;
};
}

/** Wires real infrastructure into DataGcDeps. */
Expand All @@ -94,10 +101,11 @@ export function createDataGcDeps(
): DataGcDeps {
const dsPath = (subdir: string): string | undefined =>
datastoreResolver?.resolvePath(subdir);
const catalogStore = createCatalogStore(repoDir, datastoreResolver);
const unifiedDataRepo = new FileSystemUnifiedDataRepository(
repoDir,
dsPath(SWAMP_SUBDIRS.data),
createCatalogStore(repoDir, datastoreResolver),
catalogStore,
);
const workflowRunRepo = new YamlWorkflowRunRepository(
repoDir,
Expand All @@ -112,6 +120,7 @@ export function createDataGcDeps(
findExpiredData: () => service.findExpiredData(),
previewVersionGarbage: () => service.previewVersionGarbage(),
deleteExpiredData: (opts) => service.deleteExpiredData(opts),
compactCatalog: () => catalogStore.checkpoint(),
};
}

Expand Down Expand Up @@ -155,6 +164,8 @@ export async function* dataGc(

const result = await deps.deleteExpiredData({ dryRun: input.dryRun });

const compact = !input.dryRun ? deps.compactCatalog?.() : undefined;

yield {
kind: "completed" as const,
data: {
Expand All @@ -163,6 +174,8 @@ export async function* dataGc(
bytesReclaimed: result.bytesReclaimed,
dryRun: result.dryRun,
expiredEntries: result.expiredEntries,
walPagesTotal: compact?.walPagesTotal ?? 0,
walPagesCheckpointed: compact?.walPagesCheckpointed ?? 0,
},
};
})(),
Expand Down
Loading
Loading