From 47b50a4892c8be7fbaee38916d0e8f00d1a3dc82 Mon Sep 17 00:00:00 2001 From: stack72 Date: Tue, 5 May 2026 13:54:31 +0100 Subject: [PATCH 1/2] perf(summarise): skip pre-cutoff scans and cap output (swamp-club#240) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `swamp summarise --since 1d` hung on repos with thousands of workflow runs because SummaryService called `findAllGlobal()` on three repos to load every output, run, and data record into memory before filtering by the cutoff. Pushes the time bound into the repository contract via a new `findAllGlobalSince(cutoff)` method on `WorkflowRunRepository`, `OutputRepository`, and `DataRepositoryReader`. The YAML implementations use a two-stage filter: an mtime pre-filter (`Deno.stat`, skip parse if mtime < cutoff — runs are rewritten on every status transition, so mtime < cutoff implies the run completed before cutoff) and a parse-verify fallback that re-checks `startedAt`/`createdAt` after parse to handle long-running workflows and backup-restore scenarios where mtime is unreliable. Adds an opt-in `--limit N` flag that caps each per-group `runs[]` array; counts (`succeeded`, `failed`, `total`) always reflect every matching run in the window. When truncation occurs, the group sets an optional `truncated: true` field — omitted otherwise so existing JSON consumers see no shape change. Default is unlimited, preserving prior behavior. Hand-measured against synthetic 3000- and 10000-run fixtures (warm cache, macOS APFS): N=3000, in-window=50: findAllGlobal 343ms → findAllGlobalSince 88ms (3.9x) N=3000, in-window=500: findAllGlobal 345ms → findAllGlobalSince 144ms (2.4x) N=10000, in-window=100: findAllGlobal 1141ms → findAllGlobalSince 281ms (4.1x) Cold-cache wins are larger because `Deno.stat` is much cheaper than `readFile + parseYaml`. Speedup grows with the ratio of out-of-window to in-window runs — exactly the case the issue describes (`--since 1d` over months of accumulated runs). `integration/summarise_perf_test.ts` pins the optimization deterministically by counting parses against a 500-run fixture; if the mtime pre-filter ever regresses, parse count jumps from ~10 back to 500 and the test fails with a concrete error. Wall-clock is intentionally not asserted (flaky on shared CI). New CLI test in `src/cli/commands/summarise_test.ts` covers the flag surface. Service tests cover limit truncation, the optional `truncated` field, and that counts remain authoritative when `runs[]` is capped. Streaming JSON output, an interactive progress indicator, and a `--sample` mode (also proposed in the issue) are deferred to follow-ups; they require invasive renderer reshaping that is out of scope here. Co-Authored-By: Claude Opus 4.7 (1M context) --- integration/summarise_perf_test.ts | 157 ++++++++++++++++++ src/cli/commands/summarise.ts | 9 + src/cli/commands/summarise_test.ts | 63 +++++++ src/domain/models/repositories.ts | 13 ++ src/domain/summary/summary_service.ts | 72 ++++---- src/domain/summary/summary_service_test.ts | 121 ++++++++++++++ src/domain/summary/summary_types.ts | 24 +++ .../workflows/execution_service_test.ts | 15 ++ src/domain/workflows/repositories.ts | 12 ++ .../persistence/unified_data_repository.ts | 93 ++++++++++- .../persistence/yaml_output_repository.ts | 57 +++++++ .../yaml_workflow_run_repository.ts | 94 +++++++++++ .../yaml_workflow_run_repository_test.ts | 91 ++++++++++ src/libswamp/summary/summarise.ts | 15 +- src/libswamp/workflows/run_test.ts | 8 + src/presentation/renderers/summarise.ts | 6 + 16 files changed, 817 insertions(+), 33 deletions(-) create mode 100644 integration/summarise_perf_test.ts create mode 100644 src/cli/commands/summarise_test.ts diff --git a/integration/summarise_perf_test.ts b/integration/summarise_perf_test.ts new file mode 100644 index 00000000..70b7f67f --- /dev/null +++ b/integration/summarise_perf_test.ts @@ -0,0 +1,157 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +import { assertEquals } from "@std/assert"; +import { ensureDir } from "@std/fs"; +import { join } from "@std/path"; +import { stringify as stringifyYaml } from "@std/yaml"; +import { YamlWorkflowRunRepository } from "../src/infrastructure/persistence/yaml_workflow_run_repository.ts"; + +// Regression test for swamp-club#240: pin the mtime pre-filter on +// `findAllGlobalSince`. Counts the number of files actually parsed vs the +// number of files in the fixture; if the pre-filter ever stops working, the +// parse count would jump back to N_TOTAL and this test would fail with a +// concrete, deterministic error. Wall-clock is intentionally NOT asserted — +// it's flaky across CI runners and we don't need it: the parse count is the +// thing we actually care about. + +const N_TOTAL = 500; +const N_IN_WINDOW = 10; + +async function withTempDir(fn: (dir: string) => Promise): Promise { + const tempDir = await Deno.makeTempDir({ prefix: "swamp-issue-240-" }); + try { + await fn(tempDir); + } finally { + if (Deno.build.os === "windows") { + // Best-effort: EBUSY can fire when V8 hasn't GC'd native handles yet. + await Deno.remove(tempDir, { recursive: true }).catch(() => {}); + } else { + await Deno.remove(tempDir, { recursive: true }); + } + } +} + +async function seedWorkflowRuns( + repoDir: string, + cutoff: Date, +): Promise<{ inWindow: number; outOfWindow: number }> { + const workflowId = "550e8400-e29b-41d4-a716-446655440000"; + const runsDir = join(repoDir, ".swamp", "workflow-runs", workflowId); + await ensureDir(runsDir); + + const now = Date.now(); + const oldDate = new Date(cutoff.getTime() - 7 * 24 * 60 * 60 * 1000); + let inWindow = 0; + let outOfWindow = 0; + + for (let i = 0; i < N_TOTAL; i++) { + const isFresh = i < N_IN_WINDOW; + const startedAt = isFresh + ? new Date(now - 1000 * 60) + : new Date(cutoff.getTime() - 1000 * 60 * 60 * (i + 1)); + const runId = crypto.randomUUID(); + const data = { + id: runId, + workflowId, + workflowName: "synthetic", + status: "succeeded", + startedAt: startedAt.toISOString(), + completedAt: new Date(startedAt.getTime() + 1000).toISOString(), + jobs: [{ + jobName: "job1", + status: "succeeded", + steps: [{ stepName: "step1", status: "succeeded" }], + }], + tags: {}, + }; + const path = join(runsDir, `workflow-run-${runId}.yaml`); + await Deno.writeTextFile(path, stringifyYaml(data)); + if (isFresh) { + inWindow++; + } else { + // Stamp mtime in the past so the mtime pre-filter rejects this file + // without parsing it. Mirrors what swamp itself produces — old runs + // aren't re-saved, so their mtime stays at last-completion time. + await Deno.utime(path, oldDate, oldDate); + outOfWindow++; + } + } + + return { inWindow, outOfWindow }; +} + +/** + * Counts every Deno.readTextFile call against the YAML run files. Wraps the + * builtin so we don't need to plumb instrumentation through the repo class. + */ +function instrumentReadCounter(): { count: () => number; restore: () => void } { + const original = Deno.readTextFile.bind(Deno); + let parses = 0; + Deno.readTextFile = (( + path: string | URL, + options?: Deno.ReadFileOptions, + ) => { + const p = typeof path === "string" ? path : path.pathname; + if (p.includes("workflow-run-") && p.endsWith(".yaml")) { + parses++; + } + return original(path, options); + }) as typeof Deno.readTextFile; + return { + count: () => parses, + restore: () => { + Deno.readTextFile = original; + }, + }; +} + +Deno.test( + "swamp-club#240: findAllGlobalSince mtime-pre-filter skips parse for out-of-window runs", + async () => { + await withTempDir(async (repoDir) => { + const cutoff = new Date(Date.now() - 60 * 60 * 1000); // 1 hour ago + const seeded = await seedWorkflowRuns(repoDir, cutoff); + assertEquals(seeded.inWindow, N_IN_WINDOW); + assertEquals(seeded.outOfWindow, N_TOTAL - N_IN_WINDOW); + + const repo = new YamlWorkflowRunRepository(repoDir); + const counter = instrumentReadCounter(); + try { + const results = await repo.findAllGlobalSince(cutoff); + + // Correctness: only in-window runs come back. + assertEquals(results.length, N_IN_WINDOW); + + // Performance: the parse count stays bounded by in-window count. + // Allow a small constant for any directory-walk artifacts. If the + // pre-filter regresses, this jumps to N_TOTAL. + const parses = counter.count(); + const budget = N_IN_WINDOW + 5; + if (parses > budget) { + throw new Error( + `mtime pre-filter regressed: parsed ${parses} files (budget ${budget}, fixture ${N_TOTAL})`, + ); + } + } finally { + counter.restore(); + } + }); + }, +); diff --git a/src/cli/commands/summarise.ts b/src/cli/commands/summarise.ts index aa9933f5..26e9df02 100644 --- a/src/cli/commands/summarise.ts +++ b/src/cli/commands/summarise.ts @@ -54,10 +54,18 @@ export const summariseCommand = new Command() .option("--since ", "Time window (e.g. 1h, 1d, 7d, 1w)", { default: "7d", }) + .option( + "--limit ", + "Cap per-group run details (counts still reflect all matching runs)", + ) .action(async function (options) { const ctx = createContext(options as GlobalOptions, ["summarise"]); ctx.logger.debug`Generating activity summary`; + if (options.limit !== undefined && options.limit <= 0) { + throw new Error("--limit must be a positive integer"); + } + const { repoContext } = await requireInitializedRepoReadOnly({ repoDir: resolveRepoDir(options.repoDir), outputMode: ctx.outputMode, @@ -79,6 +87,7 @@ export const summariseCommand = new Command() summarise(libCtx, deps, { since: cutoffDate, sinceLabel: options.since, + limit: options.limit, }), renderer.handlers(), ); diff --git a/src/cli/commands/summarise_test.ts b/src/cli/commands/summarise_test.ts new file mode 100644 index 00000000..6d829482 --- /dev/null +++ b/src/cli/commands/summarise_test.ts @@ -0,0 +1,63 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +import { assertEquals } from "@std/assert"; +import { initializeLogging } from "../../infrastructure/logging/logger.ts"; + +// Import models barrel to trigger self-registration +import "../../domain/models/models.ts"; + +// Initialize logging for tests +await initializeLogging({}); + +Deno.test("summariseCommand module loads", async () => { + const { summariseCommand } = await import("./summarise.ts"); + assertEquals(summariseCommand.getName(), "summarise"); +}); + +Deno.test("summariseCommand has correct description", async () => { + const { summariseCommand } = await import("./summarise.ts"); + assertEquals( + summariseCommand.getDescription(), + "Show a high-level overview of repo activity (method executions, workflows, data)", + ); +}); + +Deno.test("summariseCommand has --since option with default 7d", async () => { + const { summariseCommand } = await import("./summarise.ts"); + const options = summariseCommand.getOptions(); + const since = options.find((o) => o.name === "since"); + assertEquals(since !== undefined, true); + assertEquals(since!.default, "7d"); +}); + +Deno.test("summariseCommand has --limit option (opt-in, no default)", async () => { + const { summariseCommand } = await import("./summarise.ts"); + const options = summariseCommand.getOptions(); + const limit = options.find((o) => o.name === "limit"); + assertEquals(limit !== undefined, true); + // Default is unlimited — no `default` set on the option, preserving the + // pre-issue-240 JSON shape for callers who don't opt in. + assertEquals(limit!.default, undefined); +}); + +Deno.test("summariseCommand exposes the summarize alias", async () => { + const { summariseCommand } = await import("./summarise.ts"); + assertEquals(summariseCommand.getAliases().includes("summarize"), true); +}); diff --git a/src/domain/models/repositories.ts b/src/domain/models/repositories.ts index b0fb8e40..2de8bb7e 100644 --- a/src/domain/models/repositories.ts +++ b/src/domain/models/repositories.ts @@ -83,6 +83,19 @@ export interface OutputRepository { { output: ModelOutput; type: ModelType; method: string }[] >; + /** + * Finds all outputs across all types whose `startedAt` is at or after the + * given cutoff. Prefer this over `findAllGlobal()` when the caller has a + * time bound; implementations may short-circuit the underlying scan and skip + * work that `findAllGlobal()` would do unconditionally. + * + * @param cutoff - Earliest `startedAt` to include (inclusive) + * @returns Array of outputs whose `startedAt >= cutoff` + */ + findAllGlobalSince( + cutoff: Date, + ): Promise<{ output: ModelOutput; type: ModelType; method: string }[]>; + /** * Saves an output. * diff --git a/src/domain/summary/summary_service.ts b/src/domain/summary/summary_service.ts index b2dc7992..5fc41fc1 100644 --- a/src/domain/summary/summary_service.ts +++ b/src/domain/summary/summary_service.ts @@ -55,16 +55,31 @@ export class SummaryService { /** * Produces an activity summary for everything since the given cutoff date. + * + * @param cutoffDate Earliest startedAt/createdAt to include (inclusive). + * @param options.limit If set, caps each per-group `runs[]` array to N + * most-recent entries. Counts (succeeded/failed/total) always reflect + * ALL matching runs in the window — `limit` only bounds the detail + * array. When truncation occurs, the group's optional `truncated` flag + * is set to `true`. */ - async summarise(cutoffDate: Date): Promise { - const [allOutputs, allWorkflowRuns, allData, allDefinitions, allWorkflows] = - await Promise.all([ - this.outputRepo.findAllGlobal(), - this.workflowRunRepo.findAllGlobal(), - this.dataRepo.findAllGlobal(), - this.definitionRepo?.findAllGlobal() ?? Promise.resolve([]), - this.workflowRepo?.findAll() ?? Promise.resolve([]), - ]); + async summarise( + cutoffDate: Date, + options: { limit?: number } = {}, + ): Promise { + const [ + filteredOutputs, + filteredRuns, + filteredData, + allDefinitions, + allWorkflows, + ] = await Promise.all([ + this.outputRepo.findAllGlobalSince(cutoffDate), + this.workflowRunRepo.findAllGlobalSince(cutoffDate), + this.dataRepo.findAllGlobalSince(cutoffDate), + this.definitionRepo?.findAllGlobal() ?? Promise.resolve([]), + this.workflowRepo?.findAll() ?? Promise.resolve([]), + ]); // Build definition ID → name lookup const defNames = new Map(); @@ -89,30 +104,19 @@ export class SummaryService { workflowStepModels.set(workflow.id, stepMap); } - // Filter outputs by cutoff - const filteredOutputs = allOutputs.filter( - ({ output }) => output.startedAt >= cutoffDate, - ); - - // Filter workflow runs by cutoff - const filteredRuns = allWorkflowRuns.filter(({ run }) => { - const startedAt = run.startedAt; - return startedAt !== undefined && startedAt >= cutoffDate; - }); - - // Filter data by cutoff - const filteredData = allData.filter( - ({ data }) => data.createdAt >= cutoffDate, - ); - // Group method executions const methodExecutions = this.groupMethodExecutions( filteredOutputs, defNames, + options.limit, ); // Group workflow runs - const workflows = this.groupWorkflowRuns(filteredRuns, workflowStepModels); + const workflows = this.groupWorkflowRuns( + filteredRuns, + workflowStepModels, + options.limit, + ); // Summarise data const dataSummary = this.summariseData(filteredData); @@ -128,6 +132,7 @@ export class SummaryService { private groupMethodExecutions( outputs: { output: ModelOutput; type: ModelType; method: string }[], defNames: Map, + limit?: number, ): ModelExecutionGroup[] { // Group by model (definitionId), then by method within each model const models = new Map< @@ -185,11 +190,16 @@ export class SummaryService { b.total - a.total ); - // Sort runs within each method by startedAt descending + // Sort runs within each method by startedAt descending, then truncate + // if a limit was supplied. Counts already reflect all matching runs. for (const m of methods) { m.runs.sort((a, b) => new Date(b.startedAt).getTime() - new Date(a.startedAt).getTime() ); + if (limit !== undefined && m.runs.length > limit) { + m.runs = m.runs.slice(0, limit); + m.truncated = true; + } } const total = methods.reduce((s, m) => s + m.total, 0); @@ -215,6 +225,7 @@ export class SummaryService { private groupWorkflowRuns( runs: Awaited>, workflowStepModels: Map>, + limit?: number, ): WorkflowRunGroup[] { const groups = new Map(); @@ -287,13 +298,18 @@ export class SummaryService { // Sort groups by total count descending const sorted = [...groups.values()].sort((a, b) => b.total - a.total); - // Sort runs within each group by startedAt descending + // Sort runs within each group by startedAt descending, then truncate + // if a limit was supplied. Counts already reflect all matching runs. for (const group of sorted) { group.runs.sort((a, b) => { const aTime = a.startedAt ? new Date(a.startedAt).getTime() : 0; const bTime = b.startedAt ? new Date(b.startedAt).getTime() : 0; return bTime - aTime; }); + if (limit !== undefined && group.runs.length > limit) { + group.runs = group.runs.slice(0, limit); + group.truncated = true; + } } return sorted; diff --git a/src/domain/summary/summary_service_test.ts b/src/domain/summary/summary_service_test.ts index 1878e461..ded91344 100644 --- a/src/domain/summary/summary_service_test.ts +++ b/src/domain/summary/summary_service_test.ts @@ -127,6 +127,8 @@ function createMockOutputRepo( ): OutputRepository { return { findAllGlobal: () => Promise.resolve(items), + findAllGlobalSince: (cutoff: Date) => + Promise.resolve(items.filter(({ output }) => output.startedAt >= cutoff)), findById: () => Promise.resolve(null), findByDefinition: () => Promise.resolve([]), findLatestByDefinition: () => Promise.resolve(null), @@ -143,6 +145,12 @@ function createMockWorkflowRunRepo( ): WorkflowRunRepository { return { findAllGlobal: () => Promise.resolve(items), + findAllGlobalSince: (cutoff: Date) => + Promise.resolve( + items.filter(({ run }) => + run.startedAt !== undefined && run.startedAt >= cutoff + ), + ), findById: () => Promise.resolve(null), findAllByWorkflowId: () => Promise.resolve([]), findLatestByWorkflowId: () => Promise.resolve(null), @@ -158,6 +166,8 @@ function createMockDataRepo( ): DataRepositoryReader { return { findAllGlobal: () => Promise.resolve(items), + findAllGlobalSince: (cutoff: Date) => + Promise.resolve(items.filter(({ data }) => data.createdAt >= cutoff)), }; } @@ -419,3 +429,114 @@ Deno.test("summarise - counts data items, versions, and unique models", async () assertEquals(result.data.byModelType[1].items, 1); assertEquals(result.data.byModelType[1].versions, 5); }); + +Deno.test("summarise - limit truncates per-group runs but counts reflect all matching runs", async () => { + const type = ModelType.create("aws/s3-bucket"); + const wfId = createWorkflowId(crypto.randomUUID()); + const sharedDefId = crypto.randomUUID(); + + const outputs = Array.from({ length: 10 }, (_, i) => ({ + output: makeOutput({ + definitionId: sharedDefId, + status: i % 2 === 0 ? "succeeded" : "failed", + startedAt: new Date(2026, 0, 1, 0, i), + }), + type, + method: "apply", + })); + + const runs = Array.from({ length: 8 }, (_, i) => ({ + run: makeWorkflowRun({ + status: "succeeded", + startedAt: new Date(2026, 0, 1, 0, i), + }), + workflowId: wfId, + })); + + const service = new SummaryService( + createMockOutputRepo(outputs), + createMockWorkflowRunRepo(runs), + createMockDataRepo([]), + ); + + const result = await service.summarise(new Date(0), { limit: 3 }); + + // Method group: counts reflect all 10, runs[] is capped at 3, truncated set + assertEquals(result.methodExecutions.length, 1); + const methodGroup = result.methodExecutions[0].methods[0]; + assertEquals(methodGroup.total, 10); + assertEquals(methodGroup.succeeded, 5); + assertEquals(methodGroup.failed, 5); + assertEquals(methodGroup.runs.length, 3); + assertEquals(methodGroup.truncated, true); + + // Workflow group: counts reflect all 8, runs[] is capped at 3, truncated set + assertEquals(result.workflows.length, 1); + const wfGroup = result.workflows[0]; + assertEquals(wfGroup.total, 8); + assertEquals(wfGroup.succeeded, 8); + assertEquals(wfGroup.runs.length, 3); + assertEquals(wfGroup.truncated, true); +}); + +Deno.test("summarise - omits truncated when limit is not exceeded", async () => { + const type = ModelType.create("aws/s3-bucket"); + const service = new SummaryService( + createMockOutputRepo([ + { + output: makeOutput({ status: "succeeded" }), + type, + method: "apply", + }, + { + output: makeOutput({ status: "succeeded" }), + type, + method: "apply", + }, + ]), + createMockWorkflowRunRepo([ + { + run: makeWorkflowRun({ status: "succeeded" }), + workflowId: createWorkflowId(crypto.randomUUID()), + }, + ]), + createMockDataRepo([]), + ); + + const result = await service.summarise(new Date(0), { limit: 100 }); + + assertEquals( + result.methodExecutions[0].methods[0].truncated, + undefined, + "truncated should be omitted when no truncation occurred", + ); + assertEquals( + result.workflows[0].truncated, + undefined, + "truncated should be omitted when no truncation occurred", + ); +}); + +Deno.test("summarise - default (no limit) preserves all runs in details", async () => { + const type = ModelType.create("aws/s3-bucket"); + const sharedDefId = crypto.randomUUID(); + const service = new SummaryService( + createMockOutputRepo( + Array.from({ length: 50 }, () => ({ + output: makeOutput({ + definitionId: sharedDefId, + status: "succeeded", + }), + type, + method: "apply", + })), + ), + createMockWorkflowRunRepo([]), + createMockDataRepo([]), + ); + + const result = await service.summarise(new Date(0)); + + assertEquals(result.methodExecutions[0].methods[0].runs.length, 50); + assertEquals(result.methodExecutions[0].methods[0].truncated, undefined); +}); diff --git a/src/domain/summary/summary_types.ts b/src/domain/summary/summary_types.ts index 289c6be5..95f85705 100644 --- a/src/domain/summary/summary_types.ts +++ b/src/domain/summary/summary_types.ts @@ -28,6 +28,16 @@ export interface DataRepositoryReader { findAllGlobal(): Promise< Array<{ data: Data; modelType: ModelType; modelId: string }> >; + + /** + * Finds all data items whose `createdAt` is at or after the given cutoff. + * Prefer this over `findAllGlobal()` when the caller has a time bound; + * implementations may short-circuit the underlying scan and skip work that + * `findAllGlobal()` would do unconditionally. + */ + findAllGlobalSince( + cutoff: Date, + ): Promise>; } /** @@ -45,6 +55,12 @@ export interface MethodRunDetail { /** * A group of runs for a single method on a model. + * + * `runs` may be capped by an opt-in `--limit` flag; when that happens the + * group sets `truncated: true` so consumers can tell the detail list was + * shortened. Counts (`succeeded`, `failed`, `total`) always reflect every + * matching run in the window — `truncated` only signals that `runs` does + * not. The field is omitted when no truncation occurred. */ export interface MethodGroup { method: string; @@ -52,6 +68,7 @@ export interface MethodGroup { failed: number; total: number; runs: MethodRunDetail[]; + truncated?: boolean; } /** @@ -92,6 +109,12 @@ export interface WorkflowRunDetail { /** * A group of workflow runs for a given workflow name. + * + * `runs` may be capped by an opt-in `--limit` flag; when that happens the + * group sets `truncated: true` so consumers can tell the detail list was + * shortened. Counts (`succeeded`, `failed`, `total`) always reflect every + * matching run in the window. The field is omitted when no truncation + * occurred. */ export interface WorkflowRunGroup { workflowName: string; @@ -99,6 +122,7 @@ export interface WorkflowRunGroup { failed: number; total: number; runs: WorkflowRunDetail[]; + truncated?: boolean; } /** diff --git a/src/domain/workflows/execution_service_test.ts b/src/domain/workflows/execution_service_test.ts index 3ec4e932..f5e0182e 100644 --- a/src/domain/workflows/execution_service_test.ts +++ b/src/domain/workflows/execution_service_test.ts @@ -153,6 +153,15 @@ class InMemoryWorkflowRunRepository implements WorkflowRunRepository { return Promise.resolve(results); } + async findAllGlobalSince( + cutoff: Date, + ): Promise<{ run: WorkflowRun; workflowId: WorkflowId }[]> { + const all = await this.findAllGlobal(); + return all.filter(({ run }) => + run.startedAt !== undefined && run.startedAt >= cutoff + ); + } + save(workflowId: WorkflowId, run: WorkflowRun): Promise { const existing = this.runs.get(workflowId) ?? []; const idx = existing.findIndex((r) => r.id === run.id); @@ -2770,6 +2779,12 @@ class TrackingRunRepository implements WorkflowRunRepository { return this.inner.findAllGlobal(); } + findAllGlobalSince( + cutoff: Date, + ): Promise<{ run: WorkflowRun; workflowId: WorkflowId }[]> { + return this.inner.findAllGlobalSince(cutoff); + } + save(workflowId: WorkflowId, run: WorkflowRun): Promise { this.saves.push({ runId: run.id, status: run.status }); return this.inner.save(workflowId, run); diff --git a/src/domain/workflows/repositories.ts b/src/domain/workflows/repositories.ts index 8f795f46..363b012f 100644 --- a/src/domain/workflows/repositories.ts +++ b/src/domain/workflows/repositories.ts @@ -88,6 +88,18 @@ export interface WorkflowRunRepository { */ findAllGlobal(): Promise<{ run: WorkflowRun; workflowId: WorkflowId }[]>; + /** + * Finds all workflow runs across all workflows whose `startedAt` is at or + * after the given cutoff. Prefer this over `findAllGlobal()` when the caller + * has a time bound; implementations may short-circuit the underlying scan and + * skip work that `findAllGlobal()` would do unconditionally. + * + * Runs with no `startedAt` are excluded. + */ + findAllGlobalSince( + cutoff: Date, + ): Promise<{ run: WorkflowRun; workflowId: WorkflowId }[]>; + /** * Saves a workflow run. */ diff --git a/src/infrastructure/persistence/unified_data_repository.ts b/src/infrastructure/persistence/unified_data_repository.ts index 3b02542a..d19dd29e 100644 --- a/src/infrastructure/persistence/unified_data_repository.ts +++ b/src/infrastructure/persistence/unified_data_repository.ts @@ -151,6 +151,29 @@ export class FileSystemUnifiedDataRepository implements UnifiedDataRepository { return results; } + /** + * Finds all data items whose `createdAt` is at or after the cutoff using a + * two-stage filter (mtime pre-filter, then parse-and-verify) on each + * version's metadata.yaml. Old version files are skipped without parse. + * + * The `_catalog.db` SQLite catalog does not track `createdAt`, so the only + * source of truth for time-bounded filtering is the metadata YAML — hence + * the same walk shape as `findAllGlobal`, plus a stat per file before + * parse. + */ + async findAllGlobalSince( + cutoff: Date, + ): Promise> { + const results: Array< + { data: Data; modelType: ModelType; modelId: string } + > = []; + const baseDir = this.getBaseDir(); + + await this.collectAllData(baseDir, [], results, cutoff); + + return results; + } + /** * Recursively collects all data from the directory tree. * Walks .swamp/data/{type-segments...}/{model-id}/{data-name}/ structure. @@ -163,6 +186,7 @@ export class FileSystemUnifiedDataRepository implements UnifiedDataRepository { currentDir: string, pathSegments: string[], results: Array<{ data: Data; modelType: ModelType; modelId: string }>, + cutoff?: Date, ): Promise { try { const entries: { name: string; isDirectory: boolean }[] = []; @@ -190,7 +214,9 @@ export class FileSystemUnifiedDataRepository implements UnifiedDataRepository { try { const modelType = ModelType.create(typeStr); - const dataItems = await this.findAllForModel(modelType, modelId); + const dataItems = cutoff + ? await this.findAllForModelSince(modelType, modelId, cutoff) + : await this.findAllForModel(modelType, modelId); for (const data of dataItems) { results.push({ data, modelType, modelId }); } @@ -199,7 +225,7 @@ export class FileSystemUnifiedDataRepository implements UnifiedDataRepository { } } else { // Keep recursing deeper into type directories - await this.collectAllData(childPath, childSegments, results); + await this.collectAllData(childPath, childSegments, results, cutoff); } } } catch (error) { @@ -209,6 +235,69 @@ export class FileSystemUnifiedDataRepository implements UnifiedDataRepository { } } + /** + * Like `findAllForModel`, but only returns data items whose latest-version + * `createdAt` is at or after the cutoff. Stats the metadata file first + * (Stage A) so old data is skipped without parsing. + */ + private async findAllForModelSince( + type: ModelType, + modelId: string, + cutoff: Date, + ): Promise { + const dataDir = this.getModelDataDir(type, modelId); + const results: Data[] = []; + const seen = new Set(); + const cutoffMs = cutoff.getTime(); + + try { + for await (const entry of Deno.readDir(dataDir)) { + if (!entry.isDirectory) continue; + const dataName = entry.name; + + const latestVersion = await this.getLatestVersion( + type, + modelId, + dataName, + ); + if (latestVersion === null) continue; + + const metadataPath = this.getMetadataPath( + type, + modelId, + dataName, + latestVersion, + ); + + // Stage A: mtime pre-filter + try { + const stat = await Deno.stat(metadataPath); + const mtimeMs = stat.mtime?.getTime(); + if (mtimeMs !== undefined && mtimeMs < cutoffMs) continue; + } catch (error) { + if (error instanceof Deno.errors.NotFound) continue; + throw error; + } + + // Stage B: parse and verify + const data = await this.findByName(type, modelId, dataName); + if (!data) continue; + if (data.createdAt.getTime() < cutoffMs) continue; + if (seen.has(data.name)) continue; + + seen.add(data.name); + results.push(data); + } + } catch (error) { + if (error instanceof Deno.errors.NotFound) { + return []; + } + throw error; + } + + return results; + } + /** * Checks if a directory looks like a model-id directory by examining * if its children contain version subdirectories or a "latest" symlink. diff --git a/src/infrastructure/persistence/yaml_output_repository.ts b/src/infrastructure/persistence/yaml_output_repository.ts index 7472fb5f..c21d428f 100644 --- a/src/infrastructure/persistence/yaml_output_repository.ts +++ b/src/infrastructure/persistence/yaml_output_repository.ts @@ -170,6 +170,63 @@ export class YamlOutputRepository implements OutputRepository { return results; } + /** + * Finds all outputs whose `startedAt` is at or after the cutoff using a + * two-stage filter (mtime pre-filter, then parse-and-verify) so old YAML + * files are skipped without being parsed. See the matching docstring on + * `YamlWorkflowRunRepository.findAllGlobalSince` for the invariant. + * + * Output YAML is written exactly once at completion (`save()` does not + * rewrite an existing file), so mtime ≈ startedAt + duration on disk. + * Files older than the cutoff cannot have a startedAt on or after it. + */ + async findAllGlobalSince( + cutoff: Date, + ): Promise<{ output: ModelOutput; type: ModelType; method: string }[]> { + const results: { output: ModelOutput; type: ModelType; method: string }[] = + []; + const cutoffMs = cutoff.getTime(); + + for (const modelType of modelRegistry.types()) { + const typeDir = this.getTypeDir(modelType); + try { + for await (const methodEntry of Deno.readDir(typeDir)) { + if (!methodEntry.isDirectory) continue; + const methodDir = join(typeDir, methodEntry.name); + for await (const entry of Deno.readDir(methodDir)) { + if (!entry.isFile || !entry.name.endsWith(".yaml")) continue; + const path = join(methodDir, entry.name); + + // Stage A: mtime pre-filter + const stat = await Deno.stat(path); + const mtimeMs = stat.mtime?.getTime(); + if (mtimeMs !== undefined && mtimeMs < cutoffMs) continue; + + // Stage B: parse and verify + const content = await Deno.readTextFile(path); + const data = parseYaml(content) as ModelOutputData; + if (data.logFile) { + data.logFile = toAbsolutePath(this.repoDir, data.logFile); + } + const output = ModelOutput.fromData(data); + if (output.startedAt.getTime() < cutoffMs) continue; + + results.push({ + output, + type: modelType, + method: output.methodName, + }); + } + } + } catch (error) { + if (error instanceof Deno.errors.NotFound) continue; + throw error; + } + } + + return results; + } + async save( type: ModelType, method: string, diff --git a/src/infrastructure/persistence/yaml_workflow_run_repository.ts b/src/infrastructure/persistence/yaml_workflow_run_repository.ts index e7c72ab1..4f71b472 100644 --- a/src/infrastructure/persistence/yaml_workflow_run_repository.ts +++ b/src/infrastructure/persistence/yaml_workflow_run_repository.ts @@ -180,6 +180,100 @@ export class YamlWorkflowRunRepository implements WorkflowRunRepository { }); } + /** + * Finds all workflow runs across all workflows whose startedAt is at or + * after the cutoff, using a two-stage filter to avoid parsing every YAML + * file on large repos. + * + * Stage A — mtime pre-filter: stat each candidate file and skip if mtime + * is strictly before the cutoff. `save()` rewrites the YAML on every + * status transition (pending → running → succeeded/failed), so a file + * with mtime < cutoff cannot have startedAt >= cutoff: any startedAt on + * or after the cutoff would have triggered a save on or after the cutoff. + * + * Stage B — parse and verify: parse files that pass Stage A and re-check + * `startedAt >= cutoff`. This rejects long-running workflows that started + * before the cutoff but were still being saved into after it (mtime > cutoff + * but startedAt < cutoff). + * + * Backup-restore scenarios that scramble mtime cannot cause incorrect + * inclusion — Stage B is the source of truth for inclusion. They can only + * defeat the optimization (degrading to current `findAllGlobal()` cost), + * which is acceptable. + */ + async findAllGlobalSince( + cutoff: Date, + ): Promise<{ run: WorkflowRun; workflowId: WorkflowId }[]> { + const results: { run: WorkflowRun; workflowId: WorkflowId }[] = []; + const cutoffMs = cutoff.getTime(); + + try { + for await (const entry of Deno.readDir(this.baseDir)) { + if (!entry.isDirectory) continue; + const workflowId = entry.name as WorkflowId; + const runs = await this.findRunsSinceByWorkflowId(workflowId, cutoffMs); + for (const run of runs) { + results.push({ run, workflowId }); + } + } + } catch (error) { + if (error instanceof Deno.errors.NotFound) { + return []; + } + throw error; + } + + return results.sort((a, b) => { + const aTime = a.run.startedAt?.getTime() ?? 0; + const bTime = b.run.startedAt?.getTime() ?? 0; + return bTime - aTime; + }); + } + + private async findRunsSinceByWorkflowId( + workflowId: WorkflowId, + cutoffMs: number, + ): Promise { + const dir = this.getRunsDir(workflowId); + const runs: WorkflowRun[] = []; + + try { + for await (const entry of Deno.readDir(dir)) { + if ( + !entry.isFile || !entry.name.startsWith("workflow-run-") || + !entry.name.endsWith(".yaml") + ) { + continue; + } + const path = join(dir, entry.name); + + // Stage A: mtime pre-filter + const stat = await Deno.stat(path); + const mtimeMs = stat.mtime?.getTime(); + if (mtimeMs !== undefined && mtimeMs < cutoffMs) continue; + + // Stage B: parse and verify + const content = await Deno.readTextFile(path); + const data = parseYaml(content) as WorkflowRunData; + if (data.logFile) { + data.logFile = toAbsolutePath(this.repoDir, data.logFile); + } + const run = WorkflowRun.fromData(data); + const startedAtMs = run.startedAt?.getTime(); + if (startedAtMs === undefined || startedAtMs < cutoffMs) continue; + + runs.push(run); + } + } catch (error) { + if (error instanceof Deno.errors.NotFound) { + return []; + } + throw error; + } + + return runs; + } + async save(workflowId: WorkflowId, run: WorkflowRun): Promise { const path = this.getPath(workflowId, run.id); await this.notifyDirty(path); diff --git a/src/infrastructure/persistence/yaml_workflow_run_repository_test.ts b/src/infrastructure/persistence/yaml_workflow_run_repository_test.ts index 52780956..ee8092b9 100644 --- a/src/infrastructure/persistence/yaml_workflow_run_repository_test.ts +++ b/src/infrastructure/persistence/yaml_workflow_run_repository_test.ts @@ -434,3 +434,94 @@ Deno.test("YamlWorkflowRunRepository invokes markDirty with relPath on mutations assertEquals(calls.length, 2); }); }); + +Deno.test("findAllGlobalSince: returns only in-window runs", async () => { + await withTempDir(async (dir) => { + const repo = new YamlWorkflowRunRepository(dir); + const workflow = createTestWorkflow(); + + const old = WorkflowRun.create(workflow); + old.start(); + await repo.save(workflow.id, old); + + // Backdate the file so its mtime falls before the cutoff. + const oldPath = repo.getPath(workflow.id, old.id); + const oldDate = new Date(Date.now() - 7 * 24 * 60 * 60 * 1000); + await Deno.utime(oldPath, oldDate, oldDate); + + const fresh = WorkflowRun.create(workflow); + fresh.start(); + await repo.save(workflow.id, fresh); + + const cutoff = new Date(Date.now() - 60 * 60 * 1000); + const found = await repo.findAllGlobalSince(cutoff); + + assertEquals(found.length, 1); + assertEquals(found[0].run.id, fresh.id); + }); +}); + +Deno.test( + "findAllGlobalSince: rejects long-running runs that started before the cutoff", + async () => { + await withTempDir(async (dir) => { + const repo = new YamlWorkflowRunRepository(dir); + const workflow = createTestWorkflow(); + + // A run that started 2 days ago. + const longRunning = WorkflowRun.create(workflow); + longRunning.start(); + await repo.save(workflow.id, longRunning); + const path = repo.getPath(workflow.id, longRunning.id); + const startedLongAgo = new Date(Date.now() - 2 * 24 * 60 * 60 * 1000); + // Re-write the YAML with a startedAt 2 days in the past, then bump + // mtime to "now" to simulate a long-running workflow that the engine + // saved into recently — Stage A passes, Stage B must reject. + const content = await Deno.readTextFile(path); + const patched = content.replace( + /startedAt: .*/, + `startedAt: "${startedLongAgo.toISOString()}"`, + ); + await Deno.writeTextFile(path, patched); + const now = new Date(); + await Deno.utime(path, now, now); + + const cutoff = new Date(Date.now() - 60 * 60 * 1000); + const found = await repo.findAllGlobalSince(cutoff); + + assertEquals(found.length, 0); + }); + }, +); + +Deno.test( + "findAllGlobalSince: backup-restore (mtime in the future) keeps correctness", + async () => { + await withTempDir(async (dir) => { + const repo = new YamlWorkflowRunRepository(dir); + const workflow = createTestWorkflow(); + + // An old run whose mtime got scrambled by a backup-restore — mtime is + // "now" but the YAML startedAt is far in the past. Stage A is defeated + // (we won't skip), but Stage B catches it. + const old = WorkflowRun.create(workflow); + old.start(); + await repo.save(workflow.id, old); + const path = repo.getPath(workflow.id, old.id); + const startedLongAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); + const content = await Deno.readTextFile(path); + const patched = content.replace( + /startedAt: .*/, + `startedAt: "${startedLongAgo.toISOString()}"`, + ); + await Deno.writeTextFile(path, patched); + const now = new Date(); + await Deno.utime(path, now, now); + + const cutoff = new Date(Date.now() - 60 * 60 * 1000); + const found = await repo.findAllGlobalSince(cutoff); + + assertEquals(found.length, 0); + }); + }, +); diff --git a/src/libswamp/summary/summarise.ts b/src/libswamp/summary/summarise.ts index 0f9d084f..4201d69d 100644 --- a/src/libswamp/summary/summarise.ts +++ b/src/libswamp/summary/summarise.ts @@ -44,11 +44,20 @@ export type SummariseEvent = export interface SummariseInput { since: Date; sinceLabel: string; + /** + * If set, caps each per-group `runs[]` array to the N most-recent entries. + * Counts always reflect every matching run in the window. Default is + * unlimited (preserves prior behavior — no silent JSON shape change). + */ + limit?: number; } /** Dependencies for the summarise operation. */ export interface SummariseDeps { - summarise: (cutoffDate: Date) => Promise; + summarise: ( + cutoffDate: Date, + options: { limit?: number }, + ) => Promise; } /** Wires real infrastructure into SummariseDeps. */ @@ -67,7 +76,7 @@ export function createSummariseDeps(repos: { repos.workflowRepo, ); return { - summarise: (cutoffDate) => service.summarise(cutoffDate), + summarise: (cutoffDate, options) => service.summarise(cutoffDate, options), }; } @@ -79,7 +88,7 @@ export async function* summarise( ): AsyncIterable { ctx.logger.debug`Generating activity summary since ${input.sinceLabel}`; - const summary = await deps.summarise(input.since); + const summary = await deps.summarise(input.since, { limit: input.limit }); const hasActivity = summary.methodExecutions.length > 0 || summary.workflows.length > 0 || diff --git a/src/libswamp/workflows/run_test.ts b/src/libswamp/workflows/run_test.ts index 72c3b0e5..5dfdb3c0 100644 --- a/src/libswamp/workflows/run_test.ts +++ b/src/libswamp/workflows/run_test.ts @@ -107,6 +107,14 @@ class InMemoryWorkflowRunRepository implements WorkflowRunRepository { } return Promise.resolve(results); } + async findAllGlobalSince( + cutoff: Date, + ): Promise<{ run: WorkflowRun; workflowId: WorkflowId }[]> { + const all = await this.findAllGlobal(); + return all.filter(({ run }) => + run.startedAt !== undefined && run.startedAt >= cutoff + ); + } save(wfId: WorkflowId, run: WorkflowRun): Promise { const existing = this.runs.get(wfId) ?? []; const idx = existing.findIndex((r) => r.id === run.id); diff --git a/src/presentation/renderers/summarise.ts b/src/presentation/renderers/summarise.ts index 96831b6b..b9f76969 100644 --- a/src/presentation/renderers/summarise.ts +++ b/src/presentation/renderers/summarise.ts @@ -77,6 +77,9 @@ function renderMethodExecutions( const parts: string[] = []; if (method.succeeded > 0) parts.push(green(`\u2713 ${method.succeeded}`)); if (method.failed > 0) parts.push(red(`\u2717 ${method.failed}`)); + if (method.truncated) { + parts.push(dim(`(showing ${method.runs.length} of ${method.total})`)); + } writeOutput(` ${label}${parts.join(" ")}`); if (verbose) { @@ -141,6 +144,9 @@ function renderWorkflowRuns( const parts: string[] = []; if (group.succeeded > 0) parts.push(green(`\u2713 ${group.succeeded}`)); if (group.failed > 0) parts.push(red(`\u2717 ${group.failed}`)); + if (group.truncated) { + parts.push(dim(`(showing ${group.runs.length} of ${group.total})`)); + } writeOutput(` ${label} ${parts.join(" ")}`); if (verbose) { From 8518e8b6fcf2b987b0df69a2731c0329027a9cc0 Mon Sep 17 00:00:00 2001 From: stack72 Date: Tue, 5 May 2026 14:16:00 +0100 Subject: [PATCH 2/2] fix(summarise): use UserError and reject non-integer --limit (review) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Review feedback on systeminit/swamp#1306: 1. The --limit validation threw `new Error(...)`, which prints a stack trace to the user. Every other CLI-level validation in the codebase throws `UserError` (issue_ripple.ts, datastore_sync.ts, vault_put.ts, vault_create.ts); the CLI's error handler suppresses the stack and prints only the message for UserError. Switch to UserError for consistency. 2. Cliffy's `:number` type accepts floats, so `--limit 2.5` slipped past the `<= 0` guard and silently truncated via `Array.prototype.slice(0, 2.5) → slice(0, 2)`. The error message says "positive integer" — make the validation match by adding `!Number.isInteger(options.limit)` to the guard. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/cli/commands/summarise.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/cli/commands/summarise.ts b/src/cli/commands/summarise.ts index 26e9df02..bc1041b5 100644 --- a/src/cli/commands/summarise.ts +++ b/src/cli/commands/summarise.ts @@ -32,6 +32,7 @@ import { summarise, } from "../../libswamp/mod.ts"; import { createSummariseRenderer } from "../../presentation/renderers/summarise.ts"; +import { UserError } from "../../domain/errors.ts"; /** * `swamp summarise` @@ -62,8 +63,11 @@ export const summariseCommand = new Command() const ctx = createContext(options as GlobalOptions, ["summarise"]); ctx.logger.debug`Generating activity summary`; - if (options.limit !== undefined && options.limit <= 0) { - throw new Error("--limit must be a positive integer"); + if ( + options.limit !== undefined && + (options.limit <= 0 || !Number.isInteger(options.limit)) + ) { + throw new UserError("--limit must be a positive integer"); } const { repoContext } = await requireInitializedRepoReadOnly({