From 08dca1e433a630e4063a784e07cda6fb8a473cbc Mon Sep 17 00:00:00 2001 From: stack72 Date: Tue, 5 May 2026 18:46:12 +0100 Subject: [PATCH 1/2] perf(catalog): batch version deletes, WAL checkpoint after GC, add datastore compact command (swamp-club#241) Three coordinated improvements to SQLite catalog performance under heavy GC load: 1. `bulkRemoveVersions` on `CatalogStore` wraps N individual DELETE statements in a single BEGIN IMMEDIATE transaction, eliminating per-row WAL fsync overhead during version GC. 2. `compactCatalog` dep added to `DataGcDeps`; after a live GC run `dataGc` calls `PRAGMA wal_checkpoint(TRUNCATE)` so the WAL file is reset to 0 bytes. WAL page stats are reported in the completed event and rendered in both log and JSON output modes. TRUNCATE returns (0,0,0) on full success, so the log renderer special-cases that result. 3. New `swamp datastore compact` command exposes on-demand WAL checkpoint + VACUUM so operators can reclaim space without running GC. Co-Authored-By: Claude Sonnet 4.6 --- src/cli/commands/datastore.ts | 4 +- src/cli/commands/datastore_compact.ts | 94 ++++++++++++++++ .../persistence/catalog_store.ts | 70 ++++++++++++ .../persistence/catalog_store_test.ts | 60 ++++++++++ .../persistence/unified_data_repository.ts | 16 ++- src/libswamp/data/gc.ts | 15 ++- src/libswamp/data/gc_test.ts | 53 +++++++++ src/libswamp/datastores/compact.ts | 72 ++++++++++++ src/libswamp/datastores/compact_test.ts | 106 ++++++++++++++++++ src/libswamp/mod.ts | 6 + src/presentation/renderers/data_gc.ts | 12 ++ .../renderers/datastore_compact.ts | 96 ++++++++++++++++ 12 files changed, 593 insertions(+), 11 deletions(-) create mode 100644 src/cli/commands/datastore_compact.ts create mode 100644 src/libswamp/datastores/compact.ts create mode 100644 src/libswamp/datastores/compact_test.ts create mode 100644 src/presentation/renderers/datastore_compact.ts diff --git a/src/cli/commands/datastore.ts b/src/cli/commands/datastore.ts index cf2c8dd6..bf5dc90e 100644 --- a/src/cli/commands/datastore.ts +++ b/src/cli/commands/datastore.ts @@ -22,6 +22,7 @@ import { datastoreStatusCommand } from "./datastore_status.ts"; import { datastoreSetupCommand } from "./datastore_setup.ts"; import { datastoreSyncCommand } from "./datastore_sync.ts"; import { datastoreLockCommand } from "./datastore_lock.ts"; +import { datastoreCompactCommand } from "./datastore_compact.ts"; export const datastoreCommand = new Command() .description("Manage datastore configuration") @@ -31,4 +32,5 @@ export const datastoreCommand = new Command() .command("status", datastoreStatusCommand) .command("setup", datastoreSetupCommand) .command("sync", datastoreSyncCommand) - .command("lock", datastoreLockCommand); + .command("lock", datastoreLockCommand) + .command("compact", datastoreCompactCommand); diff --git a/src/cli/commands/datastore_compact.ts b/src/cli/commands/datastore_compact.ts new file mode 100644 index 00000000..827c733d --- /dev/null +++ b/src/cli/commands/datastore_compact.ts @@ -0,0 +1,94 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +import { Command } from "@cliffy/command"; +import { + consumeStream, + createLibSwampContext, + datastoreCompact, + type DatastoreCompactDeps, +} from "../../libswamp/mod.ts"; +import { createDatastoreCompactRenderer } from "../../presentation/renderers/datastore_compact.ts"; +import { + createContext, + type GlobalOptions, + resolveRepoDir, +} from "../context.ts"; +import { requireInitializedRepo } from "../repo_context.ts"; +import { createCatalogStore } from "../../infrastructure/persistence/repository_factory.ts"; +import { + SWAMP_SUBDIRS, + swampPath, +} from "../../infrastructure/persistence/paths.ts"; +import { join } from "@std/path"; + +// deno-lint-ignore no-explicit-any +type AnyOptions = any; + +export const datastoreCompactCommand = new Command() + .name("compact") + .description( + "Checkpoint the WAL and vacuum the catalog database to reclaim disk space", + ) + .example( + "Compact the catalog database", + "swamp datastore compact", + ) + .example( + "Compact and output stats as JSON", + "swamp datastore compact --json", + ) + .option( + "--repo-dir ", + "Repository directory (env: SWAMP_REPO_DIR)", + ) + .action(async function (options: AnyOptions) { + const cliCtx = createContext(options as GlobalOptions, [ + "datastore", + "compact", + ]); + + const { repoDir, datastoreResolver } = await requireInitializedRepo({ + repoDir: resolveRepoDir(options.repoDir), + outputMode: cliCtx.outputMode, + }); + + const catalogStore = createCatalogStore(repoDir, datastoreResolver); + const dataBaseDir = datastoreResolver?.resolvePath(SWAMP_SUBDIRS.data) ?? + swampPath(repoDir, SWAMP_SUBDIRS.data); + const catalogDbPath = join(dataBaseDir, "_catalog.db"); + + const deps: DatastoreCompactDeps = { + checkpoint: () => catalogStore.checkpoint(), + vacuum: () => catalogStore.vacuum(), + catalogDbSize: async () => { + try { + const stat = await Deno.stat(catalogDbPath); + return stat.size; + } catch { + return 0; + } + }, + }; + + const ctx = createLibSwampContext({ logger: cliCtx.logger }); + const renderer = createDatastoreCompactRenderer(cliCtx.outputMode); + await consumeStream(datastoreCompact(ctx, deps), renderer.handlers()); + catalogStore.close(); + }); diff --git a/src/infrastructure/persistence/catalog_store.ts b/src/infrastructure/persistence/catalog_store.ts index d5c10b05..0a6e2661 100644 --- a/src/infrastructure/persistence/catalog_store.ts +++ b/src/infrastructure/persistence/catalog_store.ts @@ -64,6 +64,14 @@ export interface CatalogRow { */ export const ITERATE_PAGE_SIZE = 1000; +/** Stats returned by {@link CatalogStore.checkpoint}. */ +export interface CatalogCheckpointStats { + /** Total WAL frames at the time of the checkpoint call. */ + walPagesTotal: number; + /** Frames successfully written to the main database file. */ + walPagesCheckpointed: number; +} + /** * Schema version. Bump this when the catalog table structure changes. * On startup, if the stored version differs, the catalog is dropped and @@ -494,6 +502,68 @@ export class CatalogStore { return [...values].sort(); } + /** + * Checkpoints the WAL file using TRUNCATE mode, which writes all WAL frames + * to the main database file and physically truncates the WAL to zero bytes. + * + * If active readers are still using WAL pages, SQLite returns fewer + * checkpointed frames than total frames (partial checkpoint). The caller + * should surface this discrepancy rather than treating it as an error — the + * next checkpoint will catch the remaining frames. + */ + checkpoint(): CatalogCheckpointStats { + const row = this.db.prepare( + "PRAGMA wal_checkpoint(TRUNCATE)", + ).get() as { busy: number; log: number; checkpointed: number }; + return { + walPagesTotal: row.log, + walPagesCheckpointed: row.checkpointed, + }; + } + + /** + * Removes multiple versions for a single (type, model, name) triple in a + * single BEGIN IMMEDIATE transaction, replacing N individual removeVersion() + * calls with one fsync. + * + * If the transaction fails, it is rolled back and the catalog remains + * consistent. Falls back gracefully — callers may catch and retry with + * individual removeVersion() calls if needed. + */ + bulkRemoveVersions( + typeNormalized: string, + modelId: string, + dataName: string, + versions: readonly number[], + ): void { + if (versions.length === 0) return; + this.db.exec("BEGIN IMMEDIATE"); + try { + const stmt = this.db.prepare( + `DELETE FROM catalog + WHERE type_normalized = ? AND model_id = ? AND data_name = ? AND version = ?`, + ); + for (const version of versions) { + stmt.run(typeNormalized, modelId, dataName, version); + } + this.db.exec("COMMIT"); + } catch (error) { + this.db.exec("ROLLBACK"); + throw error; + } + } + + /** + * Runs VACUUM to rebuild the database file and reclaim freed pages. + * + * Must be called outside any open transaction. Acquires an exclusive lock + * and rewrites the entire database file — on a large catalog this may take + * several seconds. Safe to call only when no other connections are active. + */ + vacuum(): void { + this.db.exec("VACUUM"); + } + /** * Closes the database connection. */ diff --git a/src/infrastructure/persistence/catalog_store_test.ts b/src/infrastructure/persistence/catalog_store_test.ts index 122645c3..2bfc588a 100644 --- a/src/infrastructure/persistence/catalog_store_test.ts +++ b/src/infrastructure/persistence/catalog_store_test.ts @@ -522,6 +522,66 @@ Deno.test("CatalogStore: migrates v1 catalog DB to v2 without throwing", () => { store.close(); }); +Deno.test("CatalogStore: bulkRemoveVersions deletes all specified versions atomically", () => { + const dbPath = makeTempDbPath(); + const store = new CatalogStore(dbPath); + + store.upsertNewVersion(makeRow({ version: 1 })); + store.upsertNewVersion(makeRow({ version: 2 })); + store.upsertNewVersion(makeRow({ version: 3 })); + store.upsertNewVersion(makeRow({ version: 4 })); + + store.bulkRemoveVersions("test-model", "model-001", "my-data", [1, 2, 3]); + + const rows = [...store.iterate()]; + assertEquals(rows.length, 1); + assertEquals(rows[0].version, 4); + store.close(); +}); + +Deno.test("CatalogStore: bulkRemoveVersions is a no-op for empty array", () => { + const dbPath = makeTempDbPath(); + const store = new CatalogStore(dbPath); + + store.upsertNewVersion(makeRow({ version: 1 })); + store.bulkRemoveVersions("test-model", "model-001", "my-data", []); + + assertEquals(store.count(), 1); + store.close(); +}); + +Deno.test("CatalogStore: checkpoint returns WAL page counts and truncates WAL", () => { + const dbPath = makeTempDbPath(); + const store = new CatalogStore(dbPath); + + // Write enough rows to force WAL pages to accumulate + for (let i = 0; i < 50; i++) { + store.upsertNewVersion( + makeRow({ model_id: `m-${i}`, data_name: `d-${i}`, version: 1 }), + ); + } + + const stats = store.checkpoint(); + + // WAL must have been checkpointed (all pages written to main db) + assertEquals( + stats.walPagesCheckpointed, + stats.walPagesTotal, + "Expected full checkpoint — all WAL pages should be written to main db", + ); + + // WAL file should be gone or empty after TRUNCATE + try { + const walStat = Deno.statSync(dbPath + "-wal"); + assertEquals(walStat.size, 0, "WAL file should be truncated to zero bytes"); + } catch (e) { + if (!(e instanceof Deno.errors.NotFound)) throw e; + // WAL file not present — also correct after TRUNCATE + } + + store.close(); +}); + Deno.test("CatalogStore: invalidate clears populated flag but keeps data", () => { const dbPath = makeTempDbPath(); const store = new CatalogStore(dbPath); diff --git a/src/infrastructure/persistence/unified_data_repository.ts b/src/infrastructure/persistence/unified_data_repository.ts index d19dd29e..a431bcb9 100644 --- a/src/infrastructure/persistence/unified_data_repository.ts +++ b/src/infrastructure/persistence/unified_data_repository.ts @@ -1382,15 +1382,13 @@ export class FileSystemUnifiedDataRepository implements UnifiedDataRepository { // Re-scan actual versions after parallel deletions to avoid stale marker. // Skip for dry-run — nothing was actually removed. if (!dryRun && versionsToRemove.length > 0) { - // Drop catalog rows for the versions that were removed from disk. - for (const removedVersion of versionsToRemove) { - this.catalogStore.removeVersion( - type.normalized, - modelId, - data.name, - removedVersion, - ); - } + // Drop catalog rows for all removed versions in one transaction. + this.catalogStore.bulkRemoveVersions( + type.normalized, + modelId, + data.name, + versionsToRemove, + ); const currentVersions = await this.listVersions( type, diff --git a/src/libswamp/data/gc.ts b/src/libswamp/data/gc.ts index 6faeabee..0be83d3d 100644 --- a/src/libswamp/data/gc.ts +++ b/src/libswamp/data/gc.ts @@ -58,6 +58,8 @@ export interface DataGcPreview { export interface DataGcData { dataEntriesExpired: number; versionsDeleted: number; + walPagesTotal: number; + walPagesCheckpointed: number; bytesReclaimed: number; dryRun: boolean; expiredEntries: Array<{ @@ -85,6 +87,11 @@ export interface DataGcDeps { deleteExpiredData: (opts: { dryRun: boolean; }) => Promise; + /** Checkpoints the catalog WAL. Omit in tests that don't need compaction. */ + compactCatalog?: () => { + walPagesTotal: number; + walPagesCheckpointed: number; + }; } /** Wires real infrastructure into DataGcDeps. */ @@ -94,10 +101,11 @@ export function createDataGcDeps( ): DataGcDeps { const dsPath = (subdir: string): string | undefined => datastoreResolver?.resolvePath(subdir); + const catalogStore = createCatalogStore(repoDir, datastoreResolver); const unifiedDataRepo = new FileSystemUnifiedDataRepository( repoDir, dsPath(SWAMP_SUBDIRS.data), - createCatalogStore(repoDir, datastoreResolver), + catalogStore, ); const workflowRunRepo = new YamlWorkflowRunRepository( repoDir, @@ -112,6 +120,7 @@ export function createDataGcDeps( findExpiredData: () => service.findExpiredData(), previewVersionGarbage: () => service.previewVersionGarbage(), deleteExpiredData: (opts) => service.deleteExpiredData(opts), + compactCatalog: () => catalogStore.checkpoint(), }; } @@ -155,6 +164,8 @@ export async function* dataGc( const result = await deps.deleteExpiredData({ dryRun: input.dryRun }); + const compact = !input.dryRun ? deps.compactCatalog?.() : undefined; + yield { kind: "completed" as const, data: { @@ -163,6 +174,8 @@ export async function* dataGc( bytesReclaimed: result.bytesReclaimed, dryRun: result.dryRun, expiredEntries: result.expiredEntries, + walPagesTotal: compact?.walPagesTotal ?? 0, + walPagesCheckpointed: compact?.walPagesCheckpointed ?? 0, }, }; })(), diff --git a/src/libswamp/data/gc_test.ts b/src/libswamp/data/gc_test.ts index 9258ba67..0f11f56b 100644 --- a/src/libswamp/data/gc_test.ts +++ b/src/libswamp/data/gc_test.ts @@ -131,6 +131,59 @@ Deno.test("dataGc: yields completed with gc results", async () => { assertEquals(completed.data.dryRun, false); }); +Deno.test("dataGc: calls compactCatalog and includes WAL stats in completed event", async () => { + let compactCalled = false; + const deps = makeDeps({ + compactCatalog: () => { + compactCalled = true; + return { walPagesTotal: 20, walPagesCheckpointed: 20 }; + }, + }); + + const events = await collect( + dataGc(createLibSwampContext(), deps, { dryRun: false }), + ); + + assertEquals(compactCalled, true); + const completed = events.find((e) => e.kind === "completed") as Extract< + DataGcEvent, + { kind: "completed" } + >; + assertEquals(completed.data.walPagesTotal, 20); + assertEquals(completed.data.walPagesCheckpointed, 20); +}); + +Deno.test("dataGc: skips compactCatalog on dry-run", async () => { + let compactCalled = false; + const deps = makeDeps({ + compactCatalog: () => { + compactCalled = true; + return { walPagesTotal: 20, walPagesCheckpointed: 20 }; + }, + }); + + await collect( + dataGc(createLibSwampContext(), deps, { dryRun: true }), + ); + + assertEquals(compactCalled, false); +}); + +Deno.test("dataGc: WAL stats default to zero when compactCatalog is not provided", async () => { + const deps = makeDeps(); + + const events = await collect( + dataGc(createLibSwampContext(), deps, { dryRun: false }), + ); + + const completed = events.find((e) => e.kind === "completed") as Extract< + DataGcEvent, + { kind: "completed" } + >; + assertEquals(completed.data.walPagesTotal, 0); + assertEquals(completed.data.walPagesCheckpointed, 0); +}); + Deno.test("dataGc: passes dryRun flag through", async () => { let receivedDryRun = false; const deps = makeDeps({ diff --git a/src/libswamp/datastores/compact.ts b/src/libswamp/datastores/compact.ts new file mode 100644 index 00000000..de7d07f6 --- /dev/null +++ b/src/libswamp/datastores/compact.ts @@ -0,0 +1,72 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +import type { LibSwampContext } from "../context.ts"; +import type { SwampError } from "../errors.ts"; +import { withGeneratorSpan } from "../../infrastructure/tracing/mod.ts"; + +/** Stats returned by the compact operation. */ +export interface DatastoreCompactData { + walPagesTotal: number; + walPagesCheckpointed: number; + /** Bytes reclaimed from the main database file after VACUUM. */ + dbBytesReclaimed: number; +} + +export type DatastoreCompactEvent = + | { kind: "checkpointing" } + | { kind: "vacuuming" } + | { kind: "completed"; data: DatastoreCompactData } + | { kind: "error"; error: SwampError }; + +/** Dependencies for the datastore compact operation. */ +export interface DatastoreCompactDeps { + checkpoint: () => { walPagesTotal: number; walPagesCheckpointed: number }; + vacuum: () => void; + catalogDbSize: () => Promise; +} + +/** Checkpoints the catalog WAL and runs VACUUM to reclaim freed pages. */ +export async function* datastoreCompact( + _ctx: LibSwampContext, + deps: DatastoreCompactDeps, +): AsyncIterable { + yield* withGeneratorSpan( + "swamp.datastore.compact", + {}, + (async function* () { + yield { kind: "checkpointing" } as const; + const beforeSize = await deps.catalogDbSize(); + const stats = deps.checkpoint(); + + yield { kind: "vacuuming" } as const; + deps.vacuum(); + const afterSize = await deps.catalogDbSize(); + + yield { + kind: "completed" as const, + data: { + walPagesTotal: stats.walPagesTotal, + walPagesCheckpointed: stats.walPagesCheckpointed, + dbBytesReclaimed: Math.max(0, beforeSize - afterSize), + }, + }; + })(), + ); +} diff --git a/src/libswamp/datastores/compact_test.ts b/src/libswamp/datastores/compact_test.ts new file mode 100644 index 00000000..bedb8d61 --- /dev/null +++ b/src/libswamp/datastores/compact_test.ts @@ -0,0 +1,106 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +import { assertEquals } from "@std/assert"; +import { + datastoreCompact, + type DatastoreCompactDeps, + type DatastoreCompactEvent, +} from "./compact.ts"; +import { createLibSwampContext } from "../context.ts"; +import { initializeLogging } from "../../infrastructure/logging/logger.ts"; + +async function collectEvents( + gen: AsyncIterable, +): Promise { + const events: DatastoreCompactEvent[] = []; + for await (const event of gen) { + events.push(event); + } + return events; +} + +function makeDeps( + overrides: Partial = {}, +): DatastoreCompactDeps { + return { + checkpoint: () => ({ walPagesTotal: 10, walPagesCheckpointed: 10 }), + vacuum: () => {}, + catalogDbSize: () => Promise.resolve(0), + ...overrides, + }; +} + +Deno.test("datastoreCompact: emits checkpointing, vacuuming, completed events", async () => { + await initializeLogging({}); + const ctx = createLibSwampContext({}); + const events = await collectEvents(datastoreCompact(ctx, makeDeps())); + + const kinds = events.map((e) => e.kind); + assertEquals(kinds, ["checkpointing", "vacuuming", "completed"]); +}); + +Deno.test("datastoreCompact: completed event includes WAL page counts", async () => { + await initializeLogging({}); + const ctx = createLibSwampContext({}); + const deps = makeDeps({ + checkpoint: () => ({ walPagesTotal: 42, walPagesCheckpointed: 40 }), + catalogDbSize: () => Promise.resolve(0), + }); + + const events = await collectEvents(datastoreCompact(ctx, deps)); + const completed = events.find((e) => e.kind === "completed"); + assertEquals(completed?.kind, "completed"); + if (completed?.kind === "completed") { + assertEquals(completed.data.walPagesTotal, 42); + assertEquals(completed.data.walPagesCheckpointed, 40); + } +}); + +Deno.test("datastoreCompact: reports db bytes reclaimed from before/after size", async () => { + await initializeLogging({}); + const ctx = createLibSwampContext({}); + let call = 0; + const deps = makeDeps({ + catalogDbSize: () => { + call++; + return Promise.resolve(call === 1 ? 1_000_000 : 600_000); + }, + }); + + const events = await collectEvents(datastoreCompact(ctx, deps)); + const completed = events.find((e) => e.kind === "completed"); + if (completed?.kind === "completed") { + assertEquals(completed.data.dbBytesReclaimed, 400_000); + } +}); + +Deno.test("datastoreCompact: dbBytesReclaimed is 0 when db size does not decrease", async () => { + await initializeLogging({}); + const ctx = createLibSwampContext({}); + const deps = makeDeps({ + catalogDbSize: () => Promise.resolve(500_000), + }); + + const events = await collectEvents(datastoreCompact(ctx, deps)); + const completed = events.find((e) => e.kind === "completed"); + if (completed?.kind === "completed") { + assertEquals(completed.data.dbBytesReclaimed, 0); + } +}); diff --git a/src/libswamp/mod.ts b/src/libswamp/mod.ts index 3515729c..91704a32 100644 --- a/src/libswamp/mod.ts +++ b/src/libswamp/mod.ts @@ -1033,3 +1033,9 @@ export { parseModelLockKey, parseModelSpec, } from "./datastores/lock.ts"; +export { + datastoreCompact, + type DatastoreCompactData, + type DatastoreCompactDeps, + type DatastoreCompactEvent, +} from "./datastores/compact.ts"; diff --git a/src/presentation/renderers/data_gc.ts b/src/presentation/renderers/data_gc.ts index 13ab9194..02f0eab2 100644 --- a/src/presentation/renderers/data_gc.ts +++ b/src/presentation/renderers/data_gc.ts @@ -35,6 +35,16 @@ class LogDataGcRenderer implements Renderer { completed: (e) => { logger .info`GC complete: deleted ${e.data.dataEntriesExpired} items`; + // wal_checkpoint(TRUNCATE) returns (0,0,0) on full success. + if ( + e.data.walPagesTotal > 0 && + e.data.walPagesCheckpointed < e.data.walPagesTotal + ) { + logger + .info`WAL partial checkpoint: ${e.data.walPagesCheckpointed}/${e.data.walPagesTotal} pages (active readers present)`; + } else { + logger.info`WAL checkpointed and truncated`; + } }, error: (e) => { throw new UserError(e.error.message); @@ -55,6 +65,8 @@ class JsonDataGcRenderer implements Renderer { bytesReclaimed: e.data.bytesReclaimed, dryRun: e.data.dryRun, expiredEntries: e.data.expiredEntries, + walPagesTotal: e.data.walPagesTotal, + walPagesCheckpointed: e.data.walPagesCheckpointed, }, null, 2, diff --git a/src/presentation/renderers/datastore_compact.ts b/src/presentation/renderers/datastore_compact.ts new file mode 100644 index 00000000..3690f9ef --- /dev/null +++ b/src/presentation/renderers/datastore_compact.ts @@ -0,0 +1,96 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +import type { + DatastoreCompactEvent, + EventHandlers, +} from "../../libswamp/mod.ts"; +import type { Renderer } from "../renderer.ts"; +import type { OutputMode } from "../output/output.ts"; +import { getSwampLogger } from "../../infrastructure/logging/logger.ts"; +import { UserError } from "../../domain/errors.ts"; + +class LogDatastoreCompactRenderer implements Renderer { + handlers(): EventHandlers { + const logger = getSwampLogger(["datastore", "compact"]); + return { + checkpointing: () => { + logger.info`Checkpointing WAL...`; + }, + vacuuming: () => { + logger.info`Vacuuming catalog database (this may take a moment)...`; + }, + completed: (e) => { + // wal_checkpoint(TRUNCATE) returns (0,0,0) on full success. + if ( + e.data.walPagesTotal > 0 && + e.data.walPagesCheckpointed < e.data.walPagesTotal + ) { + logger + .info`WAL partial checkpoint: ${e.data.walPagesCheckpointed}/${e.data.walPagesTotal} pages (active readers present)`; + } else { + logger.info`WAL checkpointed and truncated`; + } + if (e.data.dbBytesReclaimed > 0) { + logger + .info`Catalog compacted: reclaimed ${e.data.dbBytesReclaimed} bytes`; + } else { + logger.info`Catalog already compact`; + } + }, + error: (e) => { + throw new UserError(e.error.message); + }, + }; + } +} + +class JsonDatastoreCompactRenderer implements Renderer { + handlers(): EventHandlers { + return { + checkpointing: () => {}, + vacuuming: () => {}, + completed: (e) => { + console.log(JSON.stringify( + { + walPagesTotal: e.data.walPagesTotal, + walPagesCheckpointed: e.data.walPagesCheckpointed, + dbBytesReclaimed: e.data.dbBytesReclaimed, + }, + null, + 2, + )); + }, + error: (e) => { + throw new UserError(e.error.message); + }, + }; + } +} + +export function createDatastoreCompactRenderer( + mode: OutputMode, +): Renderer { + switch (mode) { + case "json": + return new JsonDatastoreCompactRenderer(); + case "log": + return new LogDatastoreCompactRenderer(); + } +} From 534af7270e67b1961336452c5f75594a191cb188 Mon Sep 17 00:00:00 2001 From: stack72 Date: Tue, 5 May 2026 19:07:21 +0100 Subject: [PATCH 2/2] fix(catalog): address PR review feedback - Gate WAL log lines on !dryRun in data_gc renderer; dry-run skips compactCatalog so walPagesTotal/walPagesCheckpointed both default to 0, causing the else branch to falsely log "WAL checkpointed and truncated" - Wrap consumeStream in try/finally so catalogStore.close() is guaranteed even when the renderer throws - Measure beforeSize after checkpoint (not before) so the WAL pages are already flushed to the main DB, giving an accurate pre-VACUUM baseline for dbBytesReclaimed Co-Authored-By: Claude Sonnet 4.6 --- src/cli/commands/datastore_compact.ts | 7 +++++-- src/libswamp/datastores/compact.ts | 4 +++- src/presentation/renderers/data_gc.ts | 20 +++++++++++--------- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/cli/commands/datastore_compact.ts b/src/cli/commands/datastore_compact.ts index 827c733d..df6720bc 100644 --- a/src/cli/commands/datastore_compact.ts +++ b/src/cli/commands/datastore_compact.ts @@ -89,6 +89,9 @@ export const datastoreCompactCommand = new Command() const ctx = createLibSwampContext({ logger: cliCtx.logger }); const renderer = createDatastoreCompactRenderer(cliCtx.outputMode); - await consumeStream(datastoreCompact(ctx, deps), renderer.handlers()); - catalogStore.close(); + try { + await consumeStream(datastoreCompact(ctx, deps), renderer.handlers()); + } finally { + catalogStore.close(); + } }); diff --git a/src/libswamp/datastores/compact.ts b/src/libswamp/datastores/compact.ts index de7d07f6..bf5b5d6c 100644 --- a/src/libswamp/datastores/compact.ts +++ b/src/libswamp/datastores/compact.ts @@ -52,8 +52,10 @@ export async function* datastoreCompact( {}, (async function* () { yield { kind: "checkpointing" } as const; - const beforeSize = await deps.catalogDbSize(); const stats = deps.checkpoint(); + // Measure after checkpoint so WAL pages are flushed to the main DB, + // giving an accurate before-VACUUM baseline. + const beforeSize = await deps.catalogDbSize(); yield { kind: "vacuuming" } as const; deps.vacuum(); diff --git a/src/presentation/renderers/data_gc.ts b/src/presentation/renderers/data_gc.ts index 02f0eab2..cc802d12 100644 --- a/src/presentation/renderers/data_gc.ts +++ b/src/presentation/renderers/data_gc.ts @@ -35,15 +35,17 @@ class LogDataGcRenderer implements Renderer { completed: (e) => { logger .info`GC complete: deleted ${e.data.dataEntriesExpired} items`; - // wal_checkpoint(TRUNCATE) returns (0,0,0) on full success. - if ( - e.data.walPagesTotal > 0 && - e.data.walPagesCheckpointed < e.data.walPagesTotal - ) { - logger - .info`WAL partial checkpoint: ${e.data.walPagesCheckpointed}/${e.data.walPagesTotal} pages (active readers present)`; - } else { - logger.info`WAL checkpointed and truncated`; + if (!e.data.dryRun) { + // wal_checkpoint(TRUNCATE) returns (0,0,0) on full success. + if ( + e.data.walPagesTotal > 0 && + e.data.walPagesCheckpointed < e.data.walPagesTotal + ) { + logger + .info`WAL partial checkpoint: ${e.data.walPagesCheckpointed}/${e.data.walPagesTotal} pages (active readers present)`; + } else { + logger.info`WAL checkpointed and truncated`; + } } }, error: (e) => {