Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions cmd/protocol_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func runMigration(
ctx context.Context,
dbPool db.ConnectionPool,
ledgerBackend ledgerbackend.LedgerBackend,
ledgerBackendFactory func() ledgerbackend.LedgerBackend,
models *data.Models,
processors []services.ProtocolProcessor,
) error,
Expand Down Expand Up @@ -161,18 +162,16 @@ func runMigration(
return fmt.Errorf("creating models: %w", err)
}

// Create ledger backend
ledgerBackend := ledgerbackend.NewRPCLedgerBackend(ledgerbackend.RPCLedgerBackendOptions{
RPCServerURL: opts.rpcURL,
BufferSize: 10,
})
defer func() {
if closeErr := ledgerBackend.Close(); closeErr != nil {
log.Ctx(ctx).Errorf("error closing ledger backend: %v", closeErr)
}
}()
// Create ledger backend factory for re-creating backends between range preparations.
// RPCLedgerBackend does not support calling PrepareRange more than once.
newBackend := func() ledgerbackend.LedgerBackend {
return ledgerbackend.NewRPCLedgerBackend(ledgerbackend.RPCLedgerBackendOptions{
RPCServerURL: opts.rpcURL,
BufferSize: 10,
})
}

return createAndRun(ctx, dbPool, ledgerBackend, models, processors)
return createAndRun(ctx, dbPool, newBackend(), newBackend, models, processors)
}

func (c *protocolMigrateCmd) historyCommand() *cobra.Command {
Expand All @@ -187,10 +186,11 @@ func (c *protocolMigrateCmd) historyCommand() *cobra.Command {
},
nil,
func(opts *migrationCommandOpts) error {
return runMigration("history", opts, func(ctx context.Context, dbPool db.ConnectionPool, ledgerBackend ledgerbackend.LedgerBackend, models *data.Models, processors []services.ProtocolProcessor) error {
return runMigration("history", opts, func(ctx context.Context, dbPool db.ConnectionPool, ledgerBackend ledgerbackend.LedgerBackend, ledgerBackendFactory func() ledgerbackend.LedgerBackend, models *data.Models, processors []services.ProtocolProcessor) error {
service, err := services.NewProtocolMigrateHistoryService(services.ProtocolMigrateHistoryConfig{
DB: dbPool,
LedgerBackend: ledgerBackend,
LedgerBackendFactory: ledgerBackendFactory,
ProtocolsModel: models.Protocols,
ProtocolContractsModel: models.ProtocolContracts,
IngestStore: models.IngestStore,
Expand Down Expand Up @@ -228,10 +228,11 @@ func (c *protocolMigrateCmd) currentStateCommand() *cobra.Command {
return nil
},
func(opts *migrationCommandOpts) error {
return runMigration("current-state", opts, func(ctx context.Context, dbPool db.ConnectionPool, ledgerBackend ledgerbackend.LedgerBackend, models *data.Models, processors []services.ProtocolProcessor) error {
return runMigration("current-state", opts, func(ctx context.Context, dbPool db.ConnectionPool, ledgerBackend ledgerbackend.LedgerBackend, ledgerBackendFactory func() ledgerbackend.LedgerBackend, models *data.Models, processors []services.ProtocolProcessor) error {
service, err := services.NewProtocolMigrateCurrentStateService(services.ProtocolMigrateCurrentStateConfig{
DB: dbPool,
LedgerBackend: ledgerBackend,
LedgerBackendFactory: ledgerBackendFactory,
ProtocolsModel: models.Protocols,
ProtocolContractsModel: models.ProtocolContracts,
IngestStore: models.IngestStore,
Expand Down
1 change: 1 addition & 0 deletions internal/data/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ func (m *ProtocolContractsModelMock) GetByProtocolID(ctx context.Context, protoc
return args.Get(0).([]ProtocolContracts), args.Error(1)
}

// KnownWasmModelMock is a mock implementation of KnownWasmModelInterface.
type KnownWasmModelMock struct {
mock.Mock
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ CREATE TABLE protocol_contracts (
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_protocol_wasms_protocol_id ON protocol_wasms (protocol_id);
CREATE INDEX idx_protocol_contracts_wasm_hash ON protocol_contracts (wasm_hash);


-- +migrate Down
DROP TABLE IF EXISTS protocol_contracts;
DROP TABLE IF EXISTS protocol_wasms;
4 changes: 4 additions & 0 deletions internal/integrationtests/data_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,10 @@ func (p *integrationTestProcessor) PersistCurrentState(ctx context.Context, dbTx
return p.ingestStore.Update(ctx, dbTx, fmt.Sprintf("test_%s_current_state_written", p.id), p.processedLedger)
}

func (p *integrationTestProcessor) LoadCurrentState(_ context.Context, _ pgx.Tx) error {
return nil
}

func (s *DataMigrationTestSuite) mustLedgerCloseMeta() xdr.LedgerCloseMeta {
var ledgerMeta xdr.LedgerCloseMeta
err := xdr.SafeUnmarshalBase64(protocolStateProductionLedgerMetaWith0Tx, &ledgerMeta)
Expand Down
20 changes: 11 additions & 9 deletions internal/serve/graphql/resolvers/statechange.resolvers.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 37 additions & 28 deletions internal/services/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,22 @@ type ingestService struct {
knownContractIDs set.Set[string]
protocolProcessors map[string]ProtocolProcessor
protocolContractCache *protocolContractCache
// eligibleProtocolProcessors is set by ingestLiveLedgers before each call
// to PersistLedgerData, scoping the CAS loop to only processors that had
// ProcessLedger called. Only accessed from the single-threaded live ingestion loop.
// eligibleProtocolProcessors is set by ingestLiveLedgers before each retry
// sequence, scoping protocol processing and the CAS loop to processors that
// may persist the current ledger. Only accessed from the single-threaded live
// ingestion loop.
eligibleProtocolProcessors map[string]ProtocolProcessor
// protocolCurrentStateLoaded tracks which protocols have had their current
// state loaded into processor memory via LoadCurrentState. On the first
// successful CAS advance of the current_state_cursor (handoff from migration),
// LoadCurrentState reads state from DB; subsequent ledgers use the in-memory
// state maintained by PersistCurrentState. Reset on transaction failure to
// force a reload. Only accessed from the single-threaded live ingestion loop.
protocolCurrentStateLoaded map[string]bool
}

// SetEligibleProtocolProcessorsForTest sets the eligible protocol processors for testing.
// In production, this is set by ingestLiveLedgers before each PersistLedgerData call.
// In production, this is set by ingestLiveLedgers before each retry sequence.
func (m *ingestService) SetEligibleProtocolProcessorsForTest(processors map[string]ProtocolProcessor) {
m.eligibleProtocolProcessors = processors
}
Expand Down Expand Up @@ -172,30 +180,31 @@ func NewIngestService(cfg IngestServiceConfig) (*ingestService, error) {
}

return &ingestService{
ingestionMode: cfg.IngestionMode,
models: cfg.Models,
latestLedgerCursorName: cfg.LatestLedgerCursorName,
oldestLedgerCursorName: cfg.OldestLedgerCursorName,
advisoryLockID: generateAdvisoryLockID(cfg.Network),
appTracker: cfg.AppTracker,
rpcService: cfg.RPCService,
ledgerBackend: cfg.LedgerBackend,
ledgerBackendFactory: cfg.LedgerBackendFactory,
chAccStore: cfg.ChannelAccountStore,
tokenIngestionService: cfg.TokenIngestionService,
checkpointService: cfg.CheckpointService,
metricsService: cfg.MetricsService,
networkPassphrase: cfg.NetworkPassphrase,
getLedgersLimit: cfg.GetLedgersLimit,
ledgerIndexer: indexer.NewIndexer(cfg.NetworkPassphrase, ledgerIndexerPool, cfg.MetricsService, cfg.SkipTxMeta, cfg.SkipTxEnvelope),
archive: cfg.Archive,
backfillPool: backfillPool,
backfillBatchSize: uint32(cfg.BackfillBatchSize),
backfillDBInsertBatchSize: uint32(cfg.BackfillDBInsertBatchSize),
catchupThreshold: uint32(cfg.CatchupThreshold),
knownContractIDs: set.NewSet[string](),
protocolProcessors: ppMap,
protocolContractCache: ppCache,
ingestionMode: cfg.IngestionMode,
models: cfg.Models,
latestLedgerCursorName: cfg.LatestLedgerCursorName,
oldestLedgerCursorName: cfg.OldestLedgerCursorName,
advisoryLockID: generateAdvisoryLockID(cfg.Network),
appTracker: cfg.AppTracker,
rpcService: cfg.RPCService,
ledgerBackend: cfg.LedgerBackend,
ledgerBackendFactory: cfg.LedgerBackendFactory,
chAccStore: cfg.ChannelAccountStore,
tokenIngestionService: cfg.TokenIngestionService,
checkpointService: cfg.CheckpointService,
metricsService: cfg.MetricsService,
networkPassphrase: cfg.NetworkPassphrase,
getLedgersLimit: cfg.GetLedgersLimit,
ledgerIndexer: indexer.NewIndexer(cfg.NetworkPassphrase, ledgerIndexerPool, cfg.MetricsService, cfg.SkipTxMeta, cfg.SkipTxEnvelope),
archive: cfg.Archive,
backfillPool: backfillPool,
backfillBatchSize: uint32(cfg.BackfillBatchSize),
backfillDBInsertBatchSize: uint32(cfg.BackfillDBInsertBatchSize),
catchupThreshold: uint32(cfg.CatchupThreshold),
knownContractIDs: set.NewSet[string](),
protocolProcessors: ppMap,
protocolContractCache: ppCache,
protocolCurrentStateLoaded: make(map[string]bool),
}, nil
}

Expand Down
3 changes: 3 additions & 0 deletions internal/services/ingest_backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,9 @@ func (m *ingestService) flushBatchBufferWithRetry(ctx context.Context, buffer *i
return nil
}
lastErr = err
if attempt == maxIngestProcessedDataRetries-1 {
break
}

backoff := time.Duration(1<<attempt) * time.Second
if backoff > maxIngestProcessedDataRetryBackoff {
Expand Down
48 changes: 40 additions & 8 deletions internal/services/ingest_live.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,13 @@ func (m *ingestService) protocolProcessorsEligibleForProduction(ctx context.Cont
// token changes, and cursor update. Channel unlock is a no-op when chAccStore is nil.
func (m *ingestService) PersistLedgerData(ctx context.Context, ledgerSeq uint32, buffer *indexer.IndexerBuffer, cursorName string) (int, int, error) {
var numTxs, numOps int
// Track protocols that persisted current state in this transaction attempt
// so we can reset their cache-loaded flag on rollback.
var currentStatePersistedProtocols []string

err := db.RunInPgxTransaction(ctx, m.models.DB, func(dbTx pgx.Tx) error {
currentStatePersistedProtocols = currentStatePersistedProtocols[:0]

// 1. Insert unique trustline assets (FK prerequisite for trustline balances)
uniqueAssets := buffer.GetUniqueTrustlineAssets()
if len(uniqueAssets) > 0 {
Expand Down Expand Up @@ -179,6 +184,24 @@ func (m *ingestService) PersistLedgerData(ctx context.Context, ledgerSeq uint32,
return fmt.Errorf("CAS current state cursor for %s: %w", protocolID, casErr)
}
if swapped {
// On first CAS success (handoff from migration), load current state
// from DB into processor memory. Subsequent ledgers use the in-memory
// state maintained by PersistCurrentState (write-through cache).
if !m.protocolCurrentStateLoaded[protocolID] {
loadStart := time.Now()
if loadErr := processor.LoadCurrentState(ctx, dbTx); loadErr != nil {
return fmt.Errorf("loading current state for %s at ledger %d: %w", protocolID, ledgerSeq, loadErr)
}
m.metricsService.ObserveProtocolStateProcessingDuration(protocolID, "load_current_state", time.Since(loadStart).Seconds())
m.protocolCurrentStateLoaded[protocolID] = true
}

// Any rollback after a successful current-state CAS can leave the
// processor's in-memory cache ahead of committed DB state, either
// because we just loaded it for handoff or because PersistCurrentState
// mutates the write-through cache before a later transactional failure.
currentStatePersistedProtocols = append(currentStatePersistedProtocols, protocolID)

start := time.Now()
persistErr := processor.PersistCurrentState(ctx, dbTx)
m.metricsService.ObserveProtocolStateProcessingDuration(protocolID, "persist_current_state", time.Since(start).Seconds())
Expand All @@ -197,6 +220,12 @@ func (m *ingestService) PersistLedgerData(ctx context.Context, ledgerSeq uint32,
return nil
})
if err != nil {
// Transaction rolled back — processor in-memory state loaded inside the
// rolled-back transaction may not match the committed DB state. Reset
// loaded flags to force a DB reload on the next successful CAS attempt.
for _, pid := range currentStatePersistedProtocols {
m.protocolCurrentStateLoaded[pid] = false
}
return 0, 0, fmt.Errorf("persisting ledger data for ledger %d: %w", ledgerSeq, err)
}

Expand Down Expand Up @@ -318,15 +347,9 @@ func (m *ingestService) ingestLiveLedgers(ctx context.Context, startLedger uint3
}
m.eligibleProtocolProcessors = eligibleProcessors

// Run protocol state production (in-memory analysis before DB transaction) only
// for processors that may actually persist this ledger.
if produceErr := m.produceProtocolStateForProcessors(ctx, ledgerMeta, currentLedger, eligibleProcessors); produceErr != nil {
return fmt.Errorf("producing protocol state for ledger %d: %w", currentLedger, produceErr)
}

// All DB operations in a single atomic transaction with retry
dbStart := time.Now()
numTransactionProcessed, numOperationProcessed, err := m.ingestProcessedDataWithRetry(ctx, currentLedger, buffer)
numTransactionProcessed, numOperationProcessed, err := m.ingestProcessedDataWithRetry(ctx, ledgerMeta, currentLedger, buffer)
if err != nil {
return fmt.Errorf("processing ledger %d: %w", currentLedger, err)
}
Expand Down Expand Up @@ -448,7 +471,9 @@ func (m *ingestService) refreshProtocolContractCache(ctx context.Context, curren
}

// ingestProcessedDataWithRetry wraps PersistLedgerData with retry logic.
func (m *ingestService) ingestProcessedDataWithRetry(ctx context.Context, currentLedger uint32, buffer *indexer.IndexerBuffer) (int, int, error) {
// Protocol state is re-produced on every attempt so processor-owned staged state
// is rebuilt after rolled-back transactions.
func (m *ingestService) ingestProcessedDataWithRetry(ctx context.Context, ledgerMeta xdr.LedgerCloseMeta, currentLedger uint32, buffer *indexer.IndexerBuffer) (int, int, error) {
var lastErr error
for attempt := 0; attempt < maxIngestProcessedDataRetries; attempt++ {
select {
Expand All @@ -457,11 +482,18 @@ func (m *ingestService) ingestProcessedDataWithRetry(ctx context.Context, curren
default:
}

if produceErr := m.produceProtocolStateForProcessors(ctx, ledgerMeta, currentLedger, m.eligibleProtocolProcessors); produceErr != nil {
return 0, 0, fmt.Errorf("producing protocol state for ledger %d: %w", currentLedger, produceErr)
}

numTxs, numOps, err := m.PersistLedgerData(ctx, currentLedger, buffer, m.latestLedgerCursorName)
if err == nil {
return numTxs, numOps, nil
}
lastErr = err
if attempt == maxIngestProcessedDataRetries-1 {
break
}

backoff := time.Duration(1<<attempt) * time.Second
if backoff > maxIngestProcessedDataRetryBackoff {
Expand Down
Loading
Loading