Skip to content

Optimize backfilling pipeline#539

Draft
aditya1702 wants to merge 68 commits intopgx-migrationfrom
refactor-catchup-code
Draft

Optimize backfilling pipeline#539
aditya1702 wants to merge 68 commits intopgx-migrationfrom
refactor-catchup-code

Conversation

@aditya1702
Copy link
Copy Markdown
Contributor

What

[TODO: Short statement about what is changing.]

Why

[TODO: Why this change is being made. Include any context required to understand the why.]

Known limitations

[TODO or N/A]

Issue that this PR addresses

[TODO: Attach the link to the GitHub issue or task. Include the priority of the task here in addition to the link.]

Checklist

PR Structure

  • It is not possible to break this PR down into smaller PRs.
  • This PR does not mix refactoring changes with feature changes.
  • This PR's title starts with name of package that is most changed in the PR, or all if the changes are broad or impact many packages.

Thoroughness

  • This PR adds tests for the new functionality or fixes.
  • All updated queries have been tested (refer to this check if the data set returned by the updated query is expected to be same as the original one).

Release

  • This is not a breaking change.
  • This is ready to be tested in development.
  • The new functionality is gated with a feature flag if this is not ready for production.

Replace hardcoded pool constants with Default* named constants and add
a PoolConfig struct + DefaultPoolConfig() constructor. Update
OpenDBConnectionPool and OpenDBConnectionPoolForBackfill to accept
variadic ...PoolConfig so all existing call sites continue to compile
without changes.
Parses Go duration strings (e.g. "5m", "10s") from CLI flags or
environment variables into *time.Duration ConfigKey. Follows the same
pattern as SetConfigOptionLogLevel. Includes table-driven tests covering
empty input, invalid strings, CLI args, and env vars.
Exposes --db-max-conns, --db-min-conns, --db-max-conn-lifetime, and
--db-max-conn-idle-time flags (with matching DB_* env vars) that can
be appended to any command's cfgOpts. Duration flags use the new
SetConfigOptionDuration custom setter. Defaults mirror the existing
Default* pool constants.
Adds DBMaxConns, DBMinConns, DBMaxConnLifetime, DBMaxConnIdleTime fields
to ingest.Configs. buildPoolConfig() merges non-zero values over the
defaults so zero values (unset flags) leave current behaviour unchanged.
setupDeps now passes the pool config to both OpenDBConnectionPool and
OpenDBConnectionPoolForBackfill.
Adds DBMaxConns, DBMinConns, DBMaxConnLifetime, DBMaxConnIdleTime to
serve.Configs with a BuildPoolConfig() method (exported so cmd/serve.go
can call it). initHandlerDeps and cmd/serve.go PersistentPreRunE now
pass the pool config to OpenDBConnectionPool. Same buildPoolConfig logic
as ingest: zero values leave defaults intact.
Move progressiveRecompressor struct and all its methods (newProgressiveRecompressor,
MarkDone, Wait, runCompression, compressTableChunks) to a dedicated file.
Rename ingest_backfill_test.go → ingest_recompressor_test.go since it only
tests recompressor watermark logic.

Zero behavior change — pure file reorganization.
Move BatchChanges struct, mergeTrustlineChanges, mergeAccountChanges,
mergeSACBalanceChanges, and processBatchChanges to dedicated catchup file.

Zero behavior change — pure file reorganization.
Historical backfill processes ledgers from the distant past. Channel
accounts are only relevant for recently-submitted transactions (catchup
and live modes). The unlock call was unnecessary and semantically wrong.

Also updated flushHistoricalBatch test to assert unlock is never called.
- ingest_helpers_test.go: shared vars, consts, helper functions
- ingest_backfill_test.go: 10 historical backfill tests
- ingest_catchup_test.go: 7 catchup mode tests
- ingest_test.go: 8 shared/core tests (down from 2972 to 831 lines)

Also removed unused txHash1/txHash2 constants (deadcode).
Fetch chunk-boundary-aligned start/end timestamps for the backfill and pass the boundary start into the progressive recompressor. Change progressiveRecompressor to accept a globalStart at construction (chunk-boundary-aligned lower bound) instead of deriving it from batch 0 in MarkDone. Update compressTableChunks to query chunks with range_start >= globalStart and range_end <= safeEnd so recompression is scoped to the backfill range. Adjust tests and error handling accordingly.
Add pgx binary COPY helpers and parallel backfill path: introduce CopyTransactions/CopyTransactionsAccounts and CopyOperations/CopyOperationsAccounts (with preallocated row buffers and metrics tracking) to split BatchCopy work into table-specific COPYs. Add setLocalBackfillOpts to set transaction-local options (synchronous_commit=off, session_replication_role=replica) and update insertIntoDBParallel to run five independent transactions/goroutines (transactions, transactions_accounts, operations, operations_accounts, state_changes) to maximize COPY throughput. Also update docker-compose Postgres command with DB tuning flags (shared_buffers=4GB, checkpoint_completion_target=0.9, checkpoint_timeout=30min). Error handling and metrics reporting are included for the new copy paths.
Add cached operation changes to TransactionOperationWrapper (cachedChanges + GetCachedOperationChanges) and replace repeated Transaction.GetOperationChanges calls with the cached accessor to avoid re-parsing XDR. Replace set.Union uses with set.Append(...ToSlice()) to reduce allocations when merging participant sets, and swap reflect.DeepEqual for maps.Equal where appropriate. Simplify transaction hashing: remove txnbuild dependency, compute inner transaction hash directly from XDR via network.HashTransactionInEnvelope, and only marshal envelope/meta when needed. Minor imports updated accordingly.
Introduce several concurrency-safe caches and micro-optimizations to reduce allocations and expensive strkey/XDR work.

Changes:
- types: add addressByteaCache and addressEncodeCache and expose CachedAccountAddress to cache AccountId -> G-address and cache AddressBytea Value() results.
- processors: replace repeated .Address() calls with CachedAccountAddress in effects, participants and utils to avoid repeated strkey encoding.
- token_transfer: add contractTypeCache to memoize contract classification (SAC/SEP41) by contract address.
- utils: add contractIDCache to memoize asset->contractID encoding; simplify isLiquidityPool/isClaimableBalance to prefix checks; use CachedAccountAddress for operation source resolution.
- state_change_builder: avoid calling json.Marshal for the common KeyValue==nil case and replace fmt.Sprintf with strings.Builder + strconv to eliminate reflection/boxing and reduce allocations.

Motivation: these changes target hot code paths (state changes, participant/execution processing) to improve throughput and reduce CPU/GC overhead by caching repeated computations and minimizing expensive encoding/decoding operations.
# Conflicts:
#	internal/data/operations.go
#	internal/db/db.go
#	internal/serve/serve.go
#	internal/services/ingest_backfill.go
#	internal/services/ingest_backfill_test.go
@aditya1702 aditya1702 changed the title Refactor catchup code Optimize backfilling pipeline Mar 12, 2026
@socket-security
Copy link
Copy Markdown

socket-security bot commented Mar 12, 2026

Review the following changes in direct dependencies. Learn more about Socket for GitHub.

Diff Package Supply Chain
Security
Vulnerability Quality Maintenance License
Addedgithub.com/​jackc/​pgerrcode@​v0.0.0-20250907135507-afb5586c32a610010010010050

View full report

Refactor TimescaleDB chunk handling and backfill pipeline to support progressive compression.

- Introduce db.Chunk (with atomic NumWriters) and change PreCreateChunks to return created chunks (with Name, Start, End).
- Update create/drop logic: create_chunk now returns table name/created flag; dropChunkIndexes now accepts a qualified chunk name and splits schema/table before dropping indexes.
- Add DisableInsertAutovacuum and RestoreInsertAutovacuum helpers to suppress/restore insert-triggered autovacuum on hypertables during backfill.
- Backfill changes: replace fetchBoundaryTimestamps with fetchLedgerCloseTime, pre-create chunks, disable insert autovacuum during backfill, map chunks to BackfillBatch and track writers, and wire a compressor channel into the pipeline.
- Replace the old progressive recompressor with a new progressiveCompressor (new file ingest_compressor.go) that listens for safe-window triggers and compresses chunks as they become safe; remove ingest_recompressor.go.
- Adapt ingest pipeline to use LedgerBuffer (wraps IndexerBuffer + ledgerSeq + closeTime), propagate LedgerBuffer through workers/flush, and emit CompressBatch triggers when contiguous ledgers are flushed (watermarking).
- Misc: remove local statement_timeout in setLocalBackfillOpts and adjust related function signatures and logging.

These changes enable chunks to be progressively compressed while backfill tasks run, reduce write/compaction contention by suppressing autovacuum during bulk loads, and streamline chunk/index handling.
Base automatically changed from pgx-migration to main March 16, 2026 14:55
@aditya1702 aditya1702 changed the base branch from main to pgx-migration March 31, 2026 13:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant