Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8bb1bf1
Add design doc for backfill pipeline 3-stage refactor
aditya1702 Apr 2, 2026
2308deb
Add implementation plan for backfill pipeline refactor
aditya1702 Apr 2, 2026
d8e427a
Refactor backfill pipeline; add watermark
aditya1702 Apr 2, 2026
d57bad0
Merge branch 'ledger-backend-optimize' into optimize-backfill-code
aditya1702 Apr 2, 2026
b7dcb7e
Move static backfill defaults to CLI flags, keep only runtime default…
aditya1702 Apr 2, 2026
272e9ae
Update storage_backend.go
aditya1702 Apr 2, 2026
e13c445
Add backfill pipeline observability design doc
aditya1702 Apr 2, 2026
20077b7
Add backfill observability implementation plan
aditya1702 Apr 2, 2026
5cfafc5
Add backfill pipeline Prometheus metrics to IngestionMetrics
aditya1702 Apr 2, 2026
b23cc49
Add backfill stats accumulator types for gap summary logging
aditya1702 Apr 2, 2026
00da54f
Instrument backfill dispatcher with fetch timing and channel wait met…
aditya1702 Apr 2, 2026
95e53ee
Instrument backfill process workers with timing and channel wait metrics
aditya1702 Apr 2, 2026
ef775ca
Instrument backfill flush workers with timing, throughput, and gap pr…
aditya1702 Apr 2, 2026
a664969
Add channel utilization sampler and gap boundary gauges
aditya1702 Apr 2, 2026
8990fed
change defaults
aditya1702 Apr 2, 2026
52e8e4d
Add structured gap summary log with per-stage timing breakdown
aditya1702 Apr 2, 2026
e3f269d
Sequential ledger processing for backfill
aditya1702 Apr 2, 2026
6c25c2e
Pre-create TimescaleDB chunks for backfill
aditya1702 Apr 2, 2026
34835f3
Update ingest_backfill.go
aditya1702 Apr 3, 2026
6452f29
Update datastore-pubnet.toml
aditya1702 Apr 3, 2026
72c8d6a
Add parallel backfill S3 fetcher and wiring
aditya1702 Apr 3, 2026
cb1d994
Introduce sync.Pool for buffers & tx results
aditya1702 Apr 3, 2026
5ddbe66
Cache operation changes in wrapper
aditya1702 Apr 3, 2026
62fbee3
Use pooled zstd decoders for backfill fetcher
aditya1702 Apr 3, 2026
ba787b6
Use reusable row buffer for state_changes COPY
aditya1702 Apr 3, 2026
efa8e2f
temp changes (revert later)
aditya1702 Apr 3, 2026
7202efa
Add CopyResult to track per-table COPY progress
aditya1702 Apr 3, 2026
c52661b
Update ingest.go
aditya1702 Apr 3, 2026
143aeec
Update ingest.go
aditya1702 Apr 3, 2026
2b872a6
Optimize backfill S3 downloads & datastore init
aditya1702 Apr 4, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 33 additions & 9 deletions cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,29 +59,53 @@ func (c *ingestCmd) Command() *cobra.Command {
Required: true,
},
{
Name: "backfill-workers",
Usage: "Maximum concurrent workers for backfill processing. Defaults to number of CPUs. Lower values reduce RAM usage at cost of throughput.",
Name: "backfill-process-workers",
Usage: "Number of Stage 2 process workers in the backfill pipeline. Defaults to number of CPUs.",
OptType: types.Int,
ConfigKey: &cfg.BackfillWorkers,
FlagDefault: 0,
ConfigKey: &cfg.BackfillProcessWorkers,
FlagDefault: 2,
Required: false,
},
{
Name: "backfill-batch-size",
Usage: "Number of ledgers per batch during backfill. Defaults to 250. Lower values reduce RAM usage at cost of more DB transactions.",
Name: "backfill-flush-workers",
Usage: "Number of Stage 3 flush workers in the backfill pipeline. Each uses 5 parallel DB connections.",
OptType: types.Int,
ConfigKey: &cfg.BackfillBatchSize,
FlagDefault: 250,
ConfigKey: &cfg.BackfillFlushWorkers,
FlagDefault: 2,
Required: false,
},
{
Name: "backfill-db-insert-batch-size",
Usage: "Number of ledgers to process before flushing buffer to DB during backfill. Defaults to 100. Lower values reduce RAM usage at cost of more DB transactions.",
Usage: "Number of ledgers to process before flushing buffer to DB during backfill. Lower values reduce RAM usage at cost of more DB transactions.",
OptType: types.Int,
ConfigKey: &cfg.BackfillDBInsertBatchSize,
FlagDefault: 100,
Required: false,
},
{
Name: "backfill-ledger-chan-size",
Usage: "Bounded channel size between the dispatcher and process workers in the backfill pipeline.",
OptType: types.Int,
ConfigKey: &cfg.BackfillLedgerChanSize,
FlagDefault: 200,
Required: false,
},
{
Name: "backfill-flush-chan-size",
Usage: "Bounded channel size between process workers and flush workers in the backfill pipeline.",
OptType: types.Int,
ConfigKey: &cfg.BackfillFlushChanSize,
FlagDefault: 200,
Required: false,
},
{
Name: "backfill-fetch-workers",
Usage: "Number of parallel S3 download workers in the backfill fetcher. Each worker downloads and decodes one file at a time.",
OptType: types.Int,
ConfigKey: &cfg.BackfillFetchWorkers,
FlagDefault: 40,
Required: false,
},
{
Name: "archive-url",
Usage: "Archive URL for history archives",
Expand Down
4 changes: 2 additions & 2 deletions config/datastore-pubnet.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ region = "us-east-2"

[buffered_storage_backend_config]
# Buffer size for reading ledgers
buffer_size = 100
buffer_size = 200
# Number of concurrent workers for reading
num_workers = 10
num_workers = 15
# Number of retries for failed operations
retry_limit = 3
# Wait time between retries
Expand Down
19 changes: 18 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,24 @@ services:
db:
container_name: db
image: timescale/timescaledb:2.25.0-pg17
command: ["postgres", "-c", "timescaledb.enable_chunk_skipping=on", "-c", "timescaledb.enable_sparse_index_bloom=on"]
command: [
"postgres",
"-c", "timescaledb.enable_chunk_skipping=on",
"-c", "timescaledb.enable_sparse_index_bloom=on",
"-c", "shared_buffers=8GB",
"-c", "work_mem=256MB",
"-c", "maintenance_work_mem=2GB",
"-c", "wal_buffers=64MB",
"-c", "max_wal_size=32GB",
"-c", "wal_compression=lz4",
"-c", "checkpoint_completion_target=0.9",
"-c", "checkpoint_timeout=30min",
"-c", "max_connections=100",
"-c", "effective_io_concurrency=200",
"-c", "synchronous_commit=off",
"-c", "max_parallel_maintenance_workers=4",
"-c", "autovacuum=off"
]
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres -d wallet-backend"]
interval: 10s
Expand Down
168 changes: 168 additions & 0 deletions docs/plans/2026-04-02-backfill-observability-design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
# Backfill Pipeline Observability Design

**Date:** 2026-04-02
**Status:** Approved
**Goal:** Add per-stage timing, bottleneck identification, and throughput metrics to the 3-stage backfill pipeline (dispatcher -> process -> flush).

## Problem

The backfill pipeline in `internal/services/ingest_backfill.go` has minimal observability. Only `RetriesTotal` and `RetryExhaustionsTotal` with `"batch_flush"` labels exist. There is no visibility into:

- How much time each pipeline stage takes
- Whether workers are blocked waiting on channels (backpressure)
- What the throughput rate is
- Whether batches are full or partial

Without this, optimization is guesswork.

## Approach

**Approach B: Prometheus metrics + structured gap summary logs.** Uses the existing `IngestionMetrics` struct and custom registry. No new dependencies.

## New Prometheus Metrics (8 total, 14 series)

### 1. Per-Stage Durations — Reuse existing `PhaseDuration` HistogramVec

Add 3 new phase labels to the existing `wallet_ingestion_phase_duration_seconds`:

| Phase Label | Stage | Measures |
|---|---|---|
| `backfill_fetch` | Stage 1 | Time for a single successful ledger fetch (excludes retry overhead) |
| `backfill_process` | Stage 2 | Time to process one ledger into an IndexerBuffer |
| `backfill_flush` | Stage 3 | Time for one `flushBufferWithRetry` call |

### 2. Channel Wait Time Histogram (1 new metric, 4 series)

```
wallet_ingestion_backfill_channel_wait_seconds {channel, direction}
```

- **Type:** Histogram
- **Labels:** `channel` (`"ledger"`, `"flush"`) x `direction` (`"send"`, `"receive"`)
- **Buckets:** `[0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10, 30]`
- **Diagnostic:** High send wait = downstream bottleneck. High receive wait = upstream bottleneck.

### 3. Channel Utilization Gauges (1 new metric, 2 series)

```
wallet_ingestion_backfill_channel_utilization_ratio {channel}
```

- **Type:** GaugeVec
- **Labels:** `channel` (`"ledger"`, `"flush"`)
- **Sampled:** Every 1 second by a goroutine scoped to the gap's context
- **Range:** 0.0 (empty) to 1.0 (full)
- **Diagnostic:** Sustained 1.0 = downstream can't keep up. Sustained 0.0 = upstream is slow.

### 4. Backfill Throughput Counter (1 new metric, 1 series)

```
wallet_ingestion_backfill_ledgers_flushed_total
```

- **Type:** Counter
- **Incremented:** By batch size when flush worker completes
- **Query:** `rate(...[5m])` for ledgers/sec

### 5. Backfill Batch Size Histogram (1 new metric, 1 series)

```
wallet_ingestion_backfill_batch_size
```

- **Type:** Histogram
- **Buckets:** `LinearBuckets(10, 10, 10)` -> 10, 20, ..., 100
- **Diagnostic:** Most batches at 100 = healthy. Many partial batches = process workers starved.

### 6. Gap Progress Gauge (1 new metric, 1 series)

```
wallet_ingestion_backfill_gap_progress_ratio
```

- **Type:** Gauge
- **Updated:** When watermark advances, set to `(cursor - start) / (end - start)`
- **Diagnostic:** Steadily climbing = healthy. Plateau = stalled pipeline.

### 7. Gap Boundary Gauges (2 new metrics, 2 series)

```
wallet_ingestion_backfill_gap_start_ledger
wallet_ingestion_backfill_gap_end_ledger
```

- **Type:** Gauge
- **Set:** When gap processing begins; reset to 0 when done
- **Diagnostic:** Shows which ledger range is active for dashboard correlation.

### Cardinality Summary

| Metric | Series |
|---|---|
| PhaseDuration (3 new labels) | 3 |
| channel_wait_seconds | 4 |
| channel_utilization_ratio | 2 |
| backfill_ledgers_flushed_total | 1 |
| backfill_batch_size | 1 |
| backfill_gap_progress_ratio | 1 |
| gap_start/end_ledger | 2 |
| **Total** | **14** |

## Structured Logging

### Gap Summary Log

Emitted at gap completion by `processGap`. Each pipeline stage accumulates local stats (no contention), reported at shutdown:

```
Gap [1000-50000] complete in 2m34s:
fetch: 1m12s total (avg 1.4ms/ledger)
process: 48s total (avg 0.98ms/ledger)
flush: 34s total (avg 340ms/batch, 145 batches)
channel_wait: ledger_send=2.1s ledger_recv=45s flush_send=12s flush_recv=0.3s
throughput: 318 ledgers/sec
```

### Implementation Pattern

A `backfillStats` struct with per-worker accumulators:

- Each worker goroutine maintains a local `backfillWorkerStats` (no mutex needed)
- Workers report stats through a channel or at function return
- `processGap` aggregates all worker stats into the summary log

## Channel Utilization Sampler

A lightweight goroutine started per gap:

- Ticks every 1 second
- Reads `len(ledgerCh)/cap(ledgerCh)` and `len(flushCh)/cap(flushCh)`
- Sets the gauge values
- Exits when the gap context is cancelled

Since Go channels expose `len()` and `cap()` as non-blocking reads, this has negligible overhead.

## Bottleneck Identification Cheat Sheet

| Symptom | Bottleneck | Action |
|---|---|---|
| `ledger/send` wait high, `ledger/receive` wait low | Stage 2 (process) | Increase `backfillProcessWorkers` |
| `flush/send` wait high, `flush/receive` wait low | Stage 3 (flush/DB) | Increase `backfillFlushWorkers` or tune DB |
| `ledger/receive` high, `flush/receive` high | Stage 1 (fetch/RPC) | RPC is slow; nothing to tune internally |
| `ledger` utilization ~1.0 | Stage 2 can't keep up | More process workers or increase `backfillLedgerChanSize` |
| `flush` utilization ~1.0 | Stage 3 can't keep up | More flush workers or increase `backfillFlushChanSize` |
| Batch sizes mostly partial | Process workers starved | Stage 1 fetch is the bottleneck |

## Files Modified

| File | Changes |
|---|---|
| `internal/metrics/ingestion.go` | Add 8 new metric fields + registration |
| `internal/services/ingest_backfill.go` | Instrument all 3 stages, add channel sampler, add stats aggregation, add gap summary log |
| `internal/services/backfill_stats.go` | New file: `backfillStats` and `backfillWorkerStats` types |

## Non-Goals

- OpenTelemetry tracing
- Per-ledger log lines (too noisy)
- Alerting rules (separate concern, can be added to Grafana later)
Loading
Loading