Optimize backfill code using pipelines and channels#562
Open
aditya1702 wants to merge 30 commits intoledger-backend-optimizefrom
Open
Optimize backfill code using pipelines and channels#562aditya1702 wants to merge 30 commits intoledger-backend-optimizefrom
aditya1702 wants to merge 30 commits intoledger-backend-optimizefrom
Conversation
Covers architecture for high-throughput backfill: single backend per gap, fan-out processing via bounded channels, concurrent flush workers with parallel COPY, and watermark-based cursor tracking.
Replace batch-parallel backfill with a 3-stage streaming pipeline and add a backfill watermark utility. Key changes: - Add backfillWatermark type (thread-safe) and unit tests to track contiguous flushed ledgers and drive cursor updates. - Replace legacy BackfillWorkers/BackfillBatchSize with explicit tuning fields: BackfillProcessWorkers, BackfillFlushWorkers, BackfillDBInsertBatchSize (default 100), BackfillLedgerChanSize, BackfillFlushChanSize. Update CLI flag names accordingly (cmd/ingest.go). - Rework backfill logic to a dispatcher → process workers → flush workers pipeline (runDispatcher, runProcessWorkers, runFlushWorkers). Remove progressiveRecompressor and batch-based parallel processing in favor of streaming flushes and watermark-driven cursor updates. - Add validation in NewIngestService to ensure DB connection pool size can support flush worker concurrency (each flush worker uses multiple parallel COPYs). - Introduce flush retry logic and metrics integration; simplify backend preparation per gap and improved error propagation using context cancellation causes. - Add tests to validate pool validation logic and new watermark behavior; remove obsolete batch-based backfill tests. Files changed: cmd/ingest.go, internal/ingest/ingest.go, internal/services/* (added backfill_watermark.go and tests, refactored ingest_backfill.go, updated ingest.go and tests).
… in code BackfillFlushWorkers (4), BackfillDBInsertBatchSize (100), BackfillLedgerChanSize (256), BackfillFlushChanSize (8) are now set via FlagDefault in cmd/ingest.go. Only BackfillProcessWorkers uses a zero-check for runtime.NumCPU().
Defines 8 new Prometheus metrics (14 series) and structured gap summary logging for the 3-stage backfill pipeline. Covers per-stage durations, channel wait times, utilization gauges, throughput, batch sizes, and gap progress tracking.
7 new metrics: channel wait histogram, channel utilization gauge, ledgers flushed counter, batch size histogram, gap progress gauge, and gap start/end ledger gauges.
Worker-local stats structs (no mutexes needed) that merge into per-gap aggregates after pipeline completion.
…rics Adds per-ledger fetch duration to PhaseDuration histogram and measures time blocked on ledgerCh send via BackfillChannelWait.
Switches from range-over-channel to explicit receive for timing. Each worker reports stats via statsCh, merged into gapStats after pipeline completion.
…ogress metrics Adds batch size histogram, ledgers flushed counter, gap progress gauge, flush phase duration, and channel receive wait timing.
1-second ticker samples len/cap ratio for ledger and flush channels. Gap start/end gauges set at entry, reset to 0 on exit.
Replaces simple completion log with detailed breakdown: fetch/process/flush totals and averages, channel wait times, and ledgers/sec throughput.
Introduce a sequential processing path for ledger transactions used by the backfill pipeline to avoid intra-ledger parallelism and pond pool contention. Adds Indexer.ProcessLedgerTransactionsSequential and ProcessLedgerSequential helpers, and an ingestService.processLedgerSequential wrapper that records participant metrics. Update backfill worker to call the new sequential method instead of the parallel path so backfill uses inter-ledger parallelism only.
Add utilities to pre-create TimescaleDB chunks and prepare them for bulk backfill: create chunks aligned to hypertable intervals, drop unnecessary chunk indexes, set chunks UNLOGGED and disable per-chunk autovacuum, and helpers to disable/restore insert-triggered autovacuum on parent hypertables. Includes comprehensive tests (internal/db/timescaledb_chunks_test.go) and a droppableIndexes list to control which parent indexes are removed from chunks. Integrate pre-creation into ingest backfill (internal/services/ingest_backfill.go): fetch ledger boundary timestamps, disable/restore insert autovacuum, and invoke PreCreateChunks before doing bulk inserts; also add fetchLedgerCloseTime helper used to resolve ledger timestamps. These changes reduce write overhead during backfill and prepare chunks for later compression and logging restoration.
Introduce a parallel backfill S3 fetcher and integrate it into the ingest pipeline. - Add internal/ingest/backfill_fetcher.go: a backfillFetcher that downloads ledger files in parallel, decodes XDR batches, fans out individual xdr.LedgerCloseMeta into an external ledgerCh, supports retries, cancellation, and optional timing callbacks for metrics. Returns aggregated stats. - Add tests internal/ingest/backfill_fetcher_test.go covering full delivery, partial-file filtering, context cancellation, retry behavior, and missing-file cancellation. - Add a CLI/config flag backfill-fetch-workers and Configs.BackfillFetchWorkers; wire defaults (15) into ingest setup. - Introduce BackfillFetcherFactory and BackfillFetcherRunner types in services; accept factory and worker count in IngestService config/state. - Replace the old sequential dispatcher in processGap with the parallel fetcher (fetcher runs in its own goroutine and closes ledgerCh). Aggregate fetch stats into gap summary. - Refactor datastore initialization: newDatastoreResources creates datastore and schema (shared by ledger backend and fetcher); update ledger_backend to use the new helper and return buffered config. Behavior notes: fetcher makes no ordering guarantees (designed for backfill), closes ledgerCh on completion, and cancels the gap context on permanent errors (e.g., missing file).
Reduce allocation and GC churn during backfill/ingest by reusing heavy objects: - Add txResultPool (sync.Pool) and use pooled TransactionResult in processTransaction; fields are set, pushed to buffer, then cleared and returned to the pool to avoid retaining references. - Pre-allocate stateChanges slice capacity to reduce reallocations. - Clear batch.LedgerCloseMetas entries after dispatch to allow GC of large XDR structs. - Add backfillBufferPool (sync.Pool) that provides indexer.IndexerBuffer instances; workers Get() and Clear() buffers before use and Put() them back after flush or on error. - Adjust sampler lifecycle: sampler is no longer part of pipelineWg and is stopped by closing samplerDone after the pipeline completes. These changes aim to lower per-transaction/batch allocations and memory pressure during backfill processing.
Add a cached GetChanges() method to TransactionOperationWrapper to compute and store operation ledger-entry changes once per wrapper. Replace repeated calls to Transaction.GetOperationChanges(...) across processors (accounts, contracts/sac, effects, effects_horizon, sac_balances, sac_instances, trustlines) and internal helper getSponsor to use the new cached method. This reduces redundant SDK allocations and keeps behavior unchanged; caching is safe without a mutex because each wrapper is processed by a single goroutine.
Add github.com/klauspost/compress dependency and replace support/compressxdr-based decoding with pooled klauspost zstd decoders. Introduces a sync.Pool (zstdDecoderPool) to reuse zstd.Decoder instances across fetch workers to avoid repeated internal buffer allocations when streaming many S3 files. downloadAndDecode now obtains a decoder from the pool, resets it on the S3 reader, decodes via xdr.Unmarshal, and returns the decoder to the pool; errors include context about reset/decoding failures.
Replace per-row []any allocations (pgx.CopyFromSlice) with pgx.CopyFromFunc that reuses a single []any (27 elements) buffer for all rows. This eliminates many allocations (~61M per gap) and improves bulk insert performance while preserving existing conversions, ID generation and error handling. The change is safe because pgx encodes each row immediately and does not retain the slice.
Introduce a CopyResult-driven parallel COPY flow to enable safe retries without duplicates. Replaces the errgroup-based insert/upsert with a sync.WaitGroup approach that tracks per-table commits (copyTable enum and CopyResult), skips already-done tables on retry, and surfaces the first failure. PersistLedgerData signature updated to accept a CopyResult; callers (live, backfill, loadtest, tests) updated accordingly. Removed UniqueViolation special-casing and related helper. Also add/adjust TimescaleDB indexes in migrations (transactions, operations, state_changes), update droppableIndexes and tests to match, and make minor comment tweaks and go.mod reordering for golang.org/x/sync (now indirect). These changes enable idempotent, efficient retries for parallel ingestion without requiring PK constraints.
Increase concurrency and reduce connection churn for backfill S3 downloads. Changes include: raised default backfill fetch workers from 15 to 40; expanded task channel buffer (workers*4); use a 256KB bufio.Reader for zstd decoding; and wire BackfillFetchWorkers directly into the fetcher factory. Introduces newBackfillDataStore which builds an aws-sdk-go-v2 S3 client with a tuned http.Transport (increased MaxIdleConns/PerHost) to avoid TLS connection churn and loads the datastore schema. go.mod updated to add aws-sdk-go-v2 modules. Error messages adjusted to reflect the new backfill datastore initialization path.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.