diff --git a/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts b/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts index 966c729a..264ac743 100644 --- a/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts +++ b/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync.ts @@ -657,13 +657,17 @@ export class GcsCacheSyncService implements DatastoreSyncService { generation = response.generation; } catch (err) { if (err instanceof NotFoundError) { - this.index = { - version: 1, - lastPulled: new Date().toISOString(), - entries: {}, - }; - // No remote object yet → no fingerprint to return. - return null; + // Bucket has no index file. Two sub-cases: + // (1) Brand-new empty bucket — fall back to an empty + // in-memory index, return null. Existing behaviour. + // (2) Bucket pre-dates the indexed-sync model and holds data + // under standard prefixes but was never written by a + // swamp version that publishes `.datastore-index.json`. + // Discovery: list, filter via isInternalCacheFile, build an + // index, publish it, and continue. See discoverIndexFromBucket + // for the full inline rationale (matches the @swamp/s3-datastore + // sibling implementation; swamp-club#225 follow-up to #220). + return await this.discoverIndexFromBucket(signal); } throw err; } @@ -683,6 +687,94 @@ export class GcsCacheSyncService implements DatastoreSyncService { return generation ?? null; } + /** + * Self-healing fallback for `pullIndex` when the remote + * `.datastore-index.json` is absent. Lists the bucket, filters + * internal cache files, builds an index from the listing, publishes + * it, and writes the local copy — leaving the caller's slow-path + * bookkeeping behaving identically to a normal index fetch. + * + * Inline-comment requirements (mirror of the s3-datastore sibling; + * a future reader must not "fix" any of these without understanding + * the trade-off): + * + * i. The PutObject is unconditional. Discovery is functionally + * idempotent across racing peers — entry keys and sizes match, + * but metadata timestamps (`lastPulled`, and the `lastModified` + * fallback when the listing didn't surface an `updated` value) + * are evaluated per peer so the JSON bodies are NOT byte- + * identical. Sync behaviour depends on keys + sizes, not + * timestamps, so last-writer-wins is benign. Do NOT add a + * content-fingerprint optimization here on the assumption of + * byte-equality. Matches the existing pushChanged writeback + * pattern; `ifGenerationMatch=0` is available on GCS but skipped + * to keep symmetry with the s3-datastore sibling. + * + * ii. `this.index` is set in-memory BEFORE the put, mirroring the + * post-fetch order. If put throws, the in-memory state reflects + * an unpublished snapshot, no local file is written + * (atomicWriteTextFile happens after put), and the next + * forceRemote call re-triggers discovery — idempotent with no + * orphaned state. + * + * iii. All callers of pullIndex (both pullChanged and pushChanged) + * inherit this fallback automatically. + * + * GCS list eventual consistency: generally strong since 2020, but a + * freshly-uploaded object may briefly miss from a listing. Discovery + * is convergent — re-running setup picks up any missed objects on + * the next pass. + */ + private async discoverIndexFromBucket( + signal?: AbortSignal, + ): Promise { + const discoverStart = Date.now(); + const listing = await this.gcs.listAllObjects(undefined, signal); + const filtered = listing.filter((entry) => !isInternalCacheFile(entry.key)); + + if (filtered.length === 0) { + this.index = { + version: 1, + lastPulled: new Date().toISOString(), + entries: {}, + }; + tracePhase("pullIndex.discover", discoverStart, "n=0"); + return null; + } + + const entries: Record = {}; + for (const entry of filtered) { + entries[entry.key] = { + key: entry.key, + size: entry.size, + lastModified: (entry.updated ?? new Date()).toISOString(), + }; + } + this.index = { + version: 1, + lastPulled: new Date().toISOString(), + entries, + }; + const indexJson = JSON.stringify(this.index, null, 2); + const indexData = new TextEncoder().encode(indexJson); + const putResult = await retryWithBackoff( + () => this.gcs.putObject(".datastore-index.json", indexData, signal), + { signal }, + ); + await ensureDir(this.cachePath); + await atomicWriteTextFile(this.indexPath, indexJson); + tracePhase( + "pullIndex.discover", + discoverStart, + `n=${filtered.length}`, + ); + // The returned `generation` is the raw form from the PUT response + // (a numeric string). Matches the post-fetch path's contract — + // callers compare it byte-for-byte against the sidecar's recorded + // generation. No normalization needed (unlike S3's quoted ETag). + return putResult?.generation ?? null; + } + /** Fetches a single file from GCS to the local cache. */ async pullFile(relativePath: string, signal?: AbortSignal): Promise { const localPath = assertSafePath(this.cachePath, relativePath); diff --git a/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync_test.ts b/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync_test.ts index 10fe0e2d..3052b332 100644 --- a/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync_test.ts +++ b/datastore/gcs/extensions/datastores/_lib/gcs_cache_sync_test.ts @@ -141,6 +141,27 @@ function createMockGcsClient(): GcsClient & { generation: genFor(key), }); }, + + listAllObjects( + _subPrefix?: string, + signal?: AbortSignal, + ): Promise< + Array<{ + key: string; + size: number; + generation?: string; + updated?: Date; + }> + > { + throwIfAborted(signal); + return Promise.resolve( + [...storage.entries()].map(([key, body]) => ({ + key, + size: body.length, + generation: genFor(key), + })), + ); + }, } as unknown as GcsClient & { storage: Map; generationOverrides: Map; @@ -476,6 +497,210 @@ Deno.test("pullIndex cache-hit path: scrubs in-memory and leaves local file unto } }); +// -- (i) self-healing discovery for unindexed buckets (swamp-club #225) --- + +// Test A — discovery from a populated bucket without an index. +Deno.test("pullIndex: discovers files when remote index is missing (swamp-club #225)", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "gcssync-225-A-" }); + try { + const mock = createMockGcsClient(); + mock.storage.set( + "data/@org/m/payload-1.yaml", + new TextEncoder().encode("hello\n"), + ); + mock.storage.set( + "data/@org/m/payload-2.yaml", + new TextEncoder().encode("world!\n"), + ); + + const service = new GcsCacheSyncService(mock, cachePath); + const fingerprint = await (service as unknown as { + pullIndex: ( + opts: { forceRemote: boolean }, + ) => Promise; + }).pullIndex({ forceRemote: true }); + + const indexPut = mock.puts.find((p) => p.key === ".datastore-index.json"); + assertExists(indexPut, "discovery must publish a synthesized index"); + + const synthesized = JSON.parse(new TextDecoder().decode(indexPut.body)); + assertEquals( + Object.keys(synthesized.entries).sort(), + ["data/@org/m/payload-1.yaml", "data/@org/m/payload-2.yaml"], + "synthesized index keys must match the bucket listing exactly", + ); + assertEquals(synthesized.entries["data/@org/m/payload-1.yaml"].size, 6); + assertEquals(synthesized.entries["data/@org/m/payload-2.yaml"].size, 7); + + assertExists(fingerprint, "discovery must return a fingerprint generation"); + + const state = privateState(service); + assertExists(state.index); + assertEquals( + Object.keys(state.index.entries).sort(), + ["data/@org/m/payload-1.yaml", "data/@org/m/payload-2.yaml"], + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// Test B — pullChanged hydrates everything after discovery. +Deno.test("pullChanged: hydrates an unindexed bucket via discovery (swamp-club #225)", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "gcssync-225-B-" }); + try { + const mock = createMockGcsClient(); + mock.storage.set( + "data/@org/m/a.yaml", + new TextEncoder().encode("alpha\n"), + ); + mock.storage.set( + "data/@org/m/b.yaml", + new TextEncoder().encode("beta\n"), + ); + + const service = new GcsCacheSyncService(mock, cachePath); + const pulled = await service.pullChanged(); + assertEquals(pulled, 2, "both files must be downloaded"); + + const aOnDisk = await Deno.readTextFile( + join(cachePath, "data/@org/m/a.yaml"), + ); + const bOnDisk = await Deno.readTextFile( + join(cachePath, "data/@org/m/b.yaml"), + ); + assertEquals(aOnDisk, "alpha\n"); + assertEquals(bOnDisk, "beta\n"); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// Test C — discovery filters internal cache files. +Deno.test("pullIndex discovery: skips internal cache files (swamp-club #225)", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "gcssync-225-C-" }); + try { + const mock = createMockGcsClient(); + mock.storage.set( + "data/@org/m/legit.yaml", + new TextEncoder().encode("ok\n"), + ); + mock.storage.set( + ".datastore.lock", + new TextEncoder().encode("lock-data"), + ); + mock.storage.set( + ".push-queue.json", + new TextEncoder().encode("[]"), + ); + mock.storage.set( + "data/_catalog.db", + new TextEncoder().encode("SQLITE-MAIN"), + ); + + const service = new GcsCacheSyncService(mock, cachePath); + await (service as unknown as { + pullIndex: ( + opts: { forceRemote: boolean }, + ) => Promise; + }).pullIndex({ forceRemote: true }); + + const indexPut = mock.puts.find((p) => p.key === ".datastore-index.json"); + assertExists(indexPut); + const synthesized = JSON.parse(new TextDecoder().decode(indexPut.body)); + assertEquals( + Object.keys(synthesized.entries).sort(), + ["data/@org/m/legit.yaml"], + "synthesized index must contain only the legit file", + ); + for ( + const internal of [ + ".datastore.lock", + ".push-queue.json", + "data/_catalog.db", + ] + ) { + assertEquals( + synthesized.entries[internal], + undefined, + `internal cache file must not appear in synthesized index: ${internal}`, + ); + } + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// Test D — empty bucket regression pin: brand-new-bucket fallthrough. +Deno.test("pullIndex discovery: empty bucket falls through to empty index (swamp-club #225 regression pin)", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "gcssync-225-D-" }); + try { + const mock = createMockGcsClient(); + const service = new GcsCacheSyncService(mock, cachePath); + const fingerprint = await (service as unknown as { + pullIndex: ( + opts: { forceRemote: boolean }, + ) => Promise; + }).pullIndex({ forceRemote: true }); + + assertEquals( + fingerprint, + null, + "empty-bucket fallthrough must return null fingerprint", + ); + const indexPut = mock.puts.find((p) => p.key === ".datastore-index.json"); + assertEquals( + indexPut, + undefined, + "no PutObject must fire on a genuinely empty bucket", + ); + const state = privateState(service); + assertExists(state.index); + assertEquals( + Object.keys(state.index.entries).length, + 0, + "in-memory entries must be empty for a brand-new bucket", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// Test E — pushChanged side benefit on an unindexed populated bucket. +Deno.test("pushChanged: against unindexed populated bucket builds a complete index (swamp-club #225 side benefit)", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "gcssync-225-E-" }); + try { + const mock = createMockGcsClient(); + mock.storage.set( + "data/@org/m/remote-only.yaml", + new TextEncoder().encode("remote\n"), + ); + await seedFile(cachePath, "data/@org/m/local-only.yaml", "local\n"); + + const service = new GcsCacheSyncService(mock, cachePath); + const pushed = await service.pushChanged(); + assertEquals(pushed, 1, "exactly one local file should push"); + + const indexPuts = mock.puts.filter((p) => + p.key === ".datastore-index.json" + ); + assertExists(indexPuts.at(-1), "pushChanged must publish an index"); + const finalIndex = JSON.parse( + new TextDecoder().decode(indexPuts.at(-1)!.body), + ); + assertEquals( + Object.keys(finalIndex.entries).sort(), + [ + "data/@org/m/local-only.yaml", + "data/@org/m/remote-only.yaml", + ], + "merged index must contain both the remote-discovered and local-pushed entries", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + // -- (g) push merges remote index instead of clobbering it ---------------- // Regression test for swamp-club#30: without the fix, a client whose diff --git a/datastore/gcs/extensions/datastores/_lib/gcs_client.ts b/datastore/gcs/extensions/datastores/_lib/gcs_client.ts index 7aad2bdd..fd83a1a6 100644 --- a/datastore/gcs/extensions/datastores/_lib/gcs_client.ts +++ b/datastore/gcs/extensions/datastores/_lib/gcs_client.ts @@ -62,8 +62,15 @@ export interface GcsObjectMetadata { generation?: string; } +export interface GcsListEntry { + key: string; + size: number; + generation?: string; + updated?: Date; +} + export interface GcsListResult { - keys: string[]; + entries: GcsListEntry[]; truncated: boolean; pageToken?: string; } @@ -995,12 +1002,26 @@ export class GcsClient { const data = await resp.json(); const prefixLen = this.prefix ? this.prefix.length + 1 : 0; - const keys = (data.items ?? []) - .map((obj: { name: string }) => obj.name) - .map((name: string) => name.slice(prefixLen)); + const entries: GcsListEntry[] = (data.items ?? []).map( + (obj: { + name: string; + size?: string; + generation?: string; + updated?: string; + }) => ({ + key: obj.name.slice(prefixLen), + // GCS JSON API returns `size` as a numeric string. Number("…") + // handles undefined and empty-string cleanly via `|| 0`; the + // S3 sibling gets a native number from the SDK and avoids + // parsing altogether. + size: Number(obj.size) || 0, + generation: obj.generation, + updated: obj.updated ? new Date(obj.updated) : undefined, + }), + ); return { - keys, + entries, truncated: !!data.nextPageToken, pageToken: data.nextPageToken, }; @@ -1010,16 +1031,16 @@ export class GcsClient { async listAllObjects( subPrefix?: string, signal?: AbortSignal, - ): Promise { - const allKeys: string[] = []; + ): Promise { + const all: GcsListEntry[] = []; let pageToken: string | undefined; do { const result = await this.listObjects(subPrefix, pageToken, signal); - allKeys.push(...result.keys); + all.push(...result.entries); pageToken = result.truncated ? result.pageToken : undefined; } while (pageToken); - return allKeys; + return all; } } diff --git a/datastore/gcs/manifest.yaml b/datastore/gcs/manifest.yaml index fc651409..254fd619 100644 --- a/datastore/gcs/manifest.yaml +++ b/datastore/gcs/manifest.yaml @@ -1,32 +1,32 @@ manifestVersion: 1 name: "@swamp/gcs-datastore" -version: "2026.05.04.3" +version: "2026.05.04.4" description: | Store data in a Google Cloud Storage bucket with local cache synchronization. Provides distributed locking via GCS generation-based preconditions and bidirectional sync between a local cache directory and GCS. - + ## Authentication - + Uses Google Cloud Application Default Credentials (ADC) — no credentials in config. Provide credentials via one of: - Environment variable: `GOOGLE_APPLICATION_CREDENTIALS` pointing to a service account key JSON file - User credentials: `gcloud auth application-default login` - Attached service account on GCE, Cloud Run, or GKE - + ## Required IAM Permissions - + - `storage.buckets.get` - `storage.objects.create` - `storage.objects.get` - `storage.objects.delete` - `storage.objects.list` - + The predefined role `roles/storage.objectAdmin` covers all of these. - + ## Usage - + ```bash swamp datastore setup @swamp/gcs-datastore \ --config '{"bucket": "my-bucket", "prefix": "swamp"}' --json diff --git a/datastore/s3/extensions/datastores/_lib/s3_cache_sync.ts b/datastore/s3/extensions/datastores/_lib/s3_cache_sync.ts index ca94945f..eee167e3 100644 --- a/datastore/s3/extensions/datastores/_lib/s3_cache_sync.ts +++ b/datastore/s3/extensions/datastores/_lib/s3_cache_sync.ts @@ -677,13 +677,22 @@ export class S3CacheSyncService implements DatastoreSyncService { err instanceof Error && "name" in err && (err.name === "NotFound" || err.name === "NoSuchKey") ) { - this.index = { - version: 1, - lastPulled: new Date().toISOString(), - entries: {}, - }; - // No remote object yet → no fingerprint to return. - return null; + // Bucket has no index file. Two sub-cases: + // (1) Brand-new empty bucket — fall back to an empty + // in-memory index, return null. Existing behaviour. + // (2) Bucket pre-dates the indexed-sync model and holds data + // under standard prefixes (data/, workflow-runs/, …) but + // was never written by a swamp version that publishes + // `.datastore-index.json`. Without discovery, hydrate + // reports `Hydrated: 0 pulled` and silently leaves the + // cache empty (swamp-club#225, residual from #220). + // Discovery: list the bucket, filter via isInternalCacheFile, + // build an index from the listing, publish it, and continue. + // Functionally idempotent across racing peers (entry keys and + // sizes match; only metadata timestamps like `lastPulled` and + // the `lastModified` fallback differ), so the unconditional + // PutObject is benign — see inline notes i, ii, iii below. + return await this.discoverIndexFromBucket(signal); } throw err; } @@ -707,6 +716,106 @@ export class S3CacheSyncService implements DatastoreSyncService { return etag ?? null; } + /** + * Self-healing fallback for `pullIndex` when the remote + * `.datastore-index.json` is absent. Lists the bucket, filters + * internal cache files, builds an index from the listing, publishes + * it, and writes the local copy — leaving the caller's slow-path + * bookkeeping behaving identically to a normal index fetch. + * + * Inline-comment requirements (a future reader must not "fix" any of + * these without understanding the trade-off): + * + * i. The PutObject is unconditional. Discovery is functionally + * idempotent across racing peers — entry keys and sizes match + * (same listing in, same fields populated out), but metadata + * timestamps (`lastPulled`, and the `lastModified` fallback + * when the SDK didn't surface one) are evaluated per peer so + * the JSON bodies are NOT byte-identical. That's still safe: + * sync behaviour depends on keys + sizes, not timestamps. Do + * NOT add a content-fingerprint optimization here on the + * assumption of byte-equality. Matches the existing + * pushChanged writeback (see line ~1029); `If-None-Match: *` + * is not portable across all S3-compatible backends this + * extension supports (older MinIO/Spaces/R2 implement it + * inconsistently). + * + * ii. `this.index` is set in-memory BEFORE the PutObject attempt, + * mirroring the existing post-fetch path's order. If PutObject + * throws, this.index reflects an unpublished state but no local + * file is written (atomicWriteTextFile happens after put), and + * the next forceRemote call re-triggers discovery — idempotent + * with no orphaned state. Do not reorder this — flipping it + * would diverge from the post-fetch path and complicate the + * mental model. + * + * iii. All callers of pullIndex (both pullChanged and pushChanged) + * inherit this fallback automatically — no separate change to + * pushChanged is needed. Side benefit: previously, pushChanged + * against an unindexed populated bucket would write an index + * reflecting only LOCAL files, dropping the existing remote + * entries from the index even though the storage still held + * them. With discovery here, push first builds a complete + * merged view, then walks local against it. + */ + private async discoverIndexFromBucket( + signal?: AbortSignal, + ): Promise { + const discoverStart = Date.now(); + const listing = await this.s3.listAllObjects(undefined, signal); + const filtered = listing.filter((entry) => !isInternalCacheFile(entry.key)); + + // Sub-case (1): genuinely empty bucket. Preserve existing + // brand-new-bucket semantics — empty in-memory index, no put, + // null fingerprint. + if (filtered.length === 0) { + this.index = { + version: 1, + lastPulled: new Date().toISOString(), + entries: {}, + }; + tracePhase("pullIndex.discover", discoverStart, "n=0"); + return null; + } + + // Sub-case (2): bucket holds data but no index. Build entries from + // the listing — no localMtime since nothing has been pulled yet; + // pullChanged will reconcile mtimes as it downloads each file. + const entries: Record = {}; + for (const entry of filtered) { + entries[entry.key] = { + key: entry.key, + size: entry.size, + lastModified: (entry.lastModified ?? new Date()).toISOString(), + }; + } + this.index = { + version: 1, + lastPulled: new Date().toISOString(), + entries, + }; + const indexJson = JSON.stringify(this.index, null, 2); + const indexData = new TextEncoder().encode(indexJson); + const putResult = await retryWithBackoff( + () => this.s3.putObject(".datastore-index.json", indexData, signal), + { signal }, + ); + await ensureDir(this.cachePath); + await atomicWriteTextFile(this.indexPath, indexJson); + tracePhase( + "pullIndex.discover", + discoverStart, + `n=${filtered.length}`, + ); + // The returned ETag is the raw form from the PUT response, with + // S3's surrounding double-quotes intact (e.g. `"abc123"`). This + // matches the post-fetch path's contract — `normalizeETag()` is + // what callers apply for byte-level comparison against sidecar + // values. Don't strip them here; doing so would diverge from the + // existing fingerprint convention. + return putResult?.etag ?? null; + } + /** Fetches a single file from S3 to the local cache. */ async pullFile( relativePath: string, diff --git a/datastore/s3/extensions/datastores/_lib/s3_cache_sync_test.ts b/datastore/s3/extensions/datastores/_lib/s3_cache_sync_test.ts index 650cfebb..56222099 100644 --- a/datastore/s3/extensions/datastores/_lib/s3_cache_sync_test.ts +++ b/datastore/s3/extensions/datastores/_lib/s3_cache_sync_test.ts @@ -65,9 +65,10 @@ function fakeETag(body: Uint8Array): string { } /** - * In-memory mock of S3Client recording getObject/putObject/headObject - * calls. `etagOverrides` lets a test pin a specific ETag (for example - * a multipart-shaped one) regardless of stored content. + * In-memory mock of S3Client recording getObject/putObject/headObject/ + * listAllObjects calls. `etagOverrides` lets a test pin a specific + * ETag (for example a multipart-shaped one) regardless of stored + * content. */ function createMockS3Client(): S3Client & { storage: Map; @@ -75,12 +76,14 @@ function createMockS3Client(): S3Client & { puts: PutCall[]; gets: string[]; heads: string[]; + lists: number; } { const storage = new Map(); const etagOverrides = new Map(); const puts: PutCall[] = []; const gets: string[] = []; const heads: string[] = []; + const counters = { lists: 0 }; const etagFor = (key: string, body: Uint8Array): string => etagOverrides.get(key) ?? fakeETag(body); @@ -91,6 +94,9 @@ function createMockS3Client(): S3Client & { puts, gets, heads, + get lists() { + return counters.lists; + }, putObject(key: string, body: Uint8Array): Promise<{ etag: string }> { storage.set(key, body); @@ -121,12 +127,31 @@ function createMockS3Client(): S3Client & { etag: etagFor(key, data), }); }, + + listAllObjects(): Promise< + Array<{ + key: string; + size: number; + etag?: string; + lastModified?: Date; + }> + > { + counters.lists++; + return Promise.resolve( + [...storage.entries()].map(([key, body]) => ({ + key, + size: body.length, + etag: etagFor(key, body), + })), + ); + }, } as unknown as S3Client & { storage: Map; etagOverrides: Map; puts: PutCall[]; gets: string[]; heads: string[]; + lists: number; }; } @@ -487,6 +512,232 @@ Deno.test("pullIndex cache-hit path: scrubs in-memory and leaves local file unto } }); +// -- (h) self-healing discovery for unindexed buckets (swamp-club #225) --- + +// Test A — discovery from a populated bucket without an index. +// pullIndex's NotFound branch must list, build, and publish a synthesized +// index whose entries match the listing exactly (no extras, no +// omissions, sizes match). +Deno.test("pullIndex: discovers files when remote index is missing (swamp-club #225)", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "s3sync-225-A-" }); + try { + const mock = createMockS3Client(); + // Bucket has data files but no .datastore-index.json. + mock.storage.set( + "data/@org/m/payload-1.yaml", + new TextEncoder().encode("hello\n"), + ); + mock.storage.set( + "data/@org/m/payload-2.yaml", + new TextEncoder().encode("world!\n"), + ); + + const service = new S3CacheSyncService(mock, cachePath); + const fingerprint = await (service as unknown as { + pullIndex: ( + opts: { forceRemote: boolean }, + ) => Promise; + }).pullIndex({ forceRemote: true }); + + // 1. discovery wrote the index back + const indexPut = mock.puts.find((p) => p.key === ".datastore-index.json"); + assertExists(indexPut, "discovery must publish a synthesized index"); + + // 2. synthesized index entries match the listing EXACTLY + const synthesized = JSON.parse(new TextDecoder().decode(indexPut.body)); + assertEquals( + Object.keys(synthesized.entries).sort(), + ["data/@org/m/payload-1.yaml", "data/@org/m/payload-2.yaml"], + "synthesized index keys must match the bucket listing exactly", + ); + assertEquals(synthesized.entries["data/@org/m/payload-1.yaml"].size, 6); + assertEquals(synthesized.entries["data/@org/m/payload-2.yaml"].size, 7); + + // 3. fingerprint returned (the put response ETag) so the caller's + // slow-path bookkeeping behaves like a normal index fetch + assertExists(fingerprint, "discovery must return a fingerprint ETag"); + + // 4. in-memory index reflects discovery + const state = privateState(service); + assertExists(state.index); + assertEquals( + Object.keys(state.index.entries).sort(), + ["data/@org/m/payload-1.yaml", "data/@org/m/payload-2.yaml"], + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// Test B — pullChanged hydrates everything after discovery. +Deno.test("pullChanged: hydrates an unindexed bucket via discovery (swamp-club #225)", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "s3sync-225-B-" }); + try { + const mock = createMockS3Client(); + mock.storage.set( + "data/@org/m/a.yaml", + new TextEncoder().encode("alpha\n"), + ); + mock.storage.set( + "data/@org/m/b.yaml", + new TextEncoder().encode("beta\n"), + ); + + const service = new S3CacheSyncService(mock, cachePath); + const pulled = await service.pullChanged(); + assertEquals(pulled, 2, "both files must be downloaded"); + + const aOnDisk = await Deno.readTextFile( + join(cachePath, "data/@org/m/a.yaml"), + ); + const bOnDisk = await Deno.readTextFile( + join(cachePath, "data/@org/m/b.yaml"), + ); + assertEquals(aOnDisk, "alpha\n"); + assertEquals(bOnDisk, "beta\n"); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// Test C — discovery filters internal cache files. +Deno.test("pullIndex discovery: skips internal cache files (swamp-club #225)", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "s3sync-225-C-" }); + try { + const mock = createMockS3Client(); + // One legitimate data file, plus three classes of internal files + // that must NEVER appear in a synthesized index: lock, push-queue, + // and a SQLite catalog file. + mock.storage.set( + "data/@org/m/legit.yaml", + new TextEncoder().encode("ok\n"), + ); + mock.storage.set( + ".datastore.lock", + new TextEncoder().encode("lock-data"), + ); + mock.storage.set( + ".push-queue.json", + new TextEncoder().encode("[]"), + ); + mock.storage.set( + "data/_catalog.db", + new TextEncoder().encode("SQLITE-MAIN"), + ); + + const service = new S3CacheSyncService(mock, cachePath); + await (service as unknown as { + pullIndex: ( + opts: { forceRemote: boolean }, + ) => Promise; + }).pullIndex({ forceRemote: true }); + + const indexPut = mock.puts.find((p) => p.key === ".datastore-index.json"); + assertExists(indexPut); + const synthesized = JSON.parse(new TextDecoder().decode(indexPut.body)); + assertEquals( + Object.keys(synthesized.entries).sort(), + ["data/@org/m/legit.yaml"], + "synthesized index must contain only the legit file", + ); + for ( + const internal of [ + ".datastore.lock", + ".push-queue.json", + "data/_catalog.db", + ] + ) { + assertEquals( + synthesized.entries[internal], + undefined, + `internal cache file must not appear in synthesized index: ${internal}`, + ); + } + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// Test D — empty bucket regression pin: brand-new-bucket fallthrough is +// preserved. No PutObject must fire; in-memory entries must be {}. +Deno.test("pullIndex discovery: empty bucket falls through to empty index (swamp-club #225 regression pin)", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "s3sync-225-D-" }); + try { + const mock = createMockS3Client(); + // Storage is intentionally empty. + const service = new S3CacheSyncService(mock, cachePath); + const fingerprint = await (service as unknown as { + pullIndex: ( + opts: { forceRemote: boolean }, + ) => Promise; + }).pullIndex({ forceRemote: true }); + + assertEquals( + fingerprint, + null, + "empty-bucket fallthrough must return null fingerprint", + ); + const indexPut = mock.puts.find((p) => p.key === ".datastore-index.json"); + assertEquals( + indexPut, + undefined, + "no PutObject must fire on a genuinely empty bucket", + ); + const state = privateState(service); + assertExists(state.index); + assertEquals( + Object.keys(state.index.entries).length, + 0, + "in-memory entries must be empty for a brand-new bucket", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + +// Test E — pushChanged side benefit. Previously, push against an +// unindexed populated bucket would write an index reflecting only LOCAL +// files, dropping existing remote entries from the index even though +// the storage still held them. With discovery in pullIndex, push first +// builds a complete merged view. +Deno.test("pushChanged: against unindexed populated bucket builds a complete index (swamp-club #225 side benefit)", async () => { + const cachePath = await Deno.makeTempDir({ prefix: "s3sync-225-E-" }); + try { + const mock = createMockS3Client(); + // Remote has a pre-existing data file but no index. + mock.storage.set( + "data/@org/m/remote-only.yaml", + new TextEncoder().encode("remote\n"), + ); + // Local cache has a different file ready to push. + await seedFile(cachePath, "data/@org/m/local-only.yaml", "local\n"); + + const service = new S3CacheSyncService(mock, cachePath); + const pushed = await service.pushChanged(); + assertEquals(pushed, 1, "exactly one local file should push"); + + // Final remote index must contain BOTH the remote-only and the + // local-only entry — the pre-existing remote file is NOT lost. + const indexPuts = mock.puts.filter((p) => + p.key === ".datastore-index.json" + ); + assertExists(indexPuts.at(-1), "pushChanged must publish an index"); + const finalIndex = JSON.parse( + new TextDecoder().decode(indexPuts.at(-1)!.body), + ); + assertEquals( + Object.keys(finalIndex.entries).sort(), + [ + "data/@org/m/local-only.yaml", + "data/@org/m/remote-only.yaml", + ], + "merged index must contain both the remote-discovered and local-pushed entries", + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } +}); + // -- (g) push merges remote index instead of clobbering it ---------------- // Regression test for swamp-club#30: without the fix, a client whose @@ -768,6 +1019,9 @@ Deno.test("pushChanged: batch failure message includes underlying error details" try { // Mock whose putObject rejects with 403 (non-retryable, so fails fast) // for every file push. Index fetch returns empty so writeback is unused. + // Empty listAllObjects keeps the swamp-club#225 discovery fallback on + // its empty-bucket branch — no synthesized index, behaves like the + // original brand-new-bucket case. const mock = { getObject( key: string, @@ -782,6 +1036,16 @@ Deno.test("pushChanged: batch failure message includes underlying error details" putObject(_key: string, _body: Uint8Array): Promise { return Promise.reject(opError(403)); }, + listAllObjects(): Promise< + Array<{ + key: string; + size: number; + etag?: string; + lastModified?: Date; + }> + > { + return Promise.resolve([]); + }, } as unknown as S3Client; await seedFile(cachePath, "data/a.yaml", "1\n"); @@ -2235,3 +2499,87 @@ Deno.test("pullChanged + pushChanged: fresh-process push after pull against popu await Deno.remove(cachePath, { recursive: true }); } }); + +// swamp-club #225: prove the AWS SDK actually surfaces a missing-index +// 404 with the error name our pullIndex catch branch matches against. +// The unit tests above use a hand-built mock that throws Error{name: +// "NoSuchKey"} — this proves the real SDK produces the same shape going +// over real HTTP. Without this, an SDK upgrade could rename or +// reclassify the error and silently break the discovery fallback. +// +// `sanitizeResources: false` is the same setting used by every test in +// the DEF-2 integration block above (see line 1076): the AWS SDK keeps +// TCP connections alive in its keep-alive agent, which trips Deno's +// resource-leak detector. The connections are reclaimed when the +// runtime tears down between test runs, so this is safe. +Deno.test({ + sanitizeResources: false, + name: + "integration: pullIndex discovery handles real SDK 404 on missing index (swamp-club #225)", + fn: async () => { + const cachePath = await Deno.makeTempDir({ + prefix: "s3sync-225-integration-", + }); + try { + await withProgrammableServer( + [ + // 1. GET .datastore-index.json → 404 NoSuchKey + // (the catch branch our discovery hangs off) + () => + new Response( + 'NoSuchKeyThe specified key does not exist..datastore-index.jsonr1', + { + status: 404, + headers: { "Content-Type": "application/xml" }, + }, + ), + // 2. ListObjectsV2 → one Contents entry + // (proves the real SDK parses Size + ETag from the listing) + () => + new Response( + 'test-bucket11000falsedata/discovered.yaml6"abc123"2026-05-04T12:00:00.000Z', + { + status: 200, + headers: { "Content-Type": "application/xml" }, + }, + ), + // 3. PUT .datastore-index.json (synthesized index writeback) + () => + new Response(null, { + status: 200, + headers: { ETag: '"index-etag"' }, + }), + ], + async (s3, state) => { + const service = new S3CacheSyncService(s3, cachePath); + // Call pullIndex directly to scope the test to the discovery + // path — pullChanged would also work but adds concurrent + // file GETs whose ordering against the programmable server + // is non-deterministic. + const fingerprint = await (service as unknown as { + pullIndex: ( + opts: { forceRemote: boolean }, + ) => Promise; + }).pullIndex({ forceRemote: true }); + assertEquals( + state.requestCount, + 3, + "discovery must produce exactly 3 SDK requests: GET index (404), ListObjectsV2, PUT index", + ); + // The SDK surfaces the PUT response ETag with its surrounding + // double-quotes — `normalizeETag()` is what the sidecar + // bookkeeping uses for comparisons. The discovery path + // returns the raw form, matching the post-fetch path's + // contract. + assertEquals( + fingerprint, + '"index-etag"', + "discovery must return the synthesized index's ETag from the real PUT response", + ); + }, + ); + } finally { + await Deno.remove(cachePath, { recursive: true }); + } + }, +}); diff --git a/datastore/s3/extensions/datastores/_lib/s3_client.ts b/datastore/s3/extensions/datastores/_lib/s3_client.ts index 276a89ff..9d8fea4c 100644 --- a/datastore/s3/extensions/datastores/_lib/s3_client.ts +++ b/datastore/s3/extensions/datastores/_lib/s3_client.ts @@ -62,8 +62,15 @@ export interface S3ClientConfig { const DEFAULT_REQUEST_TIMEOUT_MS = 30_000; +export interface S3ListEntry { + key: string; + size: number; + etag?: string; + lastModified?: Date; +} + export interface S3ListResult { - keys: string[]; + entries: S3ListEntry[]; truncated: boolean; continuationToken?: string; } @@ -661,12 +668,15 @@ export class S3Client { ); const prefixLen = this.prefix ? this.prefix.length + 1 : 0; - const keys = (response.Contents ?? []) - .map((obj) => obj.Key!) - .map((key) => key.slice(prefixLen)); + const entries: S3ListEntry[] = (response.Contents ?? []).map((obj) => ({ + key: obj.Key!.slice(prefixLen), + size: obj.Size ?? 0, + etag: obj.ETag, + lastModified: obj.LastModified, + })); return { - keys, + entries, truncated: response.IsTruncated ?? false, continuationToken: response.NextContinuationToken, }; @@ -676,8 +686,8 @@ export class S3Client { async listAllObjects( subPrefix?: string, signal?: AbortSignal, - ): Promise { - const allKeys: string[] = []; + ): Promise { + const all: S3ListEntry[] = []; let continuationToken: string | undefined; do { @@ -686,12 +696,12 @@ export class S3Client { continuationToken, signal, ); - allKeys.push(...result.keys); + all.push(...result.entries); continuationToken = result.truncated ? result.continuationToken : undefined; } while (continuationToken); - return allKeys; + return all; } } diff --git a/datastore/s3/manifest.yaml b/datastore/s3/manifest.yaml index f84c3e61..39dfcfe8 100644 --- a/datastore/s3/manifest.yaml +++ b/datastore/s3/manifest.yaml @@ -1,6 +1,6 @@ manifestVersion: 1 name: "@swamp/s3-datastore" -version: "2026.05.04.3" +version: "2026.05.04.4" description: | Store data in an Amazon S3 bucket with local cache synchronization. Provides distributed locking via S3 conditional writes and bidirectional