Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions design/datastores.md
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,55 @@ calls `forceRelease(expectedNonce)` to clear it — without this, the
post-acquire TOCTOU re-check would re-detect the same stale lock on every
iteration and recurse indefinitely.

#### Symmetric Drain (structural commands)

Structural commands (`requireInitializedRepo`) acquire the global lock with
a **symmetric drain** — `waitForPerModelLocks` is invoked twice, once
before the global lock is acquired and once after:

1. **First drain (pre-acquire).** Wait for any per-model locks visible at
command start to be released. A writer that is already past its own
TOCTOU recheck (in `acquireModelLocks`) is committed to writing data
and must be allowed to finish.
2. **Acquire global lock.** From this point on, any writer that runs
`inspect()` against the global lock will see it held and back off.
3. **Second drain (post-acquire).** Wait for any per-model locks that
slipped past the first drain — i.e., writers that inspected the global
lock between the first drain ending and the global-lock acquisition,
saw it not held, and went on to acquire a per-model lock.

The second drain is what closes the symmetric TOCTOU window between the
two sides of the protocol. Without it, a writer can:

1. Inspect global → not held (deleter has not yet acquired)
2. Take per-model lock
3. Pass its TOCTOU recheck → not held (deleter still has not acquired)
4. Begin writing a new version directory

…while the deleter completes its first drain, acquires the global lock,
and runs `Deno.remove(dataNameDir, { recursive: true })`. The recursive
removal then races the writer's new version subdirectory and fails with
ENOTEMPTY (Linux: `os error 39`, macOS: `os error 66`) — the failure mode
behind swamp-club#234.

The second drain catches the writer's still-held per-model lock and waits
for the writer to finish (and either commit cleanly or release on its own
TOCTOU recheck) before structural work proceeds. Because the writer's
recheck runs *immediately* after taking the per-model lock — before any
data I/O — there is no remaining window where the writer can write data
without the deleter's second drain seeing the per-model lock.

> **Maintainer note.** Both drain calls are required to keep this
> contract sound. The bidirectional citation
> (`src/cli/repo_context.ts:requireInitializedRepo` ↔ this section) is
> there so a future change cannot silently remove one of the two waits
> without confronting the contract. When changing this lifecycle, update
> both sites.

Caveat: `waitForPerModelLocks` only scans the local filesystem. Custom
(S3, distributed) datastores use their own `DistributedLock`
implementation and rely on its semantics rather than the local drain.

A SIGINT handler ensures best-effort lock release on Ctrl-C. If the process
crashes without releasing, the lock expires after the TTL (30 seconds by
default).
Expand Down
144 changes: 144 additions & 0 deletions integration/data_delete_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import { assertEquals, assertExists, assertRejects } from "@std/assert";
import { join } from "@std/path";
import { ensureDir } from "@std/fs";
import { stringify as stringifyYaml } from "@std/yaml";
import { Data } from "../src/domain/data/data.ts";
import type { OwnerDefinition } from "../src/domain/data/data_metadata.ts";
import { ModelType } from "../src/domain/models/model_type.ts";
Expand All @@ -38,6 +39,8 @@ import { FileSystemUnifiedDataRepository } from "../src/infrastructure/persisten
import { YamlDefinitionRepository } from "../src/infrastructure/persistence/yaml_definition_repository.ts";
import { CatalogStore } from "../src/infrastructure/persistence/catalog_store.ts";
import { DataDeleteService } from "../src/domain/data/data_delete_service.ts";
import { SHELL_MODEL_TYPE } from "../src/domain/models/command/shell/shell_model.ts";
import { CLI_ARGS } from "./test_helpers.ts";

async function withTempDir(fn: (dir: string) => Promise<void>): Promise<void> {
const dir = await Deno.makeTempDir({ prefix: "swamp-data-delete-" });
Expand Down Expand Up @@ -206,3 +209,144 @@ Deno.test("Data Delete: --version against non-existent version names available v
assertEquals(after.sort((a, b) => a - b), [1, 2, 3]);
});
});

// ============================================================================
// Concurrent writer / delete smoke test (regression for swamp-club#234)
// ============================================================================
//
// Smoke regression for the symmetric drain in `requireInitializedRepo`: a
// concurrent writer creating a new version directory between the deleter's
// pre-acquire drain and the rmdir caused
// `Deno.remove(dataNameDir, { recursive: true })` to fail with ENOTEMPTY
// (Linux: os error 39, macOS: os error 66). The architectural fix lives in
// src/cli/repo_context.ts; the deterministic regression for the polling
// primitive lives in src/cli/repo_context_test.ts.
//
// This test is non-deterministic by design — it spawns real CLI processes
// across N iterations and asserts no ENOTEMPTY error appears. The original
// bug reproduced in ~40 attempts under high writer pressure (4 parallel
// writers + tight delete loop), so N=50 single-writer iterations gives a
// margin large enough to catch a future regression with reasonable
// probability while staying within integration-suite wall-clock budgets.
// Do not silently shrink N below 50 — read swamp-club#234 first.

const RACE_ITERATIONS = 50;
const SHELL_SLEEP_MS = 300;

async function runSwamp(
args: string[],
cwd: string,
): Promise<{ stdout: string; stderr: string; code: number }> {
const { code, stdout, stderr } = await new Deno.Command(Deno.execPath(), {
args: [...CLI_ARGS, ...args],
stdout: "piped",
stderr: "piped",
cwd,
}).output();
return {
stdout: new TextDecoder().decode(stdout),
stderr: new TextDecoder().decode(stderr),
code,
};
}

async function initializeShellRepo(repoDir: string): Promise<void> {
for (
const sub of [
"models",
".swamp/outputs",
".swamp/data",
".swamp/logs",
]
) {
await ensureDir(join(repoDir, sub));
}
await Deno.writeTextFile(
join(repoDir, ".swamp.yaml"),
stringifyYaml({
swampVersion: "0.0.0",
initializedAt: new Date().toISOString(),
} as Record<string, unknown>),
);

// Shell model that holds the per-model lock for ~SHELL_SLEEP_MS, giving
// the concurrent delete a window to race the writer.
const definitionRepo = new YamlDefinitionRepository(repoDir);
const definition = Definition.create({
name: "race-writer",
methods: {
execute: {
arguments: {
run: `sleep ${SHELL_SLEEP_MS / 1000}; echo done`,
},
},
},
});
await definitionRepo.save(SHELL_MODEL_TYPE, definition);
}

function assertNoEnoTempty(label: string, combined: string): void {
for (
const marker of [
"Directory not empty",
"os error 39",
"os error 66",
]
) {
assertEquals(
combined.includes(marker),
false,
`${label} contains race marker "${marker}":\n${combined}`,
);
}
}

Deno.test({
name:
"Data Delete: 50 iterations of concurrent writer + delete leave no ENOTEMPTY (swamp-club#234)",
// Subprocess spawn means file handles outlive the test scope on some
// platforms; the existing CLI integration tests share this exemption.
sanitizeResources: false,
sanitizeOps: false,
fn: async () => {
await withTempDir(async (repoDir) => {
await initializeShellRepo(repoDir);

// Seed one version so the first delete has something to remove.
const seed = await runSwamp(
["model", "method", "run", "race-writer", "execute", "--json"],
repoDir,
);
assertEquals(
seed.code,
0,
`Seed write must succeed. stderr: ${seed.stderr}`,
);

for (let i = 0; i < RACE_ITERATIONS; i++) {
const writer = runSwamp(
["model", "method", "run", "race-writer", "execute", "--json"],
repoDir,
);
// Brief delay so the writer establishes its per-model lock before
// the deleter runs its first drain.
await new Promise((resolve) => setTimeout(resolve, 30));
const deleter = runSwamp(
["data", "delete", "race-writer", "result", "--force", "--json"],
repoDir,
);

const [w, d] = await Promise.all([writer, deleter]);

assertNoEnoTempty(
`iteration ${i} writer (code=${w.code})`,
`${w.stdout}\n${w.stderr}`,
);
assertNoEnoTempty(
`iteration ${i} deleter (code=${d.code})`,
`${d.stdout}\n${d.stderr}`,
);
}
});
},
});
105 changes: 67 additions & 38 deletions src/cli/repo_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -425,13 +425,31 @@ export function requireInitializedRepo(
}
}

// Wait for any held per-model locks to be released before acquiring global lock.
// This prevents the global lock from racing with in-progress per-model operations.
// Symmetric drain — see design/datastores.md "Lock Lifecycle".
//
// First drain: wait for any per-model locks visible at command start
// to be released. A writer that has *already* acquired a per-model
// lock and passed its TOCTOU recheck (acquireModelLocks, this file)
// is committed to writing data; we must let it finish.
await waitForPerModelLocks(datastoreConfig.path);

// Wire file-based lock for filesystem datastores
// Acquire the global lock. From here on, any writer that inspects
// the global lock will back off — but a writer that slipped past the
// first drain may have acquired a per-model lock between the drain
// ending and this acquisition, so its in-flight write would still
// race our structural work.
const lock = new FileLock(datastoreConfig.path);
await registerDatastoreSync({ lock });

// Second drain: with the global lock now held, wait for any such
// straggling per-model locks to release. Writers in the middle of
// their own TOCTOU recheck will see the global lock and abandon
// their per-model lock; writers already past the recheck and into
// the data write must finish before we touch the filesystem. This
// closes the symmetric TOCTOU window — do not remove without
// updating the lock-lifecycle contract documented in
// design/datastores.md.
await waitForPerModelLocks(datastoreConfig.path);
}

// Compute top-level directories for definitions, workflows, and vaults
Expand Down Expand Up @@ -632,58 +650,69 @@ export async function createModelLock(
/**
* Waits for any held per-model locks to be released.
*
* Called before acquiring the global lock so that structural commands
* (data gc, model create/delete) don't race with in-progress per-model
* operations. Only works for filesystem datastores — S3 datastores use
* distributed locks that cannot be scanned locally.
* Called twice during structural command setup (`requireInitializedRepo`):
* once before acquiring the global lock to drain in-flight writers, and
* once after to catch writers that slipped past the first drain. See
* design/datastores.md "Lock Lifecycle" for the full contract.
*
* Only works for filesystem datastores — S3 datastores use distributed
* locks that cannot be scanned locally.
*
* Test seam: `findModelLocksOverride` is for unit tests only — production
* callers must omit it. Not exported from any barrel; used solely by
* `repo_context_test.ts`.
*/
async function waitForPerModelLocks(datastorePath: string): Promise<void> {
export async function waitForPerModelLocks(
datastorePath: string,
findModelLocksOverride?: () => Promise<number>,
): Promise<void> {
const logger = getSwampLogger(["datastore", "lock"]);

const findModelLocks = async (): Promise<number> => {
let count = 0;
try {
for await (
const entry of walk(datastorePath, {
includeDirs: false,
match: [/\.lock$/],
})
) {
const rel = relative(datastorePath, entry.path);
if (!parseModelLockKey(rel)) continue;
try {
const content = await Deno.readTextFile(entry.path);
const info = JSON.parse(content) as {
acquiredAt: string;
ttlMs: number;
};
// Only count non-stale locks
const acquiredAt = new Date(info.acquiredAt).getTime();
if (Date.now() - acquiredAt <= info.ttlMs) {
count++;
const findModelLocks = findModelLocksOverride ??
(async (): Promise<number> => {
let count = 0;
try {
for await (
const entry of walk(datastorePath, {
includeDirs: false,
match: [/\.lock$/],
})
) {
const rel = relative(datastorePath, entry.path);
if (!parseModelLockKey(rel)) continue;
try {
const content = await Deno.readTextFile(entry.path);
const info = JSON.parse(content) as {
acquiredAt: string;
ttlMs: number;
};
// Only count non-stale locks
const acquiredAt = new Date(info.acquiredAt).getTime();
if (Date.now() - acquiredAt <= info.ttlMs) {
count++;
}
} catch {
// Skip unreadable lock files
}
} catch {
// Skip unreadable lock files
}
} catch {
// Datastore directory may not exist yet
}
} catch {
// Datastore directory may not exist yet
}
return count;
};
return count;
});

const held = await findModelLocks();
if (held > 0) {
logger.info(
"Waiting for {count} per-model lock(s) to be released before acquiring global lock",
"Waiting for {count} per-model lock(s) to be released",
{ count: held },
);
while (true) {
await new Promise((resolve) => setTimeout(resolve, 1_000));
const remaining = await findModelLocks();
if (remaining === 0) break;
}
logger.info`Per-model locks released, proceeding with global lock`;
logger.info`Per-model locks released`;
}
}

Expand Down
Loading
Loading