Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 36 additions & 11 deletions datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -804,18 +804,43 @@ export class GcsCacheSyncService implements DatastoreSyncService {
throw new Error(formatBatchFailure("pull", failures));
}

// Verified zero-diff: local cache matches the remote index whose
// generation we captured from the `pullIndex` GET response.
// Persist THAT generation — the one we walked against — so the
// next `pullChanged` can take the fast path. We deliberately do
// NOT re-`getMetadata` here: a post-walk metadata call could
// observe a generation from a concurrent writer's push landing
// during our walk, and recording that generation would mask their
// data on the next fast-path sync (swamp-club #168). If
// generation is null (cache-hit pullIndex or NotFound brand-new
// bucket), the sidecar is skipped — next sync self-heals.
if (pulled === 0 && indexGeneration) {
// Local cache matches the remote index whose generation we captured
// from the `pullIndex` GET response — either the walk found zero
// diff (`pulled === 0`) or we just downloaded the missing files
// (`pulled > 0`). Persist THAT generation — the one we walked
// against — so the next `pullChanged` / `pushChanged` can take the
// fast path. We deliberately do NOT re-`getMetadata`: a post-walk
// metadata call could observe a generation from a concurrent
// writer's push landing during our walk, and recording that
// generation would mask their data on the next fast-path sync
// (swamp-club #168). If generation is null (cache-hit pullIndex or
// NotFound brand-new bucket), the sidecar is skipped — next sync
// self-heals on the slow path.
//
// When `pulled > 0`, also rewrite the on-disk index with the
// in-memory state so it carries the localMtime values we just
// recorded for each downloaded file. Pre-fix, the on-disk file was
// last written by `pullIndex` from the raw remote payload (carrying
// the original pusher's local mtimes), so a subsequent fresh-process
// `pushChanged` slow-path walk saw `existing.localMtime` (pusher's)
// ≠ `stat.mtime` (local mtime from `Deno.writeFile`) and pushed
// every file with byte-identical content (swamp-club #222).
//
// Ordering invariant — DO NOT REVERSE: `atomicWriteTextFile` MUST
// run before `markSynced`. `markSynced` derives `lastVerifiedAt`
// from `Deno.stat(this.indexPath).mtime + 1ms`. Reversing the order
// captures `lastVerifiedAt` against the pre-write mtime; the
// subsequent rewrite then bumps the index mtime forward, and the
// next `tryFastPullChanged` probe spuriously bails on
// `indexMtime >= verifiedAt`.
if (indexGeneration) {
try {
if (pulled > 0 && this.index) {
await atomicWriteTextFile(
this.indexPath,
JSON.stringify(this.index, null, 2),
);
}
await this.markSynced(indexGeneration);
} catch {
// Non-fatal: sidecar update is opportunistic. Disk-full /
Expand Down
126 changes: 126 additions & 0 deletions datastore/gcs/extensions/datastores/_lib/gcs_cache_sync_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1874,3 +1874,129 @@ Deno.test("pushChanged writeback: does NOT mark sidecar clean when local is miss
await Deno.remove(cachePath, { recursive: true });
}
});

// swamp-club #222: pushChanged in a fresh process after a fresh-cache
// pullChanged must not redundantly re-upload the just-downloaded files.
// Pre-fix, pullChanged left the on-disk index carrying the original
// pusher's localMtime values (the remote payload), and skipped markSynced
// because pulled > 0. The next process's pushChanged then took the slow
// path (no sidecar), pullIndex(forceRemote) reloaded the pusher's mtimes
// into this.index, and the walk pushed every file — size matched but
// mtime didn't. The fix persists the in-memory index (with the puller's
// stat-derived localMtimes) AND marksSynced so the next pushChanged hits
// the fast path. Two positive assertions pin the persistence machinery
// (sidecar contents + on-disk index mtimes) so a future refactor can't
// silently drop them while still satisfying the symptom assertion.
Deno.test("pullChanged + pushChanged: fresh-process push after pull against populated remote does not redundantly upload (swamp-club #222)", async () => {
const cachePath = await Deno.makeTempDir({ prefix: "gcssync-issue-222-" });
try {
const mock = createMockGcsClient();
// Pin a known generation for the remote index so the sidecar
// assertion has a stable target.
mock.generationOverrides.set(".datastore-index.json", "42");

// Seed 5 files with varied sizes (5, 11, 27, 64, 128 bytes), each
// index entry carrying machine-A's localMtime (epoch — guaranteed
// distinct from any local stat.mtime that Deno.writeFile will set
// on this machine during pullChanged's downloads).
const aMtime = new Date(0).toISOString();
const SIZES = [5, 11, 27, 64, 128];
const seeded = SIZES.map((size, i) => ({
rel: `data/@m/file-${i}.yaml`,
body: "x".repeat(size),
}));
const indexEntries: Record<string, {
key: string;
size: number;
lastModified: string;
localMtime: string;
}> = {};
for (const { rel, body } of seeded) {
mock.storage.set(rel, new TextEncoder().encode(body));
indexEntries[rel] = {
key: rel,
size: body.length,
lastModified: aMtime,
localMtime: aMtime,
};
}
mock.storage.set(".datastore-index.json", encodeIndex(indexEntries));

// Process A: fresh service, fresh cache. Pulls all 5 files.
const serviceA = new GcsCacheSyncService(mock, cachePath);
const pulled = await serviceA.pullChanged();
assertEquals(pulled, SIZES.length, "must pull all 5 seeded files");

// Positive: sidecar must be written with the GET'd generation and
// clean state. Pre-fix, pulled > 0 skipped markSynced and the
// sidecar didn't exist. Post-fix, this is the load-bearing piece
// that lets the next pushChanged take the fast path.
const sidecarText = await Deno.readTextFile(
join(cachePath, ".datastore-sync-state.json"),
);
const sidecar = JSON.parse(sidecarText);
assertEquals(
sidecar.localDirty,
false,
"sidecar must be clean — local cache matches the freshly-pulled remote",
);
assertEquals(
sidecar.remoteIndexGeneration,
"42",
"sidecar must record the generation from pullIndex's GET response",
);

// Positive: on-disk index must reflect the local mtimes pullChanged
// recorded for each downloaded file — NOT the seeded epoch value.
// Pre-fix, the on-disk file was last written by pullIndex from the
// raw remote payload and still carried machine-A's epoch mtimes.
const indexText = await Deno.readTextFile(
join(cachePath, ".datastore-index.json"),
);
const onDiskIndex = JSON.parse(indexText) as {
entries: Record<string, { size: number; localMtime?: string }>;
};
for (const { rel } of seeded) {
const entry = onDiskIndex.entries[rel];
assertExists(entry, `entry ${rel} must persist on disk`);
assertExists(
entry.localMtime,
`entry ${rel} must carry a localMtime on disk`,
);
assert(
entry.localMtime !== aMtime,
`entry ${rel} on-disk localMtime must NOT be the seeded machine-A epoch — pre-fix the on-disk index still carried it`,
);
const parsed = Date.parse(entry.localMtime!);
assert(
!Number.isNaN(parsed),
`entry ${rel} localMtime must be a parseable ISO timestamp`,
);
}

// Symptom: fresh service C against the same cache dir simulates the
// cross-process scenario from the issue (`swamp datastore sync
// --push` run as a separate command). Snapshot mock.puts BEFORE so
// any writeback PUT from pullChanged doesn't pollute the count.
const putsBefore = mock.puts.length;
const serviceC = new GcsCacheSyncService(mock, cachePath);
const pushed = await serviceC.pushChanged();
assertEquals(
pushed,
0,
`pushChanged after a fresh-process pullChanged must not redundantly upload — saw ${pushed} pushes against ${SIZES.length} unchanged files`,
);
const dataPuts = mock.puts.slice(putsBefore).filter((p) =>
p.key !== ".datastore-index.json"
);
assertEquals(
dataPuts.length,
0,
`expected zero data PUTs against an unchanged cache, saw ${dataPuts.length}: ${
dataPuts.map((p) => p.key).join(", ")
}`,
);
} finally {
await Deno.remove(cachePath, { recursive: true });
}
});
2 changes: 1 addition & 1 deletion datastore/gcs/manifest.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
manifestVersion: 1
name: "@swamp/gcs-datastore"
version: "2026.04.28.4"
version: "2026.05.04.1"
description: |
Store data in a Google Cloud Storage bucket with local cache synchronization.
Provides distributed locking via GCS generation-based preconditions and
Expand Down
45 changes: 35 additions & 10 deletions datastore/s3/extensions/datastores/_lib/s3_cache_sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -831,17 +831,42 @@ export class S3CacheSyncService implements DatastoreSyncService {
throw new Error(formatBatchFailure("pull", failures));
}

// Verified zero-diff: local cache matches the remote index whose
// ETag we captured from the `pullIndex` GET response. Persist THAT
// ETag — the one we walked against — so the next `pullChanged` can
// take the fast path. We deliberately do NOT re-HEAD here: a
// post-walk HEAD could observe an ETag from a concurrent writer's
// push landing during our walk, and recording that ETag would mask
// their data on the next fast-path sync (swamp-club #168). If the
// ETag is null (cache-hit pullIndex or NotFound brand-new bucket),
// the sidecar is skipped — next sync self-heals on the slow path.
if (pulled === 0 && indexETag) {
// Local cache matches the remote index whose ETag we captured from
// the `pullIndex` GET response — either the walk found zero diff
// (`pulled === 0`) or we just downloaded the missing files
// (`pulled > 0`). Persist THAT ETag — the one we walked against —
// so the next `pullChanged` / `pushChanged` can take the fast path.
// We deliberately do NOT re-HEAD: a post-walk HEAD could observe an
// ETag from a concurrent writer's push landing during our walk,
// and recording that ETag would mask their data on the next
// fast-path sync (swamp-club #168). If the ETag is null (cache-hit
// pullIndex or NotFound brand-new bucket), the sidecar is skipped —
// next sync self-heals on the slow path.
//
// When `pulled > 0`, also rewrite the on-disk index with the
// in-memory state so it carries the localMtime values we just
// recorded for each downloaded file. Pre-fix, the on-disk file was
// last written by `pullIndex` from the raw remote payload (carrying
// the original pusher's local mtimes), so a subsequent fresh-process
// `pushChanged` slow-path walk saw `existing.localMtime` (pusher's)
// ≠ `stat.mtime` (local mtime from `Deno.writeFile`) and pushed
// every file with byte-identical content (swamp-club #222).
//
// Ordering invariant — DO NOT REVERSE: `atomicWriteTextFile` MUST
// run before `markSynced`. `markSynced` derives `lastVerifiedAt`
// from `Deno.stat(this.indexPath).mtime + 1ms`. Reversing the order
// captures `lastVerifiedAt` against the pre-write mtime; the
// subsequent rewrite then bumps the index mtime forward, and the
// next `tryFastPullChanged` probe spuriously bails on
// `indexMtime >= verifiedAt`.
if (indexETag) {
try {
if (pulled > 0 && this.index) {
await atomicWriteTextFile(
this.indexPath,
JSON.stringify(this.index, null, 2),
);
}
await this.markSynced(indexETag);
} catch {
// Non-fatal: sidecar update is opportunistic. Disk-full /
Expand Down
124 changes: 124 additions & 0 deletions datastore/s3/extensions/datastores/_lib/s3_cache_sync_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2111,3 +2111,127 @@ Deno.test("pushChanged writeback: does NOT mark sidecar clean when local is miss
await Deno.remove(cachePath, { recursive: true });
}
});

// swamp-club #222: pushChanged in a fresh process after a fresh-cache
// pullChanged must not redundantly re-upload the just-downloaded files.
// Pre-fix, pullChanged left the on-disk index carrying the original
// pusher's localMtime values (the remote payload), and skipped markSynced
// because pulled > 0. The next process's pushChanged then took the slow
// path (no sidecar), pullIndex(forceRemote) reloaded the pusher's mtimes
// into this.index, and the walk pushed every file — size matched but
// mtime didn't. The fix persists the in-memory index (with the puller's
// stat-derived localMtimes) AND marksSynced so the next pushChanged hits
// the fast path. Two positive assertions pin the persistence machinery
// (sidecar contents + on-disk index mtimes) so a future refactor can't
// silently drop them while still satisfying the symptom assertion.
Deno.test("pullChanged + pushChanged: fresh-process push after pull against populated remote does not redundantly upload (swamp-club #222)", async () => {
const cachePath = await Deno.makeTempDir({ prefix: "s3sync-issue-222-" });
try {
const mock = createMockS3Client();
// Pin a known ETag for the remote index so the sidecar assertion
// doesn't have to recompute fakeETag from byte stitching.
mock.etagOverrides.set(".datastore-index.json", '"index-pre-pull"');

// Seed 5 files with varied sizes (5, 11, 27, 64, 128 bytes), each
// index entry carrying machine-A's localMtime (epoch — guaranteed
// distinct from any local stat.mtime that Deno.writeFile will set
// on this machine during pullChanged's downloads).
const aMtime = new Date(0).toISOString();
const SIZES = [5, 11, 27, 64, 128];
const seeded = SIZES.map((size, i) => ({
rel: `data/@m/file-${i}.yaml`,
body: "x".repeat(size),
}));
const indexEntries: Record<string, {
key: string;
size: number;
lastModified: string;
localMtime: string;
}> = {};
for (const { rel, body } of seeded) {
mock.storage.set(rel, new TextEncoder().encode(body));
indexEntries[rel] = {
key: rel,
size: body.length,
lastModified: aMtime,
localMtime: aMtime,
};
}
mock.storage.set(".datastore-index.json", encodeIndex(indexEntries));

// Process A: fresh service, fresh cache. Pulls all 5 files.
const serviceA = new S3CacheSyncService(mock, cachePath);
const pulled = await serviceA.pullChanged();
assertEquals(pulled, SIZES.length, "must pull all 5 seeded files");

// Positive: sidecar must be written with the GET'd ETag and clean
// state. Pre-fix, pulled > 0 skipped markSynced and this returned
// null. Post-fix, this is the load-bearing piece that lets the next
// pushChanged take the fast path.
const sidecar = await readSidecar(cachePath);
assertExists(sidecar, "pullChanged with pulled > 0 must write the sidecar");
assertEquals(
sidecar!.localDirty,
false,
"sidecar must be clean — local cache matches the freshly-pulled remote",
);
assertEquals(
sidecar!.remoteIndexETag,
"index-pre-pull",
"sidecar must record the ETag from pullIndex's GET response",
);

// Positive: on-disk index must reflect the local mtimes pullChanged
// recorded for each downloaded file — NOT the seeded epoch value.
// Pre-fix, the on-disk file was last written by pullIndex from the
// raw remote payload and still carried machine-A's epoch mtimes.
const indexText = await Deno.readTextFile(
join(cachePath, ".datastore-index.json"),
);
const onDiskIndex = JSON.parse(indexText) as {
entries: Record<string, { size: number; localMtime?: string }>;
};
for (const { rel } of seeded) {
const entry = onDiskIndex.entries[rel];
assertExists(entry, `entry ${rel} must persist on disk`);
assertExists(
entry.localMtime,
`entry ${rel} must carry a localMtime on disk`,
);
assert(
entry.localMtime !== aMtime,
`entry ${rel} on-disk localMtime must NOT be the seeded machine-A epoch — pre-fix the on-disk index still carried it`,
);
const parsed = Date.parse(entry.localMtime!);
assert(
!Number.isNaN(parsed),
`entry ${rel} localMtime must be a parseable ISO timestamp`,
);
}

// Symptom: fresh service C against the same cache dir simulates the
// cross-process scenario from the issue (`swamp datastore sync
// --push` run as a separate command). Snapshot mock.puts BEFORE so
// any writeback PUT from pullChanged doesn't pollute the count.
const putsBefore = mock.puts.length;
const serviceC = new S3CacheSyncService(mock, cachePath);
const pushed = await serviceC.pushChanged();
assertEquals(
pushed,
0,
`pushChanged after a fresh-process pullChanged must not redundantly upload — saw ${pushed} pushes against ${SIZES.length} unchanged files`,
);
const dataPuts = mock.puts.slice(putsBefore).filter((p) =>
p.key !== ".datastore-index.json"
);
assertEquals(
dataPuts.length,
0,
`expected zero data PUTs against an unchanged cache, saw ${dataPuts.length}: ${
dataPuts.map((p) => p.key).join(", ")
}`,
);
} finally {
await Deno.remove(cachePath, { recursive: true });
}
});
2 changes: 1 addition & 1 deletion datastore/s3/manifest.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
manifestVersion: 1
name: "@swamp/s3-datastore"
version: "2026.04.28.4"
version: "2026.05.04.1"
description: |
Store data in an Amazon S3 bucket with local cache synchronization.
Provides distributed locking via S3 conditional writes and bidirectional
Expand Down
Loading