diff --git a/backend/README.md b/backend/README.md index 1fdfe763..9f0b137b 100644 --- a/backend/README.md +++ b/backend/README.md @@ -44,6 +44,16 @@ $ npm run start:dev $ npm run start:prod ``` +## Chain replay CLI + +Recover missed historical chain events into DB: + +```bash +$ npm run replay:chain-events -- --start-cursor --end-cursor --dry-run +``` + +Detailed runbook: `docs/CHAIN_EVENT_REPLAY.md` + ## Run tests ```bash diff --git a/backend/docs/CHAIN_EVENT_REPLAY.md b/backend/docs/CHAIN_EVENT_REPLAY.md new file mode 100644 index 00000000..941263d8 --- /dev/null +++ b/backend/docs/CHAIN_EVENT_REPLAY.md @@ -0,0 +1,58 @@ +# Chain Event Replay CLI + +## Purpose + +Replay historical Stellar chain events into PostgreSQL to recover indexer gaps safely. + +## Command + +```bash +npm run replay:chain-events -- --start-cursor [--end-cursor ] [--dry-run] [--limit 200] +``` + +## Required Environment + +- `DATABASE_URL`: PostgreSQL connection string +- `HORIZON_URL` (optional): Horizon base URL (defaults to `https://horizon-testnet.stellar.org`) + +## Parameters + +- `--start-cursor` (required): Inclusive starting cursor for replay. +- `--end-cursor` (optional): Inclusive ending cursor; replay stops once this cursor is reached. +- `--dry-run` (optional): Reads and evaluates events but does not write to DB. +- `--limit` (optional): Page size per request (1..200, default `200`). + +## Idempotency and Safety + +- Replayed records are stored in table `chain_event_replay`. +- Each record is keyed by `paging_token` and deduplicated with `ON CONFLICT DO NOTHING`. +- Re-running the same range is safe; duplicates are skipped and reported in summary output. + +## Dry-Run Workflow (Recommended) + +1. Validate scope and expected volume: + +```bash +npm run replay:chain-events -- --start-cursor 123 --end-cursor 999 --dry-run +``` + +2. Execute replay: + +```bash +npm run replay:chain-events -- --start-cursor 123 --end-cursor 999 +``` + +## Output + +The CLI prints JSON summary: + +- `startCursor` +- `endCursor` +- `finalCursor` +- `pages` +- `fetched` +- `inserted` +- `duplicates` +- `dryRun` + +Use `finalCursor` as a checkpoint for subsequent replay windows. diff --git a/backend/package-lock.json b/backend/package-lock.json index d1e14968..54e9d4f3 100644 --- a/backend/package-lock.json +++ b/backend/package-lock.json @@ -64,10 +64,6 @@ "tsconfig-paths": "^4.2.0", "typescript": "^5.7.3", "typescript-eslint": "^8.20.0" - }, - "overrides": { - "minimatch": ">=9.0.6", - "multer": ">=2.1.1" } }, "node_modules/@angular-devkit/core": { diff --git a/backend/package.json b/backend/package.json index 20014dba..60dcad7d 100644 --- a/backend/package.json +++ b/backend/package.json @@ -17,7 +17,8 @@ "test:watch": "jest --watch", "test:cov": "jest --coverage", "test:debug": "node --inspect-brk -r tsconfig-paths/register -r ts-node/register node_modules/.bin/jest --runInBand", - "test:e2e": "jest --config ./test/jest-e2e.json" + "test:e2e": "jest --config ./test/jest-e2e.json", + "replay:chain-events": "ts-node src/cli/replay-chain-events.ts" }, "dependencies": { "@nestjs/common": "^11.1.17", diff --git a/backend/src/cli/replay-chain-events.ts b/backend/src/cli/replay-chain-events.ts new file mode 100644 index 00000000..024dfa5a --- /dev/null +++ b/backend/src/cli/replay-chain-events.ts @@ -0,0 +1,88 @@ +import { ChainEventReplayRunner, ReplayOptions } from '../indexer/replay/chain-event-replay.runner'; +import { HorizonChainEventSource } from '../indexer/replay/horizon-chain-event.source'; +import { PostgresChainEventStore } from '../indexer/replay/postgres-chain-event.store'; + +interface CliArgs { + startCursor: string; + endCursor?: string; + dryRun: boolean; + limit: number; +} + +function readArg(args: string[], key: string): string | undefined { + const index = args.findIndex((arg) => arg === key); + if (index === -1) return undefined; + return args[index + 1]; +} + +function parseArgs(argv: string[]): CliArgs { + const startCursor = readArg(argv, '--start-cursor'); + if (!startCursor) { + throw new Error('Missing required argument: --start-cursor '); + } + + const endCursor = readArg(argv, '--end-cursor'); + const limitRaw = readArg(argv, '--limit'); + const dryRun = argv.includes('--dry-run'); + + const limit = limitRaw ? Number.parseInt(limitRaw, 10) : 200; + if (!Number.isFinite(limit) || limit < 1 || limit > 200) { + throw new Error('Invalid --limit value. Expected an integer between 1 and 200.'); + } + + return { + startCursor, + endCursor, + dryRun, + limit, + }; +} + +function printUsage(): void { + // Keep this concise for operators running incidents. + console.log( + [ + 'Usage:', + ' npm run replay:chain-events -- --start-cursor [--end-cursor ] [--dry-run] [--limit 200]', + '', + 'Environment:', + ' DATABASE_URL PostgreSQL DSN for replay storage (required)', + ' HORIZON_URL Horizon base URL (default: https://horizon-testnet.stellar.org)', + ].join('\n'), + ); +} + +async function main(): Promise { + try { + const args = parseArgs(process.argv.slice(2)); + const databaseUrl = process.env.DATABASE_URL?.trim(); + if (!databaseUrl) { + throw new Error('DATABASE_URL is required.'); + } + + const horizonUrl = + process.env.HORIZON_URL?.trim() || 'https://horizon-testnet.stellar.org'; + + const source = new HorizonChainEventSource(horizonUrl); + const store = new PostgresChainEventStore(databaseUrl); + const runner = new ChainEventReplayRunner(source, store); + + const replayOptions: ReplayOptions = { + startCursor: args.startCursor, + endCursor: args.endCursor, + dryRun: args.dryRun, + limit: args.limit, + }; + + const summary = await runner.replay(replayOptions); + + console.log(JSON.stringify({ ok: true, summary }, null, 2)); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + console.error(JSON.stringify({ ok: false, error: message }, null, 2)); + printUsage(); + process.exitCode = 1; + } +} + +void main(); diff --git a/backend/src/common/http/body-parser.config.ts b/backend/src/common/http/body-parser.config.ts new file mode 100644 index 00000000..af0c8845 --- /dev/null +++ b/backend/src/common/http/body-parser.config.ts @@ -0,0 +1,55 @@ +import { INestApplication } from '@nestjs/common'; +import { json, Request, Response, NextFunction, urlencoded } from 'express'; + +type RequestWithRawBody = Request & { rawBody?: Buffer }; + +const DEFAULT_JSON_LIMIT = '100kb'; +const DEFAULT_URLENCODED_LIMIT = '100kb'; +const DEFAULT_WEBHOOK_JSON_LIMIT = '1mb'; + +function captureRawBody( + req: Request, + _res: Response, + buffer: Buffer, +): void { + if (buffer.length > 0) { + (req as RequestWithRawBody).rawBody = Buffer.from(buffer); + } +} + +export function configureBodyParserLimits(app: INestApplication): void { + const jsonLimit = process.env.BODY_JSON_LIMIT ?? DEFAULT_JSON_LIMIT; + const urlencodedLimit = + process.env.BODY_URLENCODED_LIMIT ?? DEFAULT_URLENCODED_LIMIT; + const webhookJsonLimit = + process.env.BODY_WEBHOOK_JSON_LIMIT ?? DEFAULT_WEBHOOK_JSON_LIMIT; + + // Keep webhook payload support flexible without widening limits globally. + app.use('/v1/webhook', json({ limit: webhookJsonLimit, verify: captureRawBody })); + app.use(json({ limit: jsonLimit, verify: captureRawBody })); + app.use( + urlencoded({ + extended: true, + limit: urlencodedLimit, + verify: captureRawBody, + }), + ); + + app.use((err: unknown, _req: Request, res: Response, next: NextFunction) => { + const bodyParserError = err as { type?: string; status?: number } | undefined; + + if ( + bodyParserError?.type === 'entity.too.large' || + bodyParserError?.status === 413 + ) { + res.status(413).json({ + statusCode: 413, + error: 'Payload Too Large', + message: `Payload too large. Maximum request body size is ${jsonLimit}.`, + }); + return; + } + + next(err); + }); +} diff --git a/backend/src/indexer/replay/chain-event-replay.runner.spec.ts b/backend/src/indexer/replay/chain-event-replay.runner.spec.ts new file mode 100644 index 00000000..d03deb9c --- /dev/null +++ b/backend/src/indexer/replay/chain-event-replay.runner.spec.ts @@ -0,0 +1,104 @@ +import { + ChainEventRecord, + ChainEventReplayRunner, + ChainEventSource, + ChainEventStore, +} from './chain-event-replay.runner'; + +class InMemorySource implements ChainEventSource { + constructor(private readonly pages: ChainEventRecord[][]) {} + private pageIndex = 0; + + async fetchPage(cursor: string, _limit: number) { + const events = this.pages[this.pageIndex] ?? []; + this.pageIndex += 1; + const nextCursor = + events.length > 0 ? String(events[events.length - 1].paging_token) : cursor; + return { events, nextCursor }; + } +} + +class InMemoryStore implements ChainEventStore { + private readonly seen = new Set(); + + async init() {} + + async persistBatch(events: ChainEventRecord[], dryRun: boolean) { + let inserted = 0; + for (const event of events) { + if (!this.seen.has(event.paging_token)) { + if (!dryRun) { + this.seen.add(event.paging_token); + } + inserted += 1; + } + } + return { inserted, duplicates: events.length - inserted }; + } + + async close() {} +} + +function event(cursor: string): ChainEventRecord { + return { + id: `event-${cursor}`, + paging_token: cursor, + type: 'payment', + }; +} + +describe('ChainEventReplayRunner', () => { + it('replays until source is exhausted', async () => { + const source = new InMemorySource([[event('10'), event('11')], [event('12')], []]); + const store = new InMemoryStore(); + const runner = new ChainEventReplayRunner(source, store); + + const summary = await runner.replay({ + startCursor: '9', + limit: 200, + dryRun: false, + }); + + expect(summary.fetched).toBe(3); + expect(summary.inserted).toBe(3); + expect(summary.duplicates).toBe(0); + expect(summary.finalCursor).toBe('12'); + }); + + it('stops at end cursor and only processes <= end cursor', async () => { + const source = new InMemorySource([[event('10'), event('11')], [event('12')], []]); + const store = new InMemoryStore(); + const runner = new ChainEventReplayRunner(source, store); + + const summary = await runner.replay({ + startCursor: '9', + endCursor: '11', + limit: 200, + dryRun: false, + }); + + expect(summary.fetched).toBe(2); + expect(summary.inserted).toBe(2); + expect(summary.finalCursor).toBe('11'); + }); + + it('reports dry-run inserts without mutating state', async () => { + const source = new InMemorySource([[event('10'), event('11')], []]); + const store = new InMemoryStore(); + const runner = new ChainEventReplayRunner(source, store); + + const dryRunSummary = await runner.replay({ + startCursor: '9', + limit: 200, + dryRun: true, + }); + const replaySummary = await runner.replay({ + startCursor: '9', + limit: 200, + dryRun: false, + }); + + expect(dryRunSummary.inserted).toBe(2); + expect(replaySummary.inserted).toBe(2); + }); +}); diff --git a/backend/src/indexer/replay/chain-event-replay.runner.ts b/backend/src/indexer/replay/chain-event-replay.runner.ts new file mode 100644 index 00000000..a5e5b795 --- /dev/null +++ b/backend/src/indexer/replay/chain-event-replay.runner.ts @@ -0,0 +1,127 @@ +export interface ChainEventRecord { + id: string; + paging_token: string; + type: string; + transaction_hash?: string; + source_account?: string; + created_at?: string; + [key: string]: unknown; +} + +export interface FetchPageResult { + events: ChainEventRecord[]; + nextCursor: string; +} + +export interface ChainEventSource { + fetchPage(cursor: string, limit: number): Promise; +} + +export interface PersistResult { + inserted: number; + duplicates: number; +} + +export interface ChainEventStore { + init(): Promise; + persistBatch(events: ChainEventRecord[], dryRun: boolean): Promise; + close(): Promise; +} + +export interface ReplayOptions { + startCursor: string; + endCursor?: string; + limit: number; + dryRun: boolean; +} + +export interface ReplaySummary { + startCursor: string; + endCursor?: string; + finalCursor: string; + pages: number; + fetched: number; + inserted: number; + duplicates: number; + dryRun: boolean; +} + +function compareCursor(a: string, b: string): number { + try { + const left = BigInt(a); + const right = BigInt(b); + if (left === right) return 0; + return left < right ? -1 : 1; + } catch { + if (a === b) return 0; + return a < b ? -1 : 1; + } +} + +export class ChainEventReplayRunner { + constructor( + private readonly source: ChainEventSource, + private readonly store: ChainEventStore, + ) {} + + async replay(options: ReplayOptions): Promise { + await this.store.init(); + + let cursor = options.startCursor; + let pages = 0; + let fetched = 0; + let inserted = 0; + let duplicates = 0; + let done = false; + + while (!done) { + const page = await this.source.fetchPage(cursor, options.limit); + pages += 1; + + if (page.events.length === 0) { + done = true; + break; + } + + const selectedEvents = + options.endCursor === undefined + ? page.events + : page.events.filter( + (event) => compareCursor(event.paging_token, options.endCursor as string) <= 0, + ); + + fetched += selectedEvents.length; + + if (selectedEvents.length > 0) { + const result = await this.store.persistBatch(selectedEvents, options.dryRun); + inserted += result.inserted; + duplicates += result.duplicates; + } + + const pageLastCursor = page.nextCursor; + cursor = pageLastCursor; + + if ( + options.endCursor !== undefined && + compareCursor(pageLastCursor, options.endCursor) >= 0 + ) { + done = true; + } else if (page.events.length < options.limit) { + done = true; + } + } + + await this.store.close(); + + return { + startCursor: options.startCursor, + endCursor: options.endCursor, + finalCursor: cursor, + pages, + fetched, + inserted, + duplicates, + dryRun: options.dryRun, + }; + } +} diff --git a/backend/src/indexer/replay/horizon-chain-event.source.ts b/backend/src/indexer/replay/horizon-chain-event.source.ts new file mode 100644 index 00000000..a375ece3 --- /dev/null +++ b/backend/src/indexer/replay/horizon-chain-event.source.ts @@ -0,0 +1,33 @@ +import { ChainEventRecord, ChainEventSource, FetchPageResult } from './chain-event-replay.runner'; + +interface HorizonOperationResponse { + _embedded?: { + records?: ChainEventRecord[]; + }; +} + +export class HorizonChainEventSource implements ChainEventSource { + constructor(private readonly horizonBaseUrl: string) {} + + async fetchPage(cursor: string, limit: number): Promise { + const url = new URL('/operations', this.horizonBaseUrl); + url.searchParams.set('order', 'asc'); + url.searchParams.set('cursor', cursor); + url.searchParams.set('limit', String(limit)); + + const response = await fetch(url.toString(), { + headers: { Accept: 'application/json' }, + }); + + if (!response.ok) { + throw new Error(`Horizon request failed (${response.status} ${response.statusText})`); + } + + const payload = (await response.json()) as HorizonOperationResponse; + const events = payload._embedded?.records ?? []; + const nextCursor = + events.length > 0 ? String(events[events.length - 1].paging_token) : cursor; + + return { events, nextCursor }; + } +} diff --git a/backend/src/indexer/replay/postgres-chain-event.store.ts b/backend/src/indexer/replay/postgres-chain-event.store.ts new file mode 100644 index 00000000..d1746fc2 --- /dev/null +++ b/backend/src/indexer/replay/postgres-chain-event.store.ts @@ -0,0 +1,93 @@ +import { Pool } from 'pg'; +import { + ChainEventRecord, + ChainEventStore, + PersistResult, +} from './chain-event-replay.runner'; + +export class PostgresChainEventStore implements ChainEventStore { + private readonly pool: Pool; + + constructor(databaseUrl: string) { + this.pool = new Pool({ connectionString: databaseUrl }); + } + + async init(): Promise { + await this.pool.query(` + CREATE TABLE IF NOT EXISTS chain_event_replay ( + paging_token TEXT PRIMARY KEY, + event_id TEXT NOT NULL UNIQUE, + event_type TEXT NOT NULL, + tx_hash TEXT, + source_account TEXT, + ledger_closed_at TIMESTAMPTZ, + payload JSONB NOT NULL, + replayed_at TIMESTAMPTZ NOT NULL DEFAULT NOW() + ) + `); + } + + async persistBatch( + events: ChainEventRecord[], + dryRun: boolean, + ): Promise { + if (events.length === 0) { + return { inserted: 0, duplicates: 0 }; + } + + if (dryRun) { + const duplicates = await this.countExisting(events.map((event) => event.paging_token)); + return { inserted: events.length - duplicates, duplicates }; + } + + let inserted = 0; + for (const event of events) { + const result = await this.pool.query( + ` + INSERT INTO chain_event_replay ( + paging_token, + event_id, + event_type, + tx_hash, + source_account, + ledger_closed_at, + payload + ) + VALUES ($1, $2, $3, $4, $5, $6, $7::jsonb) + ON CONFLICT (paging_token) DO NOTHING + `, + [ + String(event.paging_token), + String(event.id), + String(event.type), + event.transaction_hash ? String(event.transaction_hash) : null, + event.source_account ? String(event.source_account) : null, + event.created_at ? new Date(String(event.created_at)) : null, + JSON.stringify(event), + ], + ); + + inserted += result.rowCount ?? 0; + } + + return { inserted, duplicates: events.length - inserted }; + } + + async close(): Promise { + await this.pool.end(); + } + + private async countExisting(pagingTokens: string[]): Promise { + const uniqueTokens = Array.from(new Set(pagingTokens)); + const result = await this.pool.query<{ total: string }>( + ` + SELECT COUNT(*)::text AS total + FROM chain_event_replay + WHERE paging_token = ANY($1::text[]) + `, + [uniqueTokens], + ); + const total = result.rows[0]?.total ?? '0'; + return Number.parseInt(total, 10); + } +} diff --git a/backend/src/main.ts b/backend/src/main.ts index 0ec10b27..d156660a 100644 --- a/backend/src/main.ts +++ b/backend/src/main.ts @@ -5,12 +5,13 @@ import { StartupProbeService } from './health/startup-probe.service'; import { getDataSourceToken } from '@nestjs/typeorm'; import { DataSource } from 'typeorm'; import { validateRequiredSecrets } from './common/secrets-validation'; +import { configureBodyParserLimits } from './common/http/body-parser.config'; async function bootstrap() { // Fail fast if any required secret is absent — before the app is created. validateRequiredSecrets(); - const app = await NestFactory.create(AppModule); + const app = await NestFactory.create(AppModule, { bodyParser: false }); // Enable versioning (URI versioning like /v1/...) app.enableVersioning({ @@ -20,6 +21,7 @@ async function bootstrap() { // Global validation pipe app.useGlobalPipes(new ValidationPipe({ whitelist: true, transform: true })); + configureBodyParserLimits(app); const probeService = app.get(StartupProbeService); diff --git a/backend/test/body-parser-limits.e2e-spec.ts b/backend/test/body-parser-limits.e2e-spec.ts new file mode 100644 index 00000000..86ccfa35 --- /dev/null +++ b/backend/test/body-parser-limits.e2e-spec.ts @@ -0,0 +1,75 @@ +import { Body, Controller, INestApplication, Module, Post } from '@nestjs/common'; +import { Test, TestingModule } from '@nestjs/testing'; +import request from 'supertest'; +import { configureBodyParserLimits } from '../src/common/http/body-parser.config'; + +@Controller() +class PayloadController { + @Post('payload') + handlePayload(@Body() body: { data: string }) { + return { received: body.data.length }; + } + + @Post('v1/webhook') + handleWebhook(@Body() body: { data: string }) { + return { received: body.data.length }; + } +} + +@Module({ + controllers: [PayloadController], +}) +class PayloadTestModule {} + +describe('Body parser limits (e2e)', () => { + let app: INestApplication; + + beforeEach(async () => { + const moduleFixture: TestingModule = await Test.createTestingModule({ + imports: [PayloadTestModule], + }).compile(); + + app = moduleFixture.createNestApplication({ bodyParser: false }); + configureBodyParserLimits(app); + await app.init(); + }); + + afterEach(async () => { + await app.close(); + }); + + it('accepts normal payloads', async () => { + const response = await request(app.getHttpServer()) + .post('/payload') + .send({ data: 'safe-body' }) + .expect(201); + + expect(response.body).toEqual({ received: 9 }); + }); + + it('rejects oversized payloads with explicit 413 error', async () => { + const oversized = 'a'.repeat(120 * 1024); + + const response = await request(app.getHttpServer()) + .post('/payload') + .send({ data: oversized }) + .expect(413); + + expect(response.body).toEqual({ + statusCode: 413, + error: 'Payload Too Large', + message: 'Payload too large. Maximum request body size is 100kb.', + }); + }); + + it('allows larger payloads on webhook override endpoint', async () => { + const webhookSizedPayload = 'a'.repeat(200 * 1024); + + const response = await request(app.getHttpServer()) + .post('/v1/webhook') + .send({ data: webhookSizedPayload }) + .expect(201); + + expect(response.body).toEqual({ received: 200 * 1024 }); + }); +});