From c41474d864292f5d0cf4acbf3398dd012cee0a67 Mon Sep 17 00:00:00 2001 From: keeb Date: Mon, 4 May 2026 16:41:51 -0700 Subject: [PATCH] feat(datastore): thread relPath through markDirty for per-path dirty tracking (swamp-club#232) Adds an optional `relPath` field to `DatastoreSyncOptions` so swamp core can attribute each `markDirty` call to a specific cache-relative path. Strictly additive: existing extensions, call sites, and bulk-invalidate semantics are unchanged. Per-key-queryable backends (Mongo/Redis/etcd-style) can now record exactly which paths changed and skip the full-cache walk on `pushChanged`. The contract is documented in eight load-bearing rules on the JSDoc and in `design/datastores.md` "markDirty() contract" section: pre-write timing, absence-on-disk = delete, undefined = bulk, restart-loses-set, cache-relative + forward-slash with native-separator conversion on Windows, backward compatibility, field scope, and bulk-overrides-per-path-within-one-operation. Internal `MarkDirtyHook` becomes positional `(relPath?: string)` because repositories don't have an `AbortSignal` in scope; the wiring layer in `repo_context.ts` packs it into the public-interface options bag and owns the absolute-to-cache-relative + forward-slash normalization. Per-call granularity emitted by core: - save / append / allocateVersion: data-name directory (version not yet allocated) - removeLatestMarker: data-name directory - delete (with version): version directory - delete (no version): data-name directory (whole subtree) - finalizeVersion: version directory - yaml save / delete: per-yaml file path - rename / collectGarbage / deleteAllByWorkflowId / clearAll: undefined (bulk) End-to-end verified against an out-of-tree Mongo-backed datastore extension: relPath flows correctly, per-path pushChanged works with subtree expansion, tombstone safety gate holds. Closes swamp-club#232 --- .../references/api.md | 43 +++++-- .../references/examples.md | 85 ++++++++++--- design/datastores.md | 93 +++++++++++--- packages/testing/datastore_test_context.ts | 15 ++- packages/testing/datastore_types.ts | 9 ++ src/cli/repo_context.ts | 43 ++++++- src/cli/repo_context_test.ts | 119 ++++++++++++++++++ .../datastore/datastore_sync_service.ts | 73 ++++++++++- .../datastore/testing_package_compat_test.ts | 7 +- .../persistence/unified_data_repository.ts | 42 +++++-- .../unified_data_repository_test.ts | 99 ++++++++++++--- .../yaml_evaluated_definition_repository.ts | 12 +- ...ml_evaluated_definition_repository_test.ts | 90 +++++++++++++ .../persistence/yaml_output_repository.ts | 18 +-- .../yaml_output_repository_test.ts | 12 +- .../yaml_workflow_run_repository.ts | 9 +- .../yaml_workflow_run_repository_test.ts | 12 +- 17 files changed, 677 insertions(+), 104 deletions(-) create mode 100644 src/infrastructure/persistence/yaml_evaluated_definition_repository_test.ts diff --git a/.claude/skills/swamp-extension-datastore/references/api.md b/.claude/skills/swamp-extension-datastore/references/api.md index 46d8d63e..5bada722 100644 --- a/.claude/skills/swamp-extension-datastore/references/api.md +++ b/.claude/skills/swamp-extension-datastore/references/api.md @@ -163,34 +163,51 @@ Optional interface for remote datastore synchronization. ```typescript interface DatastoreSyncService { - pullChanged(): Promise; - pushChanged(): Promise; - markDirty(): Promise; + pullChanged(options?: DatastoreSyncOptions): Promise; + pushChanged(options?: DatastoreSyncOptions): Promise; + markDirty(options?: DatastoreSyncOptions): Promise; +} + +interface DatastoreSyncOptions { + signal?: AbortSignal; + /** Cache-relative path of the file about to be written or removed. */ + relPath?: string; } ``` -### `pullChanged()` +### `pullChanged(options?)` Pull changed files from the remote datastore to the local cache. Called before -read/write operations on remote datastores. +read/write operations on remote datastores. `options.relPath` has no defined +meaning here — core only sets it on `markDirty`. -### `pushChanged()` +### `pushChanged(options?)` Push changed files from the local cache to the remote datastore. Called after -write operations complete. +write operations complete. `options.relPath` has no defined meaning here — core +only sets it on `markDirty`. -### `markDirty()` +### `markDirty(options?)` Signal that the local cache has uncommitted work. Swamp core calls this at the start of every repository-layer mutation that writes into the cache (e.g. `save`, `delete`, `rename`), **before** the write begins — a crash mid-write still leaves the watermark dirty. -The method only matters for implementations that maintain a clean/dirty -watermark to short-circuit zero-diff syncs (the recommended fast-path pattern — -see `design/datastores.md`). Those implementations MUST flip the watermark to -dirty here so the next `pushChanged` cannot skip past core's writes. +When core can attribute the mutation to a single path, it sets `options.relPath` +to a forward-slash cache-relative path. Bulk mutations (`rename`, non-dry-run +`collectGarbage`, `deleteAllByWorkflowId`, `clearAll`) omit the field — +extensions MUST treat absence as "fall back to full walk." + +The full eight-rule contract — pre-write timing, absence-on-disk = delete, +`undefined` = bulk, restart-loses-set, cache-relative + forward-slash (consumers +convert to native separators on Windows), backward compatibility, field scope, +bulk-overrides-per-path-within-one-operation — is documented in +`design/datastores.md` "markDirty() contract." Read that section before +implementing per-path tracking. + Implementations that unconditionally walk the cache on every `pushChanged` have -nothing to invalidate and can return `Promise.resolve()`. +nothing to invalidate and can return `Promise.resolve()` — fully backward +compatible. `markDirty()` must be idempotent and cheap — core does not deduplicate calls. diff --git a/.claude/skills/swamp-extension-datastore/references/examples.md b/.claude/skills/swamp-extension-datastore/references/examples.md index b7015b8b..7787a618 100644 --- a/.claude/skills/swamp-extension-datastore/references/examples.md +++ b/.claude/skills/swamp-extension-datastore/references/examples.md @@ -7,6 +7,7 @@ A simple local filesystem variant that stores data in a custom directory: ```typescript // extensions/datastores/custom-fs/mod.ts import { z } from "npm:zod@4"; +import { join } from "@std/path"; const ConfigSchema = z.object({ basePath: z.string().describe("Base directory for data storage"), @@ -235,24 +236,72 @@ export const datastore = { }, }), - // Sync service for pull/push operations - createSyncService: (_repoDir: string, cachePath: string) => ({ - pullChanged: async () => { - // Download changed files from remote to cachePath - console.log(`Pulling from ${parsed.endpoint} to ${cachePath}`); - }, - pushChanged: async () => { - // Upload changed files from cachePath to remote - console.log(`Pushing from ${cachePath} to ${parsed.endpoint}`); - }, - markDirty: () => { - // Swamp core calls this before every cache write. If your - // pushChanged walks the cache unconditionally, there's nothing - // to invalidate — no-op. If you add a clean/dirty sidecar - // (fast-path pattern in design/datastores.md), flip it here. - return Promise.resolve(); - }, - }), + // Sync service for pull/push operations. + // + // The pattern below shows the per-key fast path enabled by + // markDirty's options.relPath: maintain a Set of dirty + // relPaths plus a bulkInvalidated boolean. pushChanged consumes + // the set when bulkInvalidated is false, doing exactly the work + // core attributed; otherwise it falls back to a full walk. See + // `design/datastores.md` "markDirty() contract" for the eight + // load-bearing rules — pre-write timing, absence-on-disk = delete, + // restart-loses-set, etc. + // + // For a no-op fallback (always do a full walk on every push), + // delete the dirty set and just `return Promise.resolve()` from + // markDirty. + createSyncService: (_repoDir: string, cachePath: string) => { + const dirty = new Set(); + let bulkInvalidated = false; + + return { + pullChanged: async () => { + // Download changed files from remote to cachePath + console.log(`Pulling from ${parsed.endpoint} to ${cachePath}`); + }, + pushChanged: async () => { + if (bulkInvalidated || dirty.size === 0) { + // Full walk path — bulk signal arrived, or nothing recorded + // (e.g. fresh process — see rule 4: "process restart loses + // the set; first pushChanged after start does a full walk"). + console.log(`Full-walk push from ${cachePath}`); + } else { + for (const relPath of dirty) { + // Wire format is forward-slash; convert to native path + // before disk access (rule 5). + const absPath = join(cachePath, ...relPath.split("/")); + try { + await Deno.stat(absPath); + // upsert remote record for relPath (file exists) + } catch (err) { + if (err instanceof Deno.errors.NotFound) { + // delete remote record (absence-on-disk = delete, + // rule 2 — collapses create/update/delete into one + // signal, no op-kind param needed). + } else throw err; + } + } + } + dirty.clear(); + bulkInvalidated = false; + }, + markDirty: ({ relPath } = {}) => { + // Pre-write timing (rule 1): the file isn't on disk yet. + // We only RECORD the path; the upload happens later in + // pushChanged. + if (relPath !== undefined) { + dirty.add(relPath); + } else { + // undefined = bulk (rule 3). Some core mutations also + // emit a bulk + per-path pair from one operation + // (rule 8) — bulk overrides any per-path entries we may + // have recorded for the same operation. + bulkInvalidated = true; + } + return Promise.resolve(); + }, + }; + }, resolveDatastorePath: (repoDir: string) => { // For remote datastores, return the cache path diff --git a/design/datastores.md b/design/datastores.md index fe46dda6..b3caf053 100644 --- a/design/datastores.md +++ b/design/datastores.md @@ -316,20 +316,85 @@ the fast path's local-dirty flag would stay `false` and the next `pushChanged` would short-circuit past real work. The `markDirty()` method on `DatastoreSyncService` is the contract that closes -this gap: - -- **Extension obligation.** Any sync service that caches a clean/dirty - watermark MUST flip it to dirty in `markDirty()` — same primitive as the - internal `pushFile`-equivalent. Sync services without a fast path MAY - no-op. `markDirty()` must be idempotent and cheap (not deduplicated by - core). -- **Core obligation.** Repositories writing into the cache call `markDirty()` - at the start of every public mutation method (`save`, `append`, `delete`, - `rename`, `allocateVersion`, `finalizeVersion`, `removeLatestMarker`, - non-dry-run `collectGarbage`, and the equivalents on the three yaml - repositories). The call happens **before** any write begins so a crash - mid-write leaves the watermark dirty — `markDirty()` then slow-walk is - always recoverable; a lost dirty-flip is not. +this gap. The signature accepts an options bag with an optional `relPath` so +extensions tracking per-path dirty state can record exactly which path +changed instead of only flipping a single global bit: + +```typescript +markDirty(options?: DatastoreSyncOptions): Promise; + +interface DatastoreSyncOptions { + signal?: AbortSignal; + /** Cache-relative path of the file about to be written or removed. */ + relPath?: string; +} +``` + +The contract is eight load-bearing rules: + +1. **Pre-write timing.** `markDirty` fires *before* the cache write begins. + Extensions MUST NOT act synchronously on `relPath` — the file isn't on + disk yet. Treat `relPath` as a hint to record for the next `pushChanged`. +2. **Absence-on-disk = delete.** When `pushChanged` later consumes a + recorded `relPath` and the file no longer exists in the cache, the + extension SHOULD delete the corresponding remote record. This collapses + create/update/delete into one signal — no separate op-kind needed. +3. **`undefined` `relPath` = bulk.** A call without `relPath` signals a + mutation core couldn't attribute to a single path (e.g. `rename`, + non-dry-run `collectGarbage`, `deleteAllByWorkflowId`, `clearAll`). + Extensions maintaining a per-path dirty set MUST honor this by either + invalidating the set or flagging the next `pushChanged` for a full + walk. +4. **Process restart loses the set.** Extensions holding the dirty set in + memory MUST fall back to a full walk on the first `pushChanged` after + process start. Persisting the set to a sidecar is allowed but optional. +5. **`relPath` is cache-relative + forward-slash.** Relative to the + directory returned by `DatastoreProvider.resolveCachePath`, with + forward-slash separators on the wire regardless of host OS — matching + the `.datastore-index.json` key convention. **Extensions consuming + `relPath` for disk access on Windows MUST convert to native separators** + (e.g. via `@std/path` `join`) before `Deno.stat`/`Deno.readFile`/etc. +6. **Backward compatibility.** `relPath` is optional. Existing + implementations (`@swamp/s3-datastore`, `@swamp/gcs-datastore`, + filesystem no-op, every test mock) keep working unchanged because the + old single-watermark pattern still satisfies the contract — any + `markDirty` call still flips the dirty flag. +7. **Field scope.** swamp core only sets `relPath` on `markDirty` calls. + The field has no defined meaning on `pullChanged` or `pushChanged` + (it lives on the shared `DatastoreSyncOptions` for source + compatibility, not because pull/push consume it). +8. **Bulk overrides per-path within one operation.** Some core mutations + emit a bulk signal AND one or more per-path signals from the same + logical operation — `rename` is the canonical example: the upfront + `markDirty()` call has no `relPath` (bulk, for the tombstone + + latest-marker writes that don't decompose), and the inner `save()` of + the new name then emits a per-path signal. Extensions MUST treat any + bulk signal as overriding per-path signals from the same operation. + Easiest implementation: keep both a `bulkInvalidated: boolean` flag + and the dirty set; in `pushChanged`, fall back to a full walk when + `bulkInvalidated` is true regardless of the set's contents. + +**Core obligation.** Repositories writing into the cache call the dirty +hook at the start of every public mutation method (`save`, `append`, +`delete`, `rename`, `allocateVersion`, `finalizeVersion`, +`removeLatestMarker`, non-dry-run `collectGarbage`, and the equivalents +on the three yaml repositories). The call happens **before** any write +begins so a crash mid-write leaves the watermark dirty — +markDirty-then-slow-walk is always recoverable; a lost dirty-flip is not. + +**Per-call granularity emitted by core.** + +| Method | `relPath` | +| ------------------------------------------ | --------------------------------------------------------------- | +| `save`, `append`, `allocateVersion` | data-name directory (version not yet allocated at notify time) | +| `removeLatestMarker` | data-name directory | +| `delete(version=specific)` | version directory | +| `delete(version=undefined)` | data-name directory (entire subtree removed) | +| `finalizeVersion` | version directory (version known) | +| `rename` | `undefined` (bulk; inner `save()` emits its own per-path signal) | +| `collectGarbage` (non-dry-run) | `undefined` (bulk) | +| Yaml repos: `save`, `delete` | per-yaml file path | +| `deleteAllByWorkflowId`, `clearAll` | `undefined` (bulk) | Filesystem datastores have no fast path and wire no sync service, so the markDirty plumbing is a no-op for them. diff --git a/packages/testing/datastore_test_context.ts b/packages/testing/datastore_test_context.ts index d0c90011..e824d8c1 100644 --- a/packages/testing/datastore_test_context.ts +++ b/packages/testing/datastore_test_context.ts @@ -38,6 +38,13 @@ export interface LockOperation { export interface SyncOperation { method: "pullChanged" | "pushChanged" | "markDirty"; timestamp: number; + /** + * Cache-relative path forwarded by core on this call. Only set for + * `markDirty` calls when core attributes the dirty signal to a single + * path. Undefined for bulk `markDirty` calls and always undefined for + * `pullChanged`/`pushChanged`. + */ + relPath?: string; } /** Options for creating a datastore test context. */ @@ -205,8 +212,12 @@ export function createDatastoreTestContext( syncOperations.push({ method: "pushChanged", timestamp: Date.now() }); return Promise.resolve(); }, - markDirty(): Promise { - syncOperations.push({ method: "markDirty", timestamp: Date.now() }); + markDirty(options?: { relPath?: string }): Promise { + syncOperations.push({ + method: "markDirty", + timestamp: Date.now(), + relPath: options?.relPath, + }); return Promise.resolve(); }, } diff --git a/packages/testing/datastore_types.ts b/packages/testing/datastore_types.ts index 8e555b25..75efa169 100644 --- a/packages/testing/datastore_types.ts +++ b/packages/testing/datastore_types.ts @@ -69,6 +69,15 @@ export interface DatastoreVerifier { /** Options accepted by sync service methods. */ export interface DatastoreSyncOptions { signal?: AbortSignal; + /** + * Cache-relative path of the file about to be written or removed. + * swamp core only sets this on `markDirty` calls; the field has no + * defined meaning on `pullChanged` or `pushChanged`. Path is + * forward-slash-normalized; extensions consuming it for disk access + * on Windows must convert to native separators. See the canonical + * `DatastoreSyncService.markDirty` JSDoc for the full contract. + */ + relPath?: string; } /** Interface for datastore synchronization services. */ diff --git a/src/cli/repo_context.ts b/src/cli/repo_context.ts index c0882899..9963fd79 100644 --- a/src/cli/repo_context.ts +++ b/src/cli/repo_context.ts @@ -25,7 +25,7 @@ * - Throwing clear errors when not initialized */ -import { isAbsolute, join, relative, resolve } from "@std/path"; +import { isAbsolute, join, relative, resolve, SEPARATOR } from "@std/path"; import type { OutputMode } from "../presentation/output/output.ts"; import { createRepositoryContext, @@ -69,7 +69,10 @@ import { resolveSyncTimeoutMs, } from "../domain/datastore/datastore_config.ts"; import type { DatastoreProvider } from "../domain/datastore/datastore_provider.ts"; -import type { DatastoreSyncService } from "../domain/datastore/datastore_sync_service.ts"; +import type { + DatastoreSyncService, + MarkDirtyHook, +} from "../domain/datastore/datastore_sync_service.ts"; import { datastoreTypeRegistry } from "../domain/datastore/datastore_type_registry.ts"; import { getSwampLogger } from "../infrastructure/logging/logger.ts"; import { withSpan } from "../infrastructure/tracing/mod.ts"; @@ -120,6 +123,32 @@ async function resolveCustomProvider( return typeInfo.createProvider(config.config); } +/** + * Bridges a `DatastoreSyncService` into a `MarkDirtyHook` that repositories + * can call without knowing the cache root. Repositories pass the absolute + * path of the about-to-be-written file (or `undefined` for bulk mutations); + * the hook computes the cache-relative form and forwards it to the sync + * service via `DatastoreSyncOptions.relPath`. + * + * Path conversion: `relative(cacheRoot, absPath)` then forward-slash + * normalize. Forward-slash on the wire is the cross-platform key + * convention (matching `.datastore-index.json`); extensions consuming + * `relPath` for disk access on Windows convert back to native separators. + */ +function buildMarkDirtyHook( + syncService: DatastoreSyncService, + cacheRoot: string, +): MarkDirtyHook { + return (absPath?: string) => { + if (absPath === undefined) { + return syncService.markDirty(); + } + const rel = relative(cacheRoot, absPath); + const relPath = SEPARATOR === "/" ? rel : rel.split(SEPARATOR).join("/"); + return syncService.markDirty({ relPath }); + }; +} + /** * Options for requireInitializedRepo. */ @@ -434,7 +463,10 @@ export function requireInitializedRepo( yamlWorkflowsDir, vaultsDir, datastoreResolver, - markDirty: syncService ? () => syncService!.markDirty() : undefined, + markDirty: syncService && isCustomDatastoreConfig(datastoreConfig) && + datastoreConfig.cachePath + ? buildMarkDirtyHook(syncService, datastoreConfig.cachePath) + : undefined, ...factoryConfig, }); @@ -563,7 +595,10 @@ export async function requireInitializedRepoUnlocked( yamlWorkflowsDir, vaultsDir, datastoreResolver, - markDirty: syncService ? () => syncService!.markDirty() : undefined, + markDirty: syncService && isCustomDatastoreConfig(datastoreConfig) && + datastoreConfig.cachePath + ? buildMarkDirtyHook(syncService, datastoreConfig.cachePath) + : undefined, ...factoryConfig, }); diff --git a/src/cli/repo_context_test.ts b/src/cli/repo_context_test.ts index 7bb24fc2..939d290b 100644 --- a/src/cli/repo_context_test.ts +++ b/src/cli/repo_context_test.ts @@ -722,3 +722,122 @@ Deno.test("requireInitializedRepo - default behavior still triggers coordinator await flushDatastoreSync(); }); }); + +Deno.test( + "requireInitializedRepo - wiring forwards forward-slash cache-relative relPath to markDirty", + async () => { + // End-to-end integration: a repository write under `requireInitializedRepo` + // should reach the sync service's `markDirty` with `options.relPath` set + // to a forward-slash cache-relative string. Pins the cacheRoot→relPath + // conversion in `buildMarkDirtyHook`. Uses a relPath that contains a + // directory separator so a regression that drops forward-slash + // normalization fails on the Windows CI runner. + const { datastoreTypeRegistry } = await import( + "../domain/datastore/datastore_type_registry.ts" + ); + const { Data } = await import("../domain/data/data.ts"); + const { ModelType } = await import("../domain/models/model_type.ts"); + + const typeName = "test-markdirty-relpath"; + const markDirtyCalls: Array<{ relPath?: string }> = []; + + if (!datastoreTypeRegistry.has(typeName)) { + datastoreTypeRegistry.register({ + type: typeName, + name: "Test markDirty relPath wiring", + description: "Captures markDirty options to assert relPath threading", + isBuiltIn: false, + createProvider: () => ({ + createLock: () => ({ + acquire: () => Promise.resolve(), + release: () => Promise.resolve(), + withLock: (fn: () => Promise) => fn(), + inspect: () => Promise.resolve(null), + forceRelease: () => Promise.resolve(true), + }), + createVerifier: () => ({ + verify: () => + Promise.resolve({ + healthy: true, + message: "ok", + latencyMs: 1, + datastoreType: typeName, + }), + }), + resolveDatastorePath: (repoDir: string) => `${repoDir}/.test-store`, + resolveCachePath: (repoDir: string) => `${repoDir}/.test-cache`, + createSyncService: () => ({ + pullChanged: () => Promise.resolve(0), + pushChanged: () => Promise.resolve(0), + markDirty: (options?: { relPath?: string }) => { + markDirtyCalls.push({ relPath: options?.relPath }); + return Promise.resolve(); + }, + }), + }), + }); + } + + await withTempDir(async (dir) => { + await initializeRepo(dir); + await configureExtensionDatastore(dir, typeName); + + markDirtyCalls.length = 0; + + const repo = await requireInitializedRepo({ + repoDir: dir, + outputMode: "json", + skipImplicitSync: true, + }); + + const testType = ModelType.create("test/relpath"); + const data = Data.create({ + name: "wiring-probe", + contentType: "text/plain", + lifetime: "infinite", + garbageCollection: 100, + tags: { type: "test" }, + ownerDefinition: { + ownerType: "manual", + ownerRef: "test-user", + }, + }); + + await repo.repoContext.unifiedDataRepo.save( + testType, + "model-x", + data, + new TextEncoder().encode("payload"), + ); + + // save fires one markDirty call with the data-name directory as relPath. + assertEquals(markDirtyCalls.length, 1); + const relPath = markDirtyCalls[0].relPath; + if (relPath === undefined) { + throw new Error("expected relPath to be set"); + } + // Forward-slash normalized — the data-name directory under the cache + // root contains at least one separator (data//.../wiring-probe). + if (relPath.includes("\\")) { + throw new Error( + `relPath must be forward-slash normalized, got: ${relPath}`, + ); + } + // Cache-relative — must not start with the cache root or be absolute. + if (relPath.startsWith("/") || relPath.includes(":")) { + throw new Error( + `relPath must be cache-relative, got: ${relPath}`, + ); + } + // Must contain at least one separator (data-name dir lives under + // data/.../) so the normalization is exercised. + if (!relPath.includes("/")) { + throw new Error( + `relPath must contain a separator to exercise normalization, got: ${relPath}`, + ); + } + + await flushDatastoreSync(); + }); + }, +); diff --git a/src/domain/datastore/datastore_sync_service.ts b/src/domain/datastore/datastore_sync_service.ts index c3e391d4..34aebbd8 100644 --- a/src/domain/datastore/datastore_sync_service.ts +++ b/src/domain/datastore/datastore_sync_service.ts @@ -30,6 +30,25 @@ export interface DatastoreSyncOptions { * coordinator, so unbounded extensions do not block the CLI indefinitely. */ signal?: AbortSignal; + /** + * Cache-relative path of the file about to be written or removed. + * Optional — omitted when core can't attribute the dirty signal to a + * single path (bulk mutations like `rename` or non-dry-run + * `collectGarbage`). Sync services that track per-path dirty state + * SHOULD treat absence as "fall back to full walk on next pushChanged." + * + * Path is forward-slash-normalized regardless of host OS, matching the + * cache-index key convention. Extensions consuming `relPath` for disk + * access on Windows MUST convert to native separators (e.g. via + * `@std/path` `join`) before `Deno.stat`/`Deno.readFile`/etc. + * + * Field scope: swamp core only sets this on `markDirty` calls. The + * field has no defined meaning on `pullChanged` or `pushChanged` — + * extensions can ignore it there. (It lives on the shared + * `DatastoreSyncOptions` for source compatibility, not because pull + * or push consume it.) + */ + relPath?: string; } /** @@ -57,6 +76,49 @@ export interface DatastoreSyncService { * the start of every repository-layer mutation that writes into the cache, * and calls are not deduplicated. Called before the write begins so a crash * mid-write still leaves the watermark dirty. + * + * Contract — the load-bearing rules every implementer needs to honor: + * + * 1. **Pre-write timing.** Fires *before* the cache write begins. + * Extensions MUST NOT act synchronously on `options.relPath` — the + * file isn't on disk yet. Treat `relPath` as a hint to record for + * the next `pushChanged`. + * 2. **Absence-on-disk = delete.** When `pushChanged` later consumes a + * recorded `relPath` and the file no longer exists in the cache, the + * extension SHOULD delete the corresponding remote record. This + * collapses create/update/delete into one signal — no separate + * op-kind needed. + * 3. **`undefined` `relPath` = bulk.** A call without `relPath` signals + * a mutation core couldn't attribute to a single path. Extensions + * maintaining a per-path dirty set MUST honor this by either + * invalidating the set or flagging the next `pushChanged` for a + * full walk. + * 4. **Process restart loses the set.** Extensions holding the dirty + * set in memory MUST fall back to a full walk on the first + * `pushChanged` after process start. Persisting the set to a + * sidecar is allowed but optional. + * 5. **`relPath` is cache-relative**, relative to the directory + * returned by `DatastoreProvider.resolveCachePath`. Path is + * forward-slash-normalized; extensions consuming it for disk access + * on Windows MUST convert to native separators. + * 6. **Backward compatibility.** `relPath` is optional; existing + * implementations (`@swamp/s3-datastore`, `@swamp/gcs-datastore`, + * filesystem no-op, every test mock) keep working unchanged because + * the old single-watermark pattern still satisfies the contract. + * 7. **Field scope.** swamp core only sets `relPath` on `markDirty` + * calls. The field has no defined meaning on `pullChanged` or + * `pushChanged`. + * 8. **Bulk overrides per-path within one operation.** Some core + * mutations emit a bulk signal AND one or more per-path signals + * from the same logical operation (e.g. `rename` calls the dirty + * hook upfront with no `relPath` for the tombstone + latest-marker + * writes that don't decompose, then its internal `save()` of the + * new name emits a per-path signal). Extensions MUST treat any + * bulk signal as overriding per-path signals from the same + * operation. Easiest implementation: keep both a `bulkInvalidated: + * boolean` flag and the dirty set; in `pushChanged`, fall back to + * a full walk when `bulkInvalidated` is true regardless of the + * set's contents. */ markDirty(options?: DatastoreSyncOptions): Promise; } @@ -72,8 +134,17 @@ export type SyncDirection = "push" | "pull"; * registered at all). Undefined when the repository is wired against a * datastore with no sync service (e.g. filesystem) — callers treat it as a * no-op. + * + * **Internal vs public contract.** Repositories pass an absolute path (or + * `undefined` for bulk mutations) — they don't have the cache root in scope. + * The composition root in `repo_context.ts` wraps this hook with a helper + * that converts the absolute path to a forward-slash cache-relative string + * and forwards it to {@link DatastoreSyncService.markDirty} via + * {@link DatastoreSyncOptions.relPath}. The full contract (8 rules + * including pre-write timing, absence-on-disk semantics, restart behavior) + * is documented on `DatastoreSyncService.markDirty`. */ -export type MarkDirtyHook = () => Promise; +export type MarkDirtyHook = (relPath?: string) => Promise; /** * Thrown when a datastore sync operation exceeds the configured timeout. diff --git a/src/domain/datastore/testing_package_compat_test.ts b/src/domain/datastore/testing_package_compat_test.ts index d236e450..28fc3466 100644 --- a/src/domain/datastore/testing_package_compat_test.ts +++ b/src/domain/datastore/testing_package_compat_test.ts @@ -70,7 +70,12 @@ function _checkDatastoreSyncServiceFields(sync: TestingDatastoreSyncService) { .pushChanged(); const _markDirty: ReturnType = sync.markDirty(); - void [_pull, _push, _markDirty]; + // Verify the optional relPath member on DatastoreSyncOptions type-checks + // through both the canonical and testing-package signatures. + const _markDirtyWithRelPath: ReturnType< + CanonicalDatastoreSyncService["markDirty"] + > = sync.markDirty({ relPath: "data/foo/v1/raw" }); + void [_pull, _push, _markDirty, _markDirtyWithRelPath]; } // DatastoreProvider: verify methods exist and return compatible types. diff --git a/src/infrastructure/persistence/unified_data_repository.ts b/src/infrastructure/persistence/unified_data_repository.ts index 6f978efd..dab7767e 100644 --- a/src/infrastructure/persistence/unified_data_repository.ts +++ b/src/infrastructure/persistence/unified_data_repository.ts @@ -451,9 +451,15 @@ export class FileSystemUnifiedDataRepository implements UnifiedDataRepository { * the cache directory. The hook is no-op when no sync service is wired — * e.g. filesystem datastores, or when constructing the repository outside * a CLI sync lifecycle. See `design/datastores.md` for the contract. + * + * `relPath` is the absolute path of the file or directory about to be + * written or removed (when core can attribute the dirty signal to a single + * path); the wiring layer converts it to a cache-relative form before + * forwarding to the sync service. Pass `undefined` for genuine bulk + * mutations. */ - private async notifyDirty(): Promise { - if (this.markDirty) await this.markDirty(); + private async notifyDirty(relPath?: string): Promise { + if (this.markDirty) await this.markDirty(relPath); } /** @@ -755,7 +761,9 @@ export class FileSystemUnifiedDataRepository implements UnifiedDataRepository { ); } - await this.notifyDirty(); + // Pre-write notify with the data-name directory: version not yet + // allocated, so the truthful signal is "this subtree is changing." + await this.notifyDirty(this.getDataNameDir(type, modelId, data.name)); // Check if data with this name already exists const existing = await this.findByName(type, modelId, data.name); @@ -823,7 +831,7 @@ export class FileSystemUnifiedDataRepository implements UnifiedDataRepository { dataName: string, content: Uint8Array, ): Promise { - await this.notifyDirty(); + await this.notifyDirty(this.getDataNameDir(type, modelId, dataName)); const latestVersion = await this.getLatestVersion(type, modelId, dataName); if (latestVersion === null) { @@ -942,7 +950,13 @@ export class FileSystemUnifiedDataRepository implements UnifiedDataRepository { dataName: string, version?: number, ): Promise { - await this.notifyDirty(); + // Per-version delete → version directory; full-name delete → data-name + // directory (entire subtree removed). + await this.notifyDirty( + version !== undefined + ? this.getPath(type, modelId, dataName, version) + : this.getDataNameDir(type, modelId, dataName), + ); if (version !== undefined) { // Delete specific version @@ -1009,9 +1023,9 @@ export class FileSystemUnifiedDataRepository implements UnifiedDataRepository { modelId: string, dataName: string, ): Promise { - await this.notifyDirty(); - const dataNameDir = this.getDataNameDir(type, modelId, dataName); + await this.notifyDirty(dataNameDir); + const latestSymlink = join(dataNameDir, "latest"); try { @@ -1039,6 +1053,12 @@ export class FileSystemUnifiedDataRepository implements UnifiedDataRepository { newVersion: number; } > { + // Bulk: rename writes a new data under newName (via the inner save() + // which emits its own per-path signal) plus a tombstone, content, + // and latest-marker writes under oldName. The upfront bulk-invalidate + // ensures extensions tracking per-path dirty state fall back to a + // full walk for this operation — see rule 8 on + // DatastoreSyncService.markDirty. await this.notifyDirty(); // Read the latest version of old data @@ -1197,7 +1217,9 @@ export class FileSystemUnifiedDataRepository implements UnifiedDataRepository { ); } - await this.notifyDirty(); + // Pre-write notify with the data-name directory: version not yet + // allocated. Same granularity as save/append. + await this.notifyDirty(this.getDataNameDir(type, modelId, data.name)); // Validate ownership if data with this name already exists const existing = await this.findByName(type, modelId, data.name); @@ -1234,7 +1256,9 @@ export class FileSystemUnifiedDataRepository implements UnifiedDataRepository { data: Data, version: number, ): Promise<{ size: number; checksum: string }> { - await this.notifyDirty(); + // Version is known here (allocateVersion has already run); pass the + // version directory as the per-call signal. + await this.notifyDirty(this.getPath(type, modelId, data.name, version)); const contentPath = this.getContentPath( type, diff --git a/src/infrastructure/persistence/unified_data_repository_test.ts b/src/infrastructure/persistence/unified_data_repository_test.ts index bb40791a..7cefb6b1 100644 --- a/src/infrastructure/persistence/unified_data_repository_test.ts +++ b/src/infrastructure/persistence/unified_data_repository_test.ts @@ -556,16 +556,22 @@ Deno.test("findAllForModelSync returns empty for missing model", () => { // Pins the markDirty contract from design/datastores.md: every public mutation // that writes into the cache must call the sync service's markDirty hook before -// the write begins, so the fast-path sidecar cannot short-circuit past it. +// the write begins, so the fast-path sidecar cannot short-circuit past it. Also +// pins the per-call relPath granularity — pre-write notify sites (save, append, +// allocateVersion) pass the data-name directory because the version directory +// doesn't exist yet; finalizeVersion passes the version directory; delete +// passes the version dir or data-name dir based on whether a version was +// supplied; rename and collectGarbage pass undefined (bulk). +// // Regression coverage for the datastore fast-path contract violation that // silently lost writes when the sidecar stayed clean. Deno.test("mutations call markDirty before writing", async () => { const tmpDir = await Deno.makeTempDir(); try { const catalogStore = new CatalogStore(join(tmpDir, "_catalog.db")); - const calls: string[] = []; - const markDirty = () => { - calls.push("markDirty"); + const calls: Array = []; + const markDirty = (relPath?: string) => { + calls.push(relPath); return Promise.resolve(); }; const repo = new FileSystemUnifiedDataRepository( @@ -575,7 +581,7 @@ Deno.test("mutations call markDirty before writing", async () => { markDirty, ); - // save + // save → data-name directory (version not yet allocated) const data = makeData("mark-dirty-save"); await repo.save( testType, @@ -584,8 +590,12 @@ Deno.test("mutations call markDirty before writing", async () => { new TextEncoder().encode("hello"), ); assertEquals(calls.length, 1); + assertEquals( + calls[0], + repo.getDataNameDir(testType, "model-1", "mark-dirty-save"), + ); - // allocateVersion + finalizeVersion + // allocateVersion → data-name directory; finalizeVersion → version dir const data2 = makeData("mark-dirty-alloc"); const { version, contentPath } = await repo.allocateVersion( testType, @@ -593,30 +603,91 @@ Deno.test("mutations call markDirty before writing", async () => { data2, ); assertEquals(calls.length, 2); + assertEquals( + calls[1], + repo.getDataNameDir(testType, "model-1", "mark-dirty-alloc"), + ); await Deno.writeFile(contentPath, new TextEncoder().encode("direct")); await repo.finalizeVersion(testType, "model-1", data2, version); assertEquals(calls.length, 3); + assertEquals( + calls[2], + repo.getPath(testType, "model-1", "mark-dirty-alloc", version), + ); + + // append → data-name directory (matches save/allocateVersion granularity). + // notifyDirty fires before the streaming-configured check throws, so the + // signal lands even though the operation aborts. Tolerate the throw. + try { + await repo.append( + testType, + "model-1", + "mark-dirty-save", + new TextEncoder().encode("more"), + ); + } catch { + // Expected — mark-dirty-save isn't streaming-configured. + } + assertEquals(calls.length, 4); + assertEquals( + calls[3], + repo.getDataNameDir(testType, "model-1", "mark-dirty-save"), + ); + const afterAppend = calls.length; - // rename + // rename → bulk (undefined) at entry. Internal save() emits a per-path + // signal for the new name. Rule 8: bulk must arrive first within the + // operation so extensions can correctly fall back to a full walk. await repo.rename(testType, "model-1", "mark-dirty-save", "mark-dirty-ren"); - // rename calls markDirty once at entry + once per internal save() - // (the new-name save). Both are cache writes, so both signals are - // legitimate. Pin ≥ 4 to assert at-least-once-per-mutation without - // overspecifying internal call count. - if (calls.length < 4) { + if (calls.length < afterAppend + 2) { throw new Error(`rename did not call markDirty: ${calls.length}`); } + assertEquals( + calls[afterAppend], + undefined, + "rename's first markDirty call must be bulk (undefined relPath) — rule 8", + ); + // The inner save() emits a per-path signal for the new name. Verify by + // looking for the new-name data-name directory in the subsequent calls. + const renameTail = calls.slice(afterAppend + 1); + const expectedRenameInner = repo.getDataNameDir( + testType, + "model-1", + "mark-dirty-ren", + ); + if (!renameTail.some((c) => c === expectedRenameInner)) { + throw new Error( + `rename's inner save() did not emit per-path signal for new name: ${ + JSON.stringify(renameTail) + }`, + ); + } const afterRename = calls.length; - // delete (specific version then all) + // delete with specific version → version directory await repo.delete(testType, "model-1", "mark-dirty-ren", 1); assertEquals(calls.length, afterRename + 1); + assertEquals( + calls[afterRename], + repo.getPath(testType, "model-1", "mark-dirty-ren", 1), + ); + + // delete without version → data-name directory (whole subtree) await repo.delete(testType, "model-1", "mark-dirty-ren"); assertEquals(calls.length, afterRename + 2); + assertEquals( + calls[afterRename + 1], + repo.getDataNameDir(testType, "model-1", "mark-dirty-ren"), + ); - // collectGarbage (live) + // collectGarbage (live) → bulk (undefined) await repo.collectGarbage(testType, "model-1"); assertEquals(calls.length, afterRename + 3); + assertEquals( + calls[afterRename + 2], + undefined, + "collectGarbage must use bulk relPath", + ); // collectGarbage (dry-run) must not notify — it does not touch the cache const before = calls.length; diff --git a/src/infrastructure/persistence/yaml_evaluated_definition_repository.ts b/src/infrastructure/persistence/yaml_evaluated_definition_repository.ts index 1c274b7e..f0eb1917 100644 --- a/src/infrastructure/persistence/yaml_evaluated_definition_repository.ts +++ b/src/infrastructure/persistence/yaml_evaluated_definition_repository.ts @@ -55,8 +55,8 @@ export class YamlEvaluatedDefinitionRepository { swampPath(repoDir, SWAMP_SUBDIRS.definitionsEvaluated); } - private async notifyDirty(): Promise { - if (this.markDirty) await this.markDirty(); + private async notifyDirty(relPath?: string): Promise { + if (this.markDirty) await this.markDirty(relPath); } /** @@ -240,14 +240,13 @@ export class YamlEvaluatedDefinitionRepository { * @param definition - The evaluated definition to save */ async save(type: ModelType, definition: Definition): Promise { - await this.notifyDirty(); + const path = this.getPath(type, definition.id); + await this.notifyDirty(path); const dir = this.getTypeDir(type); await assertSafePath(dir, this.baseDir); await ensureDir(dir); - const path = this.getPath(type, definition.id); - const data = definition.toData(); // Ensure type metadata is always present in persisted YAML data.type = type.normalized; @@ -267,9 +266,8 @@ export class YamlEvaluatedDefinitionRepository { * @param id - The definition ID */ async delete(type: ModelType, id: DefinitionId): Promise { - await this.notifyDirty(); - const path = this.getPath(type, id); + await this.notifyDirty(path); try { await Deno.remove(path); diff --git a/src/infrastructure/persistence/yaml_evaluated_definition_repository_test.ts b/src/infrastructure/persistence/yaml_evaluated_definition_repository_test.ts new file mode 100644 index 00000000..508a7e8f --- /dev/null +++ b/src/infrastructure/persistence/yaml_evaluated_definition_repository_test.ts @@ -0,0 +1,90 @@ +// Swamp, an Automation Framework +// Copyright (C) 2026 System Initiative, Inc. +// +// This file is part of Swamp. +// +// Swamp is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License version 3 +// as published by the Free Software Foundation, with the Swamp +// Extension and Definition Exception (found in the "COPYING-EXCEPTION" +// file). +// +// Swamp is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with Swamp. If not, see . + +import { assertEquals } from "@std/assert"; +import { YamlEvaluatedDefinitionRepository } from "./yaml_evaluated_definition_repository.ts"; +import { Definition } from "../../domain/definitions/definition.ts"; +import { ModelType } from "../../domain/models/model_type.ts"; + +const testType = ModelType.create("test/model"); + +async function withTempDir( + fn: (dir: string) => Promise, +): Promise { + const dir = await Deno.makeTempDir({ + prefix: "swamp-yaml-evaluated-definition-", + }); + try { + await fn(dir); + } finally { + if (Deno.build.os === "windows") { + await Deno.remove(dir, { recursive: true }).catch(() => {}); + } else { + await Deno.remove(dir, { recursive: true }); + } + } +} + +Deno.test( + "YamlEvaluatedDefinitionRepository invokes markDirty with relPath on mutations", + async () => { + await withTempDir(async (dir) => { + const calls: Array = []; + const markDirty = (relPath?: string) => { + calls.push(relPath); + return Promise.resolve(); + }; + const repo = new YamlEvaluatedDefinitionRepository( + dir, + undefined, + markDirty, + ); + + const definition = Definition.create({ + type: testType.normalized, + typeVersion: "1", + name: "test-def", + }); + + const expectedPath = repo.getPath(testType, definition.id); + + // save → per-definition yaml path + await repo.save(testType, definition); + assertEquals(calls.length, 1); + assertEquals(calls[0], expectedPath); + + // delete → same per-definition yaml path + await repo.delete(testType, definition.id); + assertEquals(calls.length, 2); + assertEquals(calls[1], expectedPath); + + // Reads do not notify. + await repo.findAll(testType); + await repo.findById(testType, definition.id); + assertEquals(calls.length, 2); + + // clearAll → bulk (whole evaluated-definitions tree removed) + await repo.save(testType, definition); + assertEquals(calls.length, 3); + await repo.clearAll(); + assertEquals(calls.length, 4); + assertEquals(calls[3], undefined); + }); + }, +); diff --git a/src/infrastructure/persistence/yaml_output_repository.ts b/src/infrastructure/persistence/yaml_output_repository.ts index d05b3c52..7472fb5f 100644 --- a/src/infrastructure/persistence/yaml_output_repository.ts +++ b/src/infrastructure/persistence/yaml_output_repository.ts @@ -58,8 +58,8 @@ export class YamlOutputRepository implements OutputRepository { this.baseDir = baseDir ?? swampPath(repoDir, SWAMP_SUBDIRS.outputs); } - private async notifyDirty(): Promise { - if (this.markDirty) await this.markDirty(); + private async notifyDirty(relPath?: string): Promise { + if (this.markDirty) await this.markDirty(relPath); } async findById( @@ -175,13 +175,12 @@ export class YamlOutputRepository implements OutputRepository { method: string, output: ModelOutput, ): Promise { - await this.notifyDirty(); + const path = this.getPath(type, method, output); + await this.notifyDirty(path); const dir = this.getMethodDir(type, method); await assertSafePath(dir, this.baseDir); await ensureDir(dir); - - const path = this.getPath(type, method, output); const data = output.toData(); // Convert logFile to relative path for storage if (data.logFile) { @@ -198,9 +197,11 @@ export class YamlOutputRepository implements OutputRepository { method: string, id: ModelOutputId, ): Promise { - await this.notifyDirty(); - - // We need to find the file first since filename includes timestamp + // We need to find the file first since filename includes timestamp. + // Notify per-path inside the loop once the match is found, before + // the Deno.remove — same crash-safety as the unconditional pre-write + // notify. When no match is found, nothing is removed and no signal + // is needed. const dir = this.getMethodDir(type, method); try { for await (const entry of Deno.readDir(dir)) { @@ -209,6 +210,7 @@ export class YamlOutputRepository implements OutputRepository { const content = await Deno.readTextFile(path); const data = parseYaml(content) as ModelOutputData; if (data.id === id) { + await this.notifyDirty(path); await Deno.remove(path); // Clean up empty parent directories diff --git a/src/infrastructure/persistence/yaml_output_repository_test.ts b/src/infrastructure/persistence/yaml_output_repository_test.ts index 60372b7c..0fb3c2a2 100644 --- a/src/infrastructure/persistence/yaml_output_repository_test.ts +++ b/src/infrastructure/persistence/yaml_output_repository_test.ts @@ -371,11 +371,11 @@ Deno.test("YamlOutputRepository.getPath uses correct format", () => { assertPathStringIncludes(path, ".yaml"); }); -Deno.test("YamlOutputRepository invokes markDirty on mutations", async () => { +Deno.test("YamlOutputRepository invokes markDirty with relPath on mutations", async () => { await withTempDir(async (dir) => { - const calls: string[] = []; - const markDirty = () => { - calls.push("markDirty"); + const calls: Array = []; + const markDirty = (relPath?: string) => { + calls.push(relPath); return Promise.resolve(); }; const repo = new YamlOutputRepository(dir, undefined, markDirty); @@ -386,11 +386,15 @@ Deno.test("YamlOutputRepository invokes markDirty on mutations", async () => { provenance: defaultProvenance, }); + const expectedPath = repo.getPath(testType, "create", output); + await repo.save(testType, "create", output); assertEquals(calls.length, 1); + assertEquals(calls[0], expectedPath); await repo.delete(testType, "create", output.id); assertEquals(calls.length, 2); + assertEquals(calls[1], expectedPath); // Reads do not notify. await repo.findAll(testType); diff --git a/src/infrastructure/persistence/yaml_workflow_run_repository.ts b/src/infrastructure/persistence/yaml_workflow_run_repository.ts index 291ea1c6..e7c72ab1 100644 --- a/src/infrastructure/persistence/yaml_workflow_run_repository.ts +++ b/src/infrastructure/persistence/yaml_workflow_run_repository.ts @@ -64,8 +64,8 @@ export class YamlWorkflowRunRepository implements WorkflowRunRepository { this.baseDir = baseDir ?? swampPath(repoDir, SWAMP_SUBDIRS.workflowRuns); } - private async notifyDirty(): Promise { - if (this.markDirty) await this.markDirty(); + private async notifyDirty(relPath?: string): Promise { + if (this.markDirty) await this.markDirty(relPath); } async findById( @@ -181,14 +181,13 @@ export class YamlWorkflowRunRepository implements WorkflowRunRepository { } async save(workflowId: WorkflowId, run: WorkflowRun): Promise { - await this.notifyDirty(); + const path = this.getPath(workflowId, run.id); + await this.notifyDirty(path); const dir = this.getRunsDir(workflowId); await assertSafePath(dir, this.baseDir); await ensureDir(dir); - const path = this.getPath(workflowId, run.id); - // Get the previous status to detect state changes let previousStatus: string | undefined; if (this.eventBus) { diff --git a/src/infrastructure/persistence/yaml_workflow_run_repository_test.ts b/src/infrastructure/persistence/yaml_workflow_run_repository_test.ts index 48ef8c85..52780956 100644 --- a/src/infrastructure/persistence/yaml_workflow_run_repository_test.ts +++ b/src/infrastructure/persistence/yaml_workflow_run_repository_test.ts @@ -396,11 +396,11 @@ Deno.test("YamlWorkflowRunRepository.deleteAllByWorkflowId does not affect other }); }); -Deno.test("YamlWorkflowRunRepository invokes markDirty on mutations", async () => { +Deno.test("YamlWorkflowRunRepository invokes markDirty with relPath on mutations", async () => { await withTempDir(async (dir) => { - const calls: string[] = []; - const markDirty = () => { - calls.push("markDirty"); + const calls: Array = []; + const markDirty = (relPath?: string) => { + calls.push(relPath); return Promise.resolve(); }; const repo = new YamlWorkflowRunRepository( @@ -414,15 +414,19 @@ Deno.test("YamlWorkflowRunRepository invokes markDirty on mutations", async () = const run = WorkflowRun.create(workflow); run.start(); + // save → per-run yaml path await repo.save(workflow.id, run); assertEquals(calls.length, 1); + assertEquals(calls[0], repo.getPath(workflow.id, run.id)); await repo.findById(workflow.id, run.id); await repo.findAllByWorkflowId(workflow.id); assertEquals(calls.length, 1); + // deleteAllByWorkflowId → bulk (whole runs directory removed) await repo.deleteAllByWorkflowId(workflow.id); assertEquals(calls.length, 2); + assertEquals(calls[1], undefined); // deleteAllByWorkflowId on an empty workflow is a no-op and must not // notify — nothing was written or removed.