Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 99 additions & 7 deletions datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -657,13 +657,17 @@ export class GcsCacheSyncService implements DatastoreSyncService {
generation = response.generation;
} catch (err) {
if (err instanceof NotFoundError) {
this.index = {
version: 1,
lastPulled: new Date().toISOString(),
entries: {},
};
// No remote object yet → no fingerprint to return.
return null;
// Bucket has no index file. Two sub-cases:
// (1) Brand-new empty bucket — fall back to an empty
// in-memory index, return null. Existing behaviour.
// (2) Bucket pre-dates the indexed-sync model and holds data
// under standard prefixes but was never written by a
// swamp version that publishes `.datastore-index.json`.
// Discovery: list, filter via isInternalCacheFile, build an
// index, publish it, and continue. See discoverIndexFromBucket
// for the full inline rationale (matches the @swamp/s3-datastore
// sibling implementation; swamp-club#225 follow-up to #220).
return await this.discoverIndexFromBucket(signal);
}
throw err;
}
Expand All @@ -683,6 +687,94 @@ export class GcsCacheSyncService implements DatastoreSyncService {
return generation ?? null;
}

/**
* Self-healing fallback for `pullIndex` when the remote
* `.datastore-index.json` is absent. Lists the bucket, filters
* internal cache files, builds an index from the listing, publishes
* it, and writes the local copy — leaving the caller's slow-path
* bookkeeping behaving identically to a normal index fetch.
*
* Inline-comment requirements (mirror of the s3-datastore sibling;
* a future reader must not "fix" any of these without understanding
* the trade-off):
*
* i. The PutObject is unconditional. Discovery is functionally
* idempotent across racing peers — entry keys and sizes match,
* but metadata timestamps (`lastPulled`, and the `lastModified`
* fallback when the listing didn't surface an `updated` value)
* are evaluated per peer so the JSON bodies are NOT byte-
* identical. Sync behaviour depends on keys + sizes, not
* timestamps, so last-writer-wins is benign. Do NOT add a
* content-fingerprint optimization here on the assumption of
* byte-equality. Matches the existing pushChanged writeback
* pattern; `ifGenerationMatch=0` is available on GCS but skipped
* to keep symmetry with the s3-datastore sibling.
*
* ii. `this.index` is set in-memory BEFORE the put, mirroring the
* post-fetch order. If put throws, the in-memory state reflects
* an unpublished snapshot, no local file is written
* (atomicWriteTextFile happens after put), and the next
* forceRemote call re-triggers discovery — idempotent with no
* orphaned state.
*
* iii. All callers of pullIndex (both pullChanged and pushChanged)
* inherit this fallback automatically.
*
* GCS list eventual consistency: generally strong since 2020, but a
* freshly-uploaded object may briefly miss from a listing. Discovery
* is convergent — re-running setup picks up any missed objects on
* the next pass.
*/
private async discoverIndexFromBucket(
signal?: AbortSignal,
): Promise<string | null> {
const discoverStart = Date.now();
const listing = await this.gcs.listAllObjects(undefined, signal);
const filtered = listing.filter((entry) => !isInternalCacheFile(entry.key));

if (filtered.length === 0) {
this.index = {
version: 1,
lastPulled: new Date().toISOString(),
entries: {},
};
tracePhase("pullIndex.discover", discoverStart, "n=0");
return null;
}

const entries: Record<string, IndexEntry> = {};
for (const entry of filtered) {
entries[entry.key] = {
key: entry.key,
size: entry.size,
lastModified: (entry.updated ?? new Date()).toISOString(),
};
}
this.index = {
version: 1,
lastPulled: new Date().toISOString(),
entries,
};
const indexJson = JSON.stringify(this.index, null, 2);
const indexData = new TextEncoder().encode(indexJson);
const putResult = await retryWithBackoff(
() => this.gcs.putObject(".datastore-index.json", indexData, signal),
{ signal },
);
await ensureDir(this.cachePath);
await atomicWriteTextFile(this.indexPath, indexJson);
tracePhase(
"pullIndex.discover",
discoverStart,
`n=${filtered.length}`,
);
// The returned `generation` is the raw form from the PUT response
// (a numeric string). Matches the post-fetch path's contract —
// callers compare it byte-for-byte against the sidecar's recorded
// generation. No normalization needed (unlike S3's quoted ETag).
return putResult?.generation ?? null;
}

/** Fetches a single file from GCS to the local cache. */
async pullFile(relativePath: string, signal?: AbortSignal): Promise<void> {
const localPath = assertSafePath(this.cachePath, relativePath);
Expand Down
225 changes: 225 additions & 0 deletions datastore/gcs/extensions/datastores/_lib/gcs_cache_sync_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,27 @@ function createMockGcsClient(): GcsClient & {
generation: genFor(key),
});
},

listAllObjects(
_subPrefix?: string,
signal?: AbortSignal,
): Promise<
Array<{
key: string;
size: number;
generation?: string;
updated?: Date;
}>
> {
throwIfAborted(signal);
return Promise.resolve(
[...storage.entries()].map(([key, body]) => ({
key,
size: body.length,
generation: genFor(key),
})),
);
},
} as unknown as GcsClient & {
storage: Map<string, Uint8Array>;
generationOverrides: Map<string, string>;
Expand Down Expand Up @@ -476,6 +497,210 @@ Deno.test("pullIndex cache-hit path: scrubs in-memory and leaves local file unto
}
});

// -- (i) self-healing discovery for unindexed buckets (swamp-club #225) ---

// Test A — discovery from a populated bucket without an index.
Deno.test("pullIndex: discovers files when remote index is missing (swamp-club #225)", async () => {
const cachePath = await Deno.makeTempDir({ prefix: "gcssync-225-A-" });
try {
const mock = createMockGcsClient();
mock.storage.set(
"data/@org/m/payload-1.yaml",
new TextEncoder().encode("hello\n"),
);
mock.storage.set(
"data/@org/m/payload-2.yaml",
new TextEncoder().encode("world!\n"),
);

const service = new GcsCacheSyncService(mock, cachePath);
const fingerprint = await (service as unknown as {
pullIndex: (
opts: { forceRemote: boolean },
) => Promise<string | null>;
}).pullIndex({ forceRemote: true });

const indexPut = mock.puts.find((p) => p.key === ".datastore-index.json");
assertExists(indexPut, "discovery must publish a synthesized index");

const synthesized = JSON.parse(new TextDecoder().decode(indexPut.body));
assertEquals(
Object.keys(synthesized.entries).sort(),
["data/@org/m/payload-1.yaml", "data/@org/m/payload-2.yaml"],
"synthesized index keys must match the bucket listing exactly",
);
assertEquals(synthesized.entries["data/@org/m/payload-1.yaml"].size, 6);
assertEquals(synthesized.entries["data/@org/m/payload-2.yaml"].size, 7);

assertExists(fingerprint, "discovery must return a fingerprint generation");

const state = privateState(service);
assertExists(state.index);
assertEquals(
Object.keys(state.index.entries).sort(),
["data/@org/m/payload-1.yaml", "data/@org/m/payload-2.yaml"],
);
} finally {
await Deno.remove(cachePath, { recursive: true });
}
});

// Test B — pullChanged hydrates everything after discovery.
Deno.test("pullChanged: hydrates an unindexed bucket via discovery (swamp-club #225)", async () => {
const cachePath = await Deno.makeTempDir({ prefix: "gcssync-225-B-" });
try {
const mock = createMockGcsClient();
mock.storage.set(
"data/@org/m/a.yaml",
new TextEncoder().encode("alpha\n"),
);
mock.storage.set(
"data/@org/m/b.yaml",
new TextEncoder().encode("beta\n"),
);

const service = new GcsCacheSyncService(mock, cachePath);
const pulled = await service.pullChanged();
assertEquals(pulled, 2, "both files must be downloaded");

const aOnDisk = await Deno.readTextFile(
join(cachePath, "data/@org/m/a.yaml"),
);
const bOnDisk = await Deno.readTextFile(
join(cachePath, "data/@org/m/b.yaml"),
);
assertEquals(aOnDisk, "alpha\n");
assertEquals(bOnDisk, "beta\n");
} finally {
await Deno.remove(cachePath, { recursive: true });
}
});

// Test C — discovery filters internal cache files.
Deno.test("pullIndex discovery: skips internal cache files (swamp-club #225)", async () => {
const cachePath = await Deno.makeTempDir({ prefix: "gcssync-225-C-" });
try {
const mock = createMockGcsClient();
mock.storage.set(
"data/@org/m/legit.yaml",
new TextEncoder().encode("ok\n"),
);
mock.storage.set(
".datastore.lock",
new TextEncoder().encode("lock-data"),
);
mock.storage.set(
".push-queue.json",
new TextEncoder().encode("[]"),
);
mock.storage.set(
"data/_catalog.db",
new TextEncoder().encode("SQLITE-MAIN"),
);

const service = new GcsCacheSyncService(mock, cachePath);
await (service as unknown as {
pullIndex: (
opts: { forceRemote: boolean },
) => Promise<string | null>;
}).pullIndex({ forceRemote: true });

const indexPut = mock.puts.find((p) => p.key === ".datastore-index.json");
assertExists(indexPut);
const synthesized = JSON.parse(new TextDecoder().decode(indexPut.body));
assertEquals(
Object.keys(synthesized.entries).sort(),
["data/@org/m/legit.yaml"],
"synthesized index must contain only the legit file",
);
for (
const internal of [
".datastore.lock",
".push-queue.json",
"data/_catalog.db",
]
) {
assertEquals(
synthesized.entries[internal],
undefined,
`internal cache file must not appear in synthesized index: ${internal}`,
);
}
} finally {
await Deno.remove(cachePath, { recursive: true });
}
});

// Test D — empty bucket regression pin: brand-new-bucket fallthrough.
Deno.test("pullIndex discovery: empty bucket falls through to empty index (swamp-club #225 regression pin)", async () => {
const cachePath = await Deno.makeTempDir({ prefix: "gcssync-225-D-" });
try {
const mock = createMockGcsClient();
const service = new GcsCacheSyncService(mock, cachePath);
const fingerprint = await (service as unknown as {
pullIndex: (
opts: { forceRemote: boolean },
) => Promise<string | null>;
}).pullIndex({ forceRemote: true });

assertEquals(
fingerprint,
null,
"empty-bucket fallthrough must return null fingerprint",
);
const indexPut = mock.puts.find((p) => p.key === ".datastore-index.json");
assertEquals(
indexPut,
undefined,
"no PutObject must fire on a genuinely empty bucket",
);
const state = privateState(service);
assertExists(state.index);
assertEquals(
Object.keys(state.index.entries).length,
0,
"in-memory entries must be empty for a brand-new bucket",
);
} finally {
await Deno.remove(cachePath, { recursive: true });
}
});

// Test E — pushChanged side benefit on an unindexed populated bucket.
Deno.test("pushChanged: against unindexed populated bucket builds a complete index (swamp-club #225 side benefit)", async () => {
const cachePath = await Deno.makeTempDir({ prefix: "gcssync-225-E-" });
try {
const mock = createMockGcsClient();
mock.storage.set(
"data/@org/m/remote-only.yaml",
new TextEncoder().encode("remote\n"),
);
await seedFile(cachePath, "data/@org/m/local-only.yaml", "local\n");

const service = new GcsCacheSyncService(mock, cachePath);
const pushed = await service.pushChanged();
assertEquals(pushed, 1, "exactly one local file should push");

const indexPuts = mock.puts.filter((p) =>
p.key === ".datastore-index.json"
);
assertExists(indexPuts.at(-1), "pushChanged must publish an index");
const finalIndex = JSON.parse(
new TextDecoder().decode(indexPuts.at(-1)!.body),
);
assertEquals(
Object.keys(finalIndex.entries).sort(),
[
"data/@org/m/local-only.yaml",
"data/@org/m/remote-only.yaml",
],
"merged index must contain both the remote-discovered and local-pushed entries",
);
} finally {
await Deno.remove(cachePath, { recursive: true });
}
});

// -- (g) push merges remote index instead of clobbering it ----------------

// Regression test for swamp-club#30: without the fix, a client whose
Expand Down
Loading
Loading