diff --git a/go.mod b/go.mod index 26caf22c..5550af9b 100644 --- a/go.mod +++ b/go.mod @@ -35,6 +35,7 @@ require ( github.com/tetratelabs/wazero v1.10.1 github.com/vektah/gqlparser/v2 v2.5.30 github.com/vikstrous/dataloadgen v0.0.9 + golang.org/x/sync v0.16.0 golang.org/x/term v0.33.0 golang.org/x/text v0.27.0 ) @@ -182,7 +183,6 @@ require ( golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect golang.org/x/net v0.42.0 // indirect golang.org/x/oauth2 v0.30.0 // indirect - golang.org/x/sync v0.16.0 // indirect golang.org/x/sys v0.35.0 // indirect golang.org/x/time v0.8.0 // indirect google.golang.org/api v0.215.0 // indirect diff --git a/internal/data/native_balances.go b/internal/data/native_balances.go index ab56706a..cd4f21a1 100644 --- a/internal/data/native_balances.go +++ b/internal/data/native_balances.go @@ -72,53 +72,67 @@ func (m *NativeBalanceModel) GetByAccount(ctx context.Context, accountAddress st return &nb, nil } -// BatchUpsert upserts and deletes native balances in batch. +// BatchUpsert upserts and deletes native balances using UNNEST-based bulk operations. func (m *NativeBalanceModel) BatchUpsert(ctx context.Context, dbTx pgx.Tx, upserts []NativeBalance, deletes []string) error { if len(upserts) == 0 && len(deletes) == 0 { return nil } start := time.Now() - batch := &pgx.Batch{} - - const upsertQuery = ` - INSERT INTO native_balances (account_address, balance, minimum_balance, buying_liabilities, selling_liabilities, last_modified_ledger) - VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (account_address) DO UPDATE SET - balance = EXCLUDED.balance, - minimum_balance = EXCLUDED.minimum_balance, - buying_liabilities = EXCLUDED.buying_liabilities, - selling_liabilities = EXCLUDED.selling_liabilities, - last_modified_ledger = EXCLUDED.last_modified_ledger` - - for _, nb := range upserts { - batch.Queue(upsertQuery, nb.AccountAddress, nb.Balance, nb.MinimumBalance, nb.BuyingLiabilities, nb.SellingLiabilities, nb.LedgerNumber) - } - - const deleteQuery = `DELETE FROM native_balances WHERE account_address = $1` - for _, addr := range deletes { - batch.Queue(deleteQuery, addr) - } - if batch.Len() == 0 { - return nil - } + if len(upserts) > 0 { + accountAddresses := make([]string, len(upserts)) + balances := make([]int64, len(upserts)) + minimumBalances := make([]int64, len(upserts)) + buyingLiabilities := make([]int64, len(upserts)) + sellingLiabilities := make([]int64, len(upserts)) + ledgerNumbers := make([]int64, len(upserts)) + + for i, nb := range upserts { + accountAddresses[i] = nb.AccountAddress + balances[i] = nb.Balance + minimumBalances[i] = nb.MinimumBalance + buyingLiabilities[i] = nb.BuyingLiabilities + sellingLiabilities[i] = nb.SellingLiabilities + ledgerNumbers[i] = int64(nb.LedgerNumber) + } - br := dbTx.SendBatch(ctx, batch) - for i := 0; i < batch.Len(); i++ { - if _, err := br.Exec(); err != nil { - _ = br.Close() //nolint:errcheck // cleanup on error path + const upsertQuery = ` + INSERT INTO native_balances ( + account_address, balance, minimum_balance, + buying_liabilities, selling_liabilities, last_modified_ledger + ) + SELECT * FROM UNNEST( + $1::text[], $2::bigint[], $3::bigint[], + $4::bigint[], $5::bigint[], $6::bigint[] + ) + ON CONFLICT (account_address) DO UPDATE SET + balance = EXCLUDED.balance, + minimum_balance = EXCLUDED.minimum_balance, + buying_liabilities = EXCLUDED.buying_liabilities, + selling_liabilities = EXCLUDED.selling_liabilities, + last_modified_ledger = EXCLUDED.last_modified_ledger` + + if _, err := dbTx.Exec(ctx, upsertQuery, + accountAddresses, balances, minimumBalances, + buyingLiabilities, sellingLiabilities, ledgerNumbers, + ); err != nil { m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "native_balances").Observe(time.Since(start).Seconds()) m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "native_balances").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchUpsert", "native_balances", utils.GetDBErrorType(err)).Inc() return fmt.Errorf("upserting native balances: %w", err) } } - if err := br.Close(); err != nil { - m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "native_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "native_balances").Inc() - m.Metrics.QueryErrors.WithLabelValues("BatchUpsert", "native_balances", utils.GetDBErrorType(err)).Inc() - return fmt.Errorf("closing native balance batch: %w", err) + + if len(deletes) > 0 { + const deleteQuery = `DELETE FROM native_balances WHERE account_address = ANY($1::text[])` + + if _, err := dbTx.Exec(ctx, deleteQuery, deletes); err != nil { + m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "native_balances").Observe(time.Since(start).Seconds()) + m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "native_balances").Inc() + m.Metrics.QueryErrors.WithLabelValues("BatchUpsert", "native_balances", utils.GetDBErrorType(err)).Inc() + return fmt.Errorf("deleting native balances: %w", err) + } } m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "native_balances").Observe(time.Since(start).Seconds()) @@ -133,6 +147,10 @@ func (m *NativeBalanceModel) BatchCopy(ctx context.Context, dbTx pgx.Tx, balance } start := time.Now() + defer func() { + m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "native_balances").Observe(time.Since(start).Seconds()) + m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "native_balances").Inc() + }() copyCount, err := dbTx.CopyFrom( ctx, @@ -144,20 +162,14 @@ func (m *NativeBalanceModel) BatchCopy(ctx context.Context, dbTx pgx.Tx, balance }), ) if err != nil { - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "native_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "native_balances").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "native_balances", utils.GetDBErrorType(err)).Inc() return fmt.Errorf("bulk inserting native balances via COPY: %w", err) } if int(copyCount) != len(balances) { - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "native_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "native_balances").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "native_balances", "row_count_mismatch").Inc() return fmt.Errorf("expected %d rows copied, got %d", len(balances), copyCount) } - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "native_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "native_balances").Inc() return nil } diff --git a/internal/data/operations.go b/internal/data/operations.go index 32f1b709..c08ab08f 100644 --- a/internal/data/operations.go +++ b/internal/data/operations.go @@ -323,7 +323,6 @@ func (m *OperationModel) BatchGetByStateChangeIDs(ctx context.Context, scToIDs [ } // BatchCopy inserts operations using pgx's binary COPY protocol. -// Uses pgx.Tx for binary format which is faster than lib/pq's text format. // Uses native pgtype types for optimal performance (see https://github.com/jackc/pgx/issues/763). // // IMPORTANT: BatchCopy will FAIL if any duplicate records exist. The PostgreSQL COPY @@ -333,15 +332,18 @@ func (m *OperationModel) BatchCopy( ctx context.Context, pgxTx pgx.Tx, operations []*types.Operation, - stellarAddressesByOpID map[int64]types.ParticipantSet, ) (int, error) { if len(operations) == 0 { return 0, nil } start := time.Now() + defer func() { + m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations").Observe(time.Since(start).Seconds()) + m.Metrics.BatchSize.WithLabelValues("BatchCopy", "operations").Observe(float64(len(operations))) + m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations").Inc() + }() - // COPY operations using pgx binary format with native pgtype types copyCount, err := pgxTx.CopyFrom( ctx, pgx.Identifier{"operations"}, @@ -360,71 +362,71 @@ func (m *OperationModel) BatchCopy( }), ) if err != nil { - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "operations").Observe(float64(len(operations))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "operations", utils.GetDBErrorType(err)).Inc() return 0, fmt.Errorf("pgx CopyFrom operations: %w", err) } if int(copyCount) != len(operations) { - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "operations").Observe(float64(len(operations))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "operations", "row_count_mismatch").Inc() return 0, fmt.Errorf("expected %d rows copied, got %d", len(operations), copyCount) } - // COPY operations_accounts using pgx binary format with native pgtype types. Upstream participants handling ensures that - // account address is not NULL here. - if len(stellarAddressesByOpID) > 0 { - // Build OpID -> LedgerCreatedAt lookup from operations - ledgerCreatedAtByOpID := make(map[int64]time.Time, len(operations)) - for _, op := range operations { - ledgerCreatedAtByOpID[op.ID] = op.LedgerCreatedAt - } - - var oaRows [][]any - for opID, addresses := range stellarAddressesByOpID { - ledgerCreatedAt := ledgerCreatedAtByOpID[opID] - ledgerCreatedAtPgtype := pgtype.Timestamptz{Time: ledgerCreatedAt, Valid: true} - opIDPgtype := pgtype.Int8{Int64: opID, Valid: true} - for _, addr := range addresses.ToSlice() { - addrBytes, addrErr := types.AddressBytea(addr).Value() - if addrErr != nil { - return 0, fmt.Errorf("converting address %s to bytes: %w", addr, addrErr) - } - oaRows = append(oaRows, []any{ - ledgerCreatedAtPgtype, - opIDPgtype, - addrBytes, - }) - } - } + return len(operations), nil +} - _, err = pgxTx.CopyFrom( - ctx, - pgx.Identifier{"operations_accounts"}, - []string{"ledger_created_at", "operation_id", "account_id"}, - pgx.CopyFromRows(oaRows), - ) - if err != nil { - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "operations").Observe(float64(len(operations))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations").Inc() - m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "operations_accounts", utils.GetDBErrorType(err)).Inc() - return 0, fmt.Errorf("pgx CopyFrom operations_accounts: %w", err) - } +// BatchCopyAccounts inserts operation participants into operations_accounts using pgx's binary COPY protocol. +func (m *OperationModel) BatchCopyAccounts( + ctx context.Context, + pgxTx pgx.Tx, + operations []*types.Operation, + stellarAddressesByOpID map[int64]types.ParticipantSet, +) error { + if len(stellarAddressesByOpID) == 0 { + return nil + } + start := time.Now() + var rowCount int + defer func() { + m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations_accounts").Observe(time.Since(start).Seconds()) + m.Metrics.BatchSize.WithLabelValues("BatchCopy", "operations_accounts").Observe(float64(rowCount)) m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations_accounts").Inc() + }() + + // Build OpID -> LedgerCreatedAt lookup from operations + ledgerCreatedAtByOpID := make(map[int64]time.Time, len(operations)) + for _, op := range operations { + ledgerCreatedAtByOpID[op.ID] = op.LedgerCreatedAt } - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "operations").Observe(float64(len(operations))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations").Inc() + oaRows := make([][]any, 0, len(stellarAddressesByOpID)*2) + for opID, addresses := range stellarAddressesByOpID { + ledgerCreatedAt := ledgerCreatedAtByOpID[opID] + ledgerCreatedAtPgtype := pgtype.Timestamptz{Time: ledgerCreatedAt, Valid: true} + opIDPgtype := pgtype.Int8{Int64: opID, Valid: true} + for _, addr := range addresses.ToSlice() { + addrVal, addrErr := types.AddressBytea(addr).Value() + if addrErr != nil { + return fmt.Errorf("converting address %s to bytes: %w", addr, addrErr) + } + oaRows = append(oaRows, []any{ + ledgerCreatedAtPgtype, + opIDPgtype, + addrVal.([]byte), + }) + } + } + rowCount = len(oaRows) - return len(operations), nil + _, err := pgxTx.CopyFrom( + ctx, + pgx.Identifier{"operations_accounts"}, + []string{"ledger_created_at", "operation_id", "account_id"}, + pgx.CopyFromRows(oaRows), + ) + if err != nil { + m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "operations_accounts", utils.GetDBErrorType(err)).Inc() + return fmt.Errorf("pgx CopyFrom operations_accounts: %w", err) + } + + return nil } diff --git a/internal/data/operations_test.go b/internal/data/operations_test.go index b73a9941..1dc33eae 100644 --- a/internal/data/operations_test.go +++ b/internal/data/operations_test.go @@ -147,7 +147,7 @@ func Test_OperationModel_BatchCopy(t *testing.T) { pgxTx, err := conn.Begin(ctx) require.NoError(t, err) - gotCount, err := m.BatchCopy(ctx, pgxTx, tc.operations, tc.stellarAddressesByOpID) + gotCount, err := m.BatchCopy(ctx, pgxTx, tc.operations) if tc.wantErrContains != "" { require.Error(t, err) @@ -157,6 +157,7 @@ func Test_OperationModel_BatchCopy(t *testing.T) { } require.NoError(t, err) + require.NoError(t, m.BatchCopyAccounts(ctx, pgxTx, tc.operations, tc.stellarAddressesByOpID)) require.NoError(t, pgxTx.Commit(ctx)) assert.Equal(t, tc.wantCount, gotCount) @@ -726,11 +727,15 @@ func BenchmarkOperationModel_BatchCopy(b *testing.B) { } b.StartTimer() - _, err = m.BatchCopy(ctx, pgxTx, ops, addressesByOpID) + _, err = m.BatchCopy(ctx, pgxTx, ops) if err != nil { pgxTx.Rollback(ctx) b.Fatalf("BatchCopy failed: %v", err) } + if err = m.BatchCopyAccounts(ctx, pgxTx, ops, addressesByOpID); err != nil { + pgxTx.Rollback(ctx) + b.Fatalf("BatchCopyAccounts failed: %v", err) + } b.StopTimer() if err := pgxTx.Commit(ctx); err != nil { diff --git a/internal/data/sac_balances.go b/internal/data/sac_balances.go index 8089511a..4652cf6a 100644 --- a/internal/data/sac_balances.go +++ b/internal/data/sac_balances.go @@ -80,66 +80,81 @@ func (m *SACBalanceModel) GetByAccount(ctx context.Context, accountAddress strin return balances, nil } -// BatchUpsert performs upserts and deletes for SAC balances. -// For upserts (ADD/UPDATE): inserts or updates balance with authorization flags. -// For deletes (REMOVE): removes the balance row. +// BatchUpsert performs upserts and deletes for SAC balances using UNNEST-based bulk operations. +// For upserts (ADD/UPDATE): bulk inserts or updates balance with authorization flags via single UNNEST query. +// For deletes (REMOVE): bulk removes balance rows via single UNNEST query. func (m *SACBalanceModel) BatchUpsert(ctx context.Context, dbTx pgx.Tx, upserts []SACBalance, deletes []SACBalance) error { if len(upserts) == 0 && len(deletes) == 0 { return nil } start := time.Now() - batch := &pgx.Batch{} - - // Upsert query: insert or update all fields - const upsertQuery = ` - INSERT INTO sac_balances ( - account_address, contract_id, balance, is_authorized, is_clawback_enabled, last_modified_ledger - ) VALUES ($1, $2, $3, $4, $5, $6) - ON CONFLICT (account_address, contract_id) DO UPDATE SET - balance = EXCLUDED.balance, - is_authorized = EXCLUDED.is_authorized, - is_clawback_enabled = EXCLUDED.is_clawback_enabled, - last_modified_ledger = EXCLUDED.last_modified_ledger` - - for _, bal := range upserts { - batch.Queue(upsertQuery, - bal.AccountAddress, - bal.ContractID, - bal.Balance, - bal.IsAuthorized, - bal.IsClawbackEnabled, - bal.LedgerNumber, - ) - } - // Delete query - const deleteQuery = `DELETE FROM sac_balances WHERE account_address = $1 AND contract_id = $2` + if len(upserts) > 0 { + accountAddresses := make([]string, len(upserts)) + contractIDs := make([]uuid.UUID, len(upserts)) + balances := make([]string, len(upserts)) + isAuthorized := make([]bool, len(upserts)) + isClawbackEnabled := make([]bool, len(upserts)) + ledgerNumbers := make([]int32, len(upserts)) + + for i, bal := range upserts { + accountAddresses[i] = bal.AccountAddress + contractIDs[i] = bal.ContractID + balances[i] = bal.Balance + isAuthorized[i] = bal.IsAuthorized + isClawbackEnabled[i] = bal.IsClawbackEnabled + ledgerNumbers[i] = int32(bal.LedgerNumber) + } - for _, bal := range deletes { - batch.Queue(deleteQuery, bal.AccountAddress, bal.ContractID) + const upsertQuery = ` + INSERT INTO sac_balances ( + account_address, contract_id, balance, + is_authorized, is_clawback_enabled, last_modified_ledger + ) + SELECT * FROM UNNEST( + $1::text[], $2::uuid[], $3::text[], + $4::boolean[], $5::boolean[], $6::int4[] + ) + ON CONFLICT (account_address, contract_id) DO UPDATE SET + balance = EXCLUDED.balance, + is_authorized = EXCLUDED.is_authorized, + is_clawback_enabled = EXCLUDED.is_clawback_enabled, + last_modified_ledger = EXCLUDED.last_modified_ledger` + + if _, err := dbTx.Exec(ctx, upsertQuery, + accountAddresses, contractIDs, balances, + isAuthorized, isClawbackEnabled, ledgerNumbers, + ); err != nil { + m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "sac_balances").Observe(time.Since(start).Seconds()) + m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "sac_balances").Inc() + m.Metrics.QueryErrors.WithLabelValues("BatchUpsert", "sac_balances", utils.GetDBErrorType(err)).Inc() + return fmt.Errorf("upserting SAC balances: %w", err) + } } - if batch.Len() == 0 { - return nil - } + if len(deletes) > 0 { + delAccountAddresses := make([]string, len(deletes)) + delContractIDs := make([]uuid.UUID, len(deletes)) + + for i, bal := range deletes { + delAccountAddresses[i] = bal.AccountAddress + delContractIDs[i] = bal.ContractID + } + + const deleteQuery = ` + DELETE FROM sac_balances + WHERE (account_address, contract_id) IN ( + SELECT * FROM UNNEST($1::text[], $2::uuid[]) + )` - br := dbTx.SendBatch(ctx, batch) - for i := 0; i < batch.Len(); i++ { - if _, err := br.Exec(); err != nil { - _ = br.Close() //nolint:errcheck // cleanup on error path + if _, err := dbTx.Exec(ctx, deleteQuery, delAccountAddresses, delContractIDs); err != nil { m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "sac_balances").Observe(time.Since(start).Seconds()) m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "sac_balances").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchUpsert", "sac_balances", utils.GetDBErrorType(err)).Inc() - return fmt.Errorf("upserting SAC balances: %w", err) + return fmt.Errorf("deleting SAC balances: %w", err) } } - if err := br.Close(); err != nil { - m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "sac_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "sac_balances").Inc() - m.Metrics.QueryErrors.WithLabelValues("BatchUpsert", "sac_balances", utils.GetDBErrorType(err)).Inc() - return fmt.Errorf("closing SAC balance batch: %w", err) - } m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "sac_balances").Observe(time.Since(start).Seconds()) m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "sac_balances").Inc() @@ -153,6 +168,10 @@ func (m *SACBalanceModel) BatchCopy(ctx context.Context, dbTx pgx.Tx, balances [ } start := time.Now() + defer func() { + m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "sac_balances").Observe(time.Since(start).Seconds()) + m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "sac_balances").Inc() + }() copyCount, err := dbTx.CopyFrom( ctx, @@ -178,20 +197,14 @@ func (m *SACBalanceModel) BatchCopy(ctx context.Context, dbTx pgx.Tx, balances [ }), ) if err != nil { - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "sac_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "sac_balances").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "sac_balances", utils.GetDBErrorType(err)).Inc() return fmt.Errorf("batch inserting SAC balances via COPY: %w", err) } if int(copyCount) != len(balances) { - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "sac_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "sac_balances").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "sac_balances", "row_count_mismatch").Inc() return fmt.Errorf("expected %d rows copied, got %d", len(balances), copyCount) } - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "sac_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "sac_balances").Inc() return nil } diff --git a/internal/data/statechanges.go b/internal/data/statechanges.go index a7423894..398d0e08 100644 --- a/internal/data/statechanges.go +++ b/internal/data/statechanges.go @@ -190,6 +190,11 @@ func (m *StateChangeModel) BatchCopy( } start := time.Now() + defer func() { + m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "state_changes").Observe(time.Since(start).Seconds()) + m.Metrics.BatchSize.WithLabelValues("BatchCopy", "state_changes").Observe(float64(len(stateChanges))) + m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "state_changes").Inc() + }() // COPY state_changes using pgx binary format with native pgtype types copyCount, err := pgxTx.CopyFrom( @@ -215,7 +220,7 @@ func (m *StateChangeModel) BatchCopy( } // Convert account_id to BYTEA (required field) - accountBytes, err := sc.AccountID.Value() + accountVal, err := sc.AccountID.Value() if err != nil { return nil, fmt.Errorf("converting account_id: %w", err) } @@ -257,7 +262,7 @@ func (m *StateChangeModel) BatchCopy( pgtypeTextFromReason(sc.StateChangeReason), pgtype.Timestamptz{Time: sc.LedgerCreatedAt, Valid: true}, pgtype.Int4{Int32: int32(sc.LedgerNumber), Valid: true}, - accountBytes, + accountVal.([]byte), pgtype.Int8{Int64: sc.OperationID, Valid: true}, tokenBytes, pgtypeTextFromNullString(sc.Amount), @@ -282,27 +287,14 @@ func (m *StateChangeModel) BatchCopy( }), ) if err != nil { - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "state_changes").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "state_changes").Observe(float64(len(stateChanges))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "state_changes").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "state_changes", utils.GetDBErrorType(err)).Inc() return 0, fmt.Errorf("pgx CopyFrom state_changes: %w", err) } if int(copyCount) != len(stateChanges) { - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "state_changes").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "state_changes").Observe(float64(len(stateChanges))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "state_changes").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "state_changes", "row_count_mismatch").Inc() return 0, fmt.Errorf("expected %d rows copied, got %d", len(stateChanges), copyCount) } - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "state_changes").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "state_changes").Observe(float64(len(stateChanges))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "state_changes").Inc() - return len(stateChanges), nil } diff --git a/internal/data/transactions.go b/internal/data/transactions.go index 2618019e..d9cb7e69 100644 --- a/internal/data/transactions.go +++ b/internal/data/transactions.go @@ -234,7 +234,6 @@ func (m *TransactionModel) BatchGetByStateChangeIDs(ctx context.Context, scToIDs } // BatchCopy inserts transactions using pgx's binary COPY protocol. -// Uses pgx.Tx for binary format which is faster than lib/pq's text format. // Uses native pgtype types for optimal performance (see https://github.com/jackc/pgx/issues/763). // // IMPORTANT: BatchCopy will FAIL if any duplicate records exist. The PostgreSQL COPY @@ -244,28 +243,30 @@ func (m *TransactionModel) BatchCopy( ctx context.Context, pgxTx pgx.Tx, txs []*types.Transaction, - stellarAddressesByToID map[int64]types.ParticipantSet, ) (int, error) { if len(txs) == 0 { return 0, nil } start := time.Now() + defer func() { + m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "transactions").Observe(time.Since(start).Seconds()) + m.Metrics.BatchSize.WithLabelValues("BatchCopy", "transactions").Observe(float64(len(txs))) + m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "transactions").Inc() + }() - // COPY transactions using pgx binary format with native pgtype types. Upstream participants handling ensures that - // account address is not NULL here. copyCount, err := pgxTx.CopyFrom( ctx, pgx.Identifier{"transactions"}, []string{"hash", "to_id", "fee_charged", "result_code", "ledger_number", "ledger_created_at", "is_fee_bump"}, pgx.CopyFromSlice(len(txs), func(i int) ([]any, error) { tx := txs[i] - hashBytes, err := tx.Hash.Value() + hashVal, err := tx.Hash.Value() if err != nil { return nil, fmt.Errorf("converting hash %s to bytes: %w", tx.Hash, err) } return []any{ - hashBytes, + hashVal.([]byte), pgtype.Int8{Int64: tx.ToID, Valid: true}, pgtype.Int8{Int64: tx.FeeCharged, Valid: true}, pgtype.Text{String: tx.ResultCode, Valid: true}, @@ -276,70 +277,71 @@ func (m *TransactionModel) BatchCopy( }), ) if err != nil { - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "transactions").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "transactions").Observe(float64(len(txs))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "transactions").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "transactions", utils.GetDBErrorType(err)).Inc() return 0, fmt.Errorf("pgx CopyFrom transactions: %w", err) } if int(copyCount) != len(txs) { - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "transactions").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "transactions").Observe(float64(len(txs))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "transactions").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "transactions", "row_count_mismatch").Inc() return 0, fmt.Errorf("expected %d rows copied, got %d", len(txs), copyCount) } - // COPY transactions_accounts using pgx binary format with native pgtype types - if len(stellarAddressesByToID) > 0 { - // Build ToID -> LedgerCreatedAt lookup from transactions - ledgerCreatedAtByToID := make(map[int64]time.Time, len(txs)) - for _, tx := range txs { - ledgerCreatedAtByToID[tx.ToID] = tx.LedgerCreatedAt - } - - var taRows [][]any - for toID, addresses := range stellarAddressesByToID { - ledgerCreatedAt := ledgerCreatedAtByToID[toID] - ledgerCreatedAtPgtype := pgtype.Timestamptz{Time: ledgerCreatedAt, Valid: true} - toIDPgtype := pgtype.Int8{Int64: toID, Valid: true} - for _, addr := range addresses.ToSlice() { - addrBytes, addrErr := types.AddressBytea(addr).Value() - if addrErr != nil { - return 0, fmt.Errorf("converting address %s to bytes: %w", addr, addrErr) - } - taRows = append(taRows, []any{ - ledgerCreatedAtPgtype, - toIDPgtype, - addrBytes, - }) - } - } + return len(txs), nil +} - _, err = pgxTx.CopyFrom( - ctx, - pgx.Identifier{"transactions_accounts"}, - []string{"ledger_created_at", "tx_to_id", "account_id"}, - pgx.CopyFromRows(taRows), - ) - if err != nil { - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "transactions").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "transactions").Observe(float64(len(txs))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "transactions").Inc() - m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "transactions_accounts", utils.GetDBErrorType(err)).Inc() - return 0, fmt.Errorf("pgx CopyFrom transactions_accounts: %w", err) - } +// BatchCopyAccounts inserts transaction participants into transactions_accounts using pgx's binary COPY protocol. +func (m *TransactionModel) BatchCopyAccounts( + ctx context.Context, + pgxTx pgx.Tx, + txs []*types.Transaction, + stellarAddressesByToID map[int64]types.ParticipantSet, +) error { + if len(stellarAddressesByToID) == 0 { + return nil + } + start := time.Now() + var rowCount int + defer func() { + m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "transactions_accounts").Observe(time.Since(start).Seconds()) + m.Metrics.BatchSize.WithLabelValues("BatchCopy", "transactions_accounts").Observe(float64(rowCount)) m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "transactions_accounts").Inc() + }() + + // Build ToID -> LedgerCreatedAt lookup from transactions + ledgerCreatedAtByToID := make(map[int64]time.Time, len(txs)) + for _, tx := range txs { + ledgerCreatedAtByToID[tx.ToID] = tx.LedgerCreatedAt } - duration := time.Since(start).Seconds() - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "transactions").Observe(duration) - m.Metrics.BatchSize.WithLabelValues("BatchCopy", "transactions").Observe(float64(len(txs))) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "transactions").Inc() + taRows := make([][]any, 0, len(stellarAddressesByToID)*2) + for toID, addresses := range stellarAddressesByToID { + ledgerCreatedAt := ledgerCreatedAtByToID[toID] + ledgerCreatedAtPgtype := pgtype.Timestamptz{Time: ledgerCreatedAt, Valid: true} + toIDPgtype := pgtype.Int8{Int64: toID, Valid: true} + for _, addr := range addresses.ToSlice() { + addrVal, addrErr := types.AddressBytea(addr).Value() + if addrErr != nil { + return fmt.Errorf("converting address %s to bytes: %w", addr, addrErr) + } + taRows = append(taRows, []any{ + ledgerCreatedAtPgtype, + toIDPgtype, + addrVal.([]byte), + }) + } + } + rowCount = len(taRows) - return len(txs), nil + _, err := pgxTx.CopyFrom( + ctx, + pgx.Identifier{"transactions_accounts"}, + []string{"ledger_created_at", "tx_to_id", "account_id"}, + pgx.CopyFromRows(taRows), + ) + if err != nil { + m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "transactions_accounts", utils.GetDBErrorType(err)).Inc() + return fmt.Errorf("pgx CopyFrom transactions_accounts: %w", err) + } + + return nil } diff --git a/internal/data/transactions_test.go b/internal/data/transactions_test.go index dc0f05a0..a0816dc2 100644 --- a/internal/data/transactions_test.go +++ b/internal/data/transactions_test.go @@ -145,7 +145,7 @@ func Test_TransactionModel_BatchCopy(t *testing.T) { pgxTx, err := conn.Begin(ctx) require.NoError(t, err) - gotCount, err := m.BatchCopy(ctx, pgxTx, tc.txs, tc.stellarAddressesByToID) + gotCount, err := m.BatchCopy(ctx, pgxTx, tc.txs) if tc.wantErrContains != "" { require.Error(t, err) @@ -155,6 +155,7 @@ func Test_TransactionModel_BatchCopy(t *testing.T) { } require.NoError(t, err) + require.NoError(t, m.BatchCopyAccounts(ctx, pgxTx, tc.txs, tc.stellarAddressesByToID)) require.NoError(t, pgxTx.Commit(ctx)) assert.Equal(t, tc.wantCount, gotCount) @@ -493,11 +494,15 @@ func BenchmarkTransactionModel_BatchCopy(b *testing.B) { } b.StartTimer() - _, err = m.BatchCopy(ctx, pgxTx, txs, addressesByToID) + _, err = m.BatchCopy(ctx, pgxTx, txs) if err != nil { pgxTx.Rollback(ctx) b.Fatalf("BatchCopy failed: %v", err) } + if err = m.BatchCopyAccounts(ctx, pgxTx, txs, addressesByToID); err != nil { + pgxTx.Rollback(ctx) + b.Fatalf("BatchCopyAccounts failed: %v", err) + } b.StopTimer() if err := pgxTx.Commit(ctx); err != nil { diff --git a/internal/data/trustline_balances.go b/internal/data/trustline_balances.go index e205f929..010d23e3 100644 --- a/internal/data/trustline_balances.go +++ b/internal/data/trustline_balances.go @@ -76,71 +76,87 @@ func (m *TrustlineBalanceModel) GetByAccount(ctx context.Context, accountAddress return balances, nil } -// BatchUpsert performs upserts and deletes with full XDR fields. -// For upserts (ADD/UPDATE): inserts or updates all trustline fields. -// For deletes (REMOVE): removes the trustline row. +// BatchUpsert performs upserts and deletes using UNNEST-based bulk operations. +// For upserts (ADD/UPDATE): bulk inserts or updates all trustline fields via single UNNEST query. +// For deletes (REMOVE): bulk removes trustline rows via single UNNEST query. func (m *TrustlineBalanceModel) BatchUpsert(ctx context.Context, dbTx pgx.Tx, upserts []TrustlineBalance, deletes []TrustlineBalance) error { if len(upserts) == 0 && len(deletes) == 0 { return nil } start := time.Now() - batch := &pgx.Batch{} - - // Upsert query: insert or update all fields - const upsertQuery = ` - INSERT INTO trustline_balances ( - account_address, asset_id, balance, trust_limit, - buying_liabilities, selling_liabilities, flags, last_modified_ledger - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - ON CONFLICT (account_address, asset_id) DO UPDATE SET - balance = EXCLUDED.balance, - trust_limit = EXCLUDED.trust_limit, - buying_liabilities = EXCLUDED.buying_liabilities, - selling_liabilities = EXCLUDED.selling_liabilities, - flags = EXCLUDED.flags, - last_modified_ledger = EXCLUDED.last_modified_ledger` - - for _, tl := range upserts { - batch.Queue(upsertQuery, - tl.AccountAddress, - tl.AssetID, - tl.Balance, - tl.Limit, - tl.BuyingLiabilities, - tl.SellingLiabilities, - tl.Flags, - tl.LedgerNumber, - ) - } - // Delete query - const deleteQuery = `DELETE FROM trustline_balances WHERE account_address = $1 AND asset_id = $2` + if len(upserts) > 0 { + accountAddresses := make([]string, len(upserts)) + assetIDs := make([]uuid.UUID, len(upserts)) + balances := make([]int64, len(upserts)) + trustLimits := make([]int64, len(upserts)) + buyingLiabilities := make([]int64, len(upserts)) + sellingLiabilities := make([]int64, len(upserts)) + flags := make([]int32, len(upserts)) + ledgerNumbers := make([]int64, len(upserts)) + + for i, tl := range upserts { + accountAddresses[i] = tl.AccountAddress + assetIDs[i] = tl.AssetID + balances[i] = tl.Balance + trustLimits[i] = tl.Limit + buyingLiabilities[i] = tl.BuyingLiabilities + sellingLiabilities[i] = tl.SellingLiabilities + flags[i] = int32(tl.Flags) + ledgerNumbers[i] = int64(tl.LedgerNumber) + } - for _, tl := range deletes { - batch.Queue(deleteQuery, tl.AccountAddress, tl.AssetID) + const upsertQuery = ` + INSERT INTO trustline_balances ( + account_address, asset_id, balance, trust_limit, + buying_liabilities, selling_liabilities, flags, last_modified_ledger + ) + SELECT * FROM UNNEST( + $1::text[], $2::uuid[], $3::bigint[], $4::bigint[], + $5::bigint[], $6::bigint[], $7::int4[], $8::bigint[] + ) + ON CONFLICT (account_address, asset_id) DO UPDATE SET + balance = EXCLUDED.balance, + trust_limit = EXCLUDED.trust_limit, + buying_liabilities = EXCLUDED.buying_liabilities, + selling_liabilities = EXCLUDED.selling_liabilities, + flags = EXCLUDED.flags, + last_modified_ledger = EXCLUDED.last_modified_ledger` + + if _, err := dbTx.Exec(ctx, upsertQuery, + accountAddresses, assetIDs, balances, trustLimits, + buyingLiabilities, sellingLiabilities, flags, ledgerNumbers, + ); err != nil { + m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "trustline_balances").Observe(time.Since(start).Seconds()) + m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "trustline_balances").Inc() + m.Metrics.QueryErrors.WithLabelValues("BatchUpsert", "trustline_balances", utils.GetDBErrorType(err)).Inc() + return fmt.Errorf("upserting trustline balances: %w", err) + } } - if batch.Len() == 0 { - return nil - } + if len(deletes) > 0 { + delAccountAddresses := make([]string, len(deletes)) + delAssetIDs := make([]uuid.UUID, len(deletes)) + + for i, tl := range deletes { + delAccountAddresses[i] = tl.AccountAddress + delAssetIDs[i] = tl.AssetID + } + + const deleteQuery = ` + DELETE FROM trustline_balances + WHERE (account_address, asset_id) IN ( + SELECT * FROM UNNEST($1::text[], $2::uuid[]) + )` - br := dbTx.SendBatch(ctx, batch) - for i := 0; i < batch.Len(); i++ { - if _, err := br.Exec(); err != nil { - _ = br.Close() //nolint:errcheck // cleanup on error path + if _, err := dbTx.Exec(ctx, deleteQuery, delAccountAddresses, delAssetIDs); err != nil { m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "trustline_balances").Observe(time.Since(start).Seconds()) m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "trustline_balances").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchUpsert", "trustline_balances", utils.GetDBErrorType(err)).Inc() - return fmt.Errorf("upserting trustline balances: %w", err) + return fmt.Errorf("deleting trustline balances: %w", err) } } - if err := br.Close(); err != nil { - m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "trustline_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "trustline_balances").Inc() - m.Metrics.QueryErrors.WithLabelValues("BatchUpsert", "trustline_balances", utils.GetDBErrorType(err)).Inc() - return fmt.Errorf("closing trustline balance batch: %w", err) - } m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "trustline_balances").Observe(time.Since(start).Seconds()) m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "trustline_balances").Inc() @@ -154,6 +170,10 @@ func (m *TrustlineBalanceModel) BatchCopy(ctx context.Context, dbTx pgx.Tx, bala } start := time.Now() + defer func() { + m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "trustline_balances").Observe(time.Since(start).Seconds()) + m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "trustline_balances").Inc() + }() copyCount, err := dbTx.CopyFrom( ctx, @@ -183,20 +203,14 @@ func (m *TrustlineBalanceModel) BatchCopy(ctx context.Context, dbTx pgx.Tx, bala }), ) if err != nil { - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "trustline_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "trustline_balances").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "trustline_balances", utils.GetDBErrorType(err)).Inc() return fmt.Errorf("batch inserting trustline balances via COPY: %w", err) } if int(copyCount) != len(balances) { - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "trustline_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "trustline_balances").Inc() m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "trustline_balances", "row_count_mismatch").Inc() return fmt.Errorf("expected %d rows copied, got %d", len(balances), copyCount) } - m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "trustline_balances").Observe(time.Since(start).Seconds()) - m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "trustline_balances").Inc() return nil } diff --git a/internal/metrics/db.go b/internal/metrics/db.go index 19f046db..093e8c2d 100644 --- a/internal/metrics/db.go +++ b/internal/metrics/db.go @@ -49,7 +49,7 @@ func newDBMetrics(reg prometheus.Registerer) *DBMetrics { BatchSize: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: "wallet_db_batch_operation_size", Help: "Size of batch database operations.", - Buckets: prometheus.ExponentialBuckets(1, 2, 12), + Buckets: prometheus.ExponentialBuckets(1, 2, 15), }, []string{"operation", "table"}), } reg.MustRegister( diff --git a/internal/metrics/db_test.go b/internal/metrics/db_test.go index 37708508..03017b7a 100644 --- a/internal/metrics/db_test.go +++ b/internal/metrics/db_test.go @@ -81,7 +81,7 @@ func TestDBMetrics_BatchSize_Buckets(t *testing.T) { for _, metric := range f.GetMetric() { h := metric.GetHistogram() require.NotNil(t, h) - assert.Len(t, h.GetBucket(), 12) // ExponentialBuckets(1, 2, 12) + assert.Len(t, h.GetBucket(), 15) // ExponentialBuckets(1, 2, 15) } } } diff --git a/internal/metrics/ingestion.go b/internal/metrics/ingestion.go index db22806e..e117ff3e 100644 --- a/internal/metrics/ingestion.go +++ b/internal/metrics/ingestion.go @@ -66,7 +66,7 @@ func newIngestionMetrics(reg prometheus.Registerer) *IngestionMetrics { PhaseDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: "wallet_ingestion_phase_duration_seconds", Help: "Duration of each ingestion phase.", - Buckets: []float64{0.01, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30, 60}, + Buckets: []float64{0.01, 0.05, 0.075, 0.1, 0.15, 0.2, 0.5, 1, 2, 5, 10, 30, 60}, }, []string{"phase"}), LedgersProcessed: prometheus.NewCounter(prometheus.CounterOpts{ Name: "wallet_ingestion_ledgers_total", @@ -83,7 +83,7 @@ func newIngestionMetrics(reg prometheus.Registerer) *IngestionMetrics { ParticipantsCount: prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "wallet_ingestion_participants_per_ledger", Help: "Number of unique participants per ingestion batch.", - Buckets: prometheus.ExponentialBuckets(1, 2, 12), + Buckets: prometheus.ExponentialBuckets(1, 2, 15), }), LagLedgers: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "wallet_ingestion_lag_ledgers", diff --git a/internal/metrics/ingestion_test.go b/internal/metrics/ingestion_test.go index c589d550..ff47f6b9 100644 --- a/internal/metrics/ingestion_test.go +++ b/internal/metrics/ingestion_test.go @@ -82,7 +82,7 @@ func TestIngestionMetrics_PhaseDuration_Buckets(t *testing.T) { for _, f := range families { if f.GetName() == "wallet_ingestion_phase_duration_seconds" { h := f.GetMetric()[0].GetHistogram() - assert.Len(t, h.GetBucket(), 11) // 11 custom boundaries + assert.Len(t, h.GetBucket(), 13) // 13 custom boundaries } } } @@ -98,7 +98,7 @@ func TestIngestionMetrics_ParticipantsCount_Buckets(t *testing.T) { for _, f := range families { if f.GetName() == "wallet_ingestion_participants_per_ledger" { h := f.GetMetric()[0].GetHistogram() - assert.Len(t, h.GetBucket(), 12) // ExponentialBuckets(1, 2, 12) + assert.Len(t, h.GetBucket(), 15) // ExponentialBuckets(1, 2, 15) } } } diff --git a/internal/services/ingest.go b/internal/services/ingest.go index 039c18f2..5a81ebe3 100644 --- a/internal/services/ingest.go +++ b/internal/services/ingest.go @@ -2,6 +2,7 @@ package services import ( "context" + "errors" "fmt" "hash/fnv" "runtime" @@ -11,10 +12,12 @@ import ( "github.com/alitto/pond/v2" set "github.com/deckarep/golang-set/v2" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" "github.com/stellar/go-stellar-sdk/historyarchive" "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" "github.com/stellar/go-stellar-sdk/support/log" "github.com/stellar/go-stellar-sdk/xdr" + "golang.org/x/sync/errgroup" "github.com/stellar/wallet-backend/internal/apptracker" "github.com/stellar/wallet-backend/internal/data" @@ -224,65 +227,130 @@ func (m *ingestService) processLedger(ctx context.Context, ledgerMeta xdr.Ledger return nil } -// insertIntoDB persists the processed data from the buffer to the database. -// txs is passed in to avoid a redundant GetTransactions() allocation — the caller -// already has the slice for unlockChannelAccounts. -func (m *ingestService) insertIntoDB(ctx context.Context, dbTx pgx.Tx, txs []*types.Transaction, buffer indexer.IndexerBufferInterface) (int, int, error) { +// insertAndUpsertParallel runs parallel goroutines via errgroup: 5 COPY operations (transactions, +// transactions_accounts, operations, operations_accounts, state_changes) plus, in live mode only, +// 4 balance upserts (trustline, native, SAC, account-contract tokens). Balance upserts are +// skipped in backfill mode since they represent current state, not historical data. +// Each goroutine acquires its own pool connection. UniqueViolation errors are treated as +// success for idempotent retry. +func (m *ingestService) insertAndUpsertParallel(ctx context.Context, txs []*types.Transaction, buffer indexer.IndexerBufferInterface) (int, int, error) { txParticipants := buffer.GetTransactionsParticipants() ops := buffer.GetOperations() opParticipants := buffer.GetOperationsParticipants() stateChanges := buffer.GetStateChanges() - if err := m.insertTransactions(ctx, dbTx, txs, txParticipants); err != nil { - return 0, 0, err + g, gCtx := errgroup.WithContext(ctx) + + // 5 COPY goroutines + g.Go(func() error { + return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { + if _, err := m.models.Transactions.BatchCopy(ctx, tx, txs); err != nil { + return fmt.Errorf("copying transactions: %w", err) + } + return nil + }) + }) + g.Go(func() error { + return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { + return m.models.Transactions.BatchCopyAccounts(ctx, tx, txs, txParticipants) + }) + }) + g.Go(func() error { + return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { + if _, err := m.models.Operations.BatchCopy(ctx, tx, ops); err != nil { + return fmt.Errorf("copying operations: %w", err) + } + return nil + }) + }) + g.Go(func() error { + return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { + return m.models.Operations.BatchCopyAccounts(ctx, tx, ops, opParticipants) + }) + }) + g.Go(func() error { + return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { + if _, err := m.models.StateChanges.BatchCopy(ctx, tx, stateChanges); err != nil { + return fmt.Errorf("copying state changes: %w", err) + } + return nil + }) + }) + + // 4 upsert goroutines — skipped in backfill mode (balance tables represent current state) + if m.ingestionMode != IngestionModeBackfill { + g.Go(func() error { + return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { + return m.tokenIngestionService.ProcessTrustlineChanges(ctx, tx, buffer.GetTrustlineChanges()) + }) + }) + g.Go(func() error { + return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { + return m.tokenIngestionService.ProcessNativeBalanceChanges(ctx, tx, buffer.GetAccountChanges()) + }) + }) + g.Go(func() error { + return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { + return m.tokenIngestionService.ProcessSACBalanceChanges(ctx, tx, buffer.GetSACBalanceChanges()) + }) + }) + g.Go(func() error { + return m.copyWithPoolConn(gCtx, func(ctx context.Context, tx pgx.Tx) error { + return m.tokenIngestionService.ProcessContractTokenChanges(ctx, tx, buffer.GetContractChanges()) + }) + }) } - if err := m.insertOperations(ctx, dbTx, ops, opParticipants); err != nil { - return 0, 0, err - } - if err := m.insertStateChanges(ctx, dbTx, stateChanges); err != nil { - return 0, 0, err + + if err := g.Wait(); err != nil { + return 0, 0, fmt.Errorf("parallel insert/upsert: %w", err) } + + m.recordStateChangeMetrics(stateChanges) log.Ctx(ctx).Infof("✅ inserted %d txs, %d ops, %d state_changes", len(txs), len(ops), len(stateChanges)) return len(txs), len(ops), nil } -// insertTransactions batch inserts transactions with their participants into the database. -func (m *ingestService) insertTransactions(ctx context.Context, pgxTx pgx.Tx, txs []*types.Transaction, stellarAddressesByToID map[int64]types.ParticipantSet) error { - if len(txs) == 0 { - return nil - } - _, err := m.models.Transactions.BatchCopy(ctx, pgxTx, txs, stellarAddressesByToID) +// copyWithPoolConn acquires a connection from the pool, begins a transaction, runs fn, +// and commits. On UniqueViolation the insert is idempotent (prior partial insert) so we +// return nil. +func (m *ingestService) copyWithPoolConn(ctx context.Context, fn func(context.Context, pgx.Tx) error) error { + conn, err := m.models.DB.Acquire(ctx) if err != nil { - return fmt.Errorf("batch inserting transactions: %w", err) + return fmt.Errorf("acquiring pool connection: %w", err) } - return nil -} + defer conn.Release() -// insertOperations batch inserts operations with their participants into the database. -func (m *ingestService) insertOperations(ctx context.Context, pgxTx pgx.Tx, ops []*types.Operation, stellarAddressesByOpID map[int64]types.ParticipantSet) error { - if len(ops) == 0 { - return nil - } - _, err := m.models.Operations.BatchCopy(ctx, pgxTx, ops, stellarAddressesByOpID) + tx, err := conn.Begin(ctx) if err != nil { - return fmt.Errorf("batch inserting operations: %w", err) + return fmt.Errorf("beginning transaction: %w", err) } - return nil -} + defer tx.Rollback(ctx) //nolint:errcheck -// insertStateChanges batch inserts state changes and records metrics. -func (m *ingestService) insertStateChanges(ctx context.Context, pgxTx pgx.Tx, stateChanges []types.StateChange) error { - if len(stateChanges) == 0 { - return nil + // Ingestion is idempotent (replayed from cursor on crash), so WAL fsync durability + // is unnecessary. This eliminates fsync latency from each of the 9 parallel commits. + if _, err := tx.Exec(ctx, "SET LOCAL synchronous_commit = off"); err != nil { + return fmt.Errorf("setting synchronous_commit=off: %w", err) } - _, err := m.models.StateChanges.BatchCopy(ctx, pgxTx, stateChanges) - if err != nil { - return fmt.Errorf("batch inserting state changes: %w", err) + + if err := fn(ctx, tx); err != nil { + if isUniqueViolation(err) { + return nil + } + return err + } + + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("committing copy transaction: %w", err) } - m.recordStateChangeMetrics(stateChanges) return nil } +// isUniqueViolation checks if an error is a PostgreSQL unique_violation (23505). +func isUniqueViolation(err error) bool { + var pgErr *pgconn.PgError + return errors.As(err, &pgErr) && pgErr.Code == "23505" +} + // recordStateChangeMetrics aggregates state changes by reason and category, then records metrics. func (m *ingestService) recordStateChangeMetrics(stateChanges []types.StateChange) { counts := make(map[string]int) // key: "reason|category" diff --git a/internal/services/ingest_backfill.go b/internal/services/ingest_backfill.go index de6fbb79..62f8c71d 100644 --- a/internal/services/ingest_backfill.go +++ b/internal/services/ingest_backfill.go @@ -270,13 +270,9 @@ func (m *ingestService) flushBatchBufferWithRetry(ctx context.Context, buffer *i return fmt.Errorf("setting synchronous_commit=off: %w", txErr) } txs := buffer.GetTransactions() - if _, _, err := m.insertIntoDB(ctx, dbTx, txs, buffer); err != nil { + if _, _, err := m.insertAndUpsertParallel(ctx, txs, buffer); err != nil { return fmt.Errorf("inserting processed data into db: %w", err) } - // Unlock channel accounts using all transactions (not filtered) - if err := m.unlockChannelAccounts(ctx, dbTx, txs); err != nil { - return fmt.Errorf("unlocking channel accounts: %w", err) - } // Update cursor atomically with data insertion if requested if updateCursorTo != nil { if err := m.models.IngestStore.UpdateMin(ctx, dbTx, m.oldestLedgerCursorName, *updateCursorTo); err != nil { diff --git a/internal/services/ingest_live.go b/internal/services/ingest_live.go index 8962c36f..d7ed45c7 100644 --- a/internal/services/ingest_live.go +++ b/internal/services/ingest_live.go @@ -23,68 +23,82 @@ const ( oldestLedgerSyncInterval = 100 ) -// PersistLedgerData persists processed ledger data to the database in a single atomic transaction. -// This is the shared core used by both live ingestion and loadtest. -// It handles: trustline assets, contract tokens, filtered data insertion, channel account unlocking, -// token changes, and cursor update. Channel unlock is a no-op when chAccStore is nil. +// PersistLedgerData persists processed ledger data to the database in three phases: +// +// Phase 1: Insert FK prerequisites (trustline assets, contract tokens) and commit, +// making parent rows visible for phase 2's balance upserts. +// +// Phase 2: Parallel COPYs (5 tables) + parallel upserts (4 balance tables) via errgroup, +// each on its own pool connection. UniqueViolation errors are treated as success for +// idempotent crash recovery. +// +// Phase 3: Finalize — unlock channel accounts + advance cursor. The cursor update is the +// idempotency marker; if we crash before it, everything replays safely. func (m *ingestService) PersistLedgerData(ctx context.Context, ledgerSeq uint32, buffer *indexer.IndexerBuffer, cursorName string) (int, int, error) { - var numTxs, numOps int + // Phase 1: FK prerequisites — commit so parent rows are visible to phase 2. + if err := m.persistFKPrerequisites(ctx, ledgerSeq, buffer); err != nil { + return 0, 0, fmt.Errorf("persisting FK prerequisites for ledger %d: %w", ledgerSeq, err) + } + + // Phase 2: Parallel COPYs + upserts on separate pool connections. + txs := buffer.GetTransactions() + numTxs, numOps, err := m.insertAndUpsertParallel(ctx, txs, buffer) + if err != nil { + return 0, 0, fmt.Errorf("parallel insert/upsert for ledger %d: %w", ledgerSeq, err) + } + + // Phase 3: Finalize — unlock channel accounts + advance cursor. + if err := m.persistFinalize(ctx, ledgerSeq, txs, cursorName); err != nil { + return 0, 0, fmt.Errorf("finalizing ledger %d: %w", ledgerSeq, err) + } - err := db.RunInTransaction(ctx, m.models.DB, func(dbTx pgx.Tx) error { - // 1. Insert unique trustline assets (FK prerequisite for trustline balances) + return numTxs, numOps, nil +} + +// persistFKPrerequisites inserts trustline assets and contract tokens in a committed +// transaction so that FK parent rows are visible to parallel balance upserts. +func (m *ingestService) persistFKPrerequisites(ctx context.Context, ledgerSeq uint32, buffer *indexer.IndexerBuffer) error { + if err := db.RunInTransaction(ctx, m.models.DB, func(dbTx pgx.Tx) error { uniqueAssets := buffer.GetUniqueTrustlineAssets() if len(uniqueAssets) > 0 { - if txErr := m.models.TrustlineAsset.BatchInsert(ctx, dbTx, uniqueAssets); txErr != nil { - return fmt.Errorf("inserting trustline assets for ledger %d: %w", ledgerSeq, txErr) + if err := m.models.TrustlineAsset.BatchInsert(ctx, dbTx, uniqueAssets); err != nil { + return fmt.Errorf("inserting trustline assets for ledger %d: %w", ledgerSeq, err) } } - // 2. Insert new contract tokens (filter existing, fetch metadata for SEP-41 if available, insert) - contracts, txErr := m.prepareNewContractTokens(ctx, dbTx, buffer.GetUniqueSEP41ContractTokensByID(), buffer.GetSACContracts()) - if txErr != nil { - return fmt.Errorf("preparing contract tokens for ledger %d: %w", ledgerSeq, txErr) + contracts, err := m.prepareNewContractTokens(ctx, dbTx, buffer.GetUniqueSEP41ContractTokensByID(), buffer.GetSACContracts()) + if err != nil { + return fmt.Errorf("preparing contract tokens for ledger %d: %w", ledgerSeq, err) } if len(contracts) > 0 { - if txErr = m.models.Contract.BatchInsert(ctx, dbTx, contracts); txErr != nil { - return fmt.Errorf("inserting contracts for ledger %d: %w", ledgerSeq, txErr) + if err = m.models.Contract.BatchInsert(ctx, dbTx, contracts); err != nil { + return fmt.Errorf("inserting contracts for ledger %d: %w", ledgerSeq, err) } log.Ctx(ctx).Infof("✅ inserted %d contract tokens", len(contracts)) } - // 3. Insert transactions/operations/state_changes - txs := buffer.GetTransactions() - numTxs, numOps, txErr = m.insertIntoDB(ctx, dbTx, txs, buffer) - if txErr != nil { - return fmt.Errorf("inserting processed data into db for ledger %d: %w", ledgerSeq, txErr) - } - - // 4. Unlock channel accounts (no-op when chAccStore is nil, e.g., in loadtest) - if txErr = m.unlockChannelAccounts(ctx, dbTx, txs); txErr != nil { - return fmt.Errorf("unlocking channel accounts for ledger %d: %w", ledgerSeq, txErr) - } + return nil + }); err != nil { + return fmt.Errorf("FK prerequisites transaction: %w", err) + } + return nil +} - // 5. Process token changes (trustline add/remove/update, contract token add, native balance, SAC balance) - if txErr = m.tokenIngestionService.ProcessTokenChanges(ctx, dbTx, - buffer.GetTrustlineChanges(), - buffer.GetContractChanges(), - buffer.GetAccountChanges(), - buffer.GetSACBalanceChanges(), - ); txErr != nil { - return fmt.Errorf("processing token changes for ledger %d: %w", ledgerSeq, txErr) +// persistFinalize unlocks channel accounts and advances the cursor in a committed transaction. +// The cursor update is the idempotency marker for crash recovery. +func (m *ingestService) persistFinalize(ctx context.Context, ledgerSeq uint32, txs []*types.Transaction, cursorName string) error { + if err := db.RunInTransaction(ctx, m.models.DB, func(dbTx pgx.Tx) error { + if err := m.unlockChannelAccounts(ctx, dbTx, txs); err != nil { + return fmt.Errorf("unlocking channel accounts for ledger %d: %w", ledgerSeq, err) } - - // 6. Update the specified cursor - if txErr = m.models.IngestStore.Update(ctx, dbTx, cursorName, ledgerSeq); txErr != nil { - return fmt.Errorf("updating cursor for ledger %d: %w", ledgerSeq, txErr) + if err := m.models.IngestStore.Update(ctx, dbTx, cursorName, ledgerSeq); err != nil { + return fmt.Errorf("updating cursor for ledger %d: %w", ledgerSeq, err) } - return nil - }) - if err != nil { - return 0, 0, fmt.Errorf("persisting ledger data for ledger %d: %w", ledgerSeq, err) + }); err != nil { + return fmt.Errorf("finalize transaction: %w", err) } - - return numTxs, numOps, nil + return nil } // startLiveIngestion begins continuous ingestion from the last checkpoint ledger, diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index 8c52830c..81b0b21f 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -1081,13 +1081,6 @@ func Test_ingestService_flushBatchBufferWithRetry(t *testing.T) { mockRPCService := &RPCServiceMock{} mockRPCService.On("NetworkPassphrase").Return(network.TestNetworkPassphrase).Maybe() - mockChAccStore := &store.ChannelAccountStoreMock{} - // Use variadic mock.Anything for any number of tx hashes - mockChAccStore.On("UnassignTxAndUnlockChannelAccounts", mock.Anything, mock.Anything).Return(int64(0), nil).Maybe() - mockChAccStore.On("UnassignTxAndUnlockChannelAccounts", mock.Anything, mock.Anything, mock.Anything).Return(int64(0), nil).Maybe() - mockChAccStore.On("UnassignTxAndUnlockChannelAccounts", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(0), nil).Maybe() - mockChAccStore.On("UnassignTxAndUnlockChannelAccounts", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(0), nil).Maybe() - svc, err := NewIngestService(IngestServiceConfig{ IngestionMode: IngestionModeBackfill, Models: models, @@ -1096,7 +1089,6 @@ func Test_ingestService_flushBatchBufferWithRetry(t *testing.T) { AppTracker: &apptracker.MockAppTracker{}, RPCService: mockRPCService, LedgerBackend: &LedgerBackendMock{}, - ChannelAccountStore: mockChAccStore, Metrics: m, GetLedgersLimit: defaultGetLedgersLimit, Network: network.TestNetworkPassphrase, @@ -1634,16 +1626,12 @@ func Test_ingestProcessedDataWithRetry(t *testing.T) { mockChAccStore := &store.ChannelAccountStoreMock{} - // Mock AccountTokenService to succeed + // Mock token ingestion methods (called in parallel by insertAndUpsertParallel) mockTokenIngestionService := NewTokenIngestionServiceMock(t) - mockTokenIngestionService.On("ProcessTokenChanges", - mock.Anything, // ctx - mock.Anything, // dbTx - mock.Anything, // trustlineChangesByTrustlineKey - mock.Anything, // contractChanges - mock.Anything, // accountChangesByAccountID - mock.Anything, // sacBalanceChangesByKey - ).Return(nil) + mockTokenIngestionService.On("ProcessTrustlineChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil) + mockTokenIngestionService.On("ProcessNativeBalanceChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil) + mockTokenIngestionService.On("ProcessSACBalanceChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil) + mockTokenIngestionService.On("ProcessContractTokenChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil) svc, err := NewIngestService(IngestServiceConfig{ IngestionMode: IngestionModeLive, @@ -1718,16 +1706,12 @@ func Test_ingestProcessedDataWithRetry(t *testing.T) { mockChAccStore := &store.ChannelAccountStoreMock{} - // Mock AccountTokenService to return error (simulating DB failure) + // Mock token ingestion methods - one fails to simulate DB failure mockTokenIngestionService := NewTokenIngestionServiceMock(t) - mockTokenIngestionService.On("ProcessTokenChanges", - mock.Anything, // ctx - mock.Anything, // dbTx - mock.Anything, // trustlineChangesByTrustlineKey - mock.Anything, // contractChanges - mock.Anything, // accountChangesByAccountID - mock.Anything, // sacBalanceChangesByKey - ).Return(fmt.Errorf("db connection failed")) + mockTokenIngestionService.On("ProcessTrustlineChanges", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("db connection failed")).Maybe() + mockTokenIngestionService.On("ProcessNativeBalanceChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + mockTokenIngestionService.On("ProcessSACBalanceChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + mockTokenIngestionService.On("ProcessContractTokenChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() svc, err := NewIngestService(IngestServiceConfig{ IngestionMode: IngestionModeLive, @@ -1802,24 +1786,13 @@ func Test_ingestProcessedDataWithRetry(t *testing.T) { mockChAccStore := &store.ChannelAccountStoreMock{} - // Mock AccountTokenService to fail once then succeed + // Mock token ingestion methods - trustline fails once then succeeds on retry mockTokenIngestionService := NewTokenIngestionServiceMock(t) - mockTokenIngestionService.On("ProcessTokenChanges", - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - ).Return(fmt.Errorf("transient error")).Once() - mockTokenIngestionService.On("ProcessTokenChanges", - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - mock.Anything, - ).Return(nil).Once() + mockTokenIngestionService.On("ProcessTrustlineChanges", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("transient error")).Once() + mockTokenIngestionService.On("ProcessTrustlineChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + mockTokenIngestionService.On("ProcessNativeBalanceChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + mockTokenIngestionService.On("ProcessSACBalanceChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + mockTokenIngestionService.On("ProcessContractTokenChanges", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() svc, err := NewIngestService(IngestServiceConfig{ IngestionMode: IngestionModeLive, diff --git a/internal/services/mocks.go b/internal/services/mocks.go index 7be51bdf..fea98146 100644 --- a/internal/services/mocks.go +++ b/internal/services/mocks.go @@ -135,6 +135,26 @@ func (m *TokenIngestionServiceMock) ProcessTokenChanges(ctx context.Context, dbT return args.Error(0) } +func (m *TokenIngestionServiceMock) ProcessTrustlineChanges(ctx context.Context, dbTx pgx.Tx, changesByKey map[indexer.TrustlineChangeKey]types.TrustlineChange) error { + args := m.Called(ctx, dbTx, changesByKey) + return args.Error(0) +} + +func (m *TokenIngestionServiceMock) ProcessContractTokenChanges(ctx context.Context, dbTx pgx.Tx, changes []types.ContractChange) error { + args := m.Called(ctx, dbTx, changes) + return args.Error(0) +} + +func (m *TokenIngestionServiceMock) ProcessNativeBalanceChanges(ctx context.Context, dbTx pgx.Tx, changesByAccountID map[string]types.AccountChange) error { + args := m.Called(ctx, dbTx, changesByAccountID) + return args.Error(0) +} + +func (m *TokenIngestionServiceMock) ProcessSACBalanceChanges(ctx context.Context, dbTx pgx.Tx, changesByKey map[indexer.SACBalanceChangeKey]types.SACBalanceChange) error { + args := m.Called(ctx, dbTx, changesByKey) + return args.Error(0) +} + // NewTokenIngestionServiceMock creates a new instance of TokenIngestionServiceMock. func NewTokenIngestionServiceMock(t interface { mock.TestingT diff --git a/internal/services/token_ingestion.go b/internal/services/token_ingestion.go index 87c795d2..952007ef 100644 --- a/internal/services/token_ingestion.go +++ b/internal/services/token_ingestion.go @@ -161,6 +161,12 @@ type TokenIngestionService interface { // // Both trustline and contract IDs are computed using deterministic hash functions (DeterministicAssetID, DeterministicContractID). ProcessTokenChanges(ctx context.Context, dbTx pgx.Tx, trustlineChangesByTrustlineKey map[indexer.TrustlineChangeKey]types.TrustlineChange, contractChanges []types.ContractChange, accountChangesByAccountID map[string]types.AccountChange, sacBalanceChangesByKey map[indexer.SACBalanceChangeKey]types.SACBalanceChange) error + + // Individual token change processors — used by parallel ingestion. + ProcessTrustlineChanges(ctx context.Context, dbTx pgx.Tx, changesByKey map[indexer.TrustlineChangeKey]types.TrustlineChange) error + ProcessContractTokenChanges(ctx context.Context, dbTx pgx.Tx, changes []types.ContractChange) error + ProcessNativeBalanceChanges(ctx context.Context, dbTx pgx.Tx, changesByAccountID map[string]types.AccountChange) error + ProcessSACBalanceChanges(ctx context.Context, dbTx pgx.Tx, changesByKey map[indexer.SACBalanceChangeKey]types.SACBalanceChange) error } // Verify interface compliance at compile time @@ -353,23 +359,23 @@ func (s *tokenIngestionService) ProcessTokenChanges(ctx context.Context, dbTx pg return nil } - if err := s.processTrustlineChanges(ctx, dbTx, trustlineChangesByTrustlineKey); err != nil { + if err := s.ProcessTrustlineChanges(ctx, dbTx, trustlineChangesByTrustlineKey); err != nil { return err } - if err := s.processContractTokenChanges(ctx, dbTx, contractChanges); err != nil { + if err := s.ProcessContractTokenChanges(ctx, dbTx, contractChanges); err != nil { return err } - if err := s.processNativeBalanceChanges(ctx, dbTx, accountChangesByAccountID); err != nil { + if err := s.ProcessNativeBalanceChanges(ctx, dbTx, accountChangesByAccountID); err != nil { return err } - if err := s.processSACBalanceChanges(ctx, dbTx, sacBalanceChangesByKey); err != nil { + if err := s.ProcessSACBalanceChanges(ctx, dbTx, sacBalanceChangesByKey); err != nil { return err } return nil } -// processTrustlineChanges handles trustline balance upserts and deletes. -func (s *tokenIngestionService) processTrustlineChanges(ctx context.Context, dbTx pgx.Tx, changesByKey map[indexer.TrustlineChangeKey]types.TrustlineChange) error { +// ProcessTrustlineChanges handles trustline balance upserts and deletes. +func (s *tokenIngestionService) ProcessTrustlineChanges(ctx context.Context, dbTx pgx.Tx, changesByKey map[indexer.TrustlineChangeKey]types.TrustlineChange) error { if len(changesByKey) == 0 { return nil } @@ -403,8 +409,8 @@ func (s *tokenIngestionService) processTrustlineChanges(ctx context.Context, dbT return nil } -// processContractTokenChanges handles SEP-41 contract token inserts. -func (s *tokenIngestionService) processContractTokenChanges(ctx context.Context, dbTx pgx.Tx, changes []types.ContractChange) error { +// ProcessContractTokenChanges handles SEP-41 contract token inserts. +func (s *tokenIngestionService) ProcessContractTokenChanges(ctx context.Context, dbTx pgx.Tx, changes []types.ContractChange) error { if len(changes) == 0 { return nil } @@ -431,8 +437,8 @@ func (s *tokenIngestionService) processContractTokenChanges(ctx context.Context, return nil } -// processNativeBalanceChanges handles native XLM balance upserts and deletes. -func (s *tokenIngestionService) processNativeBalanceChanges(ctx context.Context, dbTx pgx.Tx, changesByAccountID map[string]types.AccountChange) error { +// ProcessNativeBalanceChanges handles native XLM balance upserts and deletes. +func (s *tokenIngestionService) ProcessNativeBalanceChanges(ctx context.Context, dbTx pgx.Tx, changesByAccountID map[string]types.AccountChange) error { if len(changesByAccountID) == 0 { return nil } @@ -463,8 +469,8 @@ func (s *tokenIngestionService) processNativeBalanceChanges(ctx context.Context, return nil } -// processSACBalanceChanges handles SAC balance upserts and deletes for contract addresses. -func (s *tokenIngestionService) processSACBalanceChanges(ctx context.Context, dbTx pgx.Tx, changesByKey map[indexer.SACBalanceChangeKey]types.SACBalanceChange) error { +// ProcessSACBalanceChanges handles SAC balance upserts and deletes for contract addresses. +func (s *tokenIngestionService) ProcessSACBalanceChanges(ctx context.Context, dbTx pgx.Tx, changesByKey map[indexer.SACBalanceChangeKey]types.SACBalanceChange) error { if len(changesByKey) == 0 { return nil }