diff --git a/internal/data/operations.go b/internal/data/operations.go index 003bb33d..32f1b709 100644 --- a/internal/data/operations.go +++ b/internal/data/operations.go @@ -6,7 +6,6 @@ import ( "strings" "time" - set "github.com/deckarep/golang-set/v2" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" @@ -334,7 +333,7 @@ func (m *OperationModel) BatchCopy( ctx context.Context, pgxTx pgx.Tx, operations []*types.Operation, - stellarAddressesByOpID map[int64]set.Set[string], + stellarAddressesByOpID map[int64]types.ParticipantSet, ) (int, error) { if len(operations) == 0 { return 0, nil diff --git a/internal/data/operations_test.go b/internal/data/operations_test.go index 438ec018..b73a9941 100644 --- a/internal/data/operations_test.go +++ b/internal/data/operations_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - set "github.com/deckarep/golang-set/v2" "github.com/jackc/pgx/v5" "github.com/prometheus/client_golang/prometheus" "github.com/stellar/go-stellar-sdk/keypair" @@ -21,9 +20,9 @@ import ( // generateTestOperations creates n test operations for benchmarking. // It returns a map of operation IDs to addresses. -func generateTestOperations(n int, startID int64) ([]*types.Operation, map[int64]set.Set[string]) { +func generateTestOperations(n int, startID int64) ([]*types.Operation, map[int64]types.ParticipantSet) { ops := make([]*types.Operation, n) - addressesByOpID := make(map[int64]set.Set[string]) + addressesByOpID := make(map[int64]types.ParticipantSet) now := time.Now() for i := 0; i < n; i++ { @@ -37,7 +36,7 @@ func generateTestOperations(n int, startID int64) ([]*types.Operation, map[int64 LedgerNumber: uint32(i + 1), LedgerCreatedAt: now, } - addressesByOpID[opID] = set.NewSet(address) + addressesByOpID[opID] = types.NewParticipantSet(address) } return ops, addressesByOpID @@ -101,26 +100,26 @@ func Test_OperationModel_BatchCopy(t *testing.T) { testCases := []struct { name string operations []*types.Operation - stellarAddressesByOpID map[int64]set.Set[string] + stellarAddressesByOpID map[int64]types.ParticipantSet wantCount int wantErrContains string }{ { name: "🟢successful_insert_multiple", operations: []*types.Operation{&op1, &op2}, - stellarAddressesByOpID: map[int64]set.Set[string]{op1.ID: set.NewSet(kp1.Address()), op2.ID: set.NewSet(kp2.Address())}, + stellarAddressesByOpID: map[int64]types.ParticipantSet{op1.ID: types.NewParticipantSet(kp1.Address()), op2.ID: types.NewParticipantSet(kp2.Address())}, wantCount: 2, }, { name: "🟢empty_input", operations: []*types.Operation{}, - stellarAddressesByOpID: map[int64]set.Set[string]{}, + stellarAddressesByOpID: map[int64]types.ParticipantSet{}, wantCount: 0, }, { name: "🟢no_participants", operations: []*types.Operation{&op1}, - stellarAddressesByOpID: map[int64]set.Set[string]{}, + stellarAddressesByOpID: map[int64]types.ParticipantSet{}, wantCount: 1, }, } diff --git a/internal/data/transactions.go b/internal/data/transactions.go index bb04c5fc..2618019e 100644 --- a/internal/data/transactions.go +++ b/internal/data/transactions.go @@ -6,7 +6,6 @@ import ( "strings" "time" - set "github.com/deckarep/golang-set/v2" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" @@ -245,7 +244,7 @@ func (m *TransactionModel) BatchCopy( ctx context.Context, pgxTx pgx.Tx, txs []*types.Transaction, - stellarAddressesByToID map[int64]set.Set[string], + stellarAddressesByToID map[int64]types.ParticipantSet, ) (int, error) { if len(txs) == 0 { return 0, nil diff --git a/internal/data/transactions_test.go b/internal/data/transactions_test.go index de94141f..dc0f05a0 100644 --- a/internal/data/transactions_test.go +++ b/internal/data/transactions_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - set "github.com/deckarep/golang-set/v2" "github.com/jackc/pgx/v5" "github.com/prometheus/client_golang/prometheus" "github.com/stellar/go-stellar-sdk/keypair" @@ -22,9 +21,9 @@ import ( // generateTestTransactions creates n test transactions for benchmarking. // Uses toid.New to generate realistic ToIDs based on ledger sequence and transaction index. -func generateTestTransactions(n int, startLedger int32) ([]*types.Transaction, map[int64]set.Set[string]) { +func generateTestTransactions(n int, startLedger int32) ([]*types.Transaction, map[int64]types.ParticipantSet) { txs := make([]*types.Transaction, n) - addressesByToID := make(map[int64]set.Set[string]) + addressesByToID := make(map[int64]types.ParticipantSet) now := time.Now() for i := 0; i < n; i++ { @@ -43,7 +42,7 @@ func generateTestTransactions(n int, startLedger int32) ([]*types.Transaction, m LedgerCreatedAt: now, IsFeeBump: false, } - addressesByToID[toID] = set.NewSet(address) + addressesByToID[toID] = types.NewParticipantSet(address) } return txs, addressesByToID @@ -93,32 +92,32 @@ func Test_TransactionModel_BatchCopy(t *testing.T) { testCases := []struct { name string txs []*types.Transaction - stellarAddressesByToID map[int64]set.Set[string] + stellarAddressesByToID map[int64]types.ParticipantSet wantCount int wantErrContains string }{ { name: "🟢successful_insert_multiple", txs: []*types.Transaction{&txCopy1, &txCopy2}, - stellarAddressesByToID: map[int64]set.Set[string]{txCopy1.ToID: set.NewSet(kp1.Address()), txCopy2.ToID: set.NewSet(kp2.Address())}, + stellarAddressesByToID: map[int64]types.ParticipantSet{txCopy1.ToID: types.NewParticipantSet(kp1.Address()), txCopy2.ToID: types.NewParticipantSet(kp2.Address())}, wantCount: 2, }, { name: "🟢empty_input", txs: []*types.Transaction{}, - stellarAddressesByToID: map[int64]set.Set[string]{}, + stellarAddressesByToID: map[int64]types.ParticipantSet{}, wantCount: 0, }, { name: "🟢single_transaction", txs: []*types.Transaction{&txCopy3}, - stellarAddressesByToID: map[int64]set.Set[string]{txCopy3.ToID: set.NewSet(kp1.Address())}, + stellarAddressesByToID: map[int64]types.ParticipantSet{txCopy3.ToID: types.NewParticipantSet(kp1.Address())}, wantCount: 1, }, { name: "🟢no_participants", txs: []*types.Transaction{&txCopy1}, - stellarAddressesByToID: map[int64]set.Set[string]{}, + stellarAddressesByToID: map[int64]types.ParticipantSet{}, wantCount: 1, }, } diff --git a/internal/indexer/indexer.go b/internal/indexer/indexer.go index 53e396e1..7d78544b 100644 --- a/internal/indexer/indexer.go +++ b/internal/indexer/indexer.go @@ -22,11 +22,10 @@ import ( ) type IndexerBufferInterface interface { - PushTransaction(participant string, transaction types.Transaction) - PushOperation(participant string, operation types.Operation, transaction types.Transaction) - PushStateChange(transaction types.Transaction, operation types.Operation, stateChange types.StateChange) - GetTransactionsParticipants() map[int64]set.Set[string] - GetOperationsParticipants() map[int64]set.Set[string] + BatchPushTransactionResult(result *TransactionResult) + BatchPushChanges(trustlines []types.TrustlineChange, accounts []types.AccountChange, sacBalances []types.SACBalanceChange, sacContracts []*data.Contract) + GetTransactionsParticipants() map[int64]types.ParticipantSet + GetOperationsParticipants() map[int64]types.ParticipantSet GetNumberOfTransactions() int GetNumberOfOperations() int GetTransactions() []*types.Transaction @@ -36,15 +35,9 @@ type IndexerBufferInterface interface { GetContractChanges() []types.ContractChange GetAccountChanges() map[string]types.AccountChange GetSACBalanceChanges() map[SACBalanceChangeKey]types.SACBalanceChange - PushContractChange(contractChange types.ContractChange) - PushTrustlineChange(trustlineChange types.TrustlineChange) - PushAccountChange(accountChange types.AccountChange) - PushSACBalanceChange(sacBalanceChange types.SACBalanceChange) - PushSACContract(c *data.Contract) GetUniqueTrustlineAssets() []data.TrustlineAsset GetUniqueSEP41ContractTokensByID() map[string]types.ContractType GetSACContracts() map[string]*data.Contract - Merge(other IndexerBufferInterface) Clear() } @@ -101,30 +94,26 @@ func NewIndexer(networkPassphrase string, pool pond.Pool, ingestionMetrics *metr } // ProcessLedgerTransactions processes all transactions in a ledger in parallel. -// It collects transaction data (participants, operations, state changes) and populates the buffer in a single pass. +// Each goroutine pushes directly into the shared ledger buffer (protected by mutex). +// The heavy XDR parsing work runs without the lock; only the brief map inserts lock. // Returns the total participant count for metrics. func (i *Indexer) ProcessLedgerTransactions(ctx context.Context, transactions []ingest.LedgerTransaction, ledgerBuffer IndexerBufferInterface) (int, error) { group := i.pool.NewGroupContext(ctx) - txnBuffers := make([]*IndexerBuffer, len(transactions)) participantCounts := make([]int, len(transactions)) var errs []error errMu := sync.Mutex{} for idx, tx := range transactions { - index := idx - tx := tx group.Submit(func() { - buffer := NewIndexerBuffer() - count, err := i.processTransaction(ctx, tx, buffer) + count, err := i.processTransaction(ctx, tx, ledgerBuffer) if err != nil { errMu.Lock() errs = append(errs, fmt.Errorf("processing transaction at ledger=%d tx=%d: %w", tx.Ledger.LedgerSequence(), tx.Index, err)) errMu.Unlock() return } - txnBuffers[index] = buffer - participantCounts[index] = count + participantCounts[idx] = count }) } @@ -135,19 +124,18 @@ func (i *Indexer) ProcessLedgerTransactions(ctx context.Context, transactions [] return 0, fmt.Errorf("processing transactions: %w", errors.Join(errs...)) } - // Merge buffers and count participants totalParticipants := 0 - for idx, buffer := range txnBuffers { - ledgerBuffer.Merge(buffer) - totalParticipants += participantCounts[idx] + for _, count := range participantCounts { + totalParticipants += count } return totalParticipants, nil } // processTransaction processes a single transaction - collects data and populates buffer. +// All buffer writes are batched to minimize mutex contention during parallel processing. // Returns participant count for metrics. -func (i *Indexer) processTransaction(ctx context.Context, tx ingest.LedgerTransaction, buffer *IndexerBuffer) (int, error) { +func (i *Indexer) processTransaction(ctx context.Context, tx ingest.LedgerTransaction, buffer IndexerBufferInterface) (int, error) { // Get transaction participants txParticipants, err := i.participantsProcessor.GetTransactionParticipants(tx) if err != nil { @@ -172,113 +160,107 @@ func (i *Indexer) processTransaction(ctx context.Context, tx ingest.LedgerTransa return 0, fmt.Errorf("creating data transaction: %w", err) } - // Count all unique participants for metrics - allParticipants := set.NewSet[string]() - allParticipants = allParticipants.Union(txParticipants) - for _, opParticipants := range opsParticipants { - allParticipants = allParticipants.Union(opParticipants.Participants) - } - for _, stateChange := range stateChanges { - allParticipants.Add(string(stateChange.AccountID)) - } - - // Insert transaction participants - for participant := range txParticipants.Iter() { - buffer.PushTransaction(participant, *dataTx) - } - // Get operation results for extracting result codes opResults, _ := tx.Result.OperationResults() - // Insert operations participants - operationsMap := make(map[int64]*types.Operation) - for opID, opParticipants := range opsParticipants { - dataOp, opErr := processors.ConvertOperation(&tx, &opParticipants.OpWrapper.Operation, opID, opParticipants.OpWrapper.Index, opResults) + // Build operations map and collect participants per operation + operationsMap := make(map[int64]*types.Operation, len(opsParticipants)) + opParticipantMap := make(map[int64][]string, len(opsParticipants)) + for opID, opP := range opsParticipants { + dataOp, opErr := processors.ConvertOperation(&tx, &opP.OpWrapper.Operation, opID, opP.OpWrapper.Index, opResults) if opErr != nil { return 0, fmt.Errorf("creating data operation: %w", opErr) } operationsMap[opID] = dataOp - for participant := range opParticipants.Participants.Iter() { - buffer.PushOperation(participant, *dataOp, *dataTx) + participants := make([]string, 0, opP.Participants.Cardinality()) + for p := range opP.Participants.Iter() { + participants = append(participants, p) } + opParticipantMap[opID] = participants } - // Process trustline, account, and SAC balance changes from ledger changes - for _, opParticipants := range opsParticipants { - trustlineChanges, tlErr := i.trustlinesProcessor.ProcessOperation(ctx, opParticipants.OpWrapper) + // Build contract changes from state changes + var contractChanges []types.ContractChange + for _, sc := range stateChanges { + if sc.StateChangeCategory == types.StateChangeCategoryBalance && sc.ContractType == types.ContractTypeSEP41 { + contractChanges = append(contractChanges, types.ContractChange{ + AccountID: string(sc.AccountID), + OperationID: sc.OperationID, + ContractID: sc.TokenID.String(), + LedgerNumber: tx.Ledger.LedgerSequence(), + ContractType: sc.ContractType, + }) + } + } + + // Validate state change operation IDs (log warnings for mismatches) + for _, sc := range stateChanges { + if sc.AccountID == "" || sc.OperationID == 0 { + continue + } + if operationsMap[sc.OperationID] == nil { + log.Ctx(ctx).Errorf("operation ID %d not found in operations map for state change (to_id=%d, category=%s)", sc.OperationID, sc.ToID, sc.StateChangeCategory) + } + } + + // Collect tx participant strings + txParticipantList := make([]string, 0, txParticipants.Cardinality()) + for p := range txParticipants.Iter() { + txParticipantList = append(txParticipantList, p) + } + + // Push all transaction data in a single lock acquisition + buffer.BatchPushTransactionResult(&TransactionResult{ + Transaction: dataTx, + TxParticipants: txParticipantList, + Operations: operationsMap, + OpParticipants: opParticipantMap, + ContractChanges: contractChanges, + StateChanges: stateChanges, + StateChangeOpMap: operationsMap, + }) + + // Process trustline, account, and SAC balance changes from ledger changes. + // Each operation's changes are pushed in a single lock acquisition. + for _, opP := range opsParticipants { + trustlineChanges, tlErr := i.trustlinesProcessor.ProcessOperation(ctx, opP.OpWrapper) if tlErr != nil { return 0, fmt.Errorf("processing trustline changes: %w", tlErr) } - for _, tlChange := range trustlineChanges { - buffer.PushTrustlineChange(tlChange) - } - accountChanges, accErr := i.accountsProcessor.ProcessOperation(ctx, opParticipants.OpWrapper) + accountChanges, accErr := i.accountsProcessor.ProcessOperation(ctx, opP.OpWrapper) if accErr != nil { return 0, fmt.Errorf("processing account changes: %w", accErr) } - for _, accChange := range accountChanges { - buffer.PushAccountChange(accChange) - } - sacBalanceChanges, sacErr := i.sacBalancesProcessor.ProcessOperation(ctx, opParticipants.OpWrapper) + sacBalanceChanges, sacErr := i.sacBalancesProcessor.ProcessOperation(ctx, opP.OpWrapper) if sacErr != nil { return 0, fmt.Errorf("processing SAC balance changes: %w", sacErr) } - for _, sacChange := range sacBalanceChanges { - buffer.PushSACBalanceChange(sacChange) - } - sacContracts, sacInstanceErr := i.sacInstancesProcessor.ProcessOperation(ctx, opParticipants.OpWrapper) + sacContracts, sacInstanceErr := i.sacInstancesProcessor.ProcessOperation(ctx, opP.OpWrapper) if sacInstanceErr != nil { return 0, fmt.Errorf("processing SAC instances: %w", sacInstanceErr) } - for _, c := range sacContracts { - buffer.PushSACContract(c) - } - } - // Process state changes to extract contract changes - for _, stateChange := range stateChanges { - //exhaustive:ignore - switch stateChange.StateChangeCategory { - case types.StateChangeCategoryBalance: - // Only store contract changes when contract token is SEP41 - if stateChange.ContractType == types.ContractTypeSEP41 { - contractChange := types.ContractChange{ - AccountID: string(stateChange.AccountID), - OperationID: stateChange.OperationID, - ContractID: stateChange.TokenID.String(), - LedgerNumber: tx.Ledger.LedgerSequence(), - ContractType: stateChange.ContractType, - } - buffer.PushContractChange(contractChange) - } - } + buffer.BatchPushChanges(trustlineChanges, accountChanges, sacBalanceChanges, sacContracts) } - // Insert state changes - for _, stateChange := range stateChanges { - // Skip empty state changes (no account to associate with) - if stateChange.AccountID == "" { - continue - } - - // Get the correct operation for this state change - var operation types.Operation - if stateChange.OperationID != 0 { - correctOp := operationsMap[stateChange.OperationID] - if correctOp == nil { - log.Ctx(ctx).Errorf("operation ID %d not found in operations map for state change (to_id=%d, category=%s)", stateChange.OperationID, stateChange.ToID, stateChange.StateChangeCategory) - continue - } - operation = *correctOp + // Count all unique participants for metrics + allParticipants := make(map[string]struct{}, txParticipants.Cardinality()) + for _, p := range txParticipantList { + allParticipants[p] = struct{}{} + } + for _, participants := range opParticipantMap { + for _, p := range participants { + allParticipants[p] = struct{}{} } - // For fee state changes (OperationID == 0), operation remains zero value - buffer.PushStateChange(*dataTx, operation, stateChange) + } + for _, sc := range stateChanges { + allParticipants[string(sc.AccountID)] = struct{}{} } - return allParticipants.Cardinality(), nil + return len(allParticipants), nil } // getTransactionStateChanges processes operations of a transaction and calculates all state changes diff --git a/internal/indexer/indexer_buffer.go b/internal/indexer/indexer_buffer.go index ef901e64..e06fd907 100644 --- a/internal/indexer/indexer_buffer.go +++ b/internal/indexer/indexer_buffer.go @@ -5,11 +5,9 @@ package indexer import ( "fmt" - "maps" "strings" "sync" - set "github.com/deckarep/golang-set/v2" "github.com/google/uuid" "github.com/stellar/go-stellar-sdk/txnbuild" @@ -17,8 +15,8 @@ import ( "github.com/stellar/wallet-backend/internal/indexer/types" ) -// IndexerBuffer is a thread-safe, memory-efficient buffer for collecting blockchain data -// during ledger ingestion. It uses a two-level storage architecture: +// IndexerBuffer is a memory-efficient buffer for collecting blockchain data during ledger +// ingestion. It uses a two-level storage architecture: // // ARCHITECTURE: // 1. Canonical Storage Layer: @@ -27,23 +25,14 @@ import ( // - This layer owns the actual data and ensures only ONE copy exists in memory // // 2. Transaction/Operation to Participants Mapping Layer: -// - participantsByTxHash: Maps each transaction hash to a SET of participant IDs +// - participantsByToID: Maps each transaction ToID to a SET of participant IDs // - participantsByOpID: Maps each operation ID to a SET of participant IDs // - Efficiently tracks which participants interacted with each tx/op // -// MEMORY OPTIMIZATION: -// Transaction structs contain large XDR fields (10-50+ KB each). When multiple participants -// interact with the same transaction, they all point to the SAME canonical pointer instead -// of storing duplicate copies. -// -// PERFORMANCE: -// - Push operations: O(1) via set.Add() with automatic deduplication -// - No manual duplicate checking: Sets handle uniqueness automatically -// - MergeBuffer: O(n) with zero temporary map allocations -// // THREAD SAFETY: -// All public methods use RWMutex for concurrent read/exclusive write access. -// Callers can safely use multiple buffers in parallel goroutines. +// Push methods are protected by a sync.Mutex for concurrent writes during parallel +// transaction processing. Get methods are NOT locked — they are only called after +// all parallel processing completes (after group.Wait()). type TrustlineChangeKey struct { AccountID string @@ -57,11 +46,11 @@ type SACBalanceChangeKey struct { } type IndexerBuffer struct { - mu sync.RWMutex + mu sync.Mutex txByHash map[string]*types.Transaction - participantsByToID map[int64]set.Set[string] + participantsByToID map[int64]types.ParticipantSet opByID map[int64]*types.Operation - participantsByOpID map[int64]set.Set[string] + participantsByOpID map[int64]types.ParticipantSet stateChanges []types.StateChange trustlineChangesByTrustlineKey map[TrustlineChangeKey]types.TrustlineChange contractChanges []types.ContractChange @@ -73,13 +62,13 @@ type IndexerBuffer struct { } // NewIndexerBuffer creates a new IndexerBuffer with initialized data structures. -// All maps and sets are pre-allocated to avoid nil pointer issues during concurrent access. +// All maps and sets are pre-allocated to avoid nil pointer issues. func NewIndexerBuffer() *IndexerBuffer { return &IndexerBuffer{ txByHash: make(map[string]*types.Transaction), - participantsByToID: make(map[int64]set.Set[string]), + participantsByToID: make(map[int64]types.ParticipantSet), opByID: make(map[int64]*types.Operation), - participantsByOpID: make(map[int64]set.Set[string]), + participantsByOpID: make(map[int64]types.ParticipantSet), stateChanges: make([]types.StateChange, 0), trustlineChangesByTrustlineKey: make(map[TrustlineChangeKey]types.TrustlineChange), contractChanges: make([]types.ContractChange, 0), @@ -93,138 +82,64 @@ func NewIndexerBuffer() *IndexerBuffer { // PushTransaction adds a transaction and associates it with a participant. // Uses canonical pointer pattern: stores one copy of each transaction (by hash) and tracks -// which participants interacted with it. Multiple participants can reference the same transaction. -// Thread-safe: acquires write lock. -func (b *IndexerBuffer) PushTransaction(participant string, transaction types.Transaction) { +// which participants interacted with it. Thread-safe. +func (b *IndexerBuffer) PushTransaction(participant string, transaction *types.Transaction) { b.mu.Lock() defer b.mu.Unlock() - b.pushTransactionUnsafe(participant, &transaction) -} - -// pushTransactionUnsafe is the internal implementation that assumes the caller -// already holds the write lock. This method implements the following pattern: -// -// 1. Check if transaction already exists in txByHash -// 2. If not, store the transaction pointer -// 3. Add participant to the global participants set -// 4. Add participant to this transaction's participant set in participantsByToID -// -// Caller must hold write lock. -func (b *IndexerBuffer) pushTransactionUnsafe(participant string, transaction *types.Transaction) { txHash := transaction.Hash.String() if _, exists := b.txByHash[txHash]; !exists { b.txByHash[txHash] = transaction } - // Track this participant by ToID toID := transaction.ToID if _, exists := b.participantsByToID[toID]; !exists { - b.participantsByToID[toID] = set.NewSet[string]() + b.participantsByToID[toID] = make(types.ParticipantSet) } - - // Add participant - O(1) with automatic deduplication b.participantsByToID[toID].Add(participant) } -// GetNumberOfTransactions returns the count of unique transactions in the buffer. -// Thread-safe: uses read lock. -func (b *IndexerBuffer) GetNumberOfTransactions() int { - b.mu.RLock() - defer b.mu.RUnlock() - - return len(b.txByHash) -} - -// GetNumberOfOperations returns the count of unique operations in the buffer. -// Thread-safe: uses read lock. -func (b *IndexerBuffer) GetNumberOfOperations() int { - b.mu.RLock() - defer b.mu.RUnlock() +// PushOperation adds an operation and its parent transaction, associating both with a participant. +// Thread-safe. +func (b *IndexerBuffer) PushOperation(participant string, operation *types.Operation, transaction *types.Transaction) { + b.mu.Lock() + defer b.mu.Unlock() - return len(b.opByID) + b.pushOperation(participant, operation) + b.pushTransaction(participant, transaction) } -// GetTransactions returns all unique transactions. -// Thread-safe: uses read lock. -func (b *IndexerBuffer) GetTransactions() []*types.Transaction { - b.mu.RLock() - defer b.mu.RUnlock() +// PushStateChange adds a state change along with its associated transaction and operation. +// Thread-safe. +func (b *IndexerBuffer) PushStateChange(transaction *types.Transaction, operation *types.Operation, stateChange types.StateChange) { + b.mu.Lock() + defer b.mu.Unlock() - txs := make([]*types.Transaction, 0, len(b.txByHash)) - for _, txPtr := range b.txByHash { - txs = append(txs, txPtr) + b.stateChanges = append(b.stateChanges, stateChange) + b.pushTransaction(string(stateChange.AccountID), transaction) + // Fee changes dont have an operation ID associated with them + if stateChange.OperationID != 0 { + b.pushOperation(string(stateChange.AccountID), operation) } - - return txs -} - -// GetTransactionsParticipants returns a map of transaction ToIDs to its participants. -func (b *IndexerBuffer) GetTransactionsParticipants() map[int64]set.Set[string] { - b.mu.RLock() - defer b.mu.RUnlock() - - return maps.Clone(b.participantsByToID) } // PushTrustlineChange adds a trustline change to the buffer and tracks unique assets. -// Thread-safe: acquires write lock. +// Thread-safe. func (b *IndexerBuffer) PushTrustlineChange(trustlineChange types.TrustlineChange) { b.mu.Lock() defer b.mu.Unlock() - code, issuer, err := ParseAssetString(trustlineChange.Asset) - if err != nil { - return // Skip invalid assets - } - trustlineID := data.DeterministicAssetID(code, issuer) - - // Track unique asset with pre-computed deterministic ID - if _, exists := b.uniqueTrustlineAssets[trustlineID]; !exists { - b.uniqueTrustlineAssets[trustlineID] = data.TrustlineAsset{ - ID: trustlineID, - Code: code, - Issuer: issuer, - } - } - - changeKey := TrustlineChangeKey{ - AccountID: trustlineChange.AccountID, - TrustlineID: trustlineID, - } - prevChange, exists := b.trustlineChangesByTrustlineKey[changeKey] - if exists && prevChange.OperationID > trustlineChange.OperationID { - return - } - - // Handle ADD→REMOVE no-op case: if this is a remove operation and we have an add operation for the same trustline from previous operation, - // it is a no-op for current ledger. - if exists && trustlineChange.Operation == types.TrustlineOpRemove && prevChange.Operation == types.TrustlineOpAdd { - delete(b.trustlineChangesByTrustlineKey, changeKey) - return - } - - b.trustlineChangesByTrustlineKey[changeKey] = trustlineChange -} - -// GetTrustlineChanges returns all trustline changes stored in the buffer. -// Thread-safe: uses read lock. -func (b *IndexerBuffer) GetTrustlineChanges() map[TrustlineChangeKey]types.TrustlineChange { - b.mu.RLock() - defer b.mu.RUnlock() - - return b.trustlineChangesByTrustlineKey + b.pushTrustlineChange(trustlineChange) } // PushContractChange adds a contract change to the buffer and tracks unique SEP-41 contracts. -// Thread-safe: acquires write lock. +// Thread-safe. func (b *IndexerBuffer) PushContractChange(contractChange types.ContractChange) { b.mu.Lock() defer b.mu.Unlock() b.contractChanges = append(b.contractChanges, contractChange) - // Only track SEP-41 contracts for DB insertion if contractChange.ContractType != types.ContractTypeSEP41 { return } @@ -236,102 +151,146 @@ func (b *IndexerBuffer) PushContractChange(contractChange types.ContractChange) } } -// GetContractChanges returns all contract changes stored in the buffer. -// Thread-safe: uses read lock. -func (b *IndexerBuffer) GetContractChanges() []types.ContractChange { - b.mu.RLock() - defer b.mu.RUnlock() - - return b.contractChanges -} - -// PushAccountChange adds an account change to the buffer with deduplication. -// Keeps the change with highest OperationID per account. Handles CREATE→REMOVE no-op case. -// Thread-safe: acquires write lock. -func (b *IndexerBuffer) PushAccountChange(accountChange types.AccountChange) { +// BatchPushChanges pushes trustline, account, SAC balance, and SAC contract changes +// in a single lock acquisition. Thread-safe. +func (b *IndexerBuffer) BatchPushChanges( + trustlines []types.TrustlineChange, + accounts []types.AccountChange, + sacBalances []types.SACBalanceChange, + sacContracts []*data.Contract, +) { b.mu.Lock() defer b.mu.Unlock() - accountID := accountChange.AccountID - existing, exists := b.accountChangesByAccountID[accountID] - - // Keep the change with highest OperationID - if exists && existing.OperationID > accountChange.OperationID { - return + for i := range trustlines { + b.pushTrustlineChange(trustlines[i]) } - - // Handle CREATE→REMOVE no-op case: account created and removed in same batch - // Note: UPDATE→REMOVE is NOT a no-op (account existed before, needs deletion) - if exists && accountChange.Operation == types.AccountOpRemove && existing.Operation == types.AccountOpCreate { - delete(b.accountChangesByAccountID, accountID) - return + for i := range accounts { + b.pushAccountChange(accounts[i]) + } + for i := range sacBalances { + b.pushSACBalanceChange(sacBalances[i]) + } + for i := range sacContracts { + b.pushSACContract(sacContracts[i]) } - - b.accountChangesByAccountID[accountID] = accountChange } -// GetAccountChanges returns all account changes stored in the buffer. -// Thread-safe: uses read lock. -func (b *IndexerBuffer) GetAccountChanges() map[string]types.AccountChange { - b.mu.RLock() - defer b.mu.RUnlock() - - return b.accountChangesByAccountID +// TransactionResult holds all the data collected from processing a single transaction, +// ready to be pushed into the buffer in a single lock acquisition. +type TransactionResult struct { + Transaction *types.Transaction + TxParticipants []string + Operations map[int64]*types.Operation // opID → operation + OpParticipants map[int64][]string // opID → participant list + ContractChanges []types.ContractChange + StateChanges []types.StateChange + StateChangeOpMap map[int64]*types.Operation // opID → operation for state change association } -// PushSACBalanceChange adds a SAC balance change to the buffer with deduplication. -// Keeps the change with highest OperationID per (AccountID, ContractID). Handles ADD→REMOVE no-op case. -// Thread-safe: acquires write lock. -func (b *IndexerBuffer) PushSACBalanceChange(sacBalanceChange types.SACBalanceChange) { +// BatchPushTransactionResult pushes an entire transaction's worth of data into the buffer +// in a single lock acquisition. This reduces mutex contention from ~5-15 acquisitions +// per transaction down to 1. Thread-safe. +func (b *IndexerBuffer) BatchPushTransactionResult(result *TransactionResult) { b.mu.Lock() defer b.mu.Unlock() - key := SACBalanceChangeKey{ - AccountID: sacBalanceChange.AccountID, - ContractID: sacBalanceChange.ContractID, + // Push transaction participants + for _, participant := range result.TxParticipants { + b.pushTransaction(participant, result.Transaction) } - existing, exists := b.sacBalanceChangesByKey[key] - // Keep the change with highest OperationID - if exists && existing.OperationID > sacBalanceChange.OperationID { - return + // Push operations and their participants + for opID, participants := range result.OpParticipants { + op := result.Operations[opID] + for _, participant := range participants { + b.pushOperation(participant, op) + b.pushTransaction(participant, result.Transaction) + } } - // Handle ADD→REMOVE no-op case: balance created and removed in same batch - if exists && sacBalanceChange.Operation == types.SACBalanceOpRemove && existing.Operation == types.SACBalanceOpAdd { - delete(b.sacBalanceChangesByKey, key) - return + // Push contract changes + for _, cc := range result.ContractChanges { + b.contractChanges = append(b.contractChanges, cc) + if cc.ContractType == types.ContractTypeSEP41 && cc.ContractID != "" { + if _, exists := b.uniqueSEP41ContractTokensByID[cc.ContractID]; !exists { + b.uniqueSEP41ContractTokensByID[cc.ContractID] = cc.ContractType + } + } } - b.sacBalanceChangesByKey[key] = sacBalanceChange + // Push state changes + for _, sc := range result.StateChanges { + if sc.AccountID == "" { + continue + } + b.stateChanges = append(b.stateChanges, sc) + b.pushTransaction(string(sc.AccountID), result.Transaction) + if sc.OperationID != 0 { + if op := result.StateChangeOpMap[sc.OperationID]; op != nil { + b.pushOperation(string(sc.AccountID), op) + } + } + } } -// GetSACBalanceChanges returns all SAC balance changes stored in the buffer. -// Thread-safe: uses read lock. -func (b *IndexerBuffer) GetSACBalanceChanges() map[SACBalanceChangeKey]types.SACBalanceChange { - b.mu.RLock() - defer b.mu.RUnlock() +// Clear resets the buffer to its initial empty state while preserving allocated capacity. +// Used by backfill to reuse the buffer after flushing data to the database. +func (b *IndexerBuffer) Clear() { + clear(b.txByHash) + clear(b.participantsByToID) + clear(b.opByID) + clear(b.participantsByOpID) + clear(b.uniqueTrustlineAssets) + clear(b.uniqueSEP41ContractTokensByID) + clear(b.trustlineChangesByTrustlineKey) + clear(b.sacContractsByID) + b.stateChanges = b.stateChanges[:0] + b.contractChanges = b.contractChanges[:0] + clear(b.accountChangesByAccountID) + clear(b.sacBalanceChangesByKey) +} +// --- Unlocked getters (called only after parallel processing completes) --- + +func (b *IndexerBuffer) GetNumberOfTransactions() int { return len(b.txByHash) } +func (b *IndexerBuffer) GetNumberOfOperations() int { return len(b.opByID) } +func (b *IndexerBuffer) GetStateChanges() []types.StateChange { return b.stateChanges } +func (b *IndexerBuffer) GetContractChanges() []types.ContractChange { return b.contractChanges } +func (b *IndexerBuffer) GetAccountChanges() map[string]types.AccountChange { + return b.accountChangesByAccountID +} + +func (b *IndexerBuffer) GetSACBalanceChanges() map[SACBalanceChangeKey]types.SACBalanceChange { return b.sacBalanceChangesByKey } -// PushOperation adds an operation and its parent transaction, associating both with a participant. -// Uses canonical pointer pattern for both operations and transactions to avoid memory duplication. -// Thread-safe: acquires write lock. -func (b *IndexerBuffer) PushOperation(participant string, operation types.Operation, transaction types.Transaction) { - b.mu.Lock() - defer b.mu.Unlock() +func (b *IndexerBuffer) GetTransactionsParticipants() map[int64]types.ParticipantSet { + return b.participantsByToID +} - b.pushOperationUnsafe(participant, &operation) - b.pushTransactionUnsafe(participant, &transaction) +func (b *IndexerBuffer) GetOperationsParticipants() map[int64]types.ParticipantSet { + return b.participantsByOpID } -// GetOperations returns all unique operations from the canonical storage. -// Thread-safe: uses read lock. -func (b *IndexerBuffer) GetOperations() []*types.Operation { - b.mu.RLock() - defer b.mu.RUnlock() +func (b *IndexerBuffer) GetTrustlineChanges() map[TrustlineChangeKey]types.TrustlineChange { + return b.trustlineChangesByTrustlineKey +} +func (b *IndexerBuffer) GetUniqueSEP41ContractTokensByID() map[string]types.ContractType { + return b.uniqueSEP41ContractTokensByID +} +func (b *IndexerBuffer) GetSACContracts() map[string]*data.Contract { return b.sacContractsByID } + +func (b *IndexerBuffer) GetTransactions() []*types.Transaction { + txs := make([]*types.Transaction, 0, len(b.txByHash)) + for _, txPtr := range b.txByHash { + txs = append(txs, txPtr) + } + return txs +} + +func (b *IndexerBuffer) GetOperations() []*types.Operation { ops := make([]*types.Operation, 0, len(b.opByID)) for _, opPtr := range b.opByID { ops = append(ops, opPtr) @@ -339,253 +298,112 @@ func (b *IndexerBuffer) GetOperations() []*types.Operation { return ops } -// GetOperationsParticipants returns a map of operation IDs to its participants. -func (b *IndexerBuffer) GetOperationsParticipants() map[int64]set.Set[string] { - b.mu.RLock() - defer b.mu.RUnlock() +func (b *IndexerBuffer) GetUniqueTrustlineAssets() []data.TrustlineAsset { + assets := make([]data.TrustlineAsset, 0, len(b.uniqueTrustlineAssets)) + for _, asset := range b.uniqueTrustlineAssets { + assets = append(assets, asset) + } + return assets +} - return maps.Clone(b.participantsByOpID) +// --- Internal helpers (caller must hold the lock) --- + +func (b *IndexerBuffer) pushTransaction(participant string, transaction *types.Transaction) { + txHash := transaction.Hash.String() + if _, exists := b.txByHash[txHash]; !exists { + b.txByHash[txHash] = transaction + } + toID := transaction.ToID + if _, exists := b.participantsByToID[toID]; !exists { + b.participantsByToID[toID] = make(types.ParticipantSet) + } + b.participantsByToID[toID].Add(participant) } -// pushOperationUnsafe is the internal implementation for operation storage. -// Stores one copy of each operation (by ID) and tracks which participants interacted with it. -// Caller must hold write lock. -func (b *IndexerBuffer) pushOperationUnsafe(participant string, operation *types.Operation) { +func (b *IndexerBuffer) pushOperation(participant string, operation *types.Operation) { opID := operation.ID if _, exists := b.opByID[opID]; !exists { b.opByID[opID] = operation } - - // Track this participant globally if _, exists := b.participantsByOpID[opID]; !exists { - b.participantsByOpID[opID] = set.NewSet[string]() + b.participantsByOpID[opID] = make(types.ParticipantSet) } b.participantsByOpID[opID].Add(participant) } -// PushStateChange adds a state change along with its associated transaction and operation. -// Thread-safe: acquires write lock. -func (b *IndexerBuffer) PushStateChange(transaction types.Transaction, operation types.Operation, stateChange types.StateChange) { - b.mu.Lock() - defer b.mu.Unlock() - - b.stateChanges = append(b.stateChanges, stateChange) - b.pushTransactionUnsafe(string(stateChange.AccountID), &transaction) - // Fee changes dont have an operation ID associated with them - if stateChange.OperationID != 0 { - b.pushOperationUnsafe(string(stateChange.AccountID), &operation) - } -} - -// GetStateChanges returns a copy of all state changes stored in the buffer. -// Thread-safe: uses read lock. -func (b *IndexerBuffer) GetStateChanges() []types.StateChange { - b.mu.RLock() - defer b.mu.RUnlock() - - return b.stateChanges -} - -// Merge merges another IndexerBuffer into this buffer. This is used to combine -// per-ledger or per-transaction buffers into a single buffer for batch DB insertion. -// -// MERGE STRATEGY: -// 1. Union global participant sets (O(m) set operation) -// 2. Copy storage maps (txByHash, opByID) using maps.Copy -// 3. For each transaction hash in other.participantsByTxHash: -// - Merge other's participant set into our participant set for that tx hash -// - Creates new set if tx doesn't exist in our mapping yet -// -// 4. For each operation ID in other.participantsByOpID: -// - Merge other's participant set into our participant set for that op ID -// - Creates new set if op doesn't exist in our mapping yet -// -// 5. Append other's state changes to ours -// -// MEMORY EFFICIENCY: -// Zero temporary allocations - uses direct map/set manipulation. -// -// Thread-safe: acquires write lock on this buffer, read lock on other buffer. -func (b *IndexerBuffer) Merge(other IndexerBufferInterface) { - b.mu.Lock() - defer b.mu.Unlock() - - // Type assert to get concrete buffer for efficient merging - otherBuffer, ok := other.(*IndexerBuffer) - if !ok { +func (b *IndexerBuffer) pushTrustlineChange(trustlineChange types.TrustlineChange) { + code, issuer, err := ParseAssetString(trustlineChange.Asset) + if err != nil { return } + trustlineID := data.DeterministicAssetID(code, issuer) - otherBuffer.mu.RLock() - defer otherBuffer.mu.RUnlock() - - // Merge transactions (canonical storage) - this establishes our canonical pointers - maps.Copy(b.txByHash, otherBuffer.txByHash) - for toID, otherParticipants := range otherBuffer.participantsByToID { - if existing, exists := b.participantsByToID[toID]; exists { - // Merge into existing set - iterate and add (Union creates new set) - for participant := range otherParticipants.Iter() { - existing.Add(participant) - } - } else { - // Clone the set instead of creating empty + iterating - b.participantsByToID[toID] = otherParticipants.Clone() + if _, exists := b.uniqueTrustlineAssets[trustlineID]; !exists { + b.uniqueTrustlineAssets[trustlineID] = data.TrustlineAsset{ + ID: trustlineID, + Code: code, + Issuer: issuer, } } - // Merge operations (canonical storage) - maps.Copy(b.opByID, otherBuffer.opByID) - for opID, otherParticipants := range otherBuffer.participantsByOpID { - if existing, exists := b.participantsByOpID[opID]; exists { - // Merge into existing set - iterate and add (Union creates new set) - for participant := range otherParticipants.Iter() { - existing.Add(participant) - } - } else { - // Clone the set instead of creating empty + iterating - b.participantsByOpID[opID] = otherParticipants.Clone() - } + changeKey := TrustlineChangeKey{ + AccountID: trustlineChange.AccountID, + TrustlineID: trustlineID, } - - // Merge state changes - b.stateChanges = append(b.stateChanges, otherBuffer.stateChanges...) - - // Merge trustline changes - for key, change := range otherBuffer.trustlineChangesByTrustlineKey { - existing, exists := b.trustlineChangesByTrustlineKey[key] - - if exists && existing.OperationID > change.OperationID { - continue - } - - // Handle ADD→REMOVE no-op case - if exists && change.Operation == types.TrustlineOpRemove && existing.Operation == types.TrustlineOpAdd { - delete(b.trustlineChangesByTrustlineKey, key) - continue - } - - b.trustlineChangesByTrustlineKey[key] = change + prevChange, exists := b.trustlineChangesByTrustlineKey[changeKey] + if exists && prevChange.OperationID > trustlineChange.OperationID { + return } - // Merge contract changes - b.contractChanges = append(b.contractChanges, otherBuffer.contractChanges...) - - // Merge account changes with deduplication (same logic as PushAccountChange) - for accountID, change := range otherBuffer.accountChangesByAccountID { - existing, exists := b.accountChangesByAccountID[accountID] - - if exists && existing.OperationID > change.OperationID { - continue - } - - // Handle CREATE→REMOVE no-op case - if exists && change.Operation == types.AccountOpRemove && existing.Operation == types.AccountOpCreate { - delete(b.accountChangesByAccountID, accountID) - continue - } - - b.accountChangesByAccountID[accountID] = change + if exists && trustlineChange.Operation == types.TrustlineOpRemove && prevChange.Operation == types.TrustlineOpAdd { + delete(b.trustlineChangesByTrustlineKey, changeKey) + return } - // Merge SAC balance changes with deduplication (same logic as PushSACBalanceChange) - for key, change := range otherBuffer.sacBalanceChangesByKey { - existing, exists := b.sacBalanceChangesByKey[key] - - if exists && existing.OperationID > change.OperationID { - continue - } + b.trustlineChangesByTrustlineKey[changeKey] = trustlineChange +} - // Handle ADD→REMOVE no-op case - if exists && change.Operation == types.SACBalanceOpRemove && existing.Operation == types.SACBalanceOpAdd { - delete(b.sacBalanceChangesByKey, key) - continue - } +func (b *IndexerBuffer) pushAccountChange(accountChange types.AccountChange) { + accountID := accountChange.AccountID + existing, exists := b.accountChangesByAccountID[accountID] - b.sacBalanceChangesByKey[key] = change + if exists && existing.OperationID > accountChange.OperationID { + return } - // Merge unique trustline assets - maps.Copy(b.uniqueTrustlineAssets, otherBuffer.uniqueTrustlineAssets) - - // Merge unique contracts - maps.Copy(b.uniqueSEP41ContractTokensByID, otherBuffer.uniqueSEP41ContractTokensByID) - - // Merge SAC contracts (first-write wins for deduplication) - for id, contract := range otherBuffer.sacContractsByID { - if _, exists := b.sacContractsByID[id]; !exists { - b.sacContractsByID[id] = contract - } + if exists && accountChange.Operation == types.AccountOpRemove && existing.Operation == types.AccountOpCreate { + delete(b.accountChangesByAccountID, accountID) + return } -} -// Clear resets the buffer to its initial empty state while preserving allocated capacity. -// Use this to reuse the buffer after flushing data to the database during backfill. -// Thread-safe: acquires write lock. -func (b *IndexerBuffer) Clear() { - b.mu.Lock() - defer b.mu.Unlock() - - // Clear maps (keep allocated backing arrays) - clear(b.txByHash) - clear(b.participantsByToID) - clear(b.opByID) - clear(b.participantsByOpID) - clear(b.uniqueTrustlineAssets) - clear(b.uniqueSEP41ContractTokensByID) - clear(b.trustlineChangesByTrustlineKey) - clear(b.sacContractsByID) - - // Reset slices (reuse underlying arrays by slicing to zero) - b.stateChanges = b.stateChanges[:0] - b.contractChanges = b.contractChanges[:0] - - // Clear account and SAC balance changes maps - clear(b.accountChangesByAccountID) - clear(b.sacBalanceChangesByKey) + b.accountChangesByAccountID[accountID] = accountChange } -// GetUniqueTrustlineAssets returns all unique trustline assets with pre-computed IDs. -// Thread-safe: uses read lock. -func (b *IndexerBuffer) GetUniqueTrustlineAssets() []data.TrustlineAsset { - b.mu.RLock() - defer b.mu.RUnlock() +func (b *IndexerBuffer) pushSACBalanceChange(sacBalanceChange types.SACBalanceChange) { + key := SACBalanceChangeKey{ + AccountID: sacBalanceChange.AccountID, + ContractID: sacBalanceChange.ContractID, + } + existing, exists := b.sacBalanceChangesByKey[key] - assets := make([]data.TrustlineAsset, 0, len(b.uniqueTrustlineAssets)) - for _, asset := range b.uniqueTrustlineAssets { - assets = append(assets, asset) + if exists && existing.OperationID > sacBalanceChange.OperationID { + return } - return assets -} -// GetUniqueSEP41ContractTokensByID returns a map of unique SEP-41 contract IDs to their types. -// Thread-safe: uses read lock. -func (b *IndexerBuffer) GetUniqueSEP41ContractTokensByID() map[string]types.ContractType { - b.mu.RLock() - defer b.mu.RUnlock() + if exists && sacBalanceChange.Operation == types.SACBalanceOpRemove && existing.Operation == types.SACBalanceOpAdd { + delete(b.sacBalanceChangesByKey, key) + return + } - return maps.Clone(b.uniqueSEP41ContractTokensByID) + b.sacBalanceChangesByKey[key] = sacBalanceChange } -// PushSACContract adds a SAC contract with extracted metadata to the buffer. -// Thread-safe: acquires write lock. -func (b *IndexerBuffer) PushSACContract(c *data.Contract) { - b.mu.Lock() - defer b.mu.Unlock() - +func (b *IndexerBuffer) pushSACContract(c *data.Contract) { if _, exists := b.sacContractsByID[c.ContractID]; !exists { b.sacContractsByID[c.ContractID] = c } } -// GetSACContracts returns a map of SAC contract IDs to their metadata. -// Thread-safe: uses read lock. -func (b *IndexerBuffer) GetSACContracts() map[string]*data.Contract { - b.mu.RLock() - defer b.mu.RUnlock() - - return maps.Clone(b.sacContractsByID) -} - // ParseAssetString parses a "CODE:ISSUER" formatted asset string into its components. func ParseAssetString(asset string) (code, issuer string, err error) { parts := strings.SplitN(asset, ":", 2) @@ -594,7 +412,6 @@ func ParseAssetString(asset string) (code, issuer string, err error) { } code, issuer = parts[0], parts[1] - // Validate using txnbuild creditAsset := txnbuild.CreditAsset{Code: code, Issuer: issuer} if _, err := creditAsset.ToXDR(); err != nil { return "", "", fmt.Errorf("invalid asset %s: %w", asset, err) diff --git a/internal/indexer/indexer_buffer_bench_test.go b/internal/indexer/indexer_buffer_bench_test.go new file mode 100644 index 00000000..4c937d85 --- /dev/null +++ b/internal/indexer/indexer_buffer_bench_test.go @@ -0,0 +1,159 @@ +package indexer + +import ( + "fmt" + "testing" + + "github.com/stellar/wallet-backend/internal/indexer/types" +) + +const ( + benchParticipants = 100 + benchTxCount = 200 +) + +// Pre-allocated test data to isolate buffer performance from allocation noise. +var ( + benchTxs [benchTxCount]*types.Transaction + benchOps [benchTxCount]*types.Operation + benchStateChanges [benchTxCount]types.StateChange + benchParticipantStrs [benchParticipants]string + benchTrustlines [benchTxCount][]types.TrustlineChange + benchAccounts [benchTxCount][]types.AccountChange + benchSACBalances [benchTxCount][]types.SACBalanceChange + benchTxResults [benchTxCount]*TransactionResult +) + +func init() { + for i := range benchParticipants { + benchParticipantStrs[i] = fmt.Sprintf("participant-%d", i) + } + for i := range benchTxCount { + benchTxs[i] = &types.Transaction{Hash: types.HashBytea(fmt.Sprintf("hash-%d", i)), ToID: int64(i)} + benchOps[i] = &types.Operation{ID: int64(i)} + benchStateChanges[i] = types.StateChange{ + ToID: int64(i), + AccountID: types.AddressBytea(fmt.Sprintf("acct-%d", i%100)), + OperationID: int64(i), + } + benchTrustlines[i] = []types.TrustlineChange{ + {AccountID: fmt.Sprintf("acct-%d", i), Asset: "USD:GISSUER", OperationID: int64(i), Operation: types.TrustlineOpAdd}, + } + benchAccounts[i] = []types.AccountChange{ + {AccountID: fmt.Sprintf("acct-%d", i), OperationID: int64(i), Operation: types.AccountOpCreate}, + } + benchSACBalances[i] = []types.SACBalanceChange{ + {AccountID: fmt.Sprintf("acct-%d", i), ContractID: "CCONTRACT", OperationID: int64(i), Operation: types.SACBalanceOpAdd}, + } + + // Build a realistic TransactionResult with 2 participants, 1 operation, 1 state change + opID := int64(i*10 + 1) + benchTxResults[i] = &TransactionResult{ + Transaction: benchTxs[i], + TxParticipants: []string{benchParticipantStrs[i%benchParticipants], benchParticipantStrs[(i+1)%benchParticipants]}, + Operations: map[int64]*types.Operation{opID: benchOps[i]}, + OpParticipants: map[int64][]string{opID: {benchParticipantStrs[i%benchParticipants]}}, + StateChanges: []types.StateChange{benchStateChanges[i]}, + StateChangeOpMap: map[int64]*types.Operation{ + benchStateChanges[i].OperationID: benchOps[i], + }, + } + } +} + +func BenchmarkPushTransaction(b *testing.B) { + buf := NewIndexerBuffer() + b.ResetTimer() + for i := 0; i < b.N; i++ { + buf.PushTransaction(benchParticipantStrs[i%benchParticipants], benchTxs[i%benchTxCount]) + } +} + +func BenchmarkPushOperation(b *testing.B) { + buf := NewIndexerBuffer() + b.ResetTimer() + for i := 0; i < b.N; i++ { + idx := i % benchTxCount + buf.PushOperation(benchParticipantStrs[i%benchParticipants], benchOps[idx], benchTxs[idx]) + } +} + +func BenchmarkBatchPushChanges(b *testing.B) { + buf := NewIndexerBuffer() + b.ResetTimer() + for i := 0; i < b.N; i++ { + idx := i % benchTxCount + buf.BatchPushChanges(benchTrustlines[idx], benchAccounts[idx], benchSACBalances[idx], nil) + } +} + +func BenchmarkConcurrentPushTransaction(b *testing.B) { + buf := NewIndexerBuffer() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + buf.PushTransaction(benchParticipantStrs[i%benchParticipants], benchTxs[i%benchTxCount]) + i++ + } + }) +} + +func BenchmarkConcurrentPushOperation(b *testing.B) { + buf := NewIndexerBuffer() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + idx := i % benchTxCount + buf.PushOperation(benchParticipantStrs[i%benchParticipants], benchOps[idx], benchTxs[idx]) + i++ + } + }) +} + +func BenchmarkConcurrentBatchPushChanges(b *testing.B) { + buf := NewIndexerBuffer() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + idx := i % benchTxCount + buf.BatchPushChanges(benchTrustlines[idx], benchAccounts[idx], benchSACBalances[idx], nil) + i++ + } + }) +} + +func BenchmarkConcurrentPushStateChange(b *testing.B) { + buf := NewIndexerBuffer() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + idx := i % benchTxCount + buf.PushStateChange(benchTxs[idx], benchOps[idx], benchStateChanges[idx]) + i++ + } + }) +} + +func BenchmarkBatchPushTransactionResult(b *testing.B) { + buf := NewIndexerBuffer() + b.ResetTimer() + for i := 0; i < b.N; i++ { + buf.BatchPushTransactionResult(benchTxResults[i%benchTxCount]) + } +} + +func BenchmarkConcurrentBatchPushTransactionResult(b *testing.B) { + buf := NewIndexerBuffer() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + buf.BatchPushTransactionResult(benchTxResults[i%benchTxCount]) + i++ + } + }) +} diff --git a/internal/indexer/indexer_buffer_test.go b/internal/indexer/indexer_buffer_test.go index a6f4f161..11a12dfe 100644 --- a/internal/indexer/indexer_buffer_test.go +++ b/internal/indexer/indexer_buffer_test.go @@ -1,10 +1,8 @@ package indexer import ( - "sync" "testing" - set "github.com/deckarep/golang-set/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -28,15 +26,15 @@ func TestIndexerBuffer_PushTransaction(t *testing.T) { tx1 := types.Transaction{Hash: "e76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48760", ToID: 1} tx2 := types.Transaction{Hash: "a76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48761", ToID: 2} - indexerBuffer.PushTransaction("alice", tx1) - indexerBuffer.PushTransaction("alice", tx2) - indexerBuffer.PushTransaction("bob", tx2) - indexerBuffer.PushTransaction("bob", tx2) // duplicate is a no-op + indexerBuffer.PushTransaction("alice", &tx1) + indexerBuffer.PushTransaction("alice", &tx2) + indexerBuffer.PushTransaction("bob", &tx2) + indexerBuffer.PushTransaction("bob", &tx2) // duplicate is a no-op // Assert participants by transaction txParticipants := indexerBuffer.GetTransactionsParticipants() - assert.Equal(t, set.NewSet("alice"), txParticipants[tx1.ToID]) - assert.Equal(t, set.NewSet("alice", "bob"), txParticipants[tx2.ToID]) + assert.Equal(t, types.NewParticipantSet("alice"), txParticipants[tx1.ToID]) + assert.Equal(t, types.NewParticipantSet("alice", "bob"), txParticipants[tx2.ToID]) // Assert GetNumberOfTransactions assert.Equal(t, 2, indexerBuffer.GetNumberOfTransactions()) @@ -44,41 +42,6 @@ func TestIndexerBuffer_PushTransaction(t *testing.T) { // Assert GetAllTransactions assert.ElementsMatch(t, []*types.Transaction{&tx1, &tx2}, indexerBuffer.GetTransactions()) }) - - t.Run("🟢 concurrent pushes", func(t *testing.T) { - indexerBuffer := NewIndexerBuffer() - - tx1 := types.Transaction{Hash: "e76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48760", ToID: 1} - tx2 := types.Transaction{Hash: "a76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48761", ToID: 2} - - wg := sync.WaitGroup{} - wg.Add(4) - go func() { - indexerBuffer.PushTransaction("alice", tx1) - wg.Done() - }() - go func() { - indexerBuffer.PushTransaction("alice", tx2) - wg.Done() - }() - go func() { - indexerBuffer.PushTransaction("bob", tx2) - wg.Done() - }() - go func() { - indexerBuffer.PushTransaction("bob", tx2) // duplicate is a no-op - wg.Done() - }() - wg.Wait() - - // Assert participants by transaction - txParticipants := indexerBuffer.GetTransactionsParticipants() - assert.Equal(t, set.NewSet("alice"), txParticipants[tx1.ToID]) - assert.Equal(t, set.NewSet("alice", "bob"), txParticipants[tx2.ToID]) - - // Assert GetNumberOfTransactions - assert.Equal(t, 2, indexerBuffer.GetNumberOfTransactions()) - }) } func TestIndexerBuffer_PushOperation(t *testing.T) { @@ -90,54 +53,20 @@ func TestIndexerBuffer_PushOperation(t *testing.T) { op1 := types.Operation{ID: 1} op2 := types.Operation{ID: 2} - indexerBuffer.PushOperation("alice", op1, tx1) - indexerBuffer.PushOperation("bob", op2, tx2) - indexerBuffer.PushOperation("chuck", op2, tx2) - indexerBuffer.PushOperation("chuck", op2, tx2) // duplicate operation ID is a no-op + indexerBuffer.PushOperation("alice", &op1, &tx1) + indexerBuffer.PushOperation("bob", &op2, &tx2) + indexerBuffer.PushOperation("chuck", &op2, &tx2) + indexerBuffer.PushOperation("chuck", &op2, &tx2) // duplicate operation ID is a no-op // Assert participants by operation opParticipants := indexerBuffer.GetOperationsParticipants() - assert.Equal(t, set.NewSet("alice"), opParticipants[int64(1)]) - assert.Equal(t, set.NewSet("bob", "chuck"), opParticipants[int64(2)]) + assert.Equal(t, types.NewParticipantSet("alice"), opParticipants[int64(1)]) + assert.Equal(t, types.NewParticipantSet("bob", "chuck"), opParticipants[int64(2)]) // Assert transactions were also added txParticipants := indexerBuffer.GetTransactionsParticipants() - assert.Equal(t, set.NewSet("alice"), txParticipants[tx1.ToID]) - assert.Equal(t, set.NewSet("bob", "chuck"), txParticipants[tx2.ToID]) - }) - - t.Run("🟢 concurrent pushes", func(t *testing.T) { - indexerBuffer := NewIndexerBuffer() - - tx1 := types.Transaction{Hash: "e76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48760", ToID: 1} - tx2 := types.Transaction{Hash: "a76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48761", ToID: 2} - op1 := types.Operation{ID: 1} - op2 := types.Operation{ID: 2} - - wg := sync.WaitGroup{} - wg.Add(4) - go func() { - indexerBuffer.PushOperation("alice", op1, tx1) - wg.Done() - }() - go func() { - indexerBuffer.PushOperation("bob", op2, tx2) - wg.Done() - }() - go func() { - indexerBuffer.PushOperation("chuck", op2, tx2) - wg.Done() - }() - go func() { - indexerBuffer.PushOperation("chuck", op2, tx2) // duplicate operation ID is a no-op - wg.Done() - }() - wg.Wait() - - // Assert participants by operation - opParticipants := indexerBuffer.GetOperationsParticipants() - assert.Equal(t, set.NewSet("alice"), opParticipants[int64(1)]) - assert.Equal(t, set.NewSet("bob", "chuck"), opParticipants[int64(2)]) + assert.Equal(t, types.NewParticipantSet("alice"), txParticipants[tx1.ToID]) + assert.Equal(t, types.NewParticipantSet("bob", "chuck"), txParticipants[tx2.ToID]) }) } @@ -152,45 +81,14 @@ func TestIndexerBuffer_PushStateChange(t *testing.T) { sc2 := types.StateChange{ToID: 2, StateChangeID: 1} sc3 := types.StateChange{ToID: 3, StateChangeID: 1} - indexerBuffer.PushStateChange(tx, op, sc1) - indexerBuffer.PushStateChange(tx, op, sc2) - indexerBuffer.PushStateChange(tx, op, sc3) + indexerBuffer.PushStateChange(&tx, &op, sc1) + indexerBuffer.PushStateChange(&tx, &op, sc2) + indexerBuffer.PushStateChange(&tx, &op, sc3) allStateChanges := indexerBuffer.GetStateChanges() assert.Equal(t, []types.StateChange{sc1, sc2, sc3}, allStateChanges) }) - t.Run("🟢 concurrent pushes", func(t *testing.T) { - indexerBuffer := NewIndexerBuffer() - - tx := types.Transaction{Hash: "c76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48763", ToID: 1} - op := types.Operation{ID: 1} - - sc1 := types.StateChange{ToID: 1, StateChangeID: 1} - sc2 := types.StateChange{ToID: 2, StateChangeID: 1} - sc3 := types.StateChange{ToID: 3, StateChangeID: 1} - - wg := sync.WaitGroup{} - wg.Add(2) - - go func() { - defer wg.Done() - indexerBuffer.PushStateChange(tx, op, sc1) - }() - - go func() { - defer wg.Done() - indexerBuffer.PushStateChange(tx, op, sc2) - indexerBuffer.PushStateChange(tx, op, sc3) - }() - - wg.Wait() - - allStateChanges := indexerBuffer.GetStateChanges() - assert.Len(t, allStateChanges, 3) - assert.ElementsMatch(t, []types.StateChange{sc1, sc2, sc3}, allStateChanges) - }) - t.Run("🟢 with operations and transactions", func(t *testing.T) { indexerBuffer := NewIndexerBuffer() @@ -199,8 +97,8 @@ func TestIndexerBuffer_PushStateChange(t *testing.T) { op1 := types.Operation{ID: 3} op2 := types.Operation{ID: 4} op3 := types.Operation{ID: 5} - indexerBuffer.PushOperation("someone", op1, tx1) - indexerBuffer.PushOperation("someone", op2, tx2) + indexerBuffer.PushOperation("someone", &op1, &tx1) + indexerBuffer.PushOperation("someone", &op2, &tx2) sc1 := buildStateChange(3, types.StateChangeReasonCredit, "alice", op1.ID) sc2 := buildStateChange(4, types.StateChangeReasonDebit, "alice", op2.ID) @@ -209,25 +107,25 @@ func TestIndexerBuffer_PushStateChange(t *testing.T) { sc4 := buildStateChange(1, types.StateChangeReasonDebit, "bob", 0) sc5 := buildStateChange(2, types.StateChangeReasonDebit, "bob", 0) - indexerBuffer.PushStateChange(tx1, op1, sc1) - indexerBuffer.PushStateChange(tx2, op2, sc2) - indexerBuffer.PushStateChange(tx2, op3, sc3) // This operation should be added - indexerBuffer.PushStateChange(tx2, types.Operation{}, sc4) // Fee state changes don't have an operation - indexerBuffer.PushStateChange(tx2, types.Operation{}, sc5) // Fee state changes don't have an operation + indexerBuffer.PushStateChange(&tx1, &op1, sc1) + indexerBuffer.PushStateChange(&tx2, &op2, sc2) + indexerBuffer.PushStateChange(&tx2, &op3, sc3) // This operation should be added + indexerBuffer.PushStateChange(&tx2, nil, sc4) // Fee state changes don't have an operation + indexerBuffer.PushStateChange(&tx2, nil, sc5) // Fee state changes don't have an operation allStateChanges := indexerBuffer.GetStateChanges() assert.Equal(t, []types.StateChange{sc1, sc2, sc3, sc4, sc5}, allStateChanges) // Assert transaction participants txParticipants := indexerBuffer.GetTransactionsParticipants() - assert.Equal(t, set.NewSet("someone", "alice"), txParticipants[tx1.ToID]) - assert.Equal(t, set.NewSet("someone", "alice", "eve", "bob"), txParticipants[tx2.ToID]) + assert.Equal(t, types.NewParticipantSet("someone", "alice"), txParticipants[tx1.ToID]) + assert.Equal(t, types.NewParticipantSet("someone", "alice", "eve", "bob"), txParticipants[tx2.ToID]) // Assert operation participants opParticipants := indexerBuffer.GetOperationsParticipants() - assert.Equal(t, set.NewSet("someone", "alice"), opParticipants[int64(3)]) - assert.Equal(t, set.NewSet("someone", "alice"), opParticipants[int64(4)]) - assert.Equal(t, set.NewSet("eve"), opParticipants[int64(5)]) + assert.Equal(t, types.NewParticipantSet("someone", "alice"), opParticipants[int64(3)]) + assert.Equal(t, types.NewParticipantSet("someone", "alice"), opParticipants[int64(4)]) + assert.Equal(t, types.NewParticipantSet("eve"), opParticipants[int64(5)]) }) } @@ -240,14 +138,14 @@ func TestIndexerBuffer_GetNumberOfTransactions(t *testing.T) { tx1 := types.Transaction{Hash: "e76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48760", ToID: 1} tx2 := types.Transaction{Hash: "a76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48761", ToID: 2} - indexerBuffer.PushTransaction("alice", tx1) + indexerBuffer.PushTransaction("alice", &tx1) assert.Equal(t, 1, indexerBuffer.GetNumberOfTransactions()) - indexerBuffer.PushTransaction("bob", tx2) + indexerBuffer.PushTransaction("bob", &tx2) assert.Equal(t, 2, indexerBuffer.GetNumberOfTransactions()) // Duplicate should not increase count - indexerBuffer.PushTransaction("charlie", tx2) + indexerBuffer.PushTransaction("charlie", &tx2) assert.Equal(t, 2, indexerBuffer.GetNumberOfTransactions()) }) } @@ -259,9 +157,9 @@ func TestIndexerBuffer_GetAllTransactions(t *testing.T) { tx1 := types.Transaction{Hash: "e76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48760", ToID: 1, LedgerNumber: 100} tx2 := types.Transaction{Hash: "a76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48761", ToID: 2, LedgerNumber: 101} - indexerBuffer.PushTransaction("alice", tx1) - indexerBuffer.PushTransaction("bob", tx2) - indexerBuffer.PushTransaction("charlie", tx2) // duplicate + indexerBuffer.PushTransaction("alice", &tx1) + indexerBuffer.PushTransaction("bob", &tx2) + indexerBuffer.PushTransaction("charlie", &tx2) // duplicate allTxs := indexerBuffer.GetTransactions() require.Len(t, allTxs, 2) @@ -276,13 +174,13 @@ func TestIndexerBuffer_GetAllTransactionsParticipants(t *testing.T) { tx1 := types.Transaction{Hash: "e76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48760", ToID: 1} tx2 := types.Transaction{Hash: "a76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48761", ToID: 2} - indexerBuffer.PushTransaction("alice", tx1) - indexerBuffer.PushTransaction("bob", tx1) - indexerBuffer.PushTransaction("alice", tx2) + indexerBuffer.PushTransaction("alice", &tx1) + indexerBuffer.PushTransaction("bob", &tx1) + indexerBuffer.PushTransaction("alice", &tx2) txParticipants := indexerBuffer.GetTransactionsParticipants() - assert.Equal(t, set.NewSet("alice", "bob"), txParticipants[tx1.ToID]) - assert.Equal(t, set.NewSet("alice"), txParticipants[tx2.ToID]) + assert.Equal(t, types.NewParticipantSet("alice", "bob"), txParticipants[tx1.ToID]) + assert.Equal(t, types.NewParticipantSet("alice"), txParticipants[tx2.ToID]) }) } @@ -294,9 +192,9 @@ func TestIndexerBuffer_GetAllOperations(t *testing.T) { op1 := types.Operation{ID: 1} op2 := types.Operation{ID: 2} - indexerBuffer.PushOperation("alice", op1, tx1) - indexerBuffer.PushOperation("bob", op2, tx1) - indexerBuffer.PushOperation("charlie", op2, tx1) // duplicate + indexerBuffer.PushOperation("alice", &op1, &tx1) + indexerBuffer.PushOperation("bob", &op2, &tx1) + indexerBuffer.PushOperation("charlie", &op2, &tx1) // duplicate allOps := indexerBuffer.GetOperations() require.Len(t, allOps, 2) @@ -312,13 +210,13 @@ func TestIndexerBuffer_GetAllOperationsParticipants(t *testing.T) { op1 := types.Operation{ID: 1} op2 := types.Operation{ID: 2} - indexerBuffer.PushOperation("alice", op1, tx1) - indexerBuffer.PushOperation("bob", op1, tx1) - indexerBuffer.PushOperation("alice", op2, tx1) + indexerBuffer.PushOperation("alice", &op1, &tx1) + indexerBuffer.PushOperation("bob", &op1, &tx1) + indexerBuffer.PushOperation("alice", &op2, &tx1) opParticipants := indexerBuffer.GetOperationsParticipants() - assert.Equal(t, set.NewSet("alice", "bob"), opParticipants[int64(1)]) - assert.Equal(t, set.NewSet("alice"), opParticipants[int64(2)]) + assert.Equal(t, types.NewParticipantSet("alice", "bob"), opParticipants[int64(1)]) + assert.Equal(t, types.NewParticipantSet("alice"), opParticipants[int64(2)]) }) } @@ -333,238 +231,15 @@ func TestIndexerBuffer_GetAllStateChanges(t *testing.T) { sc2 := types.StateChange{ToID: 2, StateChangeID: 1, AccountID: "bob"} sc3 := types.StateChange{ToID: 3, StateChangeID: 1, AccountID: "charlie"} - indexerBuffer.PushStateChange(tx, op, sc1) - indexerBuffer.PushStateChange(tx, op, sc2) - indexerBuffer.PushStateChange(tx, op, sc3) + indexerBuffer.PushStateChange(&tx, &op, sc1) + indexerBuffer.PushStateChange(&tx, &op, sc2) + indexerBuffer.PushStateChange(&tx, &op, sc3) allStateChanges := indexerBuffer.GetStateChanges() assert.Equal(t, []types.StateChange{sc1, sc2, sc3}, allStateChanges) }) } -func TestIndexerBuffer_Merge(t *testing.T) { - t.Run("🟢 merge empty buffers", func(t *testing.T) { - buffer1 := NewIndexerBuffer() - buffer2 := NewIndexerBuffer() - - buffer1.Merge(buffer2) - assert.Equal(t, 0, buffer1.GetNumberOfTransactions()) - assert.Len(t, buffer1.GetStateChanges(), 0) - }) - - t.Run("🟢 merge transactions only", func(t *testing.T) { - buffer1 := NewIndexerBuffer() - buffer2 := NewIndexerBuffer() - - tx1 := types.Transaction{Hash: "e76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48760", ToID: 1} - tx2 := types.Transaction{Hash: "a76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48761", ToID: 2} - - buffer1.PushTransaction("alice", tx1) - buffer2.PushTransaction("bob", tx2) - - buffer1.Merge(buffer2) - - // Verify transactions - allTxs := buffer1.GetTransactions() - assert.Len(t, allTxs, 2) - assert.ElementsMatch(t, []*types.Transaction{&tx1, &tx2}, allTxs) - - // Verify transaction participants - txParticipants := buffer1.GetTransactionsParticipants() - assert.Equal(t, set.NewSet("alice"), txParticipants[tx1.ToID]) - assert.Equal(t, set.NewSet("bob"), txParticipants[tx2.ToID]) - }) - - t.Run("🟢 merge operations only", func(t *testing.T) { - buffer1 := NewIndexerBuffer() - buffer2 := NewIndexerBuffer() - - tx1 := types.Transaction{Hash: "e76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48760", ToID: 1} - op1 := types.Operation{ID: 1} - op2 := types.Operation{ID: 2} - - buffer1.PushOperation("alice", op1, tx1) - buffer2.PushOperation("bob", op2, tx1) - - buffer1.Merge(buffer2) - - // Verify operations - allOps := buffer1.GetOperations() - assert.Len(t, allOps, 2) - assert.ElementsMatch(t, []*types.Operation{&op1, &op2}, allOps) - - // Verify operation participants - opParticipants := buffer1.GetOperationsParticipants() - assert.Equal(t, set.NewSet("alice"), opParticipants[int64(1)]) - assert.Equal(t, set.NewSet("bob"), opParticipants[int64(2)]) - }) - - t.Run("🟢 merge state changes only", func(t *testing.T) { - buffer1 := NewIndexerBuffer() - buffer2 := NewIndexerBuffer() - - tx := types.Transaction{Hash: "c76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48763", ToID: 1} - op := types.Operation{ID: 1} - - sc1 := types.StateChange{ToID: 1, StateChangeID: 1, AccountID: "alice"} - sc2 := types.StateChange{ToID: 2, StateChangeID: 1, AccountID: "bob"} - - buffer1.PushStateChange(tx, op, sc1) - buffer2.PushStateChange(tx, op, sc2) - - buffer1.Merge(buffer2) - - // Verify state changes - allStateChanges := buffer1.GetStateChanges() - assert.Len(t, allStateChanges, 2) - assert.Equal(t, sc1, allStateChanges[0]) - assert.Equal(t, sc2, allStateChanges[1]) - }) - - t.Run("🟢 merge with overlapping data", func(t *testing.T) { - buffer1 := NewIndexerBuffer() - buffer2 := NewIndexerBuffer() - - tx1 := types.Transaction{Hash: "e76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48760", ToID: 1} - tx2 := types.Transaction{Hash: "a76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48761", ToID: 2} - op1 := types.Operation{ID: 1} - - // Buffer1 has tx1 with alice - buffer1.PushTransaction("alice", tx1) - buffer1.PushOperation("alice", op1, tx1) - - // Buffer2 has tx1 with bob (overlapping tx) and tx2 with charlie - buffer2.PushTransaction("bob", tx1) - buffer2.PushTransaction("charlie", tx2) - buffer2.PushOperation("bob", op1, tx1) - - buffer1.Merge(buffer2) - - // Verify transactions - allTxs := buffer1.GetTransactions() - assert.Len(t, allTxs, 2) - - // Verify tx1 has both alice and bob as participants - txParticipants := buffer1.GetTransactionsParticipants() - assert.Equal(t, set.NewSet("alice", "bob"), txParticipants[tx1.ToID]) - assert.Equal(t, set.NewSet("charlie"), txParticipants[tx2.ToID]) - - // Verify operation participants merged - opParticipants := buffer1.GetOperationsParticipants() - assert.Equal(t, set.NewSet("alice", "bob"), opParticipants[int64(1)]) - }) - - t.Run("🟢 merge into empty buffer", func(t *testing.T) { - buffer1 := NewIndexerBuffer() - buffer2 := NewIndexerBuffer() - - tx1 := types.Transaction{Hash: "e76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48760", ToID: 1} - op1 := types.Operation{ID: 1} - sc1 := types.StateChange{ToID: 1, StateChangeID: 1, AccountID: "alice"} - - buffer2.PushTransaction("alice", tx1) - buffer2.PushOperation("bob", op1, tx1) - buffer2.PushStateChange(tx1, op1, sc1) - - buffer1.Merge(buffer2) - - assert.Equal(t, 1, buffer1.GetNumberOfTransactions()) - assert.Len(t, buffer1.GetOperations(), 1) - assert.Len(t, buffer1.GetStateChanges(), 1) - }) - - t.Run("🟢 merge empty buffer into populated", func(t *testing.T) { - buffer1 := NewIndexerBuffer() - buffer2 := NewIndexerBuffer() - - tx1 := types.Transaction{Hash: "e76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48760", ToID: 1} - buffer1.PushTransaction("alice", tx1) - - buffer1.Merge(buffer2) - - assert.Equal(t, 1, buffer1.GetNumberOfTransactions()) - }) - - t.Run("🟢 concurrent merges", func(t *testing.T) { - buffer1 := NewIndexerBuffer() - buffer2 := NewIndexerBuffer() - buffer3 := NewIndexerBuffer() - - tx1 := types.Transaction{Hash: "e76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48760", ToID: 1} - tx2 := types.Transaction{Hash: "a76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48761", ToID: 2} - tx3 := types.Transaction{Hash: "b76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48762", ToID: 3} - - buffer1.PushTransaction("alice", tx1) - buffer2.PushTransaction("bob", tx2) - buffer3.PushTransaction("charlie", tx3) - - wg := sync.WaitGroup{} - wg.Add(2) - - go func() { - defer wg.Done() - buffer1.Merge(buffer2) - }() - - go func() { - defer wg.Done() - buffer1.Merge(buffer3) - }() - - wg.Wait() - - // Verify all data merged correctly - assert.Equal(t, 3, buffer1.GetNumberOfTransactions()) - }) - - t.Run("🟢 merge complete buffers with all data types", func(t *testing.T) { - buffer1 := NewIndexerBuffer() - buffer2 := NewIndexerBuffer() - - tx1 := types.Transaction{Hash: "e76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48760", ToID: 1} - tx2 := types.Transaction{Hash: "a76b7b0133690fbfb2de8fa9ca2273cb4f2e29447e0cf0e14a5f82d0daa48761", ToID: 2} - op1 := types.Operation{ID: 1} - op2 := types.Operation{ID: 2} - sc1 := types.StateChange{ToID: 1, StateChangeID: 1, AccountID: "alice", OperationID: 1} - sc2 := types.StateChange{ToID: 2, StateChangeID: 1, AccountID: "bob", OperationID: 2} - - // Buffer1 - buffer1.PushTransaction("alice", tx1) - buffer1.PushOperation("alice", op1, tx1) - buffer1.PushStateChange(tx1, op1, sc1) - - // Buffer2 - buffer2.PushTransaction("bob", tx2) - buffer2.PushOperation("bob", op2, tx2) - buffer2.PushStateChange(tx2, op2, sc2) - - buffer1.Merge(buffer2) - - // Verify transactions - allTxs := buffer1.GetTransactions() - assert.Len(t, allTxs, 2) - - // Verify operations - allOps := buffer1.GetOperations() - assert.Len(t, allOps, 2) - - // Verify state changes - allStateChanges := buffer1.GetStateChanges() - assert.Len(t, allStateChanges, 2) - assert.Equal(t, sc1, allStateChanges[0]) - assert.Equal(t, sc2, allStateChanges[1]) - - // Verify participants mappings - txParticipants := buffer1.GetTransactionsParticipants() - assert.Equal(t, set.NewSet("alice"), txParticipants[tx1.ToID]) - assert.Equal(t, set.NewSet("bob"), txParticipants[tx2.ToID]) - - opParticipants := buffer1.GetOperationsParticipants() - assert.Equal(t, set.NewSet("alice"), opParticipants[int64(1)]) - assert.Equal(t, set.NewSet("bob"), opParticipants[int64(2)]) - }) -} - func TestIndexerBuffer_PushSACBalanceChange(t *testing.T) { t.Run("🟢 stores SAC balance changes", func(t *testing.T) { buffer := NewIndexerBuffer() @@ -577,7 +252,7 @@ func TestIndexerBuffer_PushSACBalanceChange(t *testing.T) { OperationID: 100, } - buffer.PushSACBalanceChange(change1) + buffer.BatchPushChanges(nil, nil, []types.SACBalanceChange{change1}, nil) changes := buffer.GetSACBalanceChanges() assert.Len(t, changes, 1) @@ -614,9 +289,9 @@ func TestIndexerBuffer_PushSACBalanceChange(t *testing.T) { OperationID: 50, // Lower operation ID - should be ignored } - buffer.PushSACBalanceChange(change1) - buffer.PushSACBalanceChange(change2) - buffer.PushSACBalanceChange(change3) // Should be ignored (lower opID) + buffer.BatchPushChanges(nil, nil, []types.SACBalanceChange{change1}, nil) + buffer.BatchPushChanges(nil, nil, []types.SACBalanceChange{change2}, nil) + buffer.BatchPushChanges(nil, nil, []types.SACBalanceChange{change3}, nil) // Should be ignored (lower opID) changes := buffer.GetSACBalanceChanges() assert.Len(t, changes, 1) @@ -647,8 +322,8 @@ func TestIndexerBuffer_PushSACBalanceChange(t *testing.T) { OperationID: 200, } - buffer.PushSACBalanceChange(addChange) - buffer.PushSACBalanceChange(removeChange) + buffer.BatchPushChanges(nil, nil, []types.SACBalanceChange{addChange}, nil) + buffer.BatchPushChanges(nil, nil, []types.SACBalanceChange{removeChange}, nil) // ADD→REMOVE in same batch is a no-op - entry should be removed changes := buffer.GetSACBalanceChanges() @@ -676,8 +351,8 @@ func TestIndexerBuffer_PushSACBalanceChange(t *testing.T) { OperationID: 200, } - buffer.PushSACBalanceChange(updateChange) - buffer.PushSACBalanceChange(removeChange) + buffer.BatchPushChanges(nil, nil, []types.SACBalanceChange{updateChange}, nil) + buffer.BatchPushChanges(nil, nil, []types.SACBalanceChange{removeChange}, nil) // UPDATE→REMOVE is NOT a no-op - the balance existed before and needs deletion changes := buffer.GetSACBalanceChanges() @@ -702,7 +377,7 @@ func TestIndexerBuffer_PushSACBalanceChange(t *testing.T) { } for _, change := range changes { - buffer.PushSACBalanceChange(change) + buffer.BatchPushChanges(nil, nil, []types.SACBalanceChange{change}, nil) } result := buffer.GetSACBalanceChanges() @@ -719,134 +394,177 @@ func TestIndexerBuffer_PushSACBalanceChange(t *testing.T) { }) } -func TestIndexerBuffer_MergeSACBalanceChanges(t *testing.T) { - t.Run("🟢 merge SAC balance changes from two buffers", func(t *testing.T) { - buffer1 := NewIndexerBuffer() - buffer2 := NewIndexerBuffer() - - account1 := "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC" - account2 := "CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAD2KM" - contract1 := "CCWAMYJME4H5CKG7OLXGC2T4M6FL52XCZ3OQOAV6LL3GLA4RO4WH3ASP" - - buffer1.PushSACBalanceChange(types.SACBalanceChange{ - AccountID: account1, - ContractID: contract1, - Balance: "100", - Operation: types.SACBalanceOpAdd, - OperationID: 1, - }) - - buffer2.PushSACBalanceChange(types.SACBalanceChange{ - AccountID: account2, - ContractID: contract1, - Balance: "200", - Operation: types.SACBalanceOpAdd, - OperationID: 2, - }) - - buffer1.Merge(buffer2) +func TestIndexerBuffer_BatchPushTransactionResult(t *testing.T) { + t.Run("🟢 pushes all data in single call", func(t *testing.T) { + buffer := NewIndexerBuffer() - changes := buffer1.GetSACBalanceChanges() - assert.Len(t, changes, 2) - }) + tx := types.Transaction{Hash: "txhash1", ToID: 1} + op1 := types.Operation{ID: 100} + op2 := types.Operation{ID: 200} - t.Run("🟢 merge keeps higher OperationID during merge", func(t *testing.T) { - buffer1 := NewIndexerBuffer() - buffer2 := NewIndexerBuffer() + sc1 := buildStateChange(10, types.StateChangeReasonDebit, "alice", 100) + sc2 := buildStateChange(11, types.StateChangeReasonCredit, "dave", 200) + scFee := buildStateChange(12, types.StateChangeReasonDebit, "alice", 0) // fee: no operation - accountID := "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC" - contractID := "CCWAMYJME4H5CKG7OLXGC2T4M6FL52XCZ3OQOAV6LL3GLA4RO4WH3ASP" + cc := types.ContractChange{ + AccountID: "alice", + OperationID: 100, + ContractID: "CCONTRACT", + LedgerNumber: 42, + ContractType: types.ContractTypeSEP41, + } - buffer1.PushSACBalanceChange(types.SACBalanceChange{ - AccountID: accountID, - ContractID: contractID, - Balance: "100", - Operation: types.SACBalanceOpAdd, - OperationID: 50, + buffer.BatchPushTransactionResult(&TransactionResult{ + Transaction: &tx, + TxParticipants: []string{"alice", "bob"}, + Operations: map[int64]*types.Operation{100: &op1, 200: &op2}, + OpParticipants: map[int64][]string{ + 100: {"alice"}, + 200: {"bob", "charlie"}, + }, + ContractChanges: []types.ContractChange{cc}, + StateChanges: []types.StateChange{sc1, sc2, scFee}, + StateChangeOpMap: map[int64]*types.Operation{100: &op1, 200: &op2}, }) - buffer2.PushSACBalanceChange(types.SACBalanceChange{ - AccountID: accountID, - ContractID: contractID, - Balance: "200", - Operation: types.SACBalanceOpUpdate, - OperationID: 100, // Higher - }) + // Verify transaction stored + assert.Equal(t, 1, buffer.GetNumberOfTransactions()) + + // Verify operations stored + assert.Equal(t, 2, buffer.GetNumberOfOperations()) + + // Verify tx participants: alice, bob (from TxParticipants) + charlie (from OpParticipants) + // + dave (from state change) = all mapped to tx ToID=1 + txParts := buffer.GetTransactionsParticipants() + require.Contains(t, txParts, int64(1)) + _, hasAlice := txParts[1]["alice"] + _, hasBob := txParts[1]["bob"] + _, hasCharlie := txParts[1]["charlie"] + _, hasDave := txParts[1]["dave"] + assert.True(t, hasAlice) + assert.True(t, hasBob) + assert.True(t, hasCharlie) + assert.True(t, hasDave) + + // Verify op participants + opParts := buffer.GetOperationsParticipants() + require.Contains(t, opParts, int64(100)) + _, aliceInOp100 := opParts[100]["alice"] + assert.True(t, aliceInOp100) + require.Contains(t, opParts, int64(200)) + _, bobInOp200 := opParts[200]["bob"] + _, charlieInOp200 := opParts[200]["charlie"] + _, daveInOp200 := opParts[200]["dave"] + assert.True(t, bobInOp200) + assert.True(t, charlieInOp200) + // dave's state change has operationID=200, so dave should be an op participant for op 200 + assert.True(t, daveInOp200) - buffer1.Merge(buffer2) + // Verify state changes + assert.Len(t, buffer.GetStateChanges(), 3) - changes := buffer1.GetSACBalanceChanges() - assert.Len(t, changes, 1) + // Verify contract changes + ccs := buffer.GetContractChanges() + require.Len(t, ccs, 1) + assert.Equal(t, "CCONTRACT", ccs[0].ContractID) - key := SACBalanceChangeKey{AccountID: accountID, ContractID: contractID} - assert.Equal(t, "200", changes[key].Balance) - assert.Equal(t, int64(100), changes[key].OperationID) + // Verify SEP-41 tracking + assert.Contains(t, buffer.GetUniqueSEP41ContractTokensByID(), "CCONTRACT") }) - t.Run("🟢 merge handles ADD→REMOVE no-op across buffers", func(t *testing.T) { - buffer1 := NewIndexerBuffer() - buffer2 := NewIndexerBuffer() - - accountID := "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC" - contractID := "CCWAMYJME4H5CKG7OLXGC2T4M6FL52XCZ3OQOAV6LL3GLA4RO4WH3ASP" - - // Buffer1 has ADD - buffer1.PushSACBalanceChange(types.SACBalanceChange{ - AccountID: accountID, - ContractID: contractID, - Balance: "100", - Operation: types.SACBalanceOpAdd, - OperationID: 50, - }) - - // Buffer2 has REMOVE (higher opID) - buffer2.PushSACBalanceChange(types.SACBalanceChange{ - AccountID: accountID, - ContractID: contractID, - Balance: "0", - Operation: types.SACBalanceOpRemove, - OperationID: 100, + t.Run("🟢 skips state changes with empty AccountID", func(t *testing.T) { + buffer := NewIndexerBuffer() + tx := types.Transaction{Hash: "txhash2", ToID: 2} + + scEmpty := types.StateChange{ToID: 20, AccountID: ""} + scValid := buildStateChange(21, types.StateChangeReasonCredit, "alice", 0) + + buffer.BatchPushTransactionResult(&TransactionResult{ + Transaction: &tx, + TxParticipants: []string{"alice"}, + Operations: map[int64]*types.Operation{}, + OpParticipants: map[int64][]string{}, + StateChanges: []types.StateChange{scEmpty, scValid}, + StateChangeOpMap: map[int64]*types.Operation{}, }) - buffer1.Merge(buffer2) - - // ADD→REMOVE across merge is a no-op - changes := buffer1.GetSACBalanceChanges() - assert.Len(t, changes, 0) + assert.Len(t, buffer.GetStateChanges(), 1) }) - t.Run("🟢 merge ignores lower OperationID from other buffer", func(t *testing.T) { - buffer1 := NewIndexerBuffer() - buffer2 := NewIndexerBuffer() + t.Run("🟢 non-SEP41 contract changes are stored but not tracked", func(t *testing.T) { + buffer := NewIndexerBuffer() + tx := types.Transaction{Hash: "txhash3", ToID: 3} + + buffer.BatchPushTransactionResult(&TransactionResult{ + Transaction: &tx, + TxParticipants: []string{"alice"}, + Operations: map[int64]*types.Operation{}, + OpParticipants: map[int64][]string{}, + ContractChanges: []types.ContractChange{ + {AccountID: "alice", ContractID: "COTHER", ContractType: types.ContractType("OTHER")}, + }, + StateChangeOpMap: map[int64]*types.Operation{}, + }) - accountID := "CDLZFC3SYJYDZT7K67VZ75HPJVIEUVNIXF47ZG2FB2RMQQVU2HHGCYSC" - contractID := "CCWAMYJME4H5CKG7OLXGC2T4M6FL52XCZ3OQOAV6LL3GLA4RO4WH3ASP" + assert.Len(t, buffer.GetContractChanges(), 1) + assert.Empty(t, buffer.GetUniqueSEP41ContractTokensByID()) + }) - // Buffer1 has higher opID - buffer1.PushSACBalanceChange(types.SACBalanceChange{ - AccountID: accountID, - ContractID: contractID, - Balance: "200", - Operation: types.SACBalanceOpUpdate, - OperationID: 100, + t.Run("🟢 produces same result as individual Push calls", func(t *testing.T) { + // Set up identical data via individual pushes + individual := NewIndexerBuffer() + tx := types.Transaction{Hash: "txhash4", ToID: 4} + op := types.Operation{ID: 300} + sc := buildStateChange(30, types.StateChangeReasonCredit, "alice", 300) + + individual.PushTransaction("alice", &tx) + individual.PushTransaction("bob", &tx) + individual.PushOperation("alice", &op, &tx) + individual.PushStateChange(&tx, &op, sc) + individual.PushContractChange(types.ContractChange{ + AccountID: "alice", ContractID: "CSEP41", ContractType: types.ContractTypeSEP41, }) - // Buffer2 has lower opID - should be ignored - buffer2.PushSACBalanceChange(types.SACBalanceChange{ - AccountID: accountID, - ContractID: contractID, - Balance: "50", - Operation: types.SACBalanceOpAdd, - OperationID: 50, + // Set up identical data via batch push + batched := NewIndexerBuffer() + batched.BatchPushTransactionResult(&TransactionResult{ + Transaction: &tx, + TxParticipants: []string{"alice", "bob"}, + Operations: map[int64]*types.Operation{300: &op}, + OpParticipants: map[int64][]string{300: {"alice"}}, + ContractChanges: []types.ContractChange{ + {AccountID: "alice", ContractID: "CSEP41", ContractType: types.ContractTypeSEP41}, + }, + StateChanges: []types.StateChange{sc}, + StateChangeOpMap: map[int64]*types.Operation{300: &op}, }) - buffer1.Merge(buffer2) - - changes := buffer1.GetSACBalanceChanges() - assert.Len(t, changes, 1) + // Compare buffer state + assert.Equal(t, individual.GetNumberOfTransactions(), batched.GetNumberOfTransactions()) + assert.Equal(t, individual.GetNumberOfOperations(), batched.GetNumberOfOperations()) + assert.Equal(t, len(individual.GetStateChanges()), len(batched.GetStateChanges())) + assert.Equal(t, len(individual.GetContractChanges()), len(batched.GetContractChanges())) + assert.Equal(t, individual.GetUniqueSEP41ContractTokensByID(), batched.GetUniqueSEP41ContractTokensByID()) + + // Compare participants + indTxParts := individual.GetTransactionsParticipants() + batTxParts := batched.GetTransactionsParticipants() + for toID, indSet := range indTxParts { + require.Contains(t, batTxParts, toID) + for p := range indSet { + _, found := batTxParts[toID][p] + assert.True(t, found, "missing tx participant %s for toID %d", p, toID) + } + } - key := SACBalanceChangeKey{AccountID: accountID, ContractID: contractID} - assert.Equal(t, "200", changes[key].Balance) - assert.Equal(t, int64(100), changes[key].OperationID) + indOpParts := individual.GetOperationsParticipants() + batOpParts := batched.GetOperationsParticipants() + for opID, indSet := range indOpParts { + require.Contains(t, batOpParts, opID) + for p := range indSet { + _, found := batOpParts[opID][p] + assert.True(t, found, "missing op participant %s for opID %d", p, opID) + } + } }) } diff --git a/internal/indexer/indexer_test.go b/internal/indexer/indexer_test.go index fd8c8b9f..5081d1c4 100644 --- a/internal/indexer/indexer_test.go +++ b/internal/indexer/indexer_test.go @@ -225,8 +225,10 @@ func TestIndexer_ProcessLedgerTransactions(t *testing.T) { // Verify transaction participants txParticipantsMap := buffer.GetTransactionsParticipants() toID := allTxs[0].ToID - assert.True(t, txParticipantsMap[toID].Contains("alice"), "alice should be in tx participants") - assert.True(t, txParticipantsMap[toID].Contains("bob"), "bob should be in tx participants") + _, aliceOk := txParticipantsMap[toID]["alice"] + assert.True(t, aliceOk, "alice should be in tx participants") + _, bobOk := txParticipantsMap[toID]["bob"] + assert.True(t, bobOk, "bob should be in tx participants") // Verify operations allOps := buffer.GetOperations() diff --git a/internal/indexer/types/participant_set.go b/internal/indexer/types/participant_set.go new file mode 100644 index 00000000..aaa81601 --- /dev/null +++ b/internal/indexer/types/participant_set.go @@ -0,0 +1,43 @@ +package types + +// ParticipantSet is a set of participant account IDs implemented as a plain map. +// It replaces golang-set/v2 to eliminate the library's internal mutex overhead, +// since the IndexerBuffer already guarantees single-goroutine access. +type ParticipantSet map[string]struct{} + +// NewParticipantSet creates a ParticipantSet initialized with the given members. +func NewParticipantSet(members ...string) ParticipantSet { + s := make(ParticipantSet, len(members)) + for _, m := range members { + s[m] = struct{}{} + } + return s +} + +// Add inserts a participant into the set. +func (s ParticipantSet) Add(participant string) { + s[participant] = struct{}{} +} + +// Cardinality returns the number of elements in the set. +func (s ParticipantSet) Cardinality() int { + return len(s) +} + +// ToSlice returns all participants as a slice. +func (s ParticipantSet) ToSlice() []string { + result := make([]string, 0, len(s)) + for p := range s { + result = append(result, p) + } + return result +} + +// Clone returns a deep copy of the set. +func (s ParticipantSet) Clone() ParticipantSet { + clone := make(ParticipantSet, len(s)) + for k := range s { + clone[k] = struct{}{} + } + return clone +} diff --git a/internal/services/ingest.go b/internal/services/ingest.go index 49e0530b..039c18f2 100644 --- a/internal/services/ingest.go +++ b/internal/services/ingest.go @@ -118,8 +118,9 @@ type ingestService struct { } func NewIngestService(cfg IngestServiceConfig) (*ingestService, error) { - // Create worker pool for the ledger indexer (parallel transaction processing within a ledger) - ledgerIndexerPool := pond.NewPool(0) + // Create worker pool for the ledger indexer (parallel transaction processing within a ledger). + // Bounded to NumCPU to avoid goroutine oversubscription on busy ledgers. + ledgerIndexerPool := pond.NewPool(runtime.NumCPU()) cfg.Metrics.RegisterPoolMetrics("ledger_indexer", ledgerIndexerPool) // Create backfill pool with bounded size to control memory usage. @@ -224,8 +225,9 @@ func (m *ingestService) processLedger(ctx context.Context, ledgerMeta xdr.Ledger } // insertIntoDB persists the processed data from the buffer to the database. -func (m *ingestService) insertIntoDB(ctx context.Context, dbTx pgx.Tx, buffer indexer.IndexerBufferInterface) (int, int, error) { - txs := buffer.GetTransactions() +// txs is passed in to avoid a redundant GetTransactions() allocation — the caller +// already has the slice for unlockChannelAccounts. +func (m *ingestService) insertIntoDB(ctx context.Context, dbTx pgx.Tx, txs []*types.Transaction, buffer indexer.IndexerBufferInterface) (int, int, error) { txParticipants := buffer.GetTransactionsParticipants() ops := buffer.GetOperations() opParticipants := buffer.GetOperationsParticipants() @@ -245,7 +247,7 @@ func (m *ingestService) insertIntoDB(ctx context.Context, dbTx pgx.Tx, buffer in } // insertTransactions batch inserts transactions with their participants into the database. -func (m *ingestService) insertTransactions(ctx context.Context, pgxTx pgx.Tx, txs []*types.Transaction, stellarAddressesByToID map[int64]set.Set[string]) error { +func (m *ingestService) insertTransactions(ctx context.Context, pgxTx pgx.Tx, txs []*types.Transaction, stellarAddressesByToID map[int64]types.ParticipantSet) error { if len(txs) == 0 { return nil } @@ -257,7 +259,7 @@ func (m *ingestService) insertTransactions(ctx context.Context, pgxTx pgx.Tx, tx } // insertOperations batch inserts operations with their participants into the database. -func (m *ingestService) insertOperations(ctx context.Context, pgxTx pgx.Tx, ops []*types.Operation, stellarAddressesByOpID map[int64]set.Set[string]) error { +func (m *ingestService) insertOperations(ctx context.Context, pgxTx pgx.Tx, ops []*types.Operation, stellarAddressesByOpID map[int64]types.ParticipantSet) error { if len(ops) == 0 { return nil } diff --git a/internal/services/ingest_backfill.go b/internal/services/ingest_backfill.go index d40f2558..de6fbb79 100644 --- a/internal/services/ingest_backfill.go +++ b/internal/services/ingest_backfill.go @@ -269,11 +269,12 @@ func (m *ingestService) flushBatchBufferWithRetry(ctx context.Context, buffer *i if _, txErr := dbTx.Exec(ctx, "SET LOCAL synchronous_commit = off"); txErr != nil { return fmt.Errorf("setting synchronous_commit=off: %w", txErr) } - if _, _, err := m.insertIntoDB(ctx, dbTx, buffer); err != nil { + txs := buffer.GetTransactions() + if _, _, err := m.insertIntoDB(ctx, dbTx, txs, buffer); err != nil { return fmt.Errorf("inserting processed data into db: %w", err) } // Unlock channel accounts using all transactions (not filtered) - if err := m.unlockChannelAccounts(ctx, dbTx, buffer.GetTransactions()); err != nil { + if err := m.unlockChannelAccounts(ctx, dbTx, txs); err != nil { return fmt.Errorf("unlocking channel accounts: %w", err) } // Update cursor atomically with data insertion if requested diff --git a/internal/services/ingest_live.go b/internal/services/ingest_live.go index 75407559..8962c36f 100644 --- a/internal/services/ingest_live.go +++ b/internal/services/ingest_live.go @@ -52,13 +52,14 @@ func (m *ingestService) PersistLedgerData(ctx context.Context, ledgerSeq uint32, } // 3. Insert transactions/operations/state_changes - numTxs, numOps, txErr = m.insertIntoDB(ctx, dbTx, buffer) + txs := buffer.GetTransactions() + numTxs, numOps, txErr = m.insertIntoDB(ctx, dbTx, txs, buffer) if txErr != nil { return fmt.Errorf("inserting processed data into db for ledger %d: %w", ledgerSeq, txErr) } // 4. Unlock channel accounts (no-op when chAccStore is nil, e.g., in loadtest) - if txErr = m.unlockChannelAccounts(ctx, dbTx, buffer.GetTransactions()); txErr != nil { + if txErr = m.unlockChannelAccounts(ctx, dbTx, txs); txErr != nil { return fmt.Errorf("unlocking channel accounts for ledger %d: %w", ledgerSeq, txErr) } @@ -168,6 +169,7 @@ func (m *ingestService) initializeCursors(ctx context.Context, dbTx pgx.Tx, ledg func (m *ingestService) ingestLiveLedgers(ctx context.Context, startLedger uint32) error { currentLedger := startLedger log.Ctx(ctx).Infof("Starting ingestion from ledger: %d", currentLedger) + buffer := indexer.NewIndexerBuffer() for { ledgerMeta, ledgerErr := m.getLedgerWithRetry(ctx, m.ledgerBackend, currentLedger) if ledgerErr != nil { @@ -177,7 +179,7 @@ func (m *ingestService) ingestLiveLedgers(ctx context.Context, startLedger uint3 totalStart := time.Now() processStart := time.Now() - buffer := indexer.NewIndexerBuffer() + buffer.Clear() err := m.processLedger(ctx, ledgerMeta, buffer) if err != nil { m.appMetrics.Ingestion.ErrorsTotal.WithLabelValues("ingest_live").Inc() diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index f24d5bef..8c52830c 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -1007,12 +1007,12 @@ func Test_ingestService_flushBatchBufferWithRetry(t *testing.T) { sc1 := createTestStateChange(1, testAddr1, 200) sc2 := createTestStateChange(2, testAddr2, 201) - buf.PushTransaction(testAddr1, tx1) - buf.PushTransaction(testAddr2, tx2) - buf.PushOperation(testAddr1, op1, tx1) - buf.PushOperation(testAddr2, op2, tx2) - buf.PushStateChange(tx1, op1, sc1) - buf.PushStateChange(tx2, op2, sc2) + buf.PushTransaction(testAddr1, &tx1) + buf.PushTransaction(testAddr2, &tx2) + buf.PushOperation(testAddr1, &op1, &tx1) + buf.PushOperation(testAddr2, &op2, &tx2) + buf.PushStateChange(&tx1, &op1, sc1) + buf.PushStateChange(&tx2, &op2, sc2) return buf }, updateCursorTo: nil, @@ -1028,7 +1028,7 @@ func Test_ingestService_flushBatchBufferWithRetry(t *testing.T) { setupBuffer: func() *indexer.IndexerBuffer { buf := indexer.NewIndexerBuffer() tx1 := createTestTransaction(flushTxHash3, 3) - buf.PushTransaction(testAddr1, tx1) + buf.PushTransaction(testAddr1, &tx1) return buf }, updateCursorTo: ptrUint32(50), @@ -1044,7 +1044,7 @@ func Test_ingestService_flushBatchBufferWithRetry(t *testing.T) { setupBuffer: func() *indexer.IndexerBuffer { buf := indexer.NewIndexerBuffer() tx1 := createTestTransaction(flushTxHash4, 4) - buf.PushTransaction(testAddr1, tx1) + buf.PushTransaction(testAddr1, &tx1) return buf }, updateCursorTo: ptrUint32(150),