Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 30 additions & 13 deletions .claude/skills/swamp-extension-datastore/references/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,34 +163,51 @@ Optional interface for remote datastore synchronization.

```typescript
interface DatastoreSyncService {
pullChanged(): Promise<void>;
pushChanged(): Promise<void>;
markDirty(): Promise<void>;
pullChanged(options?: DatastoreSyncOptions): Promise<number | void>;
pushChanged(options?: DatastoreSyncOptions): Promise<number | void>;
markDirty(options?: DatastoreSyncOptions): Promise<void>;
}

interface DatastoreSyncOptions {
signal?: AbortSignal;
/** Cache-relative path of the file about to be written or removed. */
relPath?: string;
}
```

### `pullChanged()`
### `pullChanged(options?)`

Pull changed files from the remote datastore to the local cache. Called before
read/write operations on remote datastores.
read/write operations on remote datastores. `options.relPath` has no defined
meaning here — core only sets it on `markDirty`.

### `pushChanged()`
### `pushChanged(options?)`

Push changed files from the local cache to the remote datastore. Called after
write operations complete.
write operations complete. `options.relPath` has no defined meaning here — core
only sets it on `markDirty`.

### `markDirty()`
### `markDirty(options?)`

Signal that the local cache has uncommitted work. Swamp core calls this at the
start of every repository-layer mutation that writes into the cache (e.g.
`save`, `delete`, `rename`), **before** the write begins — a crash mid-write
still leaves the watermark dirty.

The method only matters for implementations that maintain a clean/dirty
watermark to short-circuit zero-diff syncs (the recommended fast-path pattern —
see `design/datastores.md`). Those implementations MUST flip the watermark to
dirty here so the next `pushChanged` cannot skip past core's writes.
When core can attribute the mutation to a single path, it sets `options.relPath`
to a forward-slash cache-relative path. Bulk mutations (`rename`, non-dry-run
`collectGarbage`, `deleteAllByWorkflowId`, `clearAll`) omit the field —
extensions MUST treat absence as "fall back to full walk."

The full eight-rule contract — pre-write timing, absence-on-disk = delete,
`undefined` = bulk, restart-loses-set, cache-relative + forward-slash (consumers
convert to native separators on Windows), backward compatibility, field scope,
bulk-overrides-per-path-within-one-operation — is documented in
`design/datastores.md` "markDirty() contract." Read that section before
implementing per-path tracking.

Implementations that unconditionally walk the cache on every `pushChanged` have
nothing to invalidate and can return `Promise.resolve()`.
nothing to invalidate and can return `Promise.resolve()` — fully backward
compatible.

`markDirty()` must be idempotent and cheap — core does not deduplicate calls.
85 changes: 67 additions & 18 deletions .claude/skills/swamp-extension-datastore/references/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ A simple local filesystem variant that stores data in a custom directory:
```typescript
// extensions/datastores/custom-fs/mod.ts
import { z } from "npm:zod@4";
import { join } from "@std/path";

const ConfigSchema = z.object({
basePath: z.string().describe("Base directory for data storage"),
Expand Down Expand Up @@ -235,24 +236,72 @@ export const datastore = {
},
}),

// Sync service for pull/push operations
createSyncService: (_repoDir: string, cachePath: string) => ({
pullChanged: async () => {
// Download changed files from remote to cachePath
console.log(`Pulling from ${parsed.endpoint} to ${cachePath}`);
},
pushChanged: async () => {
// Upload changed files from cachePath to remote
console.log(`Pushing from ${cachePath} to ${parsed.endpoint}`);
},
markDirty: () => {
// Swamp core calls this before every cache write. If your
// pushChanged walks the cache unconditionally, there's nothing
// to invalidate — no-op. If you add a clean/dirty sidecar
// (fast-path pattern in design/datastores.md), flip it here.
return Promise.resolve();
},
}),
// Sync service for pull/push operations.
//
// The pattern below shows the per-key fast path enabled by
// markDirty's options.relPath: maintain a Set<string> of dirty
// relPaths plus a bulkInvalidated boolean. pushChanged consumes
// the set when bulkInvalidated is false, doing exactly the work
// core attributed; otherwise it falls back to a full walk. See
// `design/datastores.md` "markDirty() contract" for the eight
// load-bearing rules — pre-write timing, absence-on-disk = delete,
// restart-loses-set, etc.
//
// For a no-op fallback (always do a full walk on every push),
// delete the dirty set and just `return Promise.resolve()` from
// markDirty.
createSyncService: (_repoDir: string, cachePath: string) => {
const dirty = new Set<string>();
let bulkInvalidated = false;

return {
pullChanged: async () => {
// Download changed files from remote to cachePath
console.log(`Pulling from ${parsed.endpoint} to ${cachePath}`);
},
pushChanged: async () => {
if (bulkInvalidated || dirty.size === 0) {
// Full walk path — bulk signal arrived, or nothing recorded
// (e.g. fresh process — see rule 4: "process restart loses
// the set; first pushChanged after start does a full walk").
console.log(`Full-walk push from ${cachePath}`);
} else {
for (const relPath of dirty) {
// Wire format is forward-slash; convert to native path
// before disk access (rule 5).
const absPath = join(cachePath, ...relPath.split("/"));
try {
await Deno.stat(absPath);
// upsert remote record for relPath (file exists)
} catch (err) {
if (err instanceof Deno.errors.NotFound) {
// delete remote record (absence-on-disk = delete,
// rule 2 — collapses create/update/delete into one
// signal, no op-kind param needed).
} else throw err;
}
}
}
dirty.clear();
bulkInvalidated = false;
},
markDirty: ({ relPath } = {}) => {
// Pre-write timing (rule 1): the file isn't on disk yet.
// We only RECORD the path; the upload happens later in
// pushChanged.
if (relPath !== undefined) {
dirty.add(relPath);
} else {
// undefined = bulk (rule 3). Some core mutations also
// emit a bulk + per-path pair from one operation
// (rule 8) — bulk overrides any per-path entries we may
// have recorded for the same operation.
bulkInvalidated = true;
}
return Promise.resolve();
},
};
},

resolveDatastorePath: (repoDir: string) => {
// For remote datastores, return the cache path
Expand Down
93 changes: 79 additions & 14 deletions design/datastores.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,20 +316,85 @@ the fast path's local-dirty flag would stay `false` and the next `pushChanged`
would short-circuit past real work.

The `markDirty()` method on `DatastoreSyncService` is the contract that closes
this gap:

- **Extension obligation.** Any sync service that caches a clean/dirty
watermark MUST flip it to dirty in `markDirty()` — same primitive as the
internal `pushFile`-equivalent. Sync services without a fast path MAY
no-op. `markDirty()` must be idempotent and cheap (not deduplicated by
core).
- **Core obligation.** Repositories writing into the cache call `markDirty()`
at the start of every public mutation method (`save`, `append`, `delete`,
`rename`, `allocateVersion`, `finalizeVersion`, `removeLatestMarker`,
non-dry-run `collectGarbage`, and the equivalents on the three yaml
repositories). The call happens **before** any write begins so a crash
mid-write leaves the watermark dirty — `markDirty()` then slow-walk is
always recoverable; a lost dirty-flip is not.
this gap. The signature accepts an options bag with an optional `relPath` so
extensions tracking per-path dirty state can record exactly which path
changed instead of only flipping a single global bit:

```typescript
markDirty(options?: DatastoreSyncOptions): Promise<void>;

interface DatastoreSyncOptions {
signal?: AbortSignal;
/** Cache-relative path of the file about to be written or removed. */
relPath?: string;
}
```

The contract is eight load-bearing rules:

1. **Pre-write timing.** `markDirty` fires *before* the cache write begins.
Extensions MUST NOT act synchronously on `relPath` — the file isn't on
disk yet. Treat `relPath` as a hint to record for the next `pushChanged`.
2. **Absence-on-disk = delete.** When `pushChanged` later consumes a
recorded `relPath` and the file no longer exists in the cache, the
extension SHOULD delete the corresponding remote record. This collapses
create/update/delete into one signal — no separate op-kind needed.
3. **`undefined` `relPath` = bulk.** A call without `relPath` signals a
mutation core couldn't attribute to a single path (e.g. `rename`,
non-dry-run `collectGarbage`, `deleteAllByWorkflowId`, `clearAll`).
Extensions maintaining a per-path dirty set MUST honor this by either
invalidating the set or flagging the next `pushChanged` for a full
walk.
4. **Process restart loses the set.** Extensions holding the dirty set in
memory MUST fall back to a full walk on the first `pushChanged` after
process start. Persisting the set to a sidecar is allowed but optional.
5. **`relPath` is cache-relative + forward-slash.** Relative to the
directory returned by `DatastoreProvider.resolveCachePath`, with
forward-slash separators on the wire regardless of host OS — matching
the `.datastore-index.json` key convention. **Extensions consuming
`relPath` for disk access on Windows MUST convert to native separators**
(e.g. via `@std/path` `join`) before `Deno.stat`/`Deno.readFile`/etc.
6. **Backward compatibility.** `relPath` is optional. Existing
implementations (`@swamp/s3-datastore`, `@swamp/gcs-datastore`,
filesystem no-op, every test mock) keep working unchanged because the
old single-watermark pattern still satisfies the contract — any
`markDirty` call still flips the dirty flag.
7. **Field scope.** swamp core only sets `relPath` on `markDirty` calls.
The field has no defined meaning on `pullChanged` or `pushChanged`
(it lives on the shared `DatastoreSyncOptions` for source
compatibility, not because pull/push consume it).
8. **Bulk overrides per-path within one operation.** Some core mutations
emit a bulk signal AND one or more per-path signals from the same
logical operation — `rename` is the canonical example: the upfront
`markDirty()` call has no `relPath` (bulk, for the tombstone +
latest-marker writes that don't decompose), and the inner `save()` of
the new name then emits a per-path signal. Extensions MUST treat any
bulk signal as overriding per-path signals from the same operation.
Easiest implementation: keep both a `bulkInvalidated: boolean` flag
and the dirty set; in `pushChanged`, fall back to a full walk when
`bulkInvalidated` is true regardless of the set's contents.

**Core obligation.** Repositories writing into the cache call the dirty
hook at the start of every public mutation method (`save`, `append`,
`delete`, `rename`, `allocateVersion`, `finalizeVersion`,
`removeLatestMarker`, non-dry-run `collectGarbage`, and the equivalents
on the three yaml repositories). The call happens **before** any write
begins so a crash mid-write leaves the watermark dirty —
markDirty-then-slow-walk is always recoverable; a lost dirty-flip is not.

**Per-call granularity emitted by core.**

| Method | `relPath` |
| ------------------------------------------ | --------------------------------------------------------------- |
| `save`, `append`, `allocateVersion` | data-name directory (version not yet allocated at notify time) |
| `removeLatestMarker` | data-name directory |
| `delete(version=specific)` | version directory |
| `delete(version=undefined)` | data-name directory (entire subtree removed) |
| `finalizeVersion` | version directory (version known) |
| `rename` | `undefined` (bulk; inner `save()` emits its own per-path signal) |
| `collectGarbage` (non-dry-run) | `undefined` (bulk) |
| Yaml repos: `save`, `delete` | per-yaml file path |
| `deleteAllByWorkflowId`, `clearAll` | `undefined` (bulk) |

Filesystem datastores have no fast path and wire no sync service, so the
markDirty plumbing is a no-op for them.
Expand Down
15 changes: 13 additions & 2 deletions packages/testing/datastore_test_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ export interface LockOperation {
export interface SyncOperation {
method: "pullChanged" | "pushChanged" | "markDirty";
timestamp: number;
/**
* Cache-relative path forwarded by core on this call. Only set for
* `markDirty` calls when core attributes the dirty signal to a single
* path. Undefined for bulk `markDirty` calls and always undefined for
* `pullChanged`/`pushChanged`.
*/
relPath?: string;
}

/** Options for creating a datastore test context. */
Expand Down Expand Up @@ -205,8 +212,12 @@ export function createDatastoreTestContext(
syncOperations.push({ method: "pushChanged", timestamp: Date.now() });
return Promise.resolve();
},
markDirty(): Promise<void> {
syncOperations.push({ method: "markDirty", timestamp: Date.now() });
markDirty(options?: { relPath?: string }): Promise<void> {
syncOperations.push({
method: "markDirty",
timestamp: Date.now(),
relPath: options?.relPath,
});
return Promise.resolve();
},
}
Expand Down
9 changes: 9 additions & 0 deletions packages/testing/datastore_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ export interface DatastoreVerifier {
/** Options accepted by sync service methods. */
export interface DatastoreSyncOptions {
signal?: AbortSignal;
/**
* Cache-relative path of the file about to be written or removed.
* swamp core only sets this on `markDirty` calls; the field has no
* defined meaning on `pullChanged` or `pushChanged`. Path is
* forward-slash-normalized; extensions consuming it for disk access
* on Windows must convert to native separators. See the canonical
* `DatastoreSyncService.markDirty` JSDoc for the full contract.
*/
relPath?: string;
}

/** Interface for datastore synchronization services. */
Expand Down
43 changes: 39 additions & 4 deletions src/cli/repo_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* - Throwing clear errors when not initialized
*/

import { isAbsolute, join, relative, resolve } from "@std/path";
import { isAbsolute, join, relative, resolve, SEPARATOR } from "@std/path";
import type { OutputMode } from "../presentation/output/output.ts";
import {
createRepositoryContext,
Expand Down Expand Up @@ -69,7 +69,10 @@ import {
resolveSyncTimeoutMs,
} from "../domain/datastore/datastore_config.ts";
import type { DatastoreProvider } from "../domain/datastore/datastore_provider.ts";
import type { DatastoreSyncService } from "../domain/datastore/datastore_sync_service.ts";
import type {
DatastoreSyncService,
MarkDirtyHook,
} from "../domain/datastore/datastore_sync_service.ts";
import { datastoreTypeRegistry } from "../domain/datastore/datastore_type_registry.ts";
import { getSwampLogger } from "../infrastructure/logging/logger.ts";
import { withSpan } from "../infrastructure/tracing/mod.ts";
Expand Down Expand Up @@ -120,6 +123,32 @@ async function resolveCustomProvider(
return typeInfo.createProvider(config.config);
}

/**
* Bridges a `DatastoreSyncService` into a `MarkDirtyHook` that repositories
* can call without knowing the cache root. Repositories pass the absolute
* path of the about-to-be-written file (or `undefined` for bulk mutations);
* the hook computes the cache-relative form and forwards it to the sync
* service via `DatastoreSyncOptions.relPath`.
*
* Path conversion: `relative(cacheRoot, absPath)` then forward-slash
* normalize. Forward-slash on the wire is the cross-platform key
* convention (matching `.datastore-index.json`); extensions consuming
* `relPath` for disk access on Windows convert back to native separators.
*/
function buildMarkDirtyHook(
syncService: DatastoreSyncService,
cacheRoot: string,
): MarkDirtyHook {
return (absPath?: string) => {
if (absPath === undefined) {
return syncService.markDirty();
}
const rel = relative(cacheRoot, absPath);
const relPath = SEPARATOR === "/" ? rel : rel.split(SEPARATOR).join("/");
return syncService.markDirty({ relPath });
};
}

/**
* Options for requireInitializedRepo.
*/
Expand Down Expand Up @@ -434,7 +463,10 @@ export function requireInitializedRepo(
yamlWorkflowsDir,
vaultsDir,
datastoreResolver,
markDirty: syncService ? () => syncService!.markDirty() : undefined,
markDirty: syncService && isCustomDatastoreConfig(datastoreConfig) &&
datastoreConfig.cachePath
? buildMarkDirtyHook(syncService, datastoreConfig.cachePath)
: undefined,
...factoryConfig,
});

Expand Down Expand Up @@ -563,7 +595,10 @@ export async function requireInitializedRepoUnlocked(
yamlWorkflowsDir,
vaultsDir,
datastoreResolver,
markDirty: syncService ? () => syncService!.markDirty() : undefined,
markDirty: syncService && isCustomDatastoreConfig(datastoreConfig) &&
datastoreConfig.cachePath
? buildMarkDirtyHook(syncService, datastoreConfig.cachePath)
: undefined,
...factoryConfig,
});

Expand Down
Loading
Loading