From b8a7f1a2097dcf83bd5638d2b075b4d0a9dba550 Mon Sep 17 00:00:00 2001 From: stack72 Date: Mon, 4 May 2026 17:24:40 +0100 Subject: [PATCH] fix(s3-datastore, gcs-datastore): persist localMtime + markSynced after non-zero pullChanged MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-fix, pullChanged's slow path skipped markSynced when pulled > 0 and left the on-disk index file carrying the original pusher's localMtime values (the raw remote payload pullIndex wrote at the start of the slow path). A subsequent fresh-process pushChanged then took the slow path (no sidecar), pullIndex(forceRemote) reloaded the pusher's mtimes into this.index, and the walk pushed every file — size matched the index but localMtime didn't (pusher's mtime ≠ local stat.mtime from Deno.writeFile). N redundant byte-identical PUTs per fresh-cache bootstrap (swamp-club #222). The fix mirrors the existing pulled === 0 markSynced path: when the local cache matches the remote index whose ETag we GET'd (whether by walk-finds-zero-diff OR by downloading the missing files), persist the in-memory index back to disk (carrying the puller's stat-derived localMtimes) AND mark synced so the next pushChanged hits the fast path. Ordering invariant preserved: atomicWriteTextFile runs BEFORE markSynced because markSynced derives lastVerifiedAt from indexPath mtime + 1ms. Out-of-scope follow-up: when a concurrent writer bumps the remote ETag between pullChanged and pushChanged, the sidecar mismatch forces the slow path, and pullIndex(forceRemote) overwrites this.index with the new remote payload (carrying the writer's localMtimes), and the walk re-fires redundant pushes for any local file whose mtime differs. Pre-existing behavior, not widened by this fix; will be filed separately post-merge. Verified: - 5-file regression test in each of s3 and gcs cache_sync_test.ts: fresh-process pushChanged after fresh-cache pullChanged uploads zero bytes, with positive assertions on sidecar presence/cleanliness and on-disk index localMtime values - s3 suite: 61/61 passing, gcs suite: 64/64 passing - MinIO end-to-end: pulled=5, pushed=0 - GCS end-to-end: pulled=5, pushed=0 Manifests bumped to 2026.05.04.1 in both extensions. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../datastores/_lib/gcs_cache_sync.ts | 47 +++++-- .../datastores/_lib/gcs_cache_sync_test.ts | 126 ++++++++++++++++++ datastore/gcs/manifest.yaml | 2 +- .../datastores/_lib/s3_cache_sync.ts | 45 +++++-- .../datastores/_lib/s3_cache_sync_test.ts | 124 +++++++++++++++++ datastore/s3/manifest.yaml | 2 +- 6 files changed, 323 insertions(+), 23 deletions(-) diff --git a/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts b/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts index c5bec2b1..966c729a 100644 --- a/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts +++ b/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts @@ -804,18 +804,43 @@ export class GcsCacheSyncService implements DatastoreSyncService { throw new Error(formatBatchFailure("pull", failures)); } - // Verified zero-diff: local cache matches the remote index whose - // generation we captured from the `pullIndex` GET response. - // Persist THAT generation — the one we walked against — so the - // next `pullChanged` can take the fast path. We deliberately do - // NOT re-`getMetadata` here: a post-walk metadata call could - // observe a generation from a concurrent writer's push landing - // during our walk, and recording that generation would mask their - // data on the next fast-path sync (swamp-club #168). If - // generation is null (cache-hit pullIndex or NotFound brand-new - // bucket), the sidecar is skipped — next sync self-heals. - if (pulled === 0 && indexGeneration) { + // Local cache matches the remote index whose generation we captured + // from the `pullIndex` GET response — either the walk found zero + // diff (`pulled === 0`) or we just downloaded the missing files + // (`pulled > 0`). Persist THAT generation — the one we walked + // against — so the next `pullChanged` / `pushChanged` can take the + // fast path. We deliberately do NOT re-`getMetadata`: a post-walk + // metadata call could observe a generation from a concurrent + // writer's push landing during our walk, and recording that + // generation would mask their data on the next fast-path sync + // (swamp-club #168). If generation is null (cache-hit pullIndex or + // NotFound brand-new bucket), the sidecar is skipped — next sync + // self-heals on the slow path. + // + // When `pulled > 0`, also rewrite the on-disk index with the + // in-memory state so it carries the localMtime values we just + // recorded for each downloaded file. Pre-fix, the on-disk file was + // last written by `pullIndex` from the raw remote payload (carrying + // the original pusher's local mtimes), so a subsequent fresh-process + // `pushChanged` slow-path walk saw `existing.localMtime` (pusher's) + // ≠ `stat.mtime` (local mtime from `Deno.writeFile`) and pushed + // every file with byte-identical content (swamp-club #222). + // + // Ordering invariant — DO NOT REVERSE: `atomicWriteTextFile` MUST + // run before `markSynced`. `markSynced` derives `lastVerifiedAt` + // from `Deno.stat(this.indexPath).mtime + 1ms`. Reversing the order + // captures `lastVerifiedAt` against the pre-write mtime; the + // subsequent rewrite then bumps the index mtime forward, and the + // next `tryFastPullChanged` probe spuriously bails on + // `indexMtime >= verifiedAt`. + if (indexGeneration) { try { + if (pulled > 0 && this.index) { + await atomicWriteTextFile( + this.indexPath, + JSON.stringify(this.index, null, 2), + ); + } await this.markSynced(indexGeneration); } catch { // Non-fatal: sidecar update is opportunistic. Disk-full / diff --git a/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync_test.ts b/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync_test.ts index 1d585ec0..08e62971 100644 --- a/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync_test.ts +++ b/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync_test.ts @@ -1874,3 +1874,129 @@ Deno.test("pushChanged writeback: does NOT mark sidecar clean when local is miss await Deno.remove(cachePath, { recursive: true }); } }); + +// swamp-club #222: pushChanged in a fresh process after a fresh-cache +// pullChanged must not redundantly re-upload the just-downloaded files. +// Pre-fix, pullChanged left the on-disk index carrying the original +// pusher's localMtime values (the remote payload), and skipped markSynced +// because pulled > 0. The next process's pushChanged then took the slow +// path (no sidecar), pullIndex(forceRemote) reloaded the pusher's mtimes +// into this.index, and the walk pushed every file — size matched but +// mtime didn't. The fix persists the in-memory index (with the puller's +// stat-derived localMtimes) AND marksSynced so the next pushChanged hits +// the fast path. Two positive assertions pin the persistence machinery +// (sidecar contents + on-disk index mtimes) so a future refactor can't +// silently drop them while still satisfying the symptom assertion. +Deno.test("pullChanged + pushChanged: fresh-process push after pull against populated remote does not redundantly upload (swamp-club #222)", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "gcssync-issue-222-" }); + try { + const mock = createMockGcsClient(); + // Pin a known generation for the remote index so the sidecar + // assertion has a stable target. + mock.generationOverrides.set(".datastore-index.json", "42"); + + // Seed 5 files with varied sizes (5, 11, 27, 64, 128 bytes), each + // index entry carrying machine-A's localMtime (epoch — guaranteed + // distinct from any local stat.mtime that Deno.writeFile will set + // on this machine during pullChanged's downloads). + const aMtime = new Date(0).toISOString(); + const SIZES = [5, 11, 27, 64, 128]; + const seeded = SIZES.map((size, i) => ({ + rel: `data/@m/file-${i}.yaml`, + body: "x".repeat(size), + })); + const indexEntries: Record = {}; + for (const { rel, body } of seeded) { + mock.storage.set(rel, new TextEncoder().encode(body)); + indexEntries[rel] = { + key: rel, + size: body.length, + lastModified: aMtime, + localMtime: aMtime, + }; + } + mock.storage.set(".datastore-index.json", encodeIndex(indexEntries)); + + // Process A: fresh service, fresh cache. Pulls all 5 files. + const serviceA = new GcsCacheSyncService(mock, cachePath); + const pulled = await serviceA.pullChanged(); + assertEquals(pulled, SIZES.length, "must pull all 5 seeded files"); + + // Positive: sidecar must be written with the GET'd generation and + // clean state. Pre-fix, pulled > 0 skipped markSynced and the + // sidecar didn't exist. Post-fix, this is the load-bearing piece + // that lets the next pushChanged take the fast path. + const sidecarText = await Deno.readTextFile( + join(cachePath, ".datastore-sync-state.json"), + ); + const sidecar = JSON.parse(sidecarText); + assertEquals( + sidecar.localDirty, + false, + "sidecar must be clean — local cache matches the freshly-pulled remote", + ); + assertEquals( + sidecar.remoteIndexGeneration, + "42", + "sidecar must record the generation from pullIndex's GET response", + ); + + // Positive: on-disk index must reflect the local mtimes pullChanged + // recorded for each downloaded file — NOT the seeded epoch value. + // Pre-fix, the on-disk file was last written by pullIndex from the + // raw remote payload and still carried machine-A's epoch mtimes. + const indexText = await Deno.readTextFile( + join(cachePath, ".datastore-index.json"), + ); + const onDiskIndex = JSON.parse(indexText) as { + entries: Record; + }; + for (const { rel } of seeded) { + const entry = onDiskIndex.entries[rel]; + assertExists(entry, `entry ${rel} must persist on disk`); + assertExists( + entry.localMtime, + `entry ${rel} must carry a localMtime on disk`, + ); + assert( + entry.localMtime !== aMtime, + `entry ${rel} on-disk localMtime must NOT be the seeded machine-A epoch — pre-fix the on-disk index still carried it`, + ); + const parsed = Date.parse(entry.localMtime!); + assert( + !Number.isNaN(parsed), + `entry ${rel} localMtime must be a parseable ISO timestamp`, + ); + } + + // Symptom: fresh service C against the same cache dir simulates the + // cross-process scenario from the issue (`swamp datastore sync + // --push` run as a separate command). Snapshot mock.puts BEFORE so + // any writeback PUT from pullChanged doesn't pollute the count. + const putsBefore = mock.puts.length; + const serviceC = new GcsCacheSyncService(mock, cachePath); + const pushed = await serviceC.pushChanged(); + assertEquals( + pushed, + 0, + `pushChanged after a fresh-process pullChanged must not redundantly upload — saw ${pushed} pushes against ${SIZES.length} unchanged files`, + ); + const dataPuts = mock.puts.slice(putsBefore).filter((p) => + p.key !== ".datastore-index.json" + ); + assertEquals( + dataPuts.length, + 0, + `expected zero data PUTs against an unchanged cache, saw ${dataPuts.length}: ${ + dataPuts.map((p) => p.key).join(", ") + }`, + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); diff --git a/datastore/gcs/manifest.yaml b/datastore/gcs/manifest.yaml index 3cc421e6..d78829bd 100644 --- a/datastore/gcs/manifest.yaml +++ b/datastore/gcs/manifest.yaml @@ -1,6 +1,6 @@ manifestVersion: 1 name: "@swamp/gcs-datastore" -version: "2026.04.28.4" +version: "2026.05.04.1" description: | Store data in a Google Cloud Storage bucket with local cache synchronization. Provides distributed locking via GCS generation-based preconditions and diff --git a/datastore/s3/extensions/datastores/_lib/s3_cache_sync.ts b/datastore/s3/extensions/datastores/_lib/s3_cache_sync.ts index 44a42521..ca94945f 100644 --- a/datastore/s3/extensions/datastores/_lib/s3_cache_sync.ts +++ b/datastore/s3/extensions/datastores/_lib/s3_cache_sync.ts @@ -831,17 +831,42 @@ export class S3CacheSyncService implements DatastoreSyncService { throw new Error(formatBatchFailure("pull", failures)); } - // Verified zero-diff: local cache matches the remote index whose - // ETag we captured from the `pullIndex` GET response. Persist THAT - // ETag — the one we walked against — so the next `pullChanged` can - // take the fast path. We deliberately do NOT re-HEAD here: a - // post-walk HEAD could observe an ETag from a concurrent writer's - // push landing during our walk, and recording that ETag would mask - // their data on the next fast-path sync (swamp-club #168). If the - // ETag is null (cache-hit pullIndex or NotFound brand-new bucket), - // the sidecar is skipped — next sync self-heals on the slow path. - if (pulled === 0 && indexETag) { + // Local cache matches the remote index whose ETag we captured from + // the `pullIndex` GET response — either the walk found zero diff + // (`pulled === 0`) or we just downloaded the missing files + // (`pulled > 0`). Persist THAT ETag — the one we walked against — + // so the next `pullChanged` / `pushChanged` can take the fast path. + // We deliberately do NOT re-HEAD: a post-walk HEAD could observe an + // ETag from a concurrent writer's push landing during our walk, + // and recording that ETag would mask their data on the next + // fast-path sync (swamp-club #168). If the ETag is null (cache-hit + // pullIndex or NotFound brand-new bucket), the sidecar is skipped — + // next sync self-heals on the slow path. + // + // When `pulled > 0`, also rewrite the on-disk index with the + // in-memory state so it carries the localMtime values we just + // recorded for each downloaded file. Pre-fix, the on-disk file was + // last written by `pullIndex` from the raw remote payload (carrying + // the original pusher's local mtimes), so a subsequent fresh-process + // `pushChanged` slow-path walk saw `existing.localMtime` (pusher's) + // ≠ `stat.mtime` (local mtime from `Deno.writeFile`) and pushed + // every file with byte-identical content (swamp-club #222). + // + // Ordering invariant — DO NOT REVERSE: `atomicWriteTextFile` MUST + // run before `markSynced`. `markSynced` derives `lastVerifiedAt` + // from `Deno.stat(this.indexPath).mtime + 1ms`. Reversing the order + // captures `lastVerifiedAt` against the pre-write mtime; the + // subsequent rewrite then bumps the index mtime forward, and the + // next `tryFastPullChanged` probe spuriously bails on + // `indexMtime >= verifiedAt`. + if (indexETag) { try { + if (pulled > 0 && this.index) { + await atomicWriteTextFile( + this.indexPath, + JSON.stringify(this.index, null, 2), + ); + } await this.markSynced(indexETag); } catch { // Non-fatal: sidecar update is opportunistic. Disk-full / diff --git a/datastore/s3/extensions/datastores/_lib/s3_cache_sync_test.ts b/datastore/s3/extensions/datastores/_lib/s3_cache_sync_test.ts index 864332dd..650cfebb 100644 --- a/datastore/s3/extensions/datastores/_lib/s3_cache_sync_test.ts +++ b/datastore/s3/extensions/datastores/_lib/s3_cache_sync_test.ts @@ -2111,3 +2111,127 @@ Deno.test("pushChanged writeback: does NOT mark sidecar clean when local is miss await Deno.remove(cachePath, { recursive: true }); } }); + +// swamp-club #222: pushChanged in a fresh process after a fresh-cache +// pullChanged must not redundantly re-upload the just-downloaded files. +// Pre-fix, pullChanged left the on-disk index carrying the original +// pusher's localMtime values (the remote payload), and skipped markSynced +// because pulled > 0. The next process's pushChanged then took the slow +// path (no sidecar), pullIndex(forceRemote) reloaded the pusher's mtimes +// into this.index, and the walk pushed every file — size matched but +// mtime didn't. The fix persists the in-memory index (with the puller's +// stat-derived localMtimes) AND marksSynced so the next pushChanged hits +// the fast path. Two positive assertions pin the persistence machinery +// (sidecar contents + on-disk index mtimes) so a future refactor can't +// silently drop them while still satisfying the symptom assertion. +Deno.test("pullChanged + pushChanged: fresh-process push after pull against populated remote does not redundantly upload (swamp-club #222)", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "s3sync-issue-222-" }); + try { + const mock = createMockS3Client(); + // Pin a known ETag for the remote index so the sidecar assertion + // doesn't have to recompute fakeETag from byte stitching. + mock.etagOverrides.set(".datastore-index.json", '"index-pre-pull"'); + + // Seed 5 files with varied sizes (5, 11, 27, 64, 128 bytes), each + // index entry carrying machine-A's localMtime (epoch — guaranteed + // distinct from any local stat.mtime that Deno.writeFile will set + // on this machine during pullChanged's downloads). + const aMtime = new Date(0).toISOString(); + const SIZES = [5, 11, 27, 64, 128]; + const seeded = SIZES.map((size, i) => ({ + rel: `data/@m/file-${i}.yaml`, + body: "x".repeat(size), + })); + const indexEntries: Record = {}; + for (const { rel, body } of seeded) { + mock.storage.set(rel, new TextEncoder().encode(body)); + indexEntries[rel] = { + key: rel, + size: body.length, + lastModified: aMtime, + localMtime: aMtime, + }; + } + mock.storage.set(".datastore-index.json", encodeIndex(indexEntries)); + + // Process A: fresh service, fresh cache. Pulls all 5 files. + const serviceA = new S3CacheSyncService(mock, cachePath); + const pulled = await serviceA.pullChanged(); + assertEquals(pulled, SIZES.length, "must pull all 5 seeded files"); + + // Positive: sidecar must be written with the GET'd ETag and clean + // state. Pre-fix, pulled > 0 skipped markSynced and this returned + // null. Post-fix, this is the load-bearing piece that lets the next + // pushChanged take the fast path. + const sidecar = await readSidecar(cachePath); + assertExists(sidecar, "pullChanged with pulled > 0 must write the sidecar"); + assertEquals( + sidecar!.localDirty, + false, + "sidecar must be clean — local cache matches the freshly-pulled remote", + ); + assertEquals( + sidecar!.remoteIndexETag, + "index-pre-pull", + "sidecar must record the ETag from pullIndex's GET response", + ); + + // Positive: on-disk index must reflect the local mtimes pullChanged + // recorded for each downloaded file — NOT the seeded epoch value. + // Pre-fix, the on-disk file was last written by pullIndex from the + // raw remote payload and still carried machine-A's epoch mtimes. + const indexText = await Deno.readTextFile( + join(cachePath, ".datastore-index.json"), + ); + const onDiskIndex = JSON.parse(indexText) as { + entries: Record; + }; + for (const { rel } of seeded) { + const entry = onDiskIndex.entries[rel]; + assertExists(entry, `entry ${rel} must persist on disk`); + assertExists( + entry.localMtime, + `entry ${rel} must carry a localMtime on disk`, + ); + assert( + entry.localMtime !== aMtime, + `entry ${rel} on-disk localMtime must NOT be the seeded machine-A epoch — pre-fix the on-disk index still carried it`, + ); + const parsed = Date.parse(entry.localMtime!); + assert( + !Number.isNaN(parsed), + `entry ${rel} localMtime must be a parseable ISO timestamp`, + ); + } + + // Symptom: fresh service C against the same cache dir simulates the + // cross-process scenario from the issue (`swamp datastore sync + // --push` run as a separate command). Snapshot mock.puts BEFORE so + // any writeback PUT from pullChanged doesn't pollute the count. + const putsBefore = mock.puts.length; + const serviceC = new S3CacheSyncService(mock, cachePath); + const pushed = await serviceC.pushChanged(); + assertEquals( + pushed, + 0, + `pushChanged after a fresh-process pullChanged must not redundantly upload — saw ${pushed} pushes against ${SIZES.length} unchanged files`, + ); + const dataPuts = mock.puts.slice(putsBefore).filter((p) => + p.key !== ".datastore-index.json" + ); + assertEquals( + dataPuts.length, + 0, + `expected zero data PUTs against an unchanged cache, saw ${dataPuts.length}: ${ + dataPuts.map((p) => p.key).join(", ") + }`, + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); diff --git a/datastore/s3/manifest.yaml b/datastore/s3/manifest.yaml index 5cd38f59..985d1ba8 100644 --- a/datastore/s3/manifest.yaml +++ b/datastore/s3/manifest.yaml @@ -1,6 +1,6 @@ manifestVersion: 1 name: "@swamp/s3-datastore" -version: "2026.04.28.4" +version: "2026.05.04.1" description: | Store data in an Amazon S3 bucket with local cache synchronization. Provides distributed locking via S3 conditional writes and bidirectional