Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/contrail/contrail.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,43 @@ declare module '@atmo-dev/contrail' {
signal?: AbortSignal;
}

export interface CollectionStats {
missing: number;
staleUpdates: number;
inSync: number;
}

export interface RefreshProgress {
usersComplete: number;
usersTotal: number;
usersFailed: number;
recordsScanned: number;
}

export interface RefreshResult {
byCollection: Record<string, CollectionStats>;
total: CollectionStats;
usersScanned: number;
usersFailed: number;
ignoreWindowMs: number;
elapsedMs: number;
}

export interface RefreshOptions {
concurrency?: number;
ignoreWindowMs?: number;
nsids?: string[];
onProgress?: (p: RefreshProgress) => void;
maxRetries?: number;
requestTimeout?: number;
}

export class Contrail {
constructor(options: ContrailOptions);
init(db?: Database, spacesDb?: Database): Promise<void>;
discover(db?: Database): Promise<string[]>;
backfill(options?: BackfillAllOptions, db?: Database): Promise<number>;
refresh(options?: RefreshOptions, db?: Database): Promise<RefreshResult>;
runPersistent(options?: RunPersistentOptions): Promise<void>;
}
}
Expand Down
40 changes: 34 additions & 6 deletions src/contrail/sync.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
/**
* One-shot Contrail sync: discover known DIDs from public relays and backfill
* their records into the Contrail Postgres index.
*
* Designed to run as a standalone process (npm script or K8s Job). MUST NOT
* run inside an API pod — Jetstream ingest belongs in a dedicated process if
* we ever stand it up (Phase 2 decision; not in this script).
* One-shot Contrail sync: discover new DIDs, backfill their history, and
* refresh records for already-known DIDs to repair drift from Jetstream gaps
* (cursor expiry, extended outages). Safe to run repeatedly — discovery and
* backfill are idempotent; refresh skips records inside the lib's ignore
* window. Intended to run daily as a CronJob alongside the live-ingest
* Deployment that handles the continuous case.
*
* Usage:
* CONTRAIL_DATABASE_URL=postgres://... \
Expand Down Expand Up @@ -74,9 +74,37 @@ async function main(): Promise<void> {
process.stdout.write('\n');
console.log(` Done: ${total} records in ${elapsed(backfillStart)}\n`);

// Refresh: re-walk every known DID's PDS and apply any records we're
// missing or have stale. Closes drift caused by Jetstream cursor expiry
// (multi-day outages) or transient ingest failures the live-ingest pod
// didn't recover from. Records inside the lib's ignoreWindowMs (default
// 60s) are skipped so this stays cheap.
console.log('--- Refresh ---');
const refreshStart = Date.now();
const refreshResult = await contrail.refresh({
onProgress: ({
usersComplete,
usersTotal,
usersFailed,
recordsScanned,
}) => {
const failStr = usersFailed > 0 ? ` | ${usersFailed} failed` : '';
process.stdout.write(
`\r ${recordsScanned} scanned | ${usersComplete}/${usersTotal} users | ${elapsed(refreshStart)}${failStr} `,
);
},
});
process.stdout.write('\n');
console.log(
` Done: ${refreshResult.total.missing} missing, ${refreshResult.total.staleUpdates} stale across ${refreshResult.usersScanned} users in ${elapsed(refreshStart)}\n`,
);

console.log(`=== Finished in ${elapsed(syncStart)} ===`);
console.log(` Discovered: ${discovered.length} users`);
console.log(` Backfilled: ${total} records`);
console.log(
` Refreshed: ${refreshResult.total.missing} missing + ${refreshResult.total.staleUpdates} stale (${refreshResult.usersScanned} users scanned)`,
);
} finally {
await pool.end();
}
Expand Down
Loading