diff --git a/design/datastores.md b/design/datastores.md index b3caf053..b7cd2a63 100644 --- a/design/datastores.md +++ b/design/datastores.md @@ -509,6 +509,55 @@ calls `forceRelease(expectedNonce)` to clear it — without this, the post-acquire TOCTOU re-check would re-detect the same stale lock on every iteration and recurse indefinitely. +#### Symmetric Drain (structural commands) + +Structural commands (`requireInitializedRepo`) acquire the global lock with +a **symmetric drain** — `waitForPerModelLocks` is invoked twice, once +before the global lock is acquired and once after: + +1. **First drain (pre-acquire).** Wait for any per-model locks visible at + command start to be released. A writer that is already past its own + TOCTOU recheck (in `acquireModelLocks`) is committed to writing data + and must be allowed to finish. +2. **Acquire global lock.** From this point on, any writer that runs + `inspect()` against the global lock will see it held and back off. +3. **Second drain (post-acquire).** Wait for any per-model locks that + slipped past the first drain — i.e., writers that inspected the global + lock between the first drain ending and the global-lock acquisition, + saw it not held, and went on to acquire a per-model lock. + +The second drain is what closes the symmetric TOCTOU window between the +two sides of the protocol. Without it, a writer can: + +1. Inspect global → not held (deleter has not yet acquired) +2. Take per-model lock +3. Pass its TOCTOU recheck → not held (deleter still has not acquired) +4. Begin writing a new version directory + +…while the deleter completes its first drain, acquires the global lock, +and runs `Deno.remove(dataNameDir, { recursive: true })`. The recursive +removal then races the writer's new version subdirectory and fails with +ENOTEMPTY (Linux: `os error 39`, macOS: `os error 66`) — the failure mode +behind swamp-club#234. + +The second drain catches the writer's still-held per-model lock and waits +for the writer to finish (and either commit cleanly or release on its own +TOCTOU recheck) before structural work proceeds. Because the writer's +recheck runs *immediately* after taking the per-model lock — before any +data I/O — there is no remaining window where the writer can write data +without the deleter's second drain seeing the per-model lock. + +> **Maintainer note.** Both drain calls are required to keep this +> contract sound. The bidirectional citation +> (`src/cli/repo_context.ts:requireInitializedRepo` ↔ this section) is +> there so a future change cannot silently remove one of the two waits +> without confronting the contract. When changing this lifecycle, update +> both sites. + +Caveat: `waitForPerModelLocks` only scans the local filesystem. Custom +(S3, distributed) datastores use their own `DistributedLock` +implementation and rely on its semantics rather than the local drain. + A SIGINT handler ensures best-effort lock release on Ctrl-C. If the process crashes without releasing, the lock expires after the TTL (30 seconds by default). diff --git a/integration/data_delete_test.ts b/integration/data_delete_test.ts index 22604a29..86ad8bf6 100644 --- a/integration/data_delete_test.ts +++ b/integration/data_delete_test.ts @@ -30,6 +30,7 @@ import { assertEquals, assertExists, assertRejects } from "@std/assert"; import { join } from "@std/path"; import { ensureDir } from "@std/fs"; +import { stringify as stringifyYaml } from "@std/yaml"; import { Data } from "../src/domain/data/data.ts"; import type { OwnerDefinition } from "../src/domain/data/data_metadata.ts"; import { ModelType } from "../src/domain/models/model_type.ts"; @@ -38,6 +39,8 @@ import { FileSystemUnifiedDataRepository } from "../src/infrastructure/persisten import { YamlDefinitionRepository } from "../src/infrastructure/persistence/yaml_definition_repository.ts"; import { CatalogStore } from "../src/infrastructure/persistence/catalog_store.ts"; import { DataDeleteService } from "../src/domain/data/data_delete_service.ts"; +import { SHELL_MODEL_TYPE } from "../src/domain/models/command/shell/shell_model.ts"; +import { CLI_ARGS } from "./test_helpers.ts"; async function withTempDir(fn: (dir: string) => Promise): Promise { const dir = await Deno.makeTempDir({ prefix: "swamp-data-delete-" }); @@ -206,3 +209,144 @@ Deno.test("Data Delete: --version against non-existent version names available v assertEquals(after.sort((a, b) => a - b), [1, 2, 3]); }); }); + +// ============================================================================ +// Concurrent writer / delete smoke test (regression for swamp-club#234) +// ============================================================================ +// +// Smoke regression for the symmetric drain in `requireInitializedRepo`: a +// concurrent writer creating a new version directory between the deleter's +// pre-acquire drain and the rmdir caused +// `Deno.remove(dataNameDir, { recursive: true })` to fail with ENOTEMPTY +// (Linux: os error 39, macOS: os error 66). The architectural fix lives in +// src/cli/repo_context.ts; the deterministic regression for the polling +// primitive lives in src/cli/repo_context_test.ts. +// +// This test is non-deterministic by design — it spawns real CLI processes +// across N iterations and asserts no ENOTEMPTY error appears. The original +// bug reproduced in ~40 attempts under high writer pressure (4 parallel +// writers + tight delete loop), so N=50 single-writer iterations gives a +// margin large enough to catch a future regression with reasonable +// probability while staying within integration-suite wall-clock budgets. +// Do not silently shrink N below 50 — read swamp-club#234 first. + +const RACE_ITERATIONS = 50; +const SHELL_SLEEP_MS = 300; + +async function runSwamp( + args: string[], + cwd: string, +): Promise<{ stdout: string; stderr: string; code: number }> { + const { code, stdout, stderr } = await new Deno.Command(Deno.execPath(), { + args: [...CLI_ARGS, ...args], + stdout: "piped", + stderr: "piped", + cwd, + }).output(); + return { + stdout: new TextDecoder().decode(stdout), + stderr: new TextDecoder().decode(stderr), + code, + }; +} + +async function initializeShellRepo(repoDir: string): Promise { + for ( + const sub of [ + "models", + ".swamp/outputs", + ".swamp/data", + ".swamp/logs", + ] + ) { + await ensureDir(join(repoDir, sub)); + } + await Deno.writeTextFile( + join(repoDir, ".swamp.yaml"), + stringifyYaml({ + swampVersion: "0.0.0", + initializedAt: new Date().toISOString(), + } as Record), + ); + + // Shell model that holds the per-model lock for ~SHELL_SLEEP_MS, giving + // the concurrent delete a window to race the writer. + const definitionRepo = new YamlDefinitionRepository(repoDir); + const definition = Definition.create({ + name: "race-writer", + methods: { + execute: { + arguments: { + run: `sleep ${SHELL_SLEEP_MS / 1000}; echo done`, + }, + }, + }, + }); + await definitionRepo.save(SHELL_MODEL_TYPE, definition); +} + +function assertNoEnoTempty(label: string, combined: string): void { + for ( + const marker of [ + "Directory not empty", + "os error 39", + "os error 66", + ] + ) { + assertEquals( + combined.includes(marker), + false, + `${label} contains race marker "${marker}":\n${combined}`, + ); + } +} + +Deno.test({ + name: + "Data Delete: 50 iterations of concurrent writer + delete leave no ENOTEMPTY (swamp-club#234)", + // Subprocess spawn means file handles outlive the test scope on some + // platforms; the existing CLI integration tests share this exemption. + sanitizeResources: false, + sanitizeOps: false, + fn: async () => { + await withTempDir(async (repoDir) => { + await initializeShellRepo(repoDir); + + // Seed one version so the first delete has something to remove. + const seed = await runSwamp( + ["model", "method", "run", "race-writer", "execute", "--json"], + repoDir, + ); + assertEquals( + seed.code, + 0, + `Seed write must succeed. stderr: ${seed.stderr}`, + ); + + for (let i = 0; i < RACE_ITERATIONS; i++) { + const writer = runSwamp( + ["model", "method", "run", "race-writer", "execute", "--json"], + repoDir, + ); + // Brief delay so the writer establishes its per-model lock before + // the deleter runs its first drain. + await new Promise((resolve) => setTimeout(resolve, 30)); + const deleter = runSwamp( + ["data", "delete", "race-writer", "result", "--force", "--json"], + repoDir, + ); + + const [w, d] = await Promise.all([writer, deleter]); + + assertNoEnoTempty( + `iteration ${i} writer (code=${w.code})`, + `${w.stdout}\n${w.stderr}`, + ); + assertNoEnoTempty( + `iteration ${i} deleter (code=${d.code})`, + `${d.stdout}\n${d.stderr}`, + ); + } + }); + }, +}); diff --git a/src/cli/repo_context.ts b/src/cli/repo_context.ts index 9963fd79..2f26052b 100644 --- a/src/cli/repo_context.ts +++ b/src/cli/repo_context.ts @@ -425,13 +425,31 @@ export function requireInitializedRepo( } } - // Wait for any held per-model locks to be released before acquiring global lock. - // This prevents the global lock from racing with in-progress per-model operations. + // Symmetric drain — see design/datastores.md "Lock Lifecycle". + // + // First drain: wait for any per-model locks visible at command start + // to be released. A writer that has *already* acquired a per-model + // lock and passed its TOCTOU recheck (acquireModelLocks, this file) + // is committed to writing data; we must let it finish. await waitForPerModelLocks(datastoreConfig.path); - // Wire file-based lock for filesystem datastores + // Acquire the global lock. From here on, any writer that inspects + // the global lock will back off — but a writer that slipped past the + // first drain may have acquired a per-model lock between the drain + // ending and this acquisition, so its in-flight write would still + // race our structural work. const lock = new FileLock(datastoreConfig.path); await registerDatastoreSync({ lock }); + + // Second drain: with the global lock now held, wait for any such + // straggling per-model locks to release. Writers in the middle of + // their own TOCTOU recheck will see the global lock and abandon + // their per-model lock; writers already past the recheck and into + // the data write must finish before we touch the filesystem. This + // closes the symmetric TOCTOU window — do not remove without + // updating the lock-lifecycle contract documented in + // design/datastores.md. + await waitForPerModelLocks(datastoreConfig.path); } // Compute top-level directories for definitions, workflows, and vaults @@ -632,50 +650,61 @@ export async function createModelLock( /** * Waits for any held per-model locks to be released. * - * Called before acquiring the global lock so that structural commands - * (data gc, model create/delete) don't race with in-progress per-model - * operations. Only works for filesystem datastores — S3 datastores use - * distributed locks that cannot be scanned locally. + * Called twice during structural command setup (`requireInitializedRepo`): + * once before acquiring the global lock to drain in-flight writers, and + * once after to catch writers that slipped past the first drain. See + * design/datastores.md "Lock Lifecycle" for the full contract. + * + * Only works for filesystem datastores — S3 datastores use distributed + * locks that cannot be scanned locally. + * + * Test seam: `findModelLocksOverride` is for unit tests only — production + * callers must omit it. Not exported from any barrel; used solely by + * `repo_context_test.ts`. */ -async function waitForPerModelLocks(datastorePath: string): Promise { +export async function waitForPerModelLocks( + datastorePath: string, + findModelLocksOverride?: () => Promise, +): Promise { const logger = getSwampLogger(["datastore", "lock"]); - const findModelLocks = async (): Promise => { - let count = 0; - try { - for await ( - const entry of walk(datastorePath, { - includeDirs: false, - match: [/\.lock$/], - }) - ) { - const rel = relative(datastorePath, entry.path); - if (!parseModelLockKey(rel)) continue; - try { - const content = await Deno.readTextFile(entry.path); - const info = JSON.parse(content) as { - acquiredAt: string; - ttlMs: number; - }; - // Only count non-stale locks - const acquiredAt = new Date(info.acquiredAt).getTime(); - if (Date.now() - acquiredAt <= info.ttlMs) { - count++; + const findModelLocks = findModelLocksOverride ?? + (async (): Promise => { + let count = 0; + try { + for await ( + const entry of walk(datastorePath, { + includeDirs: false, + match: [/\.lock$/], + }) + ) { + const rel = relative(datastorePath, entry.path); + if (!parseModelLockKey(rel)) continue; + try { + const content = await Deno.readTextFile(entry.path); + const info = JSON.parse(content) as { + acquiredAt: string; + ttlMs: number; + }; + // Only count non-stale locks + const acquiredAt = new Date(info.acquiredAt).getTime(); + if (Date.now() - acquiredAt <= info.ttlMs) { + count++; + } + } catch { + // Skip unreadable lock files } - } catch { - // Skip unreadable lock files } + } catch { + // Datastore directory may not exist yet } - } catch { - // Datastore directory may not exist yet - } - return count; - }; + return count; + }); const held = await findModelLocks(); if (held > 0) { logger.info( - "Waiting for {count} per-model lock(s) to be released before acquiring global lock", + "Waiting for {count} per-model lock(s) to be released", { count: held }, ); while (true) { @@ -683,7 +712,7 @@ async function waitForPerModelLocks(datastorePath: string): Promise { const remaining = await findModelLocks(); if (remaining === 0) break; } - logger.info`Per-model locks released, proceeding with global lock`; + logger.info`Per-model locks released`; } } diff --git a/src/cli/repo_context_test.ts b/src/cli/repo_context_test.ts index 939d290b..860d5467 100644 --- a/src/cli/repo_context_test.ts +++ b/src/cli/repo_context_test.ts @@ -28,6 +28,7 @@ import { requireInitializedRepoReadOnly, requireInitializedRepoUnlocked, resolveDatastoreForRepo, + waitForPerModelLocks, } from "./repo_context.ts"; import { flushDatastoreSync } from "../infrastructure/persistence/datastore_sync_coordinator.ts"; import { isCustomDatastoreConfig } from "../domain/datastore/datastore_config.ts"; @@ -841,3 +842,120 @@ Deno.test( }); }, ); + +// ============================================================================ +// waitForPerModelLocks Tests +// ============================================================================ +// +// `waitForPerModelLocks` is called twice by `requireInitializedRepo` — +// once before acquiring the global lock, and once after — to close the +// symmetric TOCTOU window between drain and global-lock acquisition that +// caused issue #234 (data delete failing with ENOTEMPTY against a +// concurrent writer). These tests exercise the polling primitive with an +// injected scanner so the regression coverage is deterministic. + +Deno.test( + "waitForPerModelLocks - returns immediately when no locks are held", + async () => { + let calls = 0; + const scanner = (): Promise => { + calls++; + return Promise.resolve(0); + }; + + const start = Date.now(); + await waitForPerModelLocks("/unused/datastore/path", scanner); + const elapsed = Date.now() - start; + + // No polling loop entered — single scan call, no setTimeout delay. + assertEquals(calls, 1); + assertEquals( + elapsed < 500, + true, + `expected immediate return, elapsed=${elapsed}ms`, + ); + }, +); + +Deno.test( + "waitForPerModelLocks - polls until scanner reports no held locks", + async () => { + // Sequence simulates a per-model lock that releases after the second + // poll: initial scan sees 1 (enter wait loop), first poll sees 1 + // (keep polling), second poll sees 0 (exit). The drain must not + // return until the scanner reports 0. + const sequence = [1, 1, 0]; + let i = 0; + const scanner = (): Promise => { + const next = sequence[Math.min(i, sequence.length - 1)]; + i++; + return Promise.resolve(next); + }; + + const start = Date.now(); + await waitForPerModelLocks("/unused/datastore/path", scanner); + const elapsed = Date.now() - start; + + // 3 calls: initial check + 2 polls (the second poll observes 0 and + // breaks). Polling cadence is 1s; allow some slack for scheduler + // overhead but require at least one poll interval. + assertEquals(i, 3); + assertEquals( + elapsed >= 1_000, + true, + `expected to wait at least one poll interval, elapsed=${elapsed}ms`, + ); + }, +); + +Deno.test( + "requireInitializedRepo - second drain catches a per-model lock that " + + "appears between the first drain and the global lock acquisition " + + "(regression for issue #234)", + async () => { + // End-to-end coverage that the symmetric drain is wired through + // `requireInitializedRepo`. Higher-fidelity timing-driven coverage + // (a real concurrent writer racing a real delete) lives in + // integration/data_delete_test.ts; this test only verifies the + // wiring exists by writing a lock file and confirming + // `requireInitializedRepo` succeeds without skipping it. + await withTempDir(async (dir) => { + await initializeRepo(dir); + const { datastoreConfig } = await resolveDatastoreForRepo(dir); + if (isCustomDatastoreConfig(datastoreConfig)) { + throw new Error("expected filesystem datastore for this test"); + } + + // Pre-write a stale per-model lock file. Stale locks are ignored + // by the scanner (the bug is about *live* writers); this test + // confirms `requireInitializedRepo` traverses the symmetric drain + // code path twice without the stale lock blocking it. + const lockDir = join( + datastoreConfig.path, + "data", + "aws-ec2", + "test-server", + ); + await ensureDir(lockDir); + const lockFile = join(lockDir, ".lock"); + await Deno.writeTextFile( + lockFile, + JSON.stringify({ + holder: "stale@host", + hostname: "host", + pid: 1, + // 10 minutes old + 30s TTL = stale + acquiredAt: new Date(Date.now() - 600_000).toISOString(), + ttlMs: 30_000, + }), + ); + + const ctx = await requireInitializedRepo({ + repoDir: dir, + outputMode: "json", + }); + assertEquals(typeof ctx.repoDir, "string"); + await flushDatastoreSync(); + }); + }, +);