Skip to content

Optimize backfill code using pipelines and channels#562

Open
aditya1702 wants to merge 30 commits intoledger-backend-optimizefrom
optimize-backfill-code
Open

Optimize backfill code using pipelines and channels#562
aditya1702 wants to merge 30 commits intoledger-backend-optimizefrom
optimize-backfill-code

Conversation

@aditya1702
Copy link
Copy Markdown
Contributor

No description provided.

Covers architecture for high-throughput backfill: single backend per gap,
fan-out processing via bounded channels, concurrent flush workers with
parallel COPY, and watermark-based cursor tracking.
Replace batch-parallel backfill with a 3-stage streaming pipeline and add a backfill watermark utility.

Key changes:
- Add backfillWatermark type (thread-safe) and unit tests to track contiguous flushed ledgers and drive cursor updates.
- Replace legacy BackfillWorkers/BackfillBatchSize with explicit tuning fields: BackfillProcessWorkers, BackfillFlushWorkers, BackfillDBInsertBatchSize (default 100), BackfillLedgerChanSize, BackfillFlushChanSize. Update CLI flag names accordingly (cmd/ingest.go).
- Rework backfill logic to a dispatcher → process workers → flush workers pipeline (runDispatcher, runProcessWorkers, runFlushWorkers). Remove progressiveRecompressor and batch-based parallel processing in favor of streaming flushes and watermark-driven cursor updates.
- Add validation in NewIngestService to ensure DB connection pool size can support flush worker concurrency (each flush worker uses multiple parallel COPYs).
- Introduce flush retry logic and metrics integration; simplify backend preparation per gap and improved error propagation using context cancellation causes.
- Add tests to validate pool validation logic and new watermark behavior; remove obsolete batch-based backfill tests.

Files changed: cmd/ingest.go, internal/ingest/ingest.go, internal/services/* (added backfill_watermark.go and tests, refactored ingest_backfill.go, updated ingest.go and tests).
… in code

BackfillFlushWorkers (4), BackfillDBInsertBatchSize (100),
BackfillLedgerChanSize (256), BackfillFlushChanSize (8) are now set
via FlagDefault in cmd/ingest.go. Only BackfillProcessWorkers uses a
zero-check for runtime.NumCPU().
@aditya1702 aditya1702 changed the title Optimize backfill code Optimize backfill code using pipelines and channels Apr 2, 2026
Defines 8 new Prometheus metrics (14 series) and structured gap
summary logging for the 3-stage backfill pipeline. Covers per-stage
durations, channel wait times, utilization gauges, throughput, batch
sizes, and gap progress tracking.
7 new metrics: channel wait histogram, channel utilization gauge,
ledgers flushed counter, batch size histogram, gap progress gauge,
and gap start/end ledger gauges.
Worker-local stats structs (no mutexes needed) that merge into
per-gap aggregates after pipeline completion.
…rics

Adds per-ledger fetch duration to PhaseDuration histogram and
measures time blocked on ledgerCh send via BackfillChannelWait.
Switches from range-over-channel to explicit receive for timing.
Each worker reports stats via statsCh, merged into gapStats after
pipeline completion.
…ogress metrics

Adds batch size histogram, ledgers flushed counter, gap progress gauge,
flush phase duration, and channel receive wait timing.
1-second ticker samples len/cap ratio for ledger and flush channels.
Gap start/end gauges set at entry, reset to 0 on exit.
Replaces simple completion log with detailed breakdown: fetch/process/flush
totals and averages, channel wait times, and ledgers/sec throughput.
Introduce a sequential processing path for ledger transactions used by the backfill pipeline to avoid intra-ledger parallelism and pond pool contention. Adds Indexer.ProcessLedgerTransactionsSequential and ProcessLedgerSequential helpers, and an ingestService.processLedgerSequential wrapper that records participant metrics. Update backfill worker to call the new sequential method instead of the parallel path so backfill uses inter-ledger parallelism only.
Add utilities to pre-create TimescaleDB chunks and prepare them for bulk backfill: create chunks aligned to hypertable intervals, drop unnecessary chunk indexes, set chunks UNLOGGED and disable per-chunk autovacuum, and helpers to disable/restore insert-triggered autovacuum on parent hypertables. Includes comprehensive tests (internal/db/timescaledb_chunks_test.go) and a droppableIndexes list to control which parent indexes are removed from chunks. Integrate pre-creation into ingest backfill (internal/services/ingest_backfill.go): fetch ledger boundary timestamps, disable/restore insert autovacuum, and invoke PreCreateChunks before doing bulk inserts; also add fetchLedgerCloseTime helper used to resolve ledger timestamps. These changes reduce write overhead during backfill and prepare chunks for later compression and logging restoration.
Introduce a parallel backfill S3 fetcher and integrate it into the ingest pipeline.

- Add internal/ingest/backfill_fetcher.go: a backfillFetcher that downloads ledger files in parallel, decodes XDR batches, fans out individual xdr.LedgerCloseMeta into an external ledgerCh, supports retries, cancellation, and optional timing callbacks for metrics. Returns aggregated stats.
- Add tests internal/ingest/backfill_fetcher_test.go covering full delivery, partial-file filtering, context cancellation, retry behavior, and missing-file cancellation.
- Add a CLI/config flag backfill-fetch-workers and Configs.BackfillFetchWorkers; wire defaults (15) into ingest setup.
- Introduce BackfillFetcherFactory and BackfillFetcherRunner types in services; accept factory and worker count in IngestService config/state.
- Replace the old sequential dispatcher in processGap with the parallel fetcher (fetcher runs in its own goroutine and closes ledgerCh). Aggregate fetch stats into gap summary.
- Refactor datastore initialization: newDatastoreResources creates datastore and schema (shared by ledger backend and fetcher); update ledger_backend to use the new helper and return buffered config.

Behavior notes: fetcher makes no ordering guarantees (designed for backfill), closes ledgerCh on completion, and cancels the gap context on permanent errors (e.g., missing file).
Reduce allocation and GC churn during backfill/ingest by reusing heavy objects:

- Add txResultPool (sync.Pool) and use pooled TransactionResult in processTransaction; fields are set, pushed to buffer, then cleared and returned to the pool to avoid retaining references.
- Pre-allocate stateChanges slice capacity to reduce reallocations.
- Clear batch.LedgerCloseMetas entries after dispatch to allow GC of large XDR structs.
- Add backfillBufferPool (sync.Pool) that provides indexer.IndexerBuffer instances; workers Get() and Clear() buffers before use and Put() them back after flush or on error.
- Adjust sampler lifecycle: sampler is no longer part of pipelineWg and is stopped by closing samplerDone after the pipeline completes.

These changes aim to lower per-transaction/batch allocations and memory pressure during backfill processing.
Add a cached GetChanges() method to TransactionOperationWrapper to compute and store operation ledger-entry changes once per wrapper. Replace repeated calls to Transaction.GetOperationChanges(...) across processors (accounts, contracts/sac, effects, effects_horizon, sac_balances, sac_instances, trustlines) and internal helper getSponsor to use the new cached method. This reduces redundant SDK allocations and keeps behavior unchanged; caching is safe without a mutex because each wrapper is processed by a single goroutine.
Add github.com/klauspost/compress dependency and replace support/compressxdr-based decoding with pooled klauspost zstd decoders. Introduces a sync.Pool (zstdDecoderPool) to reuse zstd.Decoder instances across fetch workers to avoid repeated internal buffer allocations when streaming many S3 files. downloadAndDecode now obtains a decoder from the pool, resets it on the S3 reader, decodes via xdr.Unmarshal, and returns the decoder to the pool; errors include context about reset/decoding failures.
Replace per-row []any allocations (pgx.CopyFromSlice) with pgx.CopyFromFunc that reuses a single []any (27 elements) buffer for all rows. This eliminates many allocations (~61M per gap) and improves bulk insert performance while preserving existing conversions, ID generation and error handling. The change is safe because pgx encodes each row immediately and does not retain the slice.
Introduce a CopyResult-driven parallel COPY flow to enable safe retries without duplicates. Replaces the errgroup-based insert/upsert with a sync.WaitGroup approach that tracks per-table commits (copyTable enum and CopyResult), skips already-done tables on retry, and surfaces the first failure. PersistLedgerData signature updated to accept a CopyResult; callers (live, backfill, loadtest, tests) updated accordingly. Removed UniqueViolation special-casing and related helper.

Also add/adjust TimescaleDB indexes in migrations (transactions, operations, state_changes), update droppableIndexes and tests to match, and make minor comment tweaks and go.mod reordering for golang.org/x/sync (now indirect). These changes enable idempotent, efficient retries for parallel ingestion without requiring PK constraints.
Increase concurrency and reduce connection churn for backfill S3 downloads. Changes include: raised default backfill fetch workers from 15 to 40; expanded task channel buffer (workers*4); use a 256KB bufio.Reader for zstd decoding; and wire BackfillFetchWorkers directly into the fetcher factory. Introduces newBackfillDataStore which builds an aws-sdk-go-v2 S3 client with a tuned http.Transport (increased MaxIdleConns/PerHost) to avoid TLS connection churn and loads the datastore schema. go.mod updated to add aws-sdk-go-v2 modules. Error messages adjusted to reflect the new backfill datastore initialization path.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant