Skip to content
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/tetratelabs/wazero v1.10.1
github.com/vektah/gqlparser/v2 v2.5.30
github.com/vikstrous/dataloadgen v0.0.9
golang.org/x/sync v0.16.0
golang.org/x/term v0.33.0
golang.org/x/text v0.27.0
)
Expand Down Expand Up @@ -182,7 +183,6 @@ require (
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
golang.org/x/net v0.42.0 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sync v0.16.0 // indirect
golang.org/x/sys v0.35.0 // indirect
golang.org/x/time v0.8.0 // indirect
google.golang.org/api v0.215.0 // indirect
Expand Down
90 changes: 51 additions & 39 deletions internal/data/native_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,53 +72,67 @@ func (m *NativeBalanceModel) GetByAccount(ctx context.Context, accountAddress st
return &nb, nil
}

// BatchUpsert upserts and deletes native balances in batch.
// BatchUpsert upserts and deletes native balances using UNNEST-based bulk operations.
func (m *NativeBalanceModel) BatchUpsert(ctx context.Context, dbTx pgx.Tx, upserts []NativeBalance, deletes []string) error {
if len(upserts) == 0 && len(deletes) == 0 {
return nil
}

start := time.Now()
batch := &pgx.Batch{}

const upsertQuery = `
INSERT INTO native_balances (account_address, balance, minimum_balance, buying_liabilities, selling_liabilities, last_modified_ledger)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (account_address) DO UPDATE SET
balance = EXCLUDED.balance,
minimum_balance = EXCLUDED.minimum_balance,
buying_liabilities = EXCLUDED.buying_liabilities,
selling_liabilities = EXCLUDED.selling_liabilities,
last_modified_ledger = EXCLUDED.last_modified_ledger`

for _, nb := range upserts {
batch.Queue(upsertQuery, nb.AccountAddress, nb.Balance, nb.MinimumBalance, nb.BuyingLiabilities, nb.SellingLiabilities, nb.LedgerNumber)
}

const deleteQuery = `DELETE FROM native_balances WHERE account_address = $1`
for _, addr := range deletes {
batch.Queue(deleteQuery, addr)
}

if batch.Len() == 0 {
return nil
}
if len(upserts) > 0 {
accountAddresses := make([]string, len(upserts))
balances := make([]int64, len(upserts))
minimumBalances := make([]int64, len(upserts))
buyingLiabilities := make([]int64, len(upserts))
sellingLiabilities := make([]int64, len(upserts))
ledgerNumbers := make([]int64, len(upserts))

for i, nb := range upserts {
accountAddresses[i] = nb.AccountAddress
balances[i] = nb.Balance
minimumBalances[i] = nb.MinimumBalance
buyingLiabilities[i] = nb.BuyingLiabilities
sellingLiabilities[i] = nb.SellingLiabilities
ledgerNumbers[i] = int64(nb.LedgerNumber)
}

br := dbTx.SendBatch(ctx, batch)
for i := 0; i < batch.Len(); i++ {
if _, err := br.Exec(); err != nil {
_ = br.Close() //nolint:errcheck // cleanup on error path
const upsertQuery = `
INSERT INTO native_balances (
account_address, balance, minimum_balance,
buying_liabilities, selling_liabilities, last_modified_ledger
)
SELECT * FROM UNNEST(
$1::text[], $2::bigint[], $3::bigint[],
$4::bigint[], $5::bigint[], $6::bigint[]
)
ON CONFLICT (account_address) DO UPDATE SET
balance = EXCLUDED.balance,
minimum_balance = EXCLUDED.minimum_balance,
buying_liabilities = EXCLUDED.buying_liabilities,
selling_liabilities = EXCLUDED.selling_liabilities,
last_modified_ledger = EXCLUDED.last_modified_ledger`

if _, err := dbTx.Exec(ctx, upsertQuery,
accountAddresses, balances, minimumBalances,
buyingLiabilities, sellingLiabilities, ledgerNumbers,
); err != nil {
m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "native_balances").Observe(time.Since(start).Seconds())
m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "native_balances").Inc()
m.Metrics.QueryErrors.WithLabelValues("BatchUpsert", "native_balances", utils.GetDBErrorType(err)).Inc()
return fmt.Errorf("upserting native balances: %w", err)
}
}
if err := br.Close(); err != nil {
m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "native_balances").Observe(time.Since(start).Seconds())
m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "native_balances").Inc()
m.Metrics.QueryErrors.WithLabelValues("BatchUpsert", "native_balances", utils.GetDBErrorType(err)).Inc()
return fmt.Errorf("closing native balance batch: %w", err)

if len(deletes) > 0 {
const deleteQuery = `DELETE FROM native_balances WHERE account_address = ANY($1::text[])`

if _, err := dbTx.Exec(ctx, deleteQuery, deletes); err != nil {
m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "native_balances").Observe(time.Since(start).Seconds())
m.Metrics.QueriesTotal.WithLabelValues("BatchUpsert", "native_balances").Inc()
m.Metrics.QueryErrors.WithLabelValues("BatchUpsert", "native_balances", utils.GetDBErrorType(err)).Inc()
return fmt.Errorf("deleting native balances: %w", err)
}
}

m.Metrics.QueryDuration.WithLabelValues("BatchUpsert", "native_balances").Observe(time.Since(start).Seconds())
Expand All @@ -133,6 +147,10 @@ func (m *NativeBalanceModel) BatchCopy(ctx context.Context, dbTx pgx.Tx, balance
}

start := time.Now()
defer func() {
m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "native_balances").Observe(time.Since(start).Seconds())
m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "native_balances").Inc()
}()

copyCount, err := dbTx.CopyFrom(
ctx,
Expand All @@ -144,20 +162,14 @@ func (m *NativeBalanceModel) BatchCopy(ctx context.Context, dbTx pgx.Tx, balance
}),
)
if err != nil {
m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "native_balances").Observe(time.Since(start).Seconds())
m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "native_balances").Inc()
m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "native_balances", utils.GetDBErrorType(err)).Inc()
return fmt.Errorf("bulk inserting native balances via COPY: %w", err)
}

if int(copyCount) != len(balances) {
m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "native_balances").Observe(time.Since(start).Seconds())
m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "native_balances").Inc()
m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "native_balances", "row_count_mismatch").Inc()
return fmt.Errorf("expected %d rows copied, got %d", len(balances), copyCount)
}

m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "native_balances").Observe(time.Since(start).Seconds())
m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "native_balances").Inc()
return nil
}
114 changes: 58 additions & 56 deletions internal/data/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,6 @@ func (m *OperationModel) BatchGetByStateChangeIDs(ctx context.Context, scToIDs [
}

// BatchCopy inserts operations using pgx's binary COPY protocol.
// Uses pgx.Tx for binary format which is faster than lib/pq's text format.
// Uses native pgtype types for optimal performance (see https://github.com/jackc/pgx/issues/763).
//
// IMPORTANT: BatchCopy will FAIL if any duplicate records exist. The PostgreSQL COPY
Expand All @@ -333,15 +332,18 @@ func (m *OperationModel) BatchCopy(
ctx context.Context,
pgxTx pgx.Tx,
operations []*types.Operation,
stellarAddressesByOpID map[int64]types.ParticipantSet,
) (int, error) {
if len(operations) == 0 {
return 0, nil
}

start := time.Now()
defer func() {
m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations").Observe(time.Since(start).Seconds())
m.Metrics.BatchSize.WithLabelValues("BatchCopy", "operations").Observe(float64(len(operations)))
m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations").Inc()
}()

// COPY operations using pgx binary format with native pgtype types
copyCount, err := pgxTx.CopyFrom(
ctx,
pgx.Identifier{"operations"},
Expand All @@ -360,71 +362,71 @@ func (m *OperationModel) BatchCopy(
}),
)
if err != nil {
duration := time.Since(start).Seconds()
m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations").Observe(duration)
m.Metrics.BatchSize.WithLabelValues("BatchCopy", "operations").Observe(float64(len(operations)))
m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations").Inc()
m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "operations", utils.GetDBErrorType(err)).Inc()
return 0, fmt.Errorf("pgx CopyFrom operations: %w", err)
}
if int(copyCount) != len(operations) {
duration := time.Since(start).Seconds()
m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations").Observe(duration)
m.Metrics.BatchSize.WithLabelValues("BatchCopy", "operations").Observe(float64(len(operations)))
m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations").Inc()
m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "operations", "row_count_mismatch").Inc()
return 0, fmt.Errorf("expected %d rows copied, got %d", len(operations), copyCount)
}

// COPY operations_accounts using pgx binary format with native pgtype types. Upstream participants handling ensures that
// account address is not NULL here.
if len(stellarAddressesByOpID) > 0 {
// Build OpID -> LedgerCreatedAt lookup from operations
ledgerCreatedAtByOpID := make(map[int64]time.Time, len(operations))
for _, op := range operations {
ledgerCreatedAtByOpID[op.ID] = op.LedgerCreatedAt
}

var oaRows [][]any
for opID, addresses := range stellarAddressesByOpID {
ledgerCreatedAt := ledgerCreatedAtByOpID[opID]
ledgerCreatedAtPgtype := pgtype.Timestamptz{Time: ledgerCreatedAt, Valid: true}
opIDPgtype := pgtype.Int8{Int64: opID, Valid: true}
for _, addr := range addresses.ToSlice() {
addrBytes, addrErr := types.AddressBytea(addr).Value()
if addrErr != nil {
return 0, fmt.Errorf("converting address %s to bytes: %w", addr, addrErr)
}
oaRows = append(oaRows, []any{
ledgerCreatedAtPgtype,
opIDPgtype,
addrBytes,
})
}
}
return len(operations), nil
}

_, err = pgxTx.CopyFrom(
ctx,
pgx.Identifier{"operations_accounts"},
[]string{"ledger_created_at", "operation_id", "account_id"},
pgx.CopyFromRows(oaRows),
)
if err != nil {
duration := time.Since(start).Seconds()
m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations").Observe(duration)
m.Metrics.BatchSize.WithLabelValues("BatchCopy", "operations").Observe(float64(len(operations)))
m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations").Inc()
m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "operations_accounts", utils.GetDBErrorType(err)).Inc()
return 0, fmt.Errorf("pgx CopyFrom operations_accounts: %w", err)
}
// BatchCopyAccounts inserts operation participants into operations_accounts using pgx's binary COPY protocol.
func (m *OperationModel) BatchCopyAccounts(
ctx context.Context,
pgxTx pgx.Tx,
operations []*types.Operation,
stellarAddressesByOpID map[int64]types.ParticipantSet,
) error {
if len(stellarAddressesByOpID) == 0 {
return nil
}

start := time.Now()
var rowCount int
defer func() {
m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations_accounts").Observe(time.Since(start).Seconds())
m.Metrics.BatchSize.WithLabelValues("BatchCopy", "operations_accounts").Observe(float64(rowCount))
m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations_accounts").Inc()
}()

// Build OpID -> LedgerCreatedAt lookup from operations
ledgerCreatedAtByOpID := make(map[int64]time.Time, len(operations))
for _, op := range operations {
ledgerCreatedAtByOpID[op.ID] = op.LedgerCreatedAt
}

duration := time.Since(start).Seconds()
m.Metrics.QueryDuration.WithLabelValues("BatchCopy", "operations").Observe(duration)
m.Metrics.BatchSize.WithLabelValues("BatchCopy", "operations").Observe(float64(len(operations)))
m.Metrics.QueriesTotal.WithLabelValues("BatchCopy", "operations").Inc()
oaRows := make([][]any, 0, len(stellarAddressesByOpID)*2)
for opID, addresses := range stellarAddressesByOpID {
ledgerCreatedAt := ledgerCreatedAtByOpID[opID]
ledgerCreatedAtPgtype := pgtype.Timestamptz{Time: ledgerCreatedAt, Valid: true}
opIDPgtype := pgtype.Int8{Int64: opID, Valid: true}
for _, addr := range addresses.ToSlice() {
addrVal, addrErr := types.AddressBytea(addr).Value()
if addrErr != nil {
return fmt.Errorf("converting address %s to bytes: %w", addr, addrErr)
}
oaRows = append(oaRows, []any{
ledgerCreatedAtPgtype,
opIDPgtype,
addrVal.([]byte),
})
}
}
rowCount = len(oaRows)

return len(operations), nil
_, err := pgxTx.CopyFrom(
ctx,
pgx.Identifier{"operations_accounts"},
[]string{"ledger_created_at", "operation_id", "account_id"},
pgx.CopyFromRows(oaRows),
)
if err != nil {
m.Metrics.QueryErrors.WithLabelValues("BatchCopy", "operations_accounts", utils.GetDBErrorType(err)).Inc()
return fmt.Errorf("pgx CopyFrom operations_accounts: %w", err)
}

return nil
}
9 changes: 7 additions & 2 deletions internal/data/operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func Test_OperationModel_BatchCopy(t *testing.T) {
pgxTx, err := conn.Begin(ctx)
require.NoError(t, err)

gotCount, err := m.BatchCopy(ctx, pgxTx, tc.operations, tc.stellarAddressesByOpID)
gotCount, err := m.BatchCopy(ctx, pgxTx, tc.operations)

if tc.wantErrContains != "" {
require.Error(t, err)
Expand All @@ -157,6 +157,7 @@ func Test_OperationModel_BatchCopy(t *testing.T) {
}

require.NoError(t, err)
require.NoError(t, m.BatchCopyAccounts(ctx, pgxTx, tc.operations, tc.stellarAddressesByOpID))
require.NoError(t, pgxTx.Commit(ctx))
assert.Equal(t, tc.wantCount, gotCount)

Expand Down Expand Up @@ -726,11 +727,15 @@ func BenchmarkOperationModel_BatchCopy(b *testing.B) {
}
b.StartTimer()

_, err = m.BatchCopy(ctx, pgxTx, ops, addressesByOpID)
_, err = m.BatchCopy(ctx, pgxTx, ops)
if err != nil {
pgxTx.Rollback(ctx)
b.Fatalf("BatchCopy failed: %v", err)
}
if err = m.BatchCopyAccounts(ctx, pgxTx, ops, addressesByOpID); err != nil {
pgxTx.Rollback(ctx)
b.Fatalf("BatchCopyAccounts failed: %v", err)
}

b.StopTimer()
if err := pgxTx.Commit(ctx); err != nil {
Expand Down
Loading
Loading