diff --git a/src/contrail/contrail.d.ts b/src/contrail/contrail.d.ts index 842ae516..308202fa 100644 --- a/src/contrail/contrail.d.ts +++ b/src/contrail/contrail.d.ts @@ -74,11 +74,43 @@ declare module '@atmo-dev/contrail' { signal?: AbortSignal; } + export interface CollectionStats { + missing: number; + staleUpdates: number; + inSync: number; + } + + export interface RefreshProgress { + usersComplete: number; + usersTotal: number; + usersFailed: number; + recordsScanned: number; + } + + export interface RefreshResult { + byCollection: Record; + total: CollectionStats; + usersScanned: number; + usersFailed: number; + ignoreWindowMs: number; + elapsedMs: number; + } + + export interface RefreshOptions { + concurrency?: number; + ignoreWindowMs?: number; + nsids?: string[]; + onProgress?: (p: RefreshProgress) => void; + maxRetries?: number; + requestTimeout?: number; + } + export class Contrail { constructor(options: ContrailOptions); init(db?: Database, spacesDb?: Database): Promise; discover(db?: Database): Promise; backfill(options?: BackfillAllOptions, db?: Database): Promise; + refresh(options?: RefreshOptions, db?: Database): Promise; runPersistent(options?: RunPersistentOptions): Promise; } } diff --git a/src/contrail/sync.ts b/src/contrail/sync.ts index d526c50f..8c5a4f3a 100644 --- a/src/contrail/sync.ts +++ b/src/contrail/sync.ts @@ -1,10 +1,10 @@ /** - * One-shot Contrail sync: discover known DIDs from public relays and backfill - * their records into the Contrail Postgres index. - * - * Designed to run as a standalone process (npm script or K8s Job). MUST NOT - * run inside an API pod — Jetstream ingest belongs in a dedicated process if - * we ever stand it up (Phase 2 decision; not in this script). + * One-shot Contrail sync: discover new DIDs, backfill their history, and + * refresh records for already-known DIDs to repair drift from Jetstream gaps + * (cursor expiry, extended outages). Safe to run repeatedly — discovery and + * backfill are idempotent; refresh skips records inside the lib's ignore + * window. Intended to run daily as a CronJob alongside the live-ingest + * Deployment that handles the continuous case. * * Usage: * CONTRAIL_DATABASE_URL=postgres://... \ @@ -74,9 +74,37 @@ async function main(): Promise { process.stdout.write('\n'); console.log(` Done: ${total} records in ${elapsed(backfillStart)}\n`); + // Refresh: re-walk every known DID's PDS and apply any records we're + // missing or have stale. Closes drift caused by Jetstream cursor expiry + // (multi-day outages) or transient ingest failures the live-ingest pod + // didn't recover from. Records inside the lib's ignoreWindowMs (default + // 60s) are skipped so this stays cheap. + console.log('--- Refresh ---'); + const refreshStart = Date.now(); + const refreshResult = await contrail.refresh({ + onProgress: ({ + usersComplete, + usersTotal, + usersFailed, + recordsScanned, + }) => { + const failStr = usersFailed > 0 ? ` | ${usersFailed} failed` : ''; + process.stdout.write( + `\r ${recordsScanned} scanned | ${usersComplete}/${usersTotal} users | ${elapsed(refreshStart)}${failStr} `, + ); + }, + }); + process.stdout.write('\n'); + console.log( + ` Done: ${refreshResult.total.missing} missing, ${refreshResult.total.staleUpdates} stale across ${refreshResult.usersScanned} users in ${elapsed(refreshStart)}\n`, + ); + console.log(`=== Finished in ${elapsed(syncStart)} ===`); console.log(` Discovered: ${discovered.length} users`); console.log(` Backfilled: ${total} records`); + console.log( + ` Refreshed: ${refreshResult.total.missing} missing + ${refreshResult.total.staleUpdates} stale (${refreshResult.usersScanned} users scanned)`, + ); } finally { await pool.end(); }