From 36fe96c9382fe11a514e92be17b038c6dccfe187 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Tue, 31 Mar 2026 17:23:08 -0400 Subject: [PATCH 1/9] Split BatchCopy and add parallel COPY for ingest Refactor bulk COPY logic: separate account participant inserts into BatchCopyAccounts for operations and transactions, keep main BatchCopy for table rows only, and record metrics per COPY. Update tests and benchmarks to call new BatchCopyAccounts. In ingest service, perform parallel COPYs for transactions, operations, state_changes and their participant tables using errgroup and copyWithPoolConn (per-connection transactions), treating unique_violation (23505) as idempotent. Also add error helper isUniqueViolation and golang.org/x/sync dependency. --- go.mod | 2 +- internal/data/operations.go | 105 +++++++++++++++------------ internal/data/operations_test.go | 9 ++- internal/data/transactions.go | 105 +++++++++++++++------------ internal/data/transactions_test.go | 9 ++- internal/services/ingest.go | 112 +++++++++++++++++++---------- 6 files changed, 211 insertions(+), 131 deletions(-) diff --git a/go.mod b/go.mod index 26caf22cf..5550af9b3 100644 --- a/go.mod +++ b/go.mod @@ -35,6 +35,7 @@ require ( github.com/tetratelabs/wazero v1.10.1 github.com/vektah/gqlparser/v2 v2.5.30 github.com/vikstrous/dataloadgen v0.0.9 + golang.org/x/sync v0.16.0 golang.org/x/term v0.33.0 golang.org/x/text v0.27.0 ) @@ -182,7 +183,6 @@ require ( golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect golang.org/x/net v0.42.0 // indirect golang.org/x/oauth2 v0.30.0 // indirect - golang.org/x/sync v0.16.0 // indirect golang.org/x/sys v0.35.0 // indirect golang.org/x/time v0.8.0 // indirect google.golang.org/api v0.215.0 // indirect diff --git a/internal/data/operations.go b/internal/data/operations.go index 32f1b7093..bf7e1f37f 100644 --- a/internal/data/operations.go +++ b/internal/data/operations.go @@ -329,11 +329,16 @@ func (m *OperationModel) BatchGetByStateChangeIDs(ctx context.Context, scToIDs [ // IMPORTANT: BatchCopy will FAIL if any duplicate records exist. The PostgreSQL COPY // protocol does not support conflict handling. Callers must ensure no duplicates exist // before calling this method, or handle the unique constraint violation error appropriately. +// BatchCopy inserts operations using pgx's binary COPY protocol. +// Uses native pgtype types for optimal performance. +// +// IMPORTANT: BatchCopy will FAIL if any duplicate records exist. The PostgreSQL COPY +// protocol does not support conflict handling. Callers must ensure no duplicates exist +// before calling this method, or handle the unique constraint violation error appropriately. func (m *OperationModel) BatchCopy( ctx context.Context, pgxTx pgx.Tx, operations []*types.Operation, - stellarAddressesByOpID map[int64]types.ParticipantSet, ) (int, error) { if len(operations) == 0 { return 0, nil @@ -341,7 +346,6 @@ func (m *OperationModel) BatchCopy( start := time.Now() - // COPY operations using pgx binary format with native pgtype types copyCount, err := pgxTx.CopyFrom( ctx, pgx.Identifier{"operations"}, @@ -376,55 +380,68 @@ func (m *OperationModel) BatchCopy( return 0, fmt.Errorf("expected %d rows copied, got %d", len(operations), copyCount) } - // COPY operations_accounts using pgx binary format with native pgtype types. Upstream participants handling ensures that - // account address is not NULL here. - if len(stellarAddressesByOpID) > 0 { - // Build OpID -> LedgerCreatedAt lookup from operations - ledgerCreatedAtByOpID := make(map[int64]time.Time, len(operations)) - for _, op := range operations { - ledgerCreatedAtByOpID[op.ID] = op.LedgerCreatedAt - } + duration := time.Since(start).Seconds() + m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations").Observe(duration) + m.Metrics.BatchSize.WithLabelValues("BatchCopy", "operations").Observe(float64(len(operations))) + m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations").Inc() - var oaRows [][]any - for opID, addresses := range stellarAddressesByOpID { - ledgerCreatedAt := ledgerCreatedAtByOpID[opID] - ledgerCreatedAtPgtype := pgtype.Timestamptz{Time: ledgerCreatedAt, Valid: true} - opIDPgtype := pgtype.Int8{Int64: opID, Valid: true} - for _, addr := range addresses.ToSlice() { - addrBytes, addrErr := types.AddressBytea(addr).Value() - if addrErr != nil { - return 0, fmt.Errorf("converting address %s to bytes: %w", addr, addrErr) - } - oaRows = append(oaRows, []any{ - ledgerCreatedAtPgtype, - opIDPgtype, - addrBytes, - }) - } - } + return len(operations), nil +} - _, err = pgxTx.CopyFrom( - ctx, - pgx.Identifier{"operations_accounts"}, - []string{"ledger_created_at", "operation_id", "account_id"}, - pgx.CopyFromRows(oaRows), - ) - if err != nil { - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "operations").Observe(float64(len(operations))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations").Inc() - m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "operations_accounts", utils.GetDBErrorType(err)).Inc() - return 0, fmt.Errorf("pgx CopyFrom operations_accounts: %w", err) +// BatchCopyAccounts inserts operation participants into operations_accounts using pgx's binary COPY protocol. +func (m *OperationModel) BatchCopyAccounts( + ctx context.Context, + pgxTx pgx.Tx, + operations []*types.Operation, + stellarAddressesByOpID map[int64]types.ParticipantSet, +) error { + if len(stellarAddressesByOpID) == 0 { + return nil + } + + start := time.Now() + + // Build OpID -> LedgerCreatedAt lookup from operations + ledgerCreatedAtByOpID := make(map[int64]time.Time, len(operations)) + for _, op := range operations { + ledgerCreatedAtByOpID[op.ID] = op.LedgerCreatedAt + } + + var oaRows [][]any + for opID, addresses := range stellarAddressesByOpID { + ledgerCreatedAt := ledgerCreatedAtByOpID[opID] + ledgerCreatedAtPgtype := pgtype.Timestamptz{Time: ledgerCreatedAt, Valid: true} + opIDPgtype := pgtype.Int8{Int64: opID, Valid: true} + for _, addr := range addresses.ToSlice() { + addrBytes, addrErr := types.AddressBytea(addr).Value() + if addrErr != nil { + return fmt.Errorf("converting address %s to bytes: %w", addr, addrErr) + } + oaRows = append(oaRows, []any{ + ledgerCreatedAtPgtype, + opIDPgtype, + addrBytes, + }) } + } + _, err := pgxTx.CopyFrom( + ctx, + pgx.Identifier{"operations_accounts"}, + []string{"ledger_created_at", "operation_id", "account_id"}, + pgx.CopyFromRows(oaRows), + ) + if err != nil { + duration := time.Since(start).Seconds() + m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations_accounts").Observe(duration) m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations_accounts").Inc() + m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "operations_accounts", utils.GetDBErrorType(err)).Inc() + return fmt.Errorf("pgx CopyFrom operations_accounts: %w", err) } duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "operations").Observe(float64(len(operations))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations").Inc() + m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations_accounts").Observe(duration) + m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations_accounts").Inc() - return len(operations), nil + return nil } diff --git a/internal/data/operations_test.go b/internal/data/operations_test.go index b73a99418..1dc33eae7 100644 --- a/internal/data/operations_test.go +++ b/internal/data/operations_test.go @@ -147,7 +147,7 @@ func Test_OperationModel_BatchCopy(t *testing.T) { pgxTx, err := conn.Begin(ctx) require.NoError(t, err) - gotCount, err := m.BatchCopy(ctx, pgxTx, tc.operations, tc.stellarAddressesByOpID) + gotCount, err := m.BatchCopy(ctx, pgxTx, tc.operations) if tc.wantErrContains != "" { require.Error(t, err) @@ -157,6 +157,7 @@ func Test_OperationModel_BatchCopy(t *testing.T) { } require.NoError(t, err) + require.NoError(t, m.BatchCopyAccounts(ctx, pgxTx, tc.operations, tc.stellarAddressesByOpID)) require.NoError(t, pgxTx.Commit(ctx)) assert.Equal(t, tc.wantCount, gotCount) @@ -726,11 +727,15 @@ func BenchmarkOperationModel_BatchCopy(b *testing.B) { } b.StartTimer() - _, err = m.BatchCopy(ctx, pgxTx, ops, addressesByOpID) + _, err = m.BatchCopy(ctx, pgxTx, ops) if err != nil { pgxTx.Rollback(ctx) b.Fatalf("BatchCopy failed: %v", err) } + if err = m.BatchCopyAccounts(ctx, pgxTx, ops, addressesByOpID); err != nil { + pgxTx.Rollback(ctx) + b.Fatalf("BatchCopyAccounts failed: %v", err) + } b.StopTimer() if err := pgxTx.Commit(ctx); err != nil { diff --git a/internal/data/transactions.go b/internal/data/transactions.go index 2618019e7..71aeab7a0 100644 --- a/internal/data/transactions.go +++ b/internal/data/transactions.go @@ -240,11 +240,16 @@ func (m *TransactionModel) BatchGetByStateChangeIDs(ctx context.Context, scToIDs // IMPORTANT: BatchCopy will FAIL if any duplicate records exist. The PostgreSQL COPY // protocol does not support conflict handling. Callers must ensure no duplicates exist // before calling this method, or handle the unique constraint violation error appropriately. +// BatchCopy inserts transactions using pgx's binary COPY protocol. +// Uses native pgtype types for optimal performance (see https://github.com/jackc/pgx/issues/763). +// +// IMPORTANT: BatchCopy will FAIL if any duplicate records exist. The PostgreSQL COPY +// protocol does not support conflict handling. Callers must ensure no duplicates exist +// before calling this method, or handle the unique constraint violation error appropriately. func (m *TransactionModel) BatchCopy( ctx context.Context, pgxTx pgx.Tx, txs []*types.Transaction, - stellarAddressesByToID map[int64]types.ParticipantSet, ) (int, error) { if len(txs) == 0 { return 0, nil @@ -252,8 +257,6 @@ func (m *TransactionModel) BatchCopy( start := time.Now() - // COPY transactions using pgx binary format with native pgtype types. Upstream participants handling ensures that - // account address is not NULL here. copyCount, err := pgxTx.CopyFrom( ctx, pgx.Identifier{"transactions"}, @@ -292,54 +295,68 @@ func (m *TransactionModel) BatchCopy( return 0, fmt.Errorf("expected %d rows copied, got %d", len(txs), copyCount) } - // COPY transactions_accounts using pgx binary format with native pgtype types - if len(stellarAddressesByToID) > 0 { - // Build ToID -> LedgerCreatedAt lookup from transactions - ledgerCreatedAtByToID := make(map[int64]time.Time, len(txs)) - for _, tx := range txs { - ledgerCreatedAtByToID[tx.ToID] = tx.LedgerCreatedAt - } + duration := time.Since(start).Seconds() + m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "transactions").Observe(duration) + m.Metrics.BatchSize.WithLabelValues("BatchCopy", "transactions").Observe(float64(len(txs))) + m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "transactions").Inc() - var taRows [][]any - for toID, addresses := range stellarAddressesByToID { - ledgerCreatedAt := ledgerCreatedAtByToID[toID] - ledgerCreatedAtPgtype := pgtype.Timestamptz{Time: ledgerCreatedAt, Valid: true} - toIDPgtype := pgtype.Int8{Int64: toID, Valid: true} - for _, addr := range addresses.ToSlice() { - addrBytes, addrErr := types.AddressBytea(addr).Value() - if addrErr != nil { - return 0, fmt.Errorf("converting address %s to bytes: %w", addr, addrErr) - } - taRows = append(taRows, []any{ - ledgerCreatedAtPgtype, - toIDPgtype, - addrBytes, - }) - } - } + return len(txs), nil +} - _, err = pgxTx.CopyFrom( - ctx, - pgx.Identifier{"transactions_accounts"}, - []string{"ledger_created_at", "tx_to_id", "account_id"}, - pgx.CopyFromRows(taRows), - ) - if err != nil { - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "transactions").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "transactions").Observe(float64(len(txs))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "transactions").Inc() - m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "transactions_accounts", utils.GetDBErrorType(err)).Inc() - return 0, fmt.Errorf("pgx CopyFrom transactions_accounts: %w", err) +// BatchCopyAccounts inserts transaction participants into transactions_accounts using pgx's binary COPY protocol. +func (m *TransactionModel) BatchCopyAccounts( + ctx context.Context, + pgxTx pgx.Tx, + txs []*types.Transaction, + stellarAddressesByToID map[int64]types.ParticipantSet, +) error { + if len(stellarAddressesByToID) == 0 { + return nil + } + + start := time.Now() + + // Build ToID -> LedgerCreatedAt lookup from transactions + ledgerCreatedAtByToID := make(map[int64]time.Time, len(txs)) + for _, tx := range txs { + ledgerCreatedAtByToID[tx.ToID] = tx.LedgerCreatedAt + } + + var taRows [][]any + for toID, addresses := range stellarAddressesByToID { + ledgerCreatedAt := ledgerCreatedAtByToID[toID] + ledgerCreatedAtPgtype := pgtype.Timestamptz{Time: ledgerCreatedAt, Valid: true} + toIDPgtype := pgtype.Int8{Int64: toID, Valid: true} + for _, addr := range addresses.ToSlice() { + addrBytes, addrErr := types.AddressBytea(addr).Value() + if addrErr != nil { + return fmt.Errorf("converting address %s to bytes: %w", addr, addrErr) + } + taRows = append(taRows, []any{ + ledgerCreatedAtPgtype, + toIDPgtype, + addrBytes, + }) } + } + _, err := pgxTx.CopyFrom( + ctx, + pgx.Identifier{"transactions_accounts"}, + []string{"ledger_created_at", "tx_to_id", "account_id"}, + pgx.CopyFromRows(taRows), + ) + if err != nil { + duration := time.Since(start).Seconds() + m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "transactions_accounts").Observe(duration) m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "transactions_accounts").Inc() + m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "transactions_accounts", utils.GetDBErrorType(err)).Inc() + return fmt.Errorf("pgx CopyFrom transactions_accounts: %w", err) } duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "transactions").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "transactions").Observe(float64(len(txs))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "transactions").Inc() + m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "transactions_accounts").Observe(duration) + m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "transactions_accounts").Inc() - return len(txs), nil + return nil } diff --git a/internal/data/transactions_test.go b/internal/data/transactions_test.go index dc0f05a0a..a0816dc28 100644 --- a/internal/data/transactions_test.go +++ b/internal/data/transactions_test.go @@ -145,7 +145,7 @@ func Test_TransactionModel_BatchCopy(t *testing.T) { pgxTx, err := conn.Begin(ctx) require.NoError(t, err) - gotCount, err := m.BatchCopy(ctx, pgxTx, tc.txs, tc.stellarAddressesByToID) + gotCount, err := m.BatchCopy(ctx, pgxTx, tc.txs) if tc.wantErrContains != "" { require.Error(t, err) @@ -155,6 +155,7 @@ func Test_TransactionModel_BatchCopy(t *testing.T) { } require.NoError(t, err) + require.NoError(t, m.BatchCopyAccounts(ctx, pgxTx, tc.txs, tc.stellarAddressesByToID)) require.NoError(t, pgxTx.Commit(ctx)) assert.Equal(t, tc.wantCount, gotCount) @@ -493,11 +494,15 @@ func BenchmarkTransactionModel_BatchCopy(b *testing.B) { } b.StartTimer() - _, err = m.BatchCopy(ctx, pgxTx, txs, addressesByToID) + _, err = m.BatchCopy(ctx, pgxTx, txs) if err != nil { pgxTx.Rollback(ctx) b.Fatalf("BatchCopy failed: %v", err) } + if err = m.BatchCopyAccounts(ctx, pgxTx, txs, addressesByToID); err != nil { + pgxTx.Rollback(ctx) + b.Fatalf("BatchCopyAccounts failed: %v", err) + } b.StopTimer() if err := pgxTx.Commit(ctx); err != nil { diff --git a/internal/services/ingest.go b/internal/services/ingest.go index 039c18f2c..6e3600b00 100644 --- a/internal/services/ingest.go +++ b/internal/services/ingest.go @@ -2,6 +2,7 @@ package services import ( "context" + "errors" "fmt" "hash/fnv" "runtime" @@ -11,10 +12,12 @@ import ( "github.com/alitto/pond/v2" set "github.com/deckarep/golang-set/v2" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" "github.com/stellar/go-stellar-sdk/historyarchive" "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" "github.com/stellar/go-stellar-sdk/support/log" "github.com/stellar/go-stellar-sdk/xdr" + "golang.org/x/sync/errgroup" "github.com/stellar/wallet-backend/internal/apptracker" "github.com/stellar/wallet-backend/internal/data" @@ -224,65 +227,98 @@ func (m *ingestService) processLedger(ctx context.Context, ledgerMeta xdr.Ledger return nil } -// insertIntoDB persists the processed data from the buffer to the database. -// txs is passed in to avoid a redundant GetTransactions() allocation — the caller -// already has the slice for unlockChannelAccounts. -func (m *ingestService) insertIntoDB(ctx context.Context, dbTx pgx.Tx, txs []*types.Transaction, buffer indexer.IndexerBufferInterface) (int, int, error) { +// insertIntoDB persists transactions, operations, state changes, and their participant +// tables to the database using parallel COPY operations. Each of the 5 tables is inserted +// concurrently on its own pool connection via errgroup. +// On UniqueViolation (from a prior partial insert), the error is treated as success +// since the data is already present from a previous attempt. +func (m *ingestService) insertIntoDB(ctx context.Context, _ pgx.Tx, txs []*types.Transaction, buffer indexer.IndexerBufferInterface) (int, int, error) { txParticipants := buffer.GetTransactionsParticipants() ops := buffer.GetOperations() opParticipants := buffer.GetOperationsParticipants() stateChanges := buffer.GetStateChanges() - if err := m.insertTransactions(ctx, dbTx, txs, txParticipants); err != nil { - return 0, 0, err - } - if err := m.insertOperations(ctx, dbTx, ops, opParticipants); err != nil { - return 0, 0, err - } - if err := m.insertStateChanges(ctx, dbTx, stateChanges); err != nil { - return 0, 0, err + g, gCtx := errgroup.WithContext(ctx) + + g.Go(func() error { + return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { + if _, err := m.models.Transactions.BatchCopy(ctx, tx, txs); err != nil { + return fmt.Errorf("copying transactions: %w", err) + } + return nil + }) + }) + g.Go(func() error { + return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { + return m.models.Transactions.BatchCopyAccounts(ctx, tx, txs, txParticipants) + }) + }) + g.Go(func() error { + return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { + if _, err := m.models.Operations.BatchCopy(ctx, tx, ops); err != nil { + return fmt.Errorf("copying operations: %w", err) + } + return nil + }) + }) + g.Go(func() error { + return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { + return m.models.Operations.BatchCopyAccounts(ctx, tx, ops, opParticipants) + }) + }) + g.Go(func() error { + return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { + if _, err := m.models.StateChanges.BatchCopy(ctx, tx, stateChanges); err != nil { + return fmt.Errorf("copying state changes: %w", err) + } + return nil + }) + }) + + if err := g.Wait(); err != nil { + return 0, 0, fmt.Errorf("parallel copy: %w", err) } + + m.recordStateChangeMetrics(stateChanges) log.Ctx(ctx).Infof("✅ inserted %d txs, %d ops, %d state_changes", len(txs), len(ops), len(stateChanges)) return len(txs), len(ops), nil } -// insertTransactions batch inserts transactions with their participants into the database. -func (m *ingestService) insertTransactions(ctx context.Context, pgxTx pgx.Tx, txs []*types.Transaction, stellarAddressesByToID map[int64]types.ParticipantSet) error { - if len(txs) == 0 { - return nil - } - _, err := m.models.Transactions.BatchCopy(ctx, pgxTx, txs, stellarAddressesByToID) +// copyWithPoolConn acquires a connection from the pool, begins a transaction, runs fn, +// and commits. On UniqueViolation the insert is idempotent (prior partial insert) so we +// return nil. +func (m *ingestService) copyWithPoolConn(ctx context.Context, fn func(context.Context, pgx.Tx) error) error { + conn, err := m.models.DB.Acquire(ctx) if err != nil { - return fmt.Errorf("batch inserting transactions: %w", err) + return fmt.Errorf("acquiring pool connection: %w", err) } - return nil -} + defer conn.Release() -// insertOperations batch inserts operations with their participants into the database. -func (m *ingestService) insertOperations(ctx context.Context, pgxTx pgx.Tx, ops []*types.Operation, stellarAddressesByOpID map[int64]types.ParticipantSet) error { - if len(ops) == 0 { - return nil - } - _, err := m.models.Operations.BatchCopy(ctx, pgxTx, ops, stellarAddressesByOpID) + tx, err := conn.Begin(ctx) if err != nil { - return fmt.Errorf("batch inserting operations: %w", err) + return fmt.Errorf("beginning transaction: %w", err) } - return nil -} + defer tx.Rollback(ctx) //nolint:errcheck -// insertStateChanges batch inserts state changes and records metrics. -func (m *ingestService) insertStateChanges(ctx context.Context, pgxTx pgx.Tx, stateChanges []types.StateChange) error { - if len(stateChanges) == 0 { - return nil + if err := fn(ctx, tx); err != nil { + if isUniqueViolation(err) { + return nil + } + return err } - _, err := m.models.StateChanges.BatchCopy(ctx, pgxTx, stateChanges) - if err != nil { - return fmt.Errorf("batch inserting state changes: %w", err) + + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("committing copy transaction: %w", err) } - m.recordStateChangeMetrics(stateChanges) return nil } +// isUniqueViolation checks if an error is a PostgreSQL unique_violation (23505). +func isUniqueViolation(err error) bool { + var pgErr *pgconn.PgError + return errors.As(err, &pgErr) && pgErr.Code == "23505" +} + // recordStateChangeMetrics aggregates state changes by reason and category, then records metrics. func (m *ingestService) recordStateChangeMetrics(stateChanges []types.StateChange) { counts := make(map[string]int) // key: "reason|category" From 458465d6135eaae55cee52ad54e09b2d4aee92ca Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Tue, 31 Mar 2026 18:00:16 -0400 Subject: [PATCH 2/9] Parallelize DB inserts and token upserts Refactor ingestion to run parallel COPYs and balance upserts and to split persistence into clear phases. insertIntoDB was renamed to insertAndUpsertParallel and now launches 9 goroutines via errgroup (5 COPYs + 4 token upserts), each on its own pool connection; UniqueViolation is treated as success for idempotent retries. PersistLedgerData was reworked into three phases: persistFKPrerequisites (commit trustline assets and contract tokens so FKs are visible), parallel insert/upsert, and persistFinalize (unlock channel accounts and advance the cursor as the idempotency marker). The TokenIngestionService interface exposes individual processors (trustline, contract, native, SAC) and the concrete service and mocks were updated accordingly. Error messages and transaction boundaries were adjusted to improve crash-recovery semantics and clarity. --- internal/services/ingest.go | 36 ++++++++-- internal/services/ingest_backfill.go | 2 +- internal/services/ingest_live.go | 102 +++++++++++++++------------ internal/services/mocks.go | 20 ++++++ internal/services/token_ingestion.go | 30 ++++---- 5 files changed, 126 insertions(+), 64 deletions(-) diff --git a/internal/services/ingest.go b/internal/services/ingest.go index 6e3600b00..6d1787def 100644 --- a/internal/services/ingest.go +++ b/internal/services/ingest.go @@ -227,12 +227,11 @@ func (m *ingestService) processLedger(ctx context.Context, ledgerMeta xdr.Ledger return nil } -// insertIntoDB persists transactions, operations, state changes, and their participant -// tables to the database using parallel COPY operations. Each of the 5 tables is inserted -// concurrently on its own pool connection via errgroup. -// On UniqueViolation (from a prior partial insert), the error is treated as success -// since the data is already present from a previous attempt. -func (m *ingestService) insertIntoDB(ctx context.Context, _ pgx.Tx, txs []*types.Transaction, buffer indexer.IndexerBufferInterface) (int, int, error) { +// insertAndUpsertParallel runs 9 goroutines via errgroup: 5 COPY operations (transactions, +// transactions_accounts, operations, operations_accounts, state_changes) and 4 balance +// upserts (trustline, native, SAC, account-contract tokens). Each goroutine acquires its +// own pool connection. UniqueViolation errors are treated as success for idempotent retry. +func (m *ingestService) insertAndUpsertParallel(ctx context.Context, txs []*types.Transaction, buffer indexer.IndexerBufferInterface) (int, int, error) { txParticipants := buffer.GetTransactionsParticipants() ops := buffer.GetOperations() opParticipants := buffer.GetOperationsParticipants() @@ -240,6 +239,7 @@ func (m *ingestService) insertIntoDB(ctx context.Context, _ pgx.Tx, txs []*types g, gCtx := errgroup.WithContext(ctx) + // 5 COPY goroutines g.Go(func() error { return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { if _, err := m.models.Transactions.BatchCopy(ctx, tx, txs); err != nil { @@ -275,8 +275,30 @@ func (m *ingestService) insertIntoDB(ctx context.Context, _ pgx.Tx, txs []*types }) }) + // 4 upsert goroutines + g.Go(func() error { + return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { + return m.tokenIngestionService.ProcessTrustlineChanges(ctx, tx, buffer.GetTrustlineChanges()) + }) + }) + g.Go(func() error { + return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { + return m.tokenIngestionService.ProcessNativeBalanceChanges(ctx, tx, buffer.GetAccountChanges()) + }) + }) + g.Go(func() error { + return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { + return m.tokenIngestionService.ProcessSACBalanceChanges(ctx, tx, buffer.GetSACBalanceChanges()) + }) + }) + g.Go(func() error { + return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { + return m.tokenIngestionService.ProcessContractTokenChanges(ctx, tx, buffer.GetContractChanges()) + }) + }) + if err := g.Wait(); err != nil { - return 0, 0, fmt.Errorf("parallel copy: %w", err) + return 0, 0, fmt.Errorf("parallel insert/upsert: %w", err) } m.recordStateChangeMetrics(stateChanges) diff --git a/internal/services/ingest_backfill.go b/internal/services/ingest_backfill.go index de6fbb799..ad019b9fe 100644 --- a/internal/services/ingest_backfill.go +++ b/internal/services/ingest_backfill.go @@ -270,7 +270,7 @@ func (m *ingestService) flushBatchBufferWithRetry(ctx context.Context, buffer *i return fmt.Errorf("setting synchronous_commit=off: %w", txErr) } txs := buffer.GetTransactions() - if _, _, err := m.insertIntoDB(ctx, dbTx, txs, buffer); err != nil { + if _, _, err := m.insertAndUpsertParallel(ctx, txs, buffer); err != nil { return fmt.Errorf("inserting processed data into db: %w", err) } // Unlock channel accounts using all transactions (not filtered) diff --git a/internal/services/ingest_live.go b/internal/services/ingest_live.go index 8962c36fd..d7ed45c7a 100644 --- a/internal/services/ingest_live.go +++ b/internal/services/ingest_live.go @@ -23,68 +23,82 @@ const ( oldestLedgerSyncInterval = 100 ) -// PersistLedgerData persists processed ledger data to the database in a single atomic transaction. -// This is the shared core used by both live ingestion and loadtest. -// It handles: trustline assets, contract tokens, filtered data insertion, channel account unlocking, -// token changes, and cursor update. Channel unlock is a no-op when chAccStore is nil. +// PersistLedgerData persists processed ledger data to the database in three phases: +// +// Phase 1: Insert FK prerequisites (trustline assets, contract tokens) and commit, +// making parent rows visible for phase 2's balance upserts. +// +// Phase 2: Parallel COPYs (5 tables) + parallel upserts (4 balance tables) via errgroup, +// each on its own pool connection. UniqueViolation errors are treated as success for +// idempotent crash recovery. +// +// Phase 3: Finalize — unlock channel accounts + advance cursor. The cursor update is the +// idempotency marker; if we crash before it, everything replays safely. func (m *ingestService) PersistLedgerData(ctx context.Context, ledgerSeq uint32, buffer *indexer.IndexerBuffer, cursorName string) (int, int, error) { - var numTxs, numOps int + // Phase 1: FK prerequisites — commit so parent rows are visible to phase 2. + if err := m.persistFKPrerequisites(ctx, ledgerSeq, buffer); err != nil { + return 0, 0, fmt.Errorf("persisting FK prerequisites for ledger %d: %w", ledgerSeq, err) + } + + // Phase 2: Parallel COPYs + upserts on separate pool connections. + txs := buffer.GetTransactions() + numTxs, numOps, err := m.insertAndUpsertParallel(ctx, txs, buffer) + if err != nil { + return 0, 0, fmt.Errorf("parallel insert/upsert for ledger %d: %w", ledgerSeq, err) + } + + // Phase 3: Finalize — unlock channel accounts + advance cursor. + if err := m.persistFinalize(ctx, ledgerSeq, txs, cursorName); err != nil { + return 0, 0, fmt.Errorf("finalizing ledger %d: %w", ledgerSeq, err) + } - err := db.RunInTransaction(ctx, m.models.DB, func(dbTx pgx.Tx) error { - // 1. Insert unique trustline assets (FK prerequisite for trustline balances) + return numTxs, numOps, nil +} + +// persistFKPrerequisites inserts trustline assets and contract tokens in a committed +// transaction so that FK parent rows are visible to parallel balance upserts. +func (m *ingestService) persistFKPrerequisites(ctx context.Context, ledgerSeq uint32, buffer *indexer.IndexerBuffer) error { + if err := db.RunInTransaction(ctx, m.models.DB, func(dbTx pgx.Tx) error { uniqueAssets := buffer.GetUniqueTrustlineAssets() if len(uniqueAssets) > 0 { - if txErr := m.models.TrustlineAsset.BatchInsert(ctx, dbTx, uniqueAssets); txErr != nil { - return fmt.Errorf("inserting trustline assets for ledger %d: %w", ledgerSeq, txErr) + if err := m.models.TrustlineAsset.BatchInsert(ctx, dbTx, uniqueAssets); err != nil { + return fmt.Errorf("inserting trustline assets for ledger %d: %w", ledgerSeq, err) } } - // 2. Insert new contract tokens (filter existing, fetch metadata for SEP-41 if available, insert) - contracts, txErr := m.prepareNewContractTokens(ctx, dbTx, buffer.GetUniqueSEP41ContractTokensByID(), buffer.GetSACContracts()) - if txErr != nil { - return fmt.Errorf("preparing contract tokens for ledger %d: %w", ledgerSeq, txErr) + contracts, err := m.prepareNewContractTokens(ctx, dbTx, buffer.GetUniqueSEP41ContractTokensByID(), buffer.GetSACContracts()) + if err != nil { + return fmt.Errorf("preparing contract tokens for ledger %d: %w", ledgerSeq, err) } if len(contracts) > 0 { - if txErr = m.models.Contract.BatchInsert(ctx, dbTx, contracts); txErr != nil { - return fmt.Errorf("inserting contracts for ledger %d: %w", ledgerSeq, txErr) + if err = m.models.Contract.BatchInsert(ctx, dbTx, contracts); err != nil { + return fmt.Errorf("inserting contracts for ledger %d: %w", ledgerSeq, err) } log.Ctx(ctx).Infof("✅ inserted %d contract tokens", len(contracts)) } - // 3. Insert transactions/operations/state_changes - txs := buffer.GetTransactions() - numTxs, numOps, txErr = m.insertIntoDB(ctx, dbTx, txs, buffer) - if txErr != nil { - return fmt.Errorf("inserting processed data into db for ledger %d: %w", ledgerSeq, txErr) - } - - // 4. Unlock channel accounts (no-op when chAccStore is nil, e.g., in loadtest) - if txErr = m.unlockChannelAccounts(ctx, dbTx, txs); txErr != nil { - return fmt.Errorf("unlocking channel accounts for ledger %d: %w", ledgerSeq, txErr) - } + return nil + }); err != nil { + return fmt.Errorf("FK prerequisites transaction: %w", err) + } + return nil +} - // 5. Process token changes (trustline add/remove/update, contract token add, native balance, SAC balance) - if txErr = m.tokenIngestionService.ProcessTokenChanges(ctx, dbTx, - buffer.GetTrustlineChanges(), - buffer.GetContractChanges(), - buffer.GetAccountChanges(), - buffer.GetSACBalanceChanges(), - ); txErr != nil { - return fmt.Errorf("processing token changes for ledger %d: %w", ledgerSeq, txErr) +// persistFinalize unlocks channel accounts and advances the cursor in a committed transaction. +// The cursor update is the idempotency marker for crash recovery. +func (m *ingestService) persistFinalize(ctx context.Context, ledgerSeq uint32, txs []*types.Transaction, cursorName string) error { + if err := db.RunInTransaction(ctx, m.models.DB, func(dbTx pgx.Tx) error { + if err := m.unlockChannelAccounts(ctx, dbTx, txs); err != nil { + return fmt.Errorf("unlocking channel accounts for ledger %d: %w", ledgerSeq, err) } - - // 6. Update the specified cursor - if txErr = m.models.IngestStore.Update(ctx, dbTx, cursorName, ledgerSeq); txErr != nil { - return fmt.Errorf("updating cursor for ledger %d: %w", ledgerSeq, txErr) + if err := m.models.IngestStore.Update(ctx, dbTx, cursorName, ledgerSeq); err != nil { + return fmt.Errorf("updating cursor for ledger %d: %w", ledgerSeq, err) } - return nil - }) - if err != nil { - return 0, 0, fmt.Errorf("persisting ledger data for ledger %d: %w", ledgerSeq, err) + }); err != nil { + return fmt.Errorf("finalize transaction: %w", err) } - - return numTxs, numOps, nil + return nil } // startLiveIngestion begins continuous ingestion from the last checkpoint ledger, diff --git a/internal/services/mocks.go b/internal/services/mocks.go index 7be51bdf9..fea981465 100644 --- a/internal/services/mocks.go +++ b/internal/services/mocks.go @@ -135,6 +135,26 @@ func (m *TokenIngestionServiceMock) ProcessTokenChanges(ctx context.Context, dbT return args.Error(0) } +func (m *TokenIngestionServiceMock) ProcessTrustlineChanges(ctx context.Context, dbTx pgx.Tx, changesByKey map[indexer.TrustlineChangeKey]types.TrustlineChange) error { + args := m.Called(ctx, dbTx, changesByKey) + return args.Error(0) +} + +func (m *TokenIngestionServiceMock) ProcessContractTokenChanges(ctx context.Context, dbTx pgx.Tx, changes []types.ContractChange) error { + args := m.Called(ctx, dbTx, changes) + return args.Error(0) +} + +func (m *TokenIngestionServiceMock) ProcessNativeBalanceChanges(ctx context.Context, dbTx pgx.Tx, changesByAccountID map[string]types.AccountChange) error { + args := m.Called(ctx, dbTx, changesByAccountID) + return args.Error(0) +} + +func (m *TokenIngestionServiceMock) ProcessSACBalanceChanges(ctx context.Context, dbTx pgx.Tx, changesByKey map[indexer.SACBalanceChangeKey]types.SACBalanceChange) error { + args := m.Called(ctx, dbTx, changesByKey) + return args.Error(0) +} + // NewTokenIngestionServiceMock creates a new instance of TokenIngestionServiceMock. func NewTokenIngestionServiceMock(t interface { mock.TestingT diff --git a/internal/services/token_ingestion.go b/internal/services/token_ingestion.go index 87c795d27..952007ef1 100644 --- a/internal/services/token_ingestion.go +++ b/internal/services/token_ingestion.go @@ -161,6 +161,12 @@ type TokenIngestionService interface { // // Both trustline and contract IDs are computed using deterministic hash functions (DeterministicAssetID, DeterministicContractID). ProcessTokenChanges(ctx context.Context, dbTx pgx.Tx, trustlineChangesByTrustlineKey map[indexer.TrustlineChangeKey]types.TrustlineChange, contractChanges []types.ContractChange, accountChangesByAccountID map[string]types.AccountChange, sacBalanceChangesByKey map[indexer.SACBalanceChangeKey]types.SACBalanceChange) error + + // Individual token change processors — used by parallel ingestion. + ProcessTrustlineChanges(ctx context.Context, dbTx pgx.Tx, changesByKey map[indexer.TrustlineChangeKey]types.TrustlineChange) error + ProcessContractTokenChanges(ctx context.Context, dbTx pgx.Tx, changes []types.ContractChange) error + ProcessNativeBalanceChanges(ctx context.Context, dbTx pgx.Tx, changesByAccountID map[string]types.AccountChange) error + ProcessSACBalanceChanges(ctx context.Context, dbTx pgx.Tx, changesByKey map[indexer.SACBalanceChangeKey]types.SACBalanceChange) error } // Verify interface compliance at compile time @@ -353,23 +359,23 @@ func (s *tokenIngestionService) ProcessTokenChanges(ctx context.Context, dbTx pg return nil } - if err := s.processTrustlineChanges(ctx, dbTx, trustlineChangesByTrustlineKey); err != nil { + if err := s.ProcessTrustlineChanges(ctx, dbTx, trustlineChangesByTrustlineKey); err != nil { return err } - if err := s.processContractTokenChanges(ctx, dbTx, contractChanges); err != nil { + if err := s.ProcessContractTokenChanges(ctx, dbTx, contractChanges); err != nil { return err } - if err := s.processNativeBalanceChanges(ctx, dbTx, accountChangesByAccountID); err != nil { + if err := s.ProcessNativeBalanceChanges(ctx, dbTx, accountChangesByAccountID); err != nil { return err } - if err := s.processSACBalanceChanges(ctx, dbTx, sacBalanceChangesByKey); err != nil { + if err := s.ProcessSACBalanceChanges(ctx, dbTx, sacBalanceChangesByKey); err != nil { return err } return nil } -// processTrustlineChanges handles trustline balance upserts and deletes. -func (s *tokenIngestionService) processTrustlineChanges(ctx context.Context, dbTx pgx.Tx, changesByKey map[indexer.TrustlineChangeKey]types.TrustlineChange) error { +// ProcessTrustlineChanges handles trustline balance upserts and deletes. +func (s *tokenIngestionService) ProcessTrustlineChanges(ctx context.Context, dbTx pgx.Tx, changesByKey map[indexer.TrustlineChangeKey]types.TrustlineChange) error { if len(changesByKey) == 0 { return nil } @@ -403,8 +409,8 @@ func (s *tokenIngestionService) processTrustlineChanges(ctx context.Context, dbT return nil } -// processContractTokenChanges handles SEP-41 contract token inserts. -func (s *tokenIngestionService) processContractTokenChanges(ctx context.Context, dbTx pgx.Tx, changes []types.ContractChange) error { +// ProcessContractTokenChanges handles SEP-41 contract token inserts. +func (s *tokenIngestionService) ProcessContractTokenChanges(ctx context.Context, dbTx pgx.Tx, changes []types.ContractChange) error { if len(changes) == 0 { return nil } @@ -431,8 +437,8 @@ func (s *tokenIngestionService) processContractTokenChanges(ctx context.Context, return nil } -// processNativeBalanceChanges handles native XLM balance upserts and deletes. -func (s *tokenIngestionService) processNativeBalanceChanges(ctx context.Context, dbTx pgx.Tx, changesByAccountID map[string]types.AccountChange) error { +// ProcessNativeBalanceChanges handles native XLM balance upserts and deletes. +func (s *tokenIngestionService) ProcessNativeBalanceChanges(ctx context.Context, dbTx pgx.Tx, changesByAccountID map[string]types.AccountChange) error { if len(changesByAccountID) == 0 { return nil } @@ -463,8 +469,8 @@ func (s *tokenIngestionService) processNativeBalanceChanges(ctx context.Context, return nil } -// processSACBalanceChanges handles SAC balance upserts and deletes for contract addresses. -func (s *tokenIngestionService) processSACBalanceChanges(ctx context.Context, dbTx pgx.Tx, changesByKey map[indexer.SACBalanceChangeKey]types.SACBalanceChange) error { +// ProcessSACBalanceChanges handles SAC balance upserts and deletes for contract addresses. +func (s *tokenIngestionService) ProcessSACBalanceChanges(ctx context.Context, dbTx pgx.Tx, changesByKey map[indexer.SACBalanceChangeKey]types.SACBalanceChange) error { if len(changesByKey) == 0 { return nil } From 5bfc085c0e1ec74b3e462051546ce41885d9ac27 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Wed, 1 Apr 2026 11:26:46 -0400 Subject: [PATCH 3/9] set synchronous_commit = off --- internal/services/ingest.go | 6 ++++++ internal/services/ingest_test.go | 7 +++++++ 2 files changed, 13 insertions(+) diff --git a/internal/services/ingest.go b/internal/services/ingest.go index 6d1787def..6a411a762 100644 --- a/internal/services/ingest.go +++ b/internal/services/ingest.go @@ -322,6 +322,12 @@ func (m *ingestService) copyWithPoolConn(ctx context.Context, fn func(context.Co } defer tx.Rollback(ctx) //nolint:errcheck + // Ingestion is idempotent (replayed from cursor on crash), so WAL fsync durability + // is unnecessary. This eliminates fsync latency from each of the 9 parallel commits. + if _, err := tx.Exec(ctx, "SET LOCAL synchronous_commit = off"); err != nil { + return fmt.Errorf("setting synchronous_commit=off: %w", err) + } + if err := fn(ctx, tx); err != nil { if isUniqueViolation(err) { return nil diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index 8c52830c5..900794890 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -1088,6 +1088,12 @@ func Test_ingestService_flushBatchBufferWithRetry(t *testing.T) { mockChAccStore.On("UnassignTxAndUnlockChannelAccounts", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(0), nil).Maybe() mockChAccStore.On("UnassignTxAndUnlockChannelAccounts", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(0), nil).Maybe() + mockTokenSvc := NewTokenIngestionServiceMock(t) + mockTokenSvc.On("ProcessTrustlineChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + mockTokenSvc.On("ProcessNativeBalanceChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + mockTokenSvc.On("ProcessSACBalanceChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + mockTokenSvc.On("ProcessContractTokenChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + svc, err := NewIngestService(IngestServiceConfig{ IngestionMode: IngestionModeBackfill, Models: models, @@ -1097,6 +1103,7 @@ func Test_ingestService_flushBatchBufferWithRetry(t *testing.T) { RPCService: mockRPCService, LedgerBackend: &LedgerBackendMock{}, ChannelAccountStore: mockChAccStore, + TokenIngestionService: mockTokenSvc, Metrics: m, GetLedgersLimit: defaultGetLedgersLimit, Network: network.TestNetworkPassphrase, From e5c84dff12a5b32b5edac4b21c86bea1ba093b19 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Wed, 1 Apr 2026 11:52:33 -0400 Subject: [PATCH 4/9] Skip balance upserts during backfill Make ingestion backfill mode skip balance upserts and remove channel unlock logic. insertAndUpsertParallel now conditionally omits the token/balance upsert goroutines when IngestionModeBackfill (balance tables represent current state and should not be updated during backfill). Updated function comment to reflect the behavior. Removed the unlockChannelAccounts call from flushBatchBufferWithRetry in backfill flow. Adjusted tests to remove mocks and expectations for ChannelAccountStore and TokenIngestionService that were only needed for live-mode balance upserts/unlocks. --- internal/services/ingest.go | 46 +++++++++++++++------------- internal/services/ingest_backfill.go | 4 --- internal/services/ingest_test.go | 15 --------- 3 files changed, 25 insertions(+), 40 deletions(-) diff --git a/internal/services/ingest.go b/internal/services/ingest.go index 6a411a762..5a81ebe36 100644 --- a/internal/services/ingest.go +++ b/internal/services/ingest.go @@ -227,10 +227,12 @@ func (m *ingestService) processLedger(ctx context.Context, ledgerMeta xdr.Ledger return nil } -// insertAndUpsertParallel runs 9 goroutines via errgroup: 5 COPY operations (transactions, -// transactions_accounts, operations, operations_accounts, state_changes) and 4 balance -// upserts (trustline, native, SAC, account-contract tokens). Each goroutine acquires its -// own pool connection. UniqueViolation errors are treated as success for idempotent retry. +// insertAndUpsertParallel runs parallel goroutines via errgroup: 5 COPY operations (transactions, +// transactions_accounts, operations, operations_accounts, state_changes) plus, in live mode only, +// 4 balance upserts (trustline, native, SAC, account-contract tokens). Balance upserts are +// skipped in backfill mode since they represent current state, not historical data. +// Each goroutine acquires its own pool connection. UniqueViolation errors are treated as +// success for idempotent retry. func (m *ingestService) insertAndUpsertParallel(ctx context.Context, txs []*types.Transaction, buffer indexer.IndexerBufferInterface) (int, int, error) { txParticipants := buffer.GetTransactionsParticipants() ops := buffer.GetOperations() @@ -275,27 +277,29 @@ func (m *ingestService) insertAndUpsertParallel(ctx context.Context, txs []*type }) }) - // 4 upsert goroutines - g.Go(func() error { - return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { - return m.tokenIngestionService.ProcessTrustlineChanges(ctx, tx, buffer.GetTrustlineChanges()) + // 4 upsert goroutines — skipped in backfill mode (balance tables represent current state) + if m.ingestionMode != IngestionModeBackfill { + g.Go(func() error { + return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { + return m.tokenIngestionService.ProcessTrustlineChanges(ctx, tx, buffer.GetTrustlineChanges()) + }) }) - }) - g.Go(func() error { - return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { - return m.tokenIngestionService.ProcessNativeBalanceChanges(ctx, tx, buffer.GetAccountChanges()) + g.Go(func() error { + return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { + return m.tokenIngestionService.ProcessNativeBalanceChanges(ctx, tx, buffer.GetAccountChanges()) + }) }) - }) - g.Go(func() error { - return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { - return m.tokenIngestionService.ProcessSACBalanceChanges(ctx, tx, buffer.GetSACBalanceChanges()) + g.Go(func() error { + return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { + return m.tokenIngestionService.ProcessSACBalanceChanges(ctx, tx, buffer.GetSACBalanceChanges()) + }) }) - }) - g.Go(func() error { - return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { - return m.tokenIngestionService.ProcessContractTokenChanges(ctx, tx, buffer.GetContractChanges()) + g.Go(func() error { + return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { + return m.tokenIngestionService.ProcessContractTokenChanges(ctx, tx, buffer.GetContractChanges()) + }) }) - }) + } if err := g.Wait(); err != nil { return 0, 0, fmt.Errorf("parallel insert/upsert: %w", err) diff --git a/internal/services/ingest_backfill.go b/internal/services/ingest_backfill.go index ad019b9fe..62f8c71de 100644 --- a/internal/services/ingest_backfill.go +++ b/internal/services/ingest_backfill.go @@ -273,10 +273,6 @@ func (m *ingestService) flushBatchBufferWithRetry(ctx context.Context, buffer *i if _, _, err := m.insertAndUpsertParallel(ctx, txs, buffer); err != nil { return fmt.Errorf("inserting processed data into db: %w", err) } - // Unlock channel accounts using all transactions (not filtered) - if err := m.unlockChannelAccounts(ctx, dbTx, txs); err != nil { - return fmt.Errorf("unlocking channel accounts: %w", err) - } // Update cursor atomically with data insertion if requested if updateCursorTo != nil { if err := m.models.IngestStore.UpdateMin(ctx, dbTx, m.oldestLedgerCursorName, *updateCursorTo); err != nil { diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index 900794890..a0fbe31f8 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -1081,19 +1081,6 @@ func Test_ingestService_flushBatchBufferWithRetry(t *testing.T) { mockRPCService := &RPCServiceMock{} mockRPCService.On("NetworkPassphrase").Return(network.TestNetworkPassphrase).Maybe() - mockChAccStore := &store.ChannelAccountStoreMock{} - // Use variadic mock.Anything for any number of tx hashes - mockChAccStore.On("UnassignTxAndUnlockChannelAccounts", mock.Anything, mock.Anything).Return(int64(0), nil).Maybe() - mockChAccStore.On("UnassignTxAndUnlockChannelAccounts", mock.Anything, mock.Anything, mock.Anything).Return(int64(0), nil).Maybe() - mockChAccStore.On("UnassignTxAndUnlockChannelAccounts", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(0), nil).Maybe() - mockChAccStore.On("UnassignTxAndUnlockChannelAccounts", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(0), nil).Maybe() - - mockTokenSvc := NewTokenIngestionServiceMock(t) - mockTokenSvc.On("ProcessTrustlineChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() - mockTokenSvc.On("ProcessNativeBalanceChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() - mockTokenSvc.On("ProcessSACBalanceChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() - mockTokenSvc.On("ProcessContractTokenChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() - svc, err := NewIngestService(IngestServiceConfig{ IngestionMode: IngestionModeBackfill, Models: models, @@ -1102,8 +1089,6 @@ func Test_ingestService_flushBatchBufferWithRetry(t *testing.T) { AppTracker: &apptracker.MockAppTracker{}, RPCService: mockRPCService, LedgerBackend: &LedgerBackendMock{}, - ChannelAccountStore: mockChAccStore, - TokenIngestionService: mockTokenSvc, Metrics: m, GetLedgersLimit: defaultGetLedgersLimit, Network: network.TestNetworkPassphrase, From c13f9c02cfc074f76eef8615f237a2f728e88082 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Wed, 1 Apr 2026 12:05:44 -0400 Subject: [PATCH 5/9] Centralize BatchCopy metrics and cleanup Consolidate metric recording in BatchCopy methods across models by using deferred functions to observe QueryDuration, BatchSize (where applicable) and increment QueriesTotal. Remove duplicated metrics updates at multiple error/success return sites. Minor optimizations and renames: preallocate rows slices for account copy helpers, track rowCount for metrics, and use local Value() results (e.g. addrVal, accountVal, hashVal) with explicit []byte casts. Also tidy up duplicate/commented docstrings in operations/transactions. --- internal/data/native_balances.go | 10 +++--- internal/data/operations.go | 45 +++++++++----------------- internal/data/sac_balances.go | 10 +++--- internal/data/statechanges.go | 22 +++++-------- internal/data/transactions.go | 49 ++++++++++------------------- internal/data/trustline_balances.go | 10 +++--- 6 files changed, 51 insertions(+), 95 deletions(-) diff --git a/internal/data/native_balances.go b/internal/data/native_balances.go index ab56706a2..e1359078b 100644 --- a/internal/data/native_balances.go +++ b/internal/data/native_balances.go @@ -133,6 +133,10 @@ func (m *NativeBalanceModel) BatchCopy(ctx context.Context, dbTx pgx.Tx, balance } start := time.Now() + defer func() { + m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "native_balances").Observe(time.Since(start).Seconds()) + m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "native_balances").Inc() + }() copyCount, err := dbTx.CopyFrom( ctx, @@ -144,20 +148,14 @@ func (m *NativeBalanceModel) BatchCopy(ctx context.Context, dbTx pgx.Tx, balance }), ) if err != nil { - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "native_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "native_balances").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "native_balances", utils.GetDBErrorType(err)).Inc() return fmt.Errorf("bulk inserting native balances via COPY: %w", err) } if int(copyCount) != len(balances) { - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "native_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "native_balances").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "native_balances", "row_count_mismatch").Inc() return fmt.Errorf("expected %d rows copied, got %d", len(balances), copyCount) } - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "native_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "native_balances").Inc() return nil } diff --git a/internal/data/operations.go b/internal/data/operations.go index bf7e1f37f..c08ab08f9 100644 --- a/internal/data/operations.go +++ b/internal/data/operations.go @@ -323,18 +323,11 @@ func (m *OperationModel) BatchGetByStateChangeIDs(ctx context.Context, scToIDs [ } // BatchCopy inserts operations using pgx's binary COPY protocol. -// Uses pgx.Tx for binary format which is faster than lib/pq's text format. // Uses native pgtype types for optimal performance (see https://github.com/jackc/pgx/issues/763). // // IMPORTANT: BatchCopy will FAIL if any duplicate records exist. The PostgreSQL COPY // protocol does not support conflict handling. Callers must ensure no duplicates exist // before calling this method, or handle the unique constraint violation error appropriately. -// BatchCopy inserts operations using pgx's binary COPY protocol. -// Uses native pgtype types for optimal performance. -// -// IMPORTANT: BatchCopy will FAIL if any duplicate records exist. The PostgreSQL COPY -// protocol does not support conflict handling. Callers must ensure no duplicates exist -// before calling this method, or handle the unique constraint violation error appropriately. func (m *OperationModel) BatchCopy( ctx context.Context, pgxTx pgx.Tx, @@ -345,6 +338,11 @@ func (m *OperationModel) BatchCopy( } start := time.Now() + defer func() { + m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations").Observe(time.Since(start).Seconds()) + m.Metrics.BatchSize.WithLabelValues("BatchCopy", "operations").Observe(float64(len(operations))) + m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations").Inc() + }() copyCount, err := pgxTx.CopyFrom( ctx, @@ -364,27 +362,14 @@ func (m *OperationModel) BatchCopy( }), ) if err != nil { - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "operations").Observe(float64(len(operations))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "operations", utils.GetDBErrorType(err)).Inc() return 0, fmt.Errorf("pgx CopyFrom operations: %w", err) } if int(copyCount) != len(operations) { - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "operations").Observe(float64(len(operations))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "operations", "row_count_mismatch").Inc() return 0, fmt.Errorf("expected %d rows copied, got %d", len(operations), copyCount) } - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "operations").Observe(float64(len(operations))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations").Inc() - return len(operations), nil } @@ -400,6 +385,12 @@ func (m *OperationModel) BatchCopyAccounts( } start := time.Now() + var rowCount int + defer func() { + m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations_accounts").Observe(time.Since(start).Seconds()) + m.Metrics.BatchSize.WithLabelValues("BatchCopy", "operations_accounts").Observe(float64(rowCount)) + m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations_accounts").Inc() + }() // Build OpID -> LedgerCreatedAt lookup from operations ledgerCreatedAtByOpID := make(map[int64]time.Time, len(operations)) @@ -407,23 +398,24 @@ func (m *OperationModel) BatchCopyAccounts( ledgerCreatedAtByOpID[op.ID] = op.LedgerCreatedAt } - var oaRows [][]any + oaRows := make([][]any, 0, len(stellarAddressesByOpID)*2) for opID, addresses := range stellarAddressesByOpID { ledgerCreatedAt := ledgerCreatedAtByOpID[opID] ledgerCreatedAtPgtype := pgtype.Timestamptz{Time: ledgerCreatedAt, Valid: true} opIDPgtype := pgtype.Int8{Int64: opID, Valid: true} for _, addr := range addresses.ToSlice() { - addrBytes, addrErr := types.AddressBytea(addr).Value() + addrVal, addrErr := types.AddressBytea(addr).Value() if addrErr != nil { return fmt.Errorf("converting address %s to bytes: %w", addr, addrErr) } oaRows = append(oaRows, []any{ ledgerCreatedAtPgtype, opIDPgtype, - addrBytes, + addrVal.([]byte), }) } } + rowCount = len(oaRows) _, err := pgxTx.CopyFrom( ctx, @@ -432,16 +424,9 @@ func (m *OperationModel) BatchCopyAccounts( pgx.CopyFromRows(oaRows), ) if err != nil { - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations_accounts").Observe(duration) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations_accounts").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "operations_accounts", utils.GetDBErrorType(err)).Inc() return fmt.Errorf("pgx CopyFrom operations_accounts: %w", err) } - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations_accounts").Observe(duration) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations_accounts").Inc() - return nil } diff --git a/internal/data/sac_balances.go b/internal/data/sac_balances.go index 8089511ab..55f5fa015 100644 --- a/internal/data/sac_balances.go +++ b/internal/data/sac_balances.go @@ -153,6 +153,10 @@ func (m *SACBalanceModel) BatchCopy(ctx context.Context, dbTx pgx.Tx, balances [ } start := time.Now() + defer func() { + m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "sac_balances").Observe(time.Since(start).Seconds()) + m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "sac_balances").Inc() + }() copyCount, err := dbTx.CopyFrom( ctx, @@ -178,20 +182,14 @@ func (m *SACBalanceModel) BatchCopy(ctx context.Context, dbTx pgx.Tx, balances [ }), ) if err != nil { - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "sac_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "sac_balances").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "sac_balances", utils.GetDBErrorType(err)).Inc() return fmt.Errorf("batch inserting SAC balances via COPY: %w", err) } if int(copyCount) != len(balances) { - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "sac_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "sac_balances").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "sac_balances", "row_count_mismatch").Inc() return fmt.Errorf("expected %d rows copied, got %d", len(balances), copyCount) } - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "sac_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "sac_balances").Inc() return nil } diff --git a/internal/data/statechanges.go b/internal/data/statechanges.go index a74238949..398d0e087 100644 --- a/internal/data/statechanges.go +++ b/internal/data/statechanges.go @@ -190,6 +190,11 @@ func (m *StateChangeModel) BatchCopy( } start := time.Now() + defer func() { + m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "state_changes").Observe(time.Since(start).Seconds()) + m.Metrics.BatchSize.WithLabelValues("BatchCopy", "state_changes").Observe(float64(len(stateChanges))) + m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "state_changes").Inc() + }() // COPY state_changes using pgx binary format with native pgtype types copyCount, err := pgxTx.CopyFrom( @@ -215,7 +220,7 @@ func (m *StateChangeModel) BatchCopy( } // Convert account_id to BYTEA (required field) - accountBytes, err := sc.AccountID.Value() + accountVal, err := sc.AccountID.Value() if err != nil { return nil, fmt.Errorf("converting account_id: %w", err) } @@ -257,7 +262,7 @@ func (m *StateChangeModel) BatchCopy( pgtypeTextFromReason(sc.StateChangeReason), pgtype.Timestamptz{Time: sc.LedgerCreatedAt, Valid: true}, pgtype.Int4{Int32: int32(sc.LedgerNumber), Valid: true}, - accountBytes, + accountVal.([]byte), pgtype.Int8{Int64: sc.OperationID, Valid: true}, tokenBytes, pgtypeTextFromNullString(sc.Amount), @@ -282,27 +287,14 @@ func (m *StateChangeModel) BatchCopy( }), ) if err != nil { - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "state_changes").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "state_changes").Observe(float64(len(stateChanges))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "state_changes").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "state_changes", utils.GetDBErrorType(err)).Inc() return 0, fmt.Errorf("pgx CopyFrom state_changes: %w", err) } if int(copyCount) != len(stateChanges) { - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "state_changes").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "state_changes").Observe(float64(len(stateChanges))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "state_changes").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "state_changes", "row_count_mismatch").Inc() return 0, fmt.Errorf("expected %d rows copied, got %d", len(stateChanges), copyCount) } - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "state_changes").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "state_changes").Observe(float64(len(stateChanges))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "state_changes").Inc() - return len(stateChanges), nil } diff --git a/internal/data/transactions.go b/internal/data/transactions.go index 71aeab7a0..d9cb7e69d 100644 --- a/internal/data/transactions.go +++ b/internal/data/transactions.go @@ -233,13 +233,6 @@ func (m *TransactionModel) BatchGetByStateChangeIDs(ctx context.Context, scToIDs return transactions, nil } -// BatchCopy inserts transactions using pgx's binary COPY protocol. -// Uses pgx.Tx for binary format which is faster than lib/pq's text format. -// Uses native pgtype types for optimal performance (see https://github.com/jackc/pgx/issues/763). -// -// IMPORTANT: BatchCopy will FAIL if any duplicate records exist. The PostgreSQL COPY -// protocol does not support conflict handling. Callers must ensure no duplicates exist -// before calling this method, or handle the unique constraint violation error appropriately. // BatchCopy inserts transactions using pgx's binary COPY protocol. // Uses native pgtype types for optimal performance (see https://github.com/jackc/pgx/issues/763). // @@ -256,6 +249,11 @@ func (m *TransactionModel) BatchCopy( } start := time.Now() + defer func() { + m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "transactions").Observe(time.Since(start).Seconds()) + m.Metrics.BatchSize.WithLabelValues("BatchCopy", "transactions").Observe(float64(len(txs))) + m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "transactions").Inc() + }() copyCount, err := pgxTx.CopyFrom( ctx, @@ -263,12 +261,12 @@ func (m *TransactionModel) BatchCopy( []string{"hash", "to_id", "fee_charged", "result_code", "ledger_number", "ledger_created_at", "is_fee_bump"}, pgx.CopyFromSlice(len(txs), func(i int) ([]any, error) { tx := txs[i] - hashBytes, err := tx.Hash.Value() + hashVal, err := tx.Hash.Value() if err != nil { return nil, fmt.Errorf("converting hash %s to bytes: %w", tx.Hash, err) } return []any{ - hashBytes, + hashVal.([]byte), pgtype.Int8{Int64: tx.ToID, Valid: true}, pgtype.Int8{Int64: tx.FeeCharged, Valid: true}, pgtype.Text{String: tx.ResultCode, Valid: true}, @@ -279,27 +277,14 @@ func (m *TransactionModel) BatchCopy( }), ) if err != nil { - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "transactions").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "transactions").Observe(float64(len(txs))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "transactions").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "transactions", utils.GetDBErrorType(err)).Inc() return 0, fmt.Errorf("pgx CopyFrom transactions: %w", err) } if int(copyCount) != len(txs) { - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "transactions").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "transactions").Observe(float64(len(txs))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "transactions").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "transactions", "row_count_mismatch").Inc() return 0, fmt.Errorf("expected %d rows copied, got %d", len(txs), copyCount) } - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "transactions").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "transactions").Observe(float64(len(txs))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "transactions").Inc() - return len(txs), nil } @@ -315,6 +300,12 @@ func (m *TransactionModel) BatchCopyAccounts( } start := time.Now() + var rowCount int + defer func() { + m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "transactions_accounts").Observe(time.Since(start).Seconds()) + m.Metrics.BatchSize.WithLabelValues("BatchCopy", "transactions_accounts").Observe(float64(rowCount)) + m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "transactions_accounts").Inc() + }() // Build ToID -> LedgerCreatedAt lookup from transactions ledgerCreatedAtByToID := make(map[int64]time.Time, len(txs)) @@ -322,23 +313,24 @@ func (m *TransactionModel) BatchCopyAccounts( ledgerCreatedAtByToID[tx.ToID] = tx.LedgerCreatedAt } - var taRows [][]any + taRows := make([][]any, 0, len(stellarAddressesByToID)*2) for toID, addresses := range stellarAddressesByToID { ledgerCreatedAt := ledgerCreatedAtByToID[toID] ledgerCreatedAtPgtype := pgtype.Timestamptz{Time: ledgerCreatedAt, Valid: true} toIDPgtype := pgtype.Int8{Int64: toID, Valid: true} for _, addr := range addresses.ToSlice() { - addrBytes, addrErr := types.AddressBytea(addr).Value() + addrVal, addrErr := types.AddressBytea(addr).Value() if addrErr != nil { return fmt.Errorf("converting address %s to bytes: %w", addr, addrErr) } taRows = append(taRows, []any{ ledgerCreatedAtPgtype, toIDPgtype, - addrBytes, + addrVal.([]byte), }) } } + rowCount = len(taRows) _, err := pgxTx.CopyFrom( ctx, @@ -347,16 +339,9 @@ func (m *TransactionModel) BatchCopyAccounts( pgx.CopyFromRows(taRows), ) if err != nil { - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "transactions_accounts").Observe(duration) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "transactions_accounts").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "transactions_accounts", utils.GetDBErrorType(err)).Inc() return fmt.Errorf("pgx CopyFrom transactions_accounts: %w", err) } - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "transactions_accounts").Observe(duration) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "transactions_accounts").Inc() - return nil } diff --git a/internal/data/trustline_balances.go b/internal/data/trustline_balances.go index e205f9291..b958ee8ef 100644 --- a/internal/data/trustline_balances.go +++ b/internal/data/trustline_balances.go @@ -154,6 +154,10 @@ func (m *TrustlineBalanceModel) BatchCopy(ctx context.Context, dbTx pgx.Tx, bala } start := time.Now() + defer func() { + m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "trustline_balances").Observe(time.Since(start).Seconds()) + m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "trustline_balances").Inc() + }() copyCount, err := dbTx.CopyFrom( ctx, @@ -183,20 +187,14 @@ func (m *TrustlineBalanceModel) BatchCopy(ctx context.Context, dbTx pgx.Tx, bala }), ) if err != nil { - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "trustline_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "trustline_balances").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "trustline_balances", utils.GetDBErrorType(err)).Inc() return fmt.Errorf("batch inserting trustline balances via COPY: %w", err) } if int(copyCount) != len(balances) { - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "trustline_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "trustline_balances").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "trustline_balances", "row_count_mismatch").Inc() return fmt.Errorf("expected %d rows copied, got %d", len(balances), copyCount) } - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "trustline_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "trustline_balances").Inc() return nil } From 16ee738498477a6694fbacfdd009cef0b7675f80 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Wed, 1 Apr 2026 12:43:42 -0400 Subject: [PATCH 6/9] Update ingest_test.go --- internal/services/ingest_test.go | 51 ++++++++++---------------------- 1 file changed, 16 insertions(+), 35 deletions(-) diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index a0fbe31f8..81b0b21fd 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -1626,16 +1626,12 @@ func Test_ingestProcessedDataWithRetry(t *testing.T) { mockChAccStore := &store.ChannelAccountStoreMock{} - // Mock AccountTokenService to succeed + // Mock token ingestion methods (called in parallel by insertAndUpsertParallel) mockTokenIngestionService := NewTokenIngestionServiceMock(t) - mockTokenIngestionService.On("ProcessTokenChanges", - mock.Anything, // ctx - mock.Anything, // dbTx - mock.Anything, // trustlineChangesByTrustlineKey - mock.Anything, // contractChanges - mock.Anything, // accountChangesByAccountID - mock.Anything, // sacBalanceChangesByKey - ).Return(nil) + mockTokenIngestionService.On("ProcessTrustlineChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil) + mockTokenIngestionService.On("ProcessNativeBalanceChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil) + mockTokenIngestionService.On("ProcessSACBalanceChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil) + mockTokenIngestionService.On("ProcessContractTokenChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil) svc, err := NewIngestService(IngestServiceConfig{ IngestionMode: IngestionModeLive, @@ -1710,16 +1706,12 @@ func Test_ingestProcessedDataWithRetry(t *testing.T) { mockChAccStore := &store.ChannelAccountStoreMock{} - // Mock AccountTokenService to return error (simulating DB failure) + // Mock token ingestion methods - one fails to simulate DB failure mockTokenIngestionService := NewTokenIngestionServiceMock(t) - mockTokenIngestionService.On("ProcessTokenChanges", - mock.Anything, // ctx - mock.Anything, // dbTx - mock.Anything, // trustlineChangesByTrustlineKey - mock.Anything, // contractChanges - mock.Anything, // accountChangesByAccountID - mock.Anything, // sacBalanceChangesByKey - ).Return(fmt.Errorf("db connection failed")) + mockTokenIngestionService.On("ProcessTrustlineChanges", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("db connection failed")).Maybe() + mockTokenIngestionService.On("ProcessNativeBalanceChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + mockTokenIngestionService.On("ProcessSACBalanceChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + mockTokenIngestionService.On("ProcessContractTokenChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() svc, err := NewIngestService(IngestServiceConfig{ IngestionMode: IngestionModeLive, @@ -1794,24 +1786,13 @@ func Test_ingestProcessedDataWithRetry(t *testing.T) { mockChAccStore := &store.ChannelAccountStoreMock{} - // Mock AccountTokenService to fail once then succeed + // Mock token ingestion methods - trustline fails once then succeeds on retry mockTokenIngestionService := NewTokenIngestionServiceMock(t) - mockTokenIngestionService.On("ProcessTokenChanges", - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - ).Return(fmt.Errorf("transient error")).Once() - mockTokenIngestionService.On("ProcessTokenChanges", - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - ).Return(nil).Once() + mockTokenIngestionService.On("ProcessTrustlineChanges", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("transient error")).Once() + mockTokenIngestionService.On("ProcessTrustlineChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + mockTokenIngestionService.On("ProcessNativeBalanceChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + mockTokenIngestionService.On("ProcessSACBalanceChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + mockTokenIngestionService.On("ProcessContractTokenChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() svc, err := NewIngestService(IngestServiceConfig{ IngestionMode: IngestionModeLive, From 0f80c4cc6de19463e7eebe3e1366c8edbc8514fe Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Wed, 1 Apr 2026 13:06:25 -0400 Subject: [PATCH 7/9] Increase histogram buckets from 12 to 15 Increase the number of buckets used by ExponentialBuckets from 12 to 15 for DB batch size and ingestion participants histograms. Updated internal/metrics/db.go and internal/metrics/ingestion.go to use ExponentialBuckets(1, 2, 15), and adjusted tests in internal/metrics/db_test.go and internal/metrics/ingestion_test.go to expect 15 buckets. This expands the histogram range/resolution for those metrics. --- internal/metrics/db.go | 2 +- internal/metrics/db_test.go | 2 +- internal/metrics/ingestion.go | 2 +- internal/metrics/ingestion_test.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/metrics/db.go b/internal/metrics/db.go index 19f046dba..093e8c2de 100644 --- a/internal/metrics/db.go +++ b/internal/metrics/db.go @@ -49,7 +49,7 @@ func newDBMetrics(reg prometheus.Registerer) *DBMetrics { BatchSize: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: "wallet_db_batch_operation_size", Help: "Size of batch database operations.", - Buckets: prometheus.ExponentialBuckets(1, 2, 12), + Buckets: prometheus.ExponentialBuckets(1, 2, 15), }, []string{"operation", "table"}), } reg.MustRegister( diff --git a/internal/metrics/db_test.go b/internal/metrics/db_test.go index 377085087..03017b7a5 100644 --- a/internal/metrics/db_test.go +++ b/internal/metrics/db_test.go @@ -81,7 +81,7 @@ func TestDBMetrics_BatchSize_Buckets(t *testing.T) { for _, metric := range f.GetMetric() { h := metric.GetHistogram() require.NotNil(t, h) - assert.Len(t, h.GetBucket(), 12) // ExponentialBuckets(1, 2, 12) + assert.Len(t, h.GetBucket(), 15) // ExponentialBuckets(1, 2, 15) } } } diff --git a/internal/metrics/ingestion.go b/internal/metrics/ingestion.go index db22806e7..81ebc1970 100644 --- a/internal/metrics/ingestion.go +++ b/internal/metrics/ingestion.go @@ -83,7 +83,7 @@ func newIngestionMetrics(reg prometheus.Registerer) *IngestionMetrics { ParticipantsCount: prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "wallet_ingestion_participants_per_ledger", Help: "Number of unique participants per ingestion batch.", - Buckets: prometheus.ExponentialBuckets(1, 2, 12), + Buckets: prometheus.ExponentialBuckets(1, 2, 15), }), LagLedgers: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "wallet_ingestion_lag_ledgers", diff --git a/internal/metrics/ingestion_test.go b/internal/metrics/ingestion_test.go index c589d5502..97fa7c60c 100644 --- a/internal/metrics/ingestion_test.go +++ b/internal/metrics/ingestion_test.go @@ -98,7 +98,7 @@ func TestIngestionMetrics_ParticipantsCount_Buckets(t *testing.T) { for _, f := range families { if f.GetName() == "wallet_ingestion_participants_per_ledger" { h := f.GetMetric()[0].GetHistogram() - assert.Len(t, h.GetBucket(), 12) // ExponentialBuckets(1, 2, 12) + assert.Len(t, h.GetBucket(), 15) // ExponentialBuckets(1, 2, 15) } } } From 55507e56d633f0043a39d94edd79cb56e40bb98d Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Wed, 1 Apr 2026 13:26:40 -0400 Subject: [PATCH 8/9] Add finer buckets to ingestion phase histogram Add two additional histogram bucket boundaries (0.075 and 0.15) to wallet_ingestion_phase_duration_seconds to provide finer resolution between 0.05-0.1s and 0.1-0.2s. Update the test expectation to reflect the new total of 13 custom buckets. --- internal/metrics/ingestion.go | 2 +- internal/metrics/ingestion_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/metrics/ingestion.go b/internal/metrics/ingestion.go index 81ebc1970..e117ff3e4 100644 --- a/internal/metrics/ingestion.go +++ b/internal/metrics/ingestion.go @@ -66,7 +66,7 @@ func newIngestionMetrics(reg prometheus.Registerer) *IngestionMetrics { PhaseDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: "wallet_ingestion_phase_duration_seconds", Help: "Duration of each ingestion phase.", - Buckets: []float64{0.01, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30, 60}, + Buckets: []float64{0.01, 0.05, 0.075, 0.1, 0.15, 0.2, 0.5, 1, 2, 5, 10, 30, 60}, }, []string{"phase"}), LedgersProcessed: prometheus.NewCounter(prometheus.CounterOpts{ Name: "wallet_ingestion_ledgers_total", diff --git a/internal/metrics/ingestion_test.go b/internal/metrics/ingestion_test.go index 97fa7c60c..ff47f6b9f 100644 --- a/internal/metrics/ingestion_test.go +++ b/internal/metrics/ingestion_test.go @@ -82,7 +82,7 @@ func TestIngestionMetrics_PhaseDuration_Buckets(t *testing.T) { for _, f := range families { if f.GetName() == "wallet_ingestion_phase_duration_seconds" { h := f.GetMetric()[0].GetHistogram() - assert.Len(t, h.GetBucket(), 11) // 11 custom boundaries + assert.Len(t, h.GetBucket(), 13) // 13 custom boundaries } } } From 340b684ee15a52feb912cf9c4151d1ea80d851cc Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Wed, 1 Apr 2026 14:11:52 -0400 Subject: [PATCH 9/9] Use UNNEST for bulk balance upserts/deletes Replace per-row pgx.Batch queuing with UNNEST-based bulk operations in native_balances, sac_balances, and trustline_balances. Each BatchUpsert now builds column slices and performs a single INSERT ... SELECT FROM UNNEST(... ) ... ON CONFLICT for upserts and a single DELETE using ANY or UNNEST for deletes. Preserves existing metrics/error handling and includes appropriate type casts (UUIDs, int32/int64) and ledger number conversions to reduce DB round-trips and simplify batch logic. --- internal/data/native_balances.go | 80 +++++++++++-------- internal/data/sac_balances.go | 103 ++++++++++++++----------- internal/data/trustline_balances.go | 114 ++++++++++++++++------------ 3 files changed, 171 insertions(+), 126 deletions(-) diff --git a/internal/data/native_balances.go b/internal/data/native_balances.go index e1359078b..cd4f21a16 100644 --- a/internal/data/native_balances.go +++ b/internal/data/native_balances.go @@ -72,53 +72,67 @@ func (m *NativeBalanceModel) GetByAccount(ctx context.Context, accountAddress st return &nb, nil } -// BatchUpsert upserts and deletes native balances in batch. +// BatchUpsert upserts and deletes native balances using UNNEST-based bulk operations. func (m *NativeBalanceModel) BatchUpsert(ctx context.Context, dbTx pgx.Tx, upserts []NativeBalance, deletes []string) error { if len(upserts) == 0 && len(deletes) == 0 { return nil } start := time.Now() - batch := &pgx.Batch{} - - const upsertQuery = ` - INSERT INTO native_balances (account_address, balance, minimum_balance, buying_liabilities, selling_liabilities, last_modified_ledger) - VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (account_address) DO UPDATE SET - balance = EXCLUDED.balance, - minimum_balance = EXCLUDED.minimum_balance, - buying_liabilities = EXCLUDED.buying_liabilities, - selling_liabilities = EXCLUDED.selling_liabilities, - last_modified_ledger = EXCLUDED.last_modified_ledger` - - for _, nb := range upserts { - batch.Queue(upsertQuery, nb.AccountAddress, nb.Balance, nb.MinimumBalance, nb.BuyingLiabilities, nb.SellingLiabilities, nb.LedgerNumber) - } - - const deleteQuery = `DELETE FROM native_balances WHERE account_address = $1` - for _, addr := range deletes { - batch.Queue(deleteQuery, addr) - } - if batch.Len() == 0 { - return nil - } + if len(upserts) > 0 { + accountAddresses := make([]string, len(upserts)) + balances := make([]int64, len(upserts)) + minimumBalances := make([]int64, len(upserts)) + buyingLiabilities := make([]int64, len(upserts)) + sellingLiabilities := make([]int64, len(upserts)) + ledgerNumbers := make([]int64, len(upserts)) + + for i, nb := range upserts { + accountAddresses[i] = nb.AccountAddress + balances[i] = nb.Balance + minimumBalances[i] = nb.MinimumBalance + buyingLiabilities[i] = nb.BuyingLiabilities + sellingLiabilities[i] = nb.SellingLiabilities + ledgerNumbers[i] = int64(nb.LedgerNumber) + } - br := dbTx.SendBatch(ctx, batch) - for i := 0; i < batch.Len(); i++ { - if _, err := br.Exec(); err != nil { - _ = br.Close() //nolint:errcheck // cleanup on error path + const upsertQuery = ` + INSERT INTO native_balances ( + account_address, balance, minimum_balance, + buying_liabilities, selling_liabilities, last_modified_ledger + ) + SELECT * FROM UNNEST( + $1::text[], $2::bigint[], $3::bigint[], + $4::bigint[], $5::bigint[], $6::bigint[] + ) + ON CONFLICT (account_address) DO UPDATE SET + balance = EXCLUDED.balance, + minimum_balance = EXCLUDED.minimum_balance, + buying_liabilities = EXCLUDED.buying_liabilities, + selling_liabilities = EXCLUDED.selling_liabilities, + last_modified_ledger = EXCLUDED.last_modified_ledger` + + if _, err := dbTx.Exec(ctx, upsertQuery, + accountAddresses, balances, minimumBalances, + buyingLiabilities, sellingLiabilities, ledgerNumbers, + ); err != nil { m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "native_balances").Observe(time.Since(start).Seconds()) m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "native_balances").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchUpsert", "native_balances", utils.GetDBErrorType(err)).Inc() return fmt.Errorf("upserting native balances: %w", err) } } - if err := br.Close(); err != nil { - m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "native_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "native_balances").Inc() - m.Metrics.QueryErrors.WithLabelValues("BatchUpsert", "native_balances", utils.GetDBErrorType(err)).Inc() - return fmt.Errorf("closing native balance batch: %w", err) + + if len(deletes) > 0 { + const deleteQuery = `DELETE FROM native_balances WHERE account_address = ANY($1::text[])` + + if _, err := dbTx.Exec(ctx, deleteQuery, deletes); err != nil { + m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "native_balances").Observe(time.Since(start).Seconds()) + m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "native_balances").Inc() + m.Metrics.QueryErrors.WithLabelValues("BatchUpsert", "native_balances", utils.GetDBErrorType(err)).Inc() + return fmt.Errorf("deleting native balances: %w", err) + } } m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "native_balances").Observe(time.Since(start).Seconds()) diff --git a/internal/data/sac_balances.go b/internal/data/sac_balances.go index 55f5fa015..4652cf6af 100644 --- a/internal/data/sac_balances.go +++ b/internal/data/sac_balances.go @@ -80,66 +80,81 @@ func (m *SACBalanceModel) GetByAccount(ctx context.Context, accountAddress strin return balances, nil } -// BatchUpsert performs upserts and deletes for SAC balances. -// For upserts (ADD/UPDATE): inserts or updates balance with authorization flags. -// For deletes (REMOVE): removes the balance row. +// BatchUpsert performs upserts and deletes for SAC balances using UNNEST-based bulk operations. +// For upserts (ADD/UPDATE): bulk inserts or updates balance with authorization flags via single UNNEST query. +// For deletes (REMOVE): bulk removes balance rows via single UNNEST query. func (m *SACBalanceModel) BatchUpsert(ctx context.Context, dbTx pgx.Tx, upserts []SACBalance, deletes []SACBalance) error { if len(upserts) == 0 && len(deletes) == 0 { return nil } start := time.Now() - batch := &pgx.Batch{} - - // Upsert query: insert or update all fields - const upsertQuery = ` - INSERT INTO sac_balances ( - account_address, contract_id, balance, is_authorized, is_clawback_enabled, last_modified_ledger - ) VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (account_address, contract_id) DO UPDATE SET - balance = EXCLUDED.balance, - is_authorized = EXCLUDED.is_authorized, - is_clawback_enabled = EXCLUDED.is_clawback_enabled, - last_modified_ledger = EXCLUDED.last_modified_ledger` - - for _, bal := range upserts { - batch.Queue(upsertQuery, - bal.AccountAddress, - bal.ContractID, - bal.Balance, - bal.IsAuthorized, - bal.IsClawbackEnabled, - bal.LedgerNumber, - ) - } - // Delete query - const deleteQuery = `DELETE FROM sac_balances WHERE account_address = $1 AND contract_id = $2` + if len(upserts) > 0 { + accountAddresses := make([]string, len(upserts)) + contractIDs := make([]uuid.UUID, len(upserts)) + balances := make([]string, len(upserts)) + isAuthorized := make([]bool, len(upserts)) + isClawbackEnabled := make([]bool, len(upserts)) + ledgerNumbers := make([]int32, len(upserts)) + + for i, bal := range upserts { + accountAddresses[i] = bal.AccountAddress + contractIDs[i] = bal.ContractID + balances[i] = bal.Balance + isAuthorized[i] = bal.IsAuthorized + isClawbackEnabled[i] = bal.IsClawbackEnabled + ledgerNumbers[i] = int32(bal.LedgerNumber) + } - for _, bal := range deletes { - batch.Queue(deleteQuery, bal.AccountAddress, bal.ContractID) + const upsertQuery = ` + INSERT INTO sac_balances ( + account_address, contract_id, balance, + is_authorized, is_clawback_enabled, last_modified_ledger + ) + SELECT * FROM UNNEST( + $1::text[], $2::uuid[], $3::text[], + $4::boolean[], $5::boolean[], $6::int4[] + ) + ON CONFLICT (account_address, contract_id) DO UPDATE SET + balance = EXCLUDED.balance, + is_authorized = EXCLUDED.is_authorized, + is_clawback_enabled = EXCLUDED.is_clawback_enabled, + last_modified_ledger = EXCLUDED.last_modified_ledger` + + if _, err := dbTx.Exec(ctx, upsertQuery, + accountAddresses, contractIDs, balances, + isAuthorized, isClawbackEnabled, ledgerNumbers, + ); err != nil { + m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "sac_balances").Observe(time.Since(start).Seconds()) + m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "sac_balances").Inc() + m.Metrics.QueryErrors.WithLabelValues("BatchUpsert", "sac_balances", utils.GetDBErrorType(err)).Inc() + return fmt.Errorf("upserting SAC balances: %w", err) + } } - if batch.Len() == 0 { - return nil - } + if len(deletes) > 0 { + delAccountAddresses := make([]string, len(deletes)) + delContractIDs := make([]uuid.UUID, len(deletes)) + + for i, bal := range deletes { + delAccountAddresses[i] = bal.AccountAddress + delContractIDs[i] = bal.ContractID + } - br := dbTx.SendBatch(ctx, batch) - for i := 0; i < batch.Len(); i++ { - if _, err := br.Exec(); err != nil { - _ = br.Close() //nolint:errcheck // cleanup on error path + const deleteQuery = ` + DELETE FROM sac_balances + WHERE (account_address, contract_id) IN ( + SELECT * FROM UNNEST($1::text[], $2::uuid[]) + )` + + if _, err := dbTx.Exec(ctx, deleteQuery, delAccountAddresses, delContractIDs); err != nil { m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "sac_balances").Observe(time.Since(start).Seconds()) m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "sac_balances").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchUpsert", "sac_balances", utils.GetDBErrorType(err)).Inc() - return fmt.Errorf("upserting SAC balances: %w", err) + return fmt.Errorf("deleting SAC balances: %w", err) } } - if err := br.Close(); err != nil { - m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "sac_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "sac_balances").Inc() - m.Metrics.QueryErrors.WithLabelValues("BatchUpsert", "sac_balances", utils.GetDBErrorType(err)).Inc() - return fmt.Errorf("closing SAC balance batch: %w", err) - } m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "sac_balances").Observe(time.Since(start).Seconds()) m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "sac_balances").Inc() diff --git a/internal/data/trustline_balances.go b/internal/data/trustline_balances.go index b958ee8ef..010d23e34 100644 --- a/internal/data/trustline_balances.go +++ b/internal/data/trustline_balances.go @@ -76,71 +76,87 @@ func (m *TrustlineBalanceModel) GetByAccount(ctx context.Context, accountAddress return balances, nil } -// BatchUpsert performs upserts and deletes with full XDR fields. -// For upserts (ADD/UPDATE): inserts or updates all trustline fields. -// For deletes (REMOVE): removes the trustline row. +// BatchUpsert performs upserts and deletes using UNNEST-based bulk operations. +// For upserts (ADD/UPDATE): bulk inserts or updates all trustline fields via single UNNEST query. +// For deletes (REMOVE): bulk removes trustline rows via single UNNEST query. func (m *TrustlineBalanceModel) BatchUpsert(ctx context.Context, dbTx pgx.Tx, upserts []TrustlineBalance, deletes []TrustlineBalance) error { if len(upserts) == 0 && len(deletes) == 0 { return nil } start := time.Now() - batch := &pgx.Batch{} - - // Upsert query: insert or update all fields - const upsertQuery = ` - INSERT INTO trustline_balances ( - account_address, asset_id, balance, trust_limit, - buying_liabilities, selling_liabilities, flags, last_modified_ledger - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - ON CONFLICT (account_address, asset_id) DO UPDATE SET - balance = EXCLUDED.balance, - trust_limit = EXCLUDED.trust_limit, - buying_liabilities = EXCLUDED.buying_liabilities, - selling_liabilities = EXCLUDED.selling_liabilities, - flags = EXCLUDED.flags, - last_modified_ledger = EXCLUDED.last_modified_ledger` - - for _, tl := range upserts { - batch.Queue(upsertQuery, - tl.AccountAddress, - tl.AssetID, - tl.Balance, - tl.Limit, - tl.BuyingLiabilities, - tl.SellingLiabilities, - tl.Flags, - tl.LedgerNumber, - ) - } - // Delete query - const deleteQuery = `DELETE FROM trustline_balances WHERE account_address = $1 AND asset_id = $2` + if len(upserts) > 0 { + accountAddresses := make([]string, len(upserts)) + assetIDs := make([]uuid.UUID, len(upserts)) + balances := make([]int64, len(upserts)) + trustLimits := make([]int64, len(upserts)) + buyingLiabilities := make([]int64, len(upserts)) + sellingLiabilities := make([]int64, len(upserts)) + flags := make([]int32, len(upserts)) + ledgerNumbers := make([]int64, len(upserts)) + + for i, tl := range upserts { + accountAddresses[i] = tl.AccountAddress + assetIDs[i] = tl.AssetID + balances[i] = tl.Balance + trustLimits[i] = tl.Limit + buyingLiabilities[i] = tl.BuyingLiabilities + sellingLiabilities[i] = tl.SellingLiabilities + flags[i] = int32(tl.Flags) + ledgerNumbers[i] = int64(tl.LedgerNumber) + } - for _, tl := range deletes { - batch.Queue(deleteQuery, tl.AccountAddress, tl.AssetID) + const upsertQuery = ` + INSERT INTO trustline_balances ( + account_address, asset_id, balance, trust_limit, + buying_liabilities, selling_liabilities, flags, last_modified_ledger + ) + SELECT * FROM UNNEST( + $1::text[], $2::uuid[], $3::bigint[], $4::bigint[], + $5::bigint[], $6::bigint[], $7::int4[], $8::bigint[] + ) + ON CONFLICT (account_address, asset_id) DO UPDATE SET + balance = EXCLUDED.balance, + trust_limit = EXCLUDED.trust_limit, + buying_liabilities = EXCLUDED.buying_liabilities, + selling_liabilities = EXCLUDED.selling_liabilities, + flags = EXCLUDED.flags, + last_modified_ledger = EXCLUDED.last_modified_ledger` + + if _, err := dbTx.Exec(ctx, upsertQuery, + accountAddresses, assetIDs, balances, trustLimits, + buyingLiabilities, sellingLiabilities, flags, ledgerNumbers, + ); err != nil { + m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "trustline_balances").Observe(time.Since(start).Seconds()) + m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "trustline_balances").Inc() + m.Metrics.QueryErrors.WithLabelValues("BatchUpsert", "trustline_balances", utils.GetDBErrorType(err)).Inc() + return fmt.Errorf("upserting trustline balances: %w", err) + } } - if batch.Len() == 0 { - return nil - } + if len(deletes) > 0 { + delAccountAddresses := make([]string, len(deletes)) + delAssetIDs := make([]uuid.UUID, len(deletes)) + + for i, tl := range deletes { + delAccountAddresses[i] = tl.AccountAddress + delAssetIDs[i] = tl.AssetID + } - br := dbTx.SendBatch(ctx, batch) - for i := 0; i < batch.Len(); i++ { - if _, err := br.Exec(); err != nil { - _ = br.Close() //nolint:errcheck // cleanup on error path + const deleteQuery = ` + DELETE FROM trustline_balances + WHERE (account_address, asset_id) IN ( + SELECT * FROM UNNEST($1::text[], $2::uuid[]) + )` + + if _, err := dbTx.Exec(ctx, deleteQuery, delAccountAddresses, delAssetIDs); err != nil { m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "trustline_balances").Observe(time.Since(start).Seconds()) m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "trustline_balances").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchUpsert", "trustline_balances", utils.GetDBErrorType(err)).Inc() - return fmt.Errorf("upserting trustline balances: %w", err) + return fmt.Errorf("deleting trustline balances: %w", err) } } - if err := br.Close(); err != nil { - m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "trustline_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "trustline_balances").Inc() - m.Metrics.QueryErrors.WithLabelValues("BatchUpsert", "trustline_balances", utils.GetDBErrorType(err)).Inc() - return fmt.Errorf("closing trustline balance batch: %w", err) - } m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "trustline_balances").Observe(time.Since(start).Seconds()) m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "trustline_balances").Inc()