diff --git a/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts b/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts index c5bec2b1..966c729a 100644 --- a/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts +++ b/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts @@ -804,18 +804,43 @@ export class GcsCacheSyncService implements DatastoreSyncService { throw new Error(formatBatchFailure("pull", failures)); } - // Verified zero-diff: local cache matches the remote index whose - // generation we captured from the `pullIndex` GET response. - // Persist THAT generation — the one we walked against — so the - // next `pullChanged` can take the fast path. We deliberately do - // NOT re-`getMetadata` here: a post-walk metadata call could - // observe a generation from a concurrent writer's push landing - // during our walk, and recording that generation would mask their - // data on the next fast-path sync (swamp-club #168). If - // generation is null (cache-hit pullIndex or NotFound brand-new - // bucket), the sidecar is skipped — next sync self-heals. - if (pulled === 0 && indexGeneration) { + // Local cache matches the remote index whose generation we captured + // from the `pullIndex` GET response — either the walk found zero + // diff (`pulled === 0`) or we just downloaded the missing files + // (`pulled > 0`). Persist THAT generation — the one we walked + // against — so the next `pullChanged` / `pushChanged` can take the + // fast path. We deliberately do NOT re-`getMetadata`: a post-walk + // metadata call could observe a generation from a concurrent + // writer's push landing during our walk, and recording that + // generation would mask their data on the next fast-path sync + // (swamp-club #168). If generation is null (cache-hit pullIndex or + // NotFound brand-new bucket), the sidecar is skipped — next sync + // self-heals on the slow path. + // + // When `pulled > 0`, also rewrite the on-disk index with the + // in-memory state so it carries the localMtime values we just + // recorded for each downloaded file. Pre-fix, the on-disk file was + // last written by `pullIndex` from the raw remote payload (carrying + // the original pusher's local mtimes), so a subsequent fresh-process + // `pushChanged` slow-path walk saw `existing.localMtime` (pusher's) + // ≠ `stat.mtime` (local mtime from `Deno.writeFile`) and pushed + // every file with byte-identical content (swamp-club #222). + // + // Ordering invariant — DO NOT REVERSE: `atomicWriteTextFile` MUST + // run before `markSynced`. `markSynced` derives `lastVerifiedAt` + // from `Deno.stat(this.indexPath).mtime + 1ms`. Reversing the order + // captures `lastVerifiedAt` against the pre-write mtime; the + // subsequent rewrite then bumps the index mtime forward, and the + // next `tryFastPullChanged` probe spuriously bails on + // `indexMtime >= verifiedAt`. + if (indexGeneration) { try { + if (pulled > 0 && this.index) { + await atomicWriteTextFile( + this.indexPath, + JSON.stringify(this.index, null, 2), + ); + } await this.markSynced(indexGeneration); } catch { // Non-fatal: sidecar update is opportunistic. Disk-full / diff --git a/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync_test.ts b/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync_test.ts index 1d585ec0..08e62971 100644 --- a/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync_test.ts +++ b/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync_test.ts @@ -1874,3 +1874,129 @@ Deno.test("pushChanged writeback: does NOT mark sidecar clean when local is miss await Deno.remove(cachePath, { recursive: true }); } }); + +// swamp-club #222: pushChanged in a fresh process after a fresh-cache +// pullChanged must not redundantly re-upload the just-downloaded files. +// Pre-fix, pullChanged left the on-disk index carrying the original +// pusher's localMtime values (the remote payload), and skipped markSynced +// because pulled > 0. The next process's pushChanged then took the slow +// path (no sidecar), pullIndex(forceRemote) reloaded the pusher's mtimes +// into this.index, and the walk pushed every file — size matched but +// mtime didn't. The fix persists the in-memory index (with the puller's +// stat-derived localMtimes) AND marksSynced so the next pushChanged hits +// the fast path. Two positive assertions pin the persistence machinery +// (sidecar contents + on-disk index mtimes) so a future refactor can't +// silently drop them while still satisfying the symptom assertion. +Deno.test("pullChanged + pushChanged: fresh-process push after pull against populated remote does not redundantly upload (swamp-club #222)", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "gcssync-issue-222-" }); + try { + const mock = createMockGcsClient(); + // Pin a known generation for the remote index so the sidecar + // assertion has a stable target. + mock.generationOverrides.set(".datastore-index.json", "42"); + + // Seed 5 files with varied sizes (5, 11, 27, 64, 128 bytes), each + // index entry carrying machine-A's localMtime (epoch — guaranteed + // distinct from any local stat.mtime that Deno.writeFile will set + // on this machine during pullChanged's downloads). + const aMtime = new Date(0).toISOString(); + const SIZES = [5, 11, 27, 64, 128]; + const seeded = SIZES.map((size, i) => ({ + rel: `data/@m/file-${i}.yaml`, + body: "x".repeat(size), + })); + const indexEntries: Record = {}; + for (const { rel, body } of seeded) { + mock.storage.set(rel, new TextEncoder().encode(body)); + indexEntries[rel] = { + key: rel, + size: body.length, + lastModified: aMtime, + localMtime: aMtime, + }; + } + mock.storage.set(".datastore-index.json", encodeIndex(indexEntries)); + + // Process A: fresh service, fresh cache. Pulls all 5 files. + const serviceA = new GcsCacheSyncService(mock, cachePath); + const pulled = await serviceA.pullChanged(); + assertEquals(pulled, SIZES.length, "must pull all 5 seeded files"); + + // Positive: sidecar must be written with the GET'd generation and + // clean state. Pre-fix, pulled > 0 skipped markSynced and the + // sidecar didn't exist. Post-fix, this is the load-bearing piece + // that lets the next pushChanged take the fast path. + const sidecarText = await Deno.readTextFile( + join(cachePath, ".datastore-sync-state.json"), + ); + const sidecar = JSON.parse(sidecarText); + assertEquals( + sidecar.localDirty, + false, + "sidecar must be clean — local cache matches the freshly-pulled remote", + ); + assertEquals( + sidecar.remoteIndexGeneration, + "42", + "sidecar must record the generation from pullIndex's GET response", + ); + + // Positive: on-disk index must reflect the local mtimes pullChanged + // recorded for each downloaded file — NOT the seeded epoch value. + // Pre-fix, the on-disk file was last written by pullIndex from the + // raw remote payload and still carried machine-A's epoch mtimes. + const indexText = await Deno.readTextFile( + join(cachePath, ".datastore-index.json"), + ); + const onDiskIndex = JSON.parse(indexText) as { + entries: Record; + }; + for (const { rel } of seeded) { + const entry = onDiskIndex.entries[rel]; + assertExists(entry, `entry ${rel} must persist on disk`); + assertExists( + entry.localMtime, + `entry ${rel} must carry a localMtime on disk`, + ); + assert( + entry.localMtime !== aMtime, + `entry ${rel} on-disk localMtime must NOT be the seeded machine-A epoch — pre-fix the on-disk index still carried it`, + ); + const parsed = Date.parse(entry.localMtime!); + assert( + !Number.isNaN(parsed), + `entry ${rel} localMtime must be a parseable ISO timestamp`, + ); + } + + // Symptom: fresh service C against the same cache dir simulates the + // cross-process scenario from the issue (`swamp datastore sync + // --push` run as a separate command). Snapshot mock.puts BEFORE so + // any writeback PUT from pullChanged doesn't pollute the count. + const putsBefore = mock.puts.length; + const serviceC = new GcsCacheSyncService(mock, cachePath); + const pushed = await serviceC.pushChanged(); + assertEquals( + pushed, + 0, + `pushChanged after a fresh-process pullChanged must not redundantly upload — saw ${pushed} pushes against ${SIZES.length} unchanged files`, + ); + const dataPuts = mock.puts.slice(putsBefore).filter((p) => + p.key !== ".datastore-index.json" + ); + assertEquals( + dataPuts.length, + 0, + `expected zero data PUTs against an unchanged cache, saw ${dataPuts.length}: ${ + dataPuts.map((p) => p.key).join(", ") + }`, + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); diff --git a/datastore/gcs/manifest.yaml b/datastore/gcs/manifest.yaml index 3cc421e6..d78829bd 100644 --- a/datastore/gcs/manifest.yaml +++ b/datastore/gcs/manifest.yaml @@ -1,6 +1,6 @@ manifestVersion: 1 name: "@swamp/gcs-datastore" -version: "2026.04.28.4" +version: "2026.05.04.1" description: | Store data in a Google Cloud Storage bucket with local cache synchronization. Provides distributed locking via GCS generation-based preconditions and diff --git a/datastore/s3/extensions/datastores/_lib/s3_cache_sync.ts b/datastore/s3/extensions/datastores/_lib/s3_cache_sync.ts index 44a42521..ca94945f 100644 --- a/datastore/s3/extensions/datastores/_lib/s3_cache_sync.ts +++ b/datastore/s3/extensions/datastores/_lib/s3_cache_sync.ts @@ -831,17 +831,42 @@ export class S3CacheSyncService implements DatastoreSyncService { throw new Error(formatBatchFailure("pull", failures)); } - // Verified zero-diff: local cache matches the remote index whose - // ETag we captured from the `pullIndex` GET response. Persist THAT - // ETag — the one we walked against — so the next `pullChanged` can - // take the fast path. We deliberately do NOT re-HEAD here: a - // post-walk HEAD could observe an ETag from a concurrent writer's - // push landing during our walk, and recording that ETag would mask - // their data on the next fast-path sync (swamp-club #168). If the - // ETag is null (cache-hit pullIndex or NotFound brand-new bucket), - // the sidecar is skipped — next sync self-heals on the slow path. - if (pulled === 0 && indexETag) { + // Local cache matches the remote index whose ETag we captured from + // the `pullIndex` GET response — either the walk found zero diff + // (`pulled === 0`) or we just downloaded the missing files + // (`pulled > 0`). Persist THAT ETag — the one we walked against — + // so the next `pullChanged` / `pushChanged` can take the fast path. + // We deliberately do NOT re-HEAD: a post-walk HEAD could observe an + // ETag from a concurrent writer's push landing during our walk, + // and recording that ETag would mask their data on the next + // fast-path sync (swamp-club #168). If the ETag is null (cache-hit + // pullIndex or NotFound brand-new bucket), the sidecar is skipped — + // next sync self-heals on the slow path. + // + // When `pulled > 0`, also rewrite the on-disk index with the + // in-memory state so it carries the localMtime values we just + // recorded for each downloaded file. Pre-fix, the on-disk file was + // last written by `pullIndex` from the raw remote payload (carrying + // the original pusher's local mtimes), so a subsequent fresh-process + // `pushChanged` slow-path walk saw `existing.localMtime` (pusher's) + // ≠ `stat.mtime` (local mtime from `Deno.writeFile`) and pushed + // every file with byte-identical content (swamp-club #222). + // + // Ordering invariant — DO NOT REVERSE: `atomicWriteTextFile` MUST + // run before `markSynced`. `markSynced` derives `lastVerifiedAt` + // from `Deno.stat(this.indexPath).mtime + 1ms`. Reversing the order + // captures `lastVerifiedAt` against the pre-write mtime; the + // subsequent rewrite then bumps the index mtime forward, and the + // next `tryFastPullChanged` probe spuriously bails on + // `indexMtime >= verifiedAt`. + if (indexETag) { try { + if (pulled > 0 && this.index) { + await atomicWriteTextFile( + this.indexPath, + JSON.stringify(this.index, null, 2), + ); + } await this.markSynced(indexETag); } catch { // Non-fatal: sidecar update is opportunistic. Disk-full / diff --git a/datastore/s3/extensions/datastores/_lib/s3_cache_sync_test.ts b/datastore/s3/extensions/datastores/_lib/s3_cache_sync_test.ts index 864332dd..650cfebb 100644 --- a/datastore/s3/extensions/datastores/_lib/s3_cache_sync_test.ts +++ b/datastore/s3/extensions/datastores/_lib/s3_cache_sync_test.ts @@ -2111,3 +2111,127 @@ Deno.test("pushChanged writeback: does NOT mark sidecar clean when local is miss await Deno.remove(cachePath, { recursive: true }); } }); + +// swamp-club #222: pushChanged in a fresh process after a fresh-cache +// pullChanged must not redundantly re-upload the just-downloaded files. +// Pre-fix, pullChanged left the on-disk index carrying the original +// pusher's localMtime values (the remote payload), and skipped markSynced +// because pulled > 0. The next process's pushChanged then took the slow +// path (no sidecar), pullIndex(forceRemote) reloaded the pusher's mtimes +// into this.index, and the walk pushed every file — size matched but +// mtime didn't. The fix persists the in-memory index (with the puller's +// stat-derived localMtimes) AND marksSynced so the next pushChanged hits +// the fast path. Two positive assertions pin the persistence machinery +// (sidecar contents + on-disk index mtimes) so a future refactor can't +// silently drop them while still satisfying the symptom assertion. +Deno.test("pullChanged + pushChanged: fresh-process push after pull against populated remote does not redundantly upload (swamp-club #222)", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "s3sync-issue-222-" }); + try { + const mock = createMockS3Client(); + // Pin a known ETag for the remote index so the sidecar assertion + // doesn't have to recompute fakeETag from byte stitching. + mock.etagOverrides.set(".datastore-index.json", '"index-pre-pull"'); + + // Seed 5 files with varied sizes (5, 11, 27, 64, 128 bytes), each + // index entry carrying machine-A's localMtime (epoch — guaranteed + // distinct from any local stat.mtime that Deno.writeFile will set + // on this machine during pullChanged's downloads). + const aMtime = new Date(0).toISOString(); + const SIZES = [5, 11, 27, 64, 128]; + const seeded = SIZES.map((size, i) => ({ + rel: `data/@m/file-${i}.yaml`, + body: "x".repeat(size), + })); + const indexEntries: Record = {}; + for (const { rel, body } of seeded) { + mock.storage.set(rel, new TextEncoder().encode(body)); + indexEntries[rel] = { + key: rel, + size: body.length, + lastModified: aMtime, + localMtime: aMtime, + }; + } + mock.storage.set(".datastore-index.json", encodeIndex(indexEntries)); + + // Process A: fresh service, fresh cache. Pulls all 5 files. + const serviceA = new S3CacheSyncService(mock, cachePath); + const pulled = await serviceA.pullChanged(); + assertEquals(pulled, SIZES.length, "must pull all 5 seeded files"); + + // Positive: sidecar must be written with the GET'd ETag and clean + // state. Pre-fix, pulled > 0 skipped markSynced and this returned + // null. Post-fix, this is the load-bearing piece that lets the next + // pushChanged take the fast path. + const sidecar = await readSidecar(cachePath); + assertExists(sidecar, "pullChanged with pulled > 0 must write the sidecar"); + assertEquals( + sidecar!.localDirty, + false, + "sidecar must be clean — local cache matches the freshly-pulled remote", + ); + assertEquals( + sidecar!.remoteIndexETag, + "index-pre-pull", + "sidecar must record the ETag from pullIndex's GET response", + ); + + // Positive: on-disk index must reflect the local mtimes pullChanged + // recorded for each downloaded file — NOT the seeded epoch value. + // Pre-fix, the on-disk file was last written by pullIndex from the + // raw remote payload and still carried machine-A's epoch mtimes. + const indexText = await Deno.readTextFile( + join(cachePath, ".datastore-index.json"), + ); + const onDiskIndex = JSON.parse(indexText) as { + entries: Record; + }; + for (const { rel } of seeded) { + const entry = onDiskIndex.entries[rel]; + assertExists(entry, `entry ${rel} must persist on disk`); + assertExists( + entry.localMtime, + `entry ${rel} must carry a localMtime on disk`, + ); + assert( + entry.localMtime !== aMtime, + `entry ${rel} on-disk localMtime must NOT be the seeded machine-A epoch — pre-fix the on-disk index still carried it`, + ); + const parsed = Date.parse(entry.localMtime!); + assert( + !Number.isNaN(parsed), + `entry ${rel} localMtime must be a parseable ISO timestamp`, + ); + } + + // Symptom: fresh service C against the same cache dir simulates the + // cross-process scenario from the issue (`swamp datastore sync + // --push` run as a separate command). Snapshot mock.puts BEFORE so + // any writeback PUT from pullChanged doesn't pollute the count. + const putsBefore = mock.puts.length; + const serviceC = new S3CacheSyncService(mock, cachePath); + const pushed = await serviceC.pushChanged(); + assertEquals( + pushed, + 0, + `pushChanged after a fresh-process pullChanged must not redundantly upload — saw ${pushed} pushes against ${SIZES.length} unchanged files`, + ); + const dataPuts = mock.puts.slice(putsBefore).filter((p) => + p.key !== ".datastore-index.json" + ); + assertEquals( + dataPuts.length, + 0, + `expected zero data PUTs against an unchanged cache, saw ${dataPuts.length}: ${ + dataPuts.map((p) => p.key).join(", ") + }`, + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); diff --git a/datastore/s3/manifest.yaml b/datastore/s3/manifest.yaml index 5cd38f59..985d1ba8 100644 --- a/datastore/s3/manifest.yaml +++ b/datastore/s3/manifest.yaml @@ -1,6 +1,6 @@ manifestVersion: 1 name: "@swamp/s3-datastore" -version: "2026.04.28.4" +version: "2026.05.04.1" description: | Store data in an Amazon S3 bucket with local cache synchronization. Provides distributed locking via S3 conditional writes and bidirectional