From 0d01b589cfd5251eddc010d5c5c72f91d340166d Mon Sep 17 00:00:00 2001 From: Aristides Staffieri Date: Fri, 20 Feb 2026 16:54:12 -0700 Subject: [PATCH 01/12] Extract checkpoint population into dedicated services, add known_wasms tracking - Add known_wasms table (migration, model, mock, and data layer tests) for tracking WASM hashes during checkpoint population - Add KnownWasm field to Models struct - Create WasmIngestionService (wasm_ingestion.go) that runs protocol validators against WASM bytecode and batch-persists hashes to known_wasms - Create CheckpointService (checkpoint.go) that orchestrates single-pass checkpoint population, delegating ContractCode entries to both WasmIngestionService and TokenProcessor, and all other entries to TokenProcessor - Extract readerFactory on checkpointService for injectable checkpoint reader creation - Extract TokenProcessor interface and NewTokenProcessor from TokenIngestionService, moving checkpoint iteration logic out of token_ingestion.go into checkpoint.go - Remove db, archive, and PopulateAccountTokens from TokenIngestionService interface and struct - Remove dbPool parameter from NewTokenIngestionServiceForLoadtest - Wire CheckpointService into IngestServiceConfig and ingestService - Update ingest_live.go to call checkpointService.PopulateFromCheckpoint instead of tokenIngestionService.PopulateAccountTokens - Update ingest.go setupDeps to construct WasmIngestionService and CheckpointService - Add ContractValidatorMock, ProtocolValidatorMock, ChangeReaderMock, CheckpointServiceMock, WasmIngestionServiceMock, TokenProcessorMock, and TokenIngestionServiceMock updates to mocks.go - Add unit tests for WasmIngestionService (10 cases covering ProcessContractCode and PersistKnownWasms) - Add unit tests for CheckpointService (16 cases covering entry routing, error propagation, and context cancellation) --- internal/data/mocks.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/data/mocks.go b/internal/data/mocks.go index 4718ce4c..e1113fd7 100644 --- a/internal/data/mocks.go +++ b/internal/data/mocks.go @@ -318,6 +318,7 @@ func (m *ProtocolContractsModelMock) GetByProtocolID(ctx context.Context, protoc return args.Get(0).([]ProtocolContracts), args.Error(1) } +// KnownWasmModelMock is a mock implementation of KnownWasmModelInterface. type KnownWasmModelMock struct { mock.Mock } From 6142d0197c3d2a527359e34d0c630292894ff316 Mon Sep 17 00:00:00 2001 From: Aristides Staffieri Date: Tue, 17 Mar 2026 10:32:48 -0600 Subject: [PATCH 02/12] Add live protocol state production pipeline with dual CAS gating Introduces the infrastructure for protocol processors to produce and persist protocol-specific state during live ledger ingestion, gated by per-protocol compare-and-swap cursors that coordinate with concurrent migration processes. Key changes: - ProtocolProcessor interface and ProtocolProcessorInput for protocol- specific ledger analysis and state persistence - Processor registry (RegisterProcessor/GetAllProcessors) for protocol processor discovery at startup - Dual CAS gating in PersistLedgerData (step 5.5): per-protocol history and current_state cursors ensure exactly-once writes even when live ingestion and migration run concurrently - Protocol contract cache with periodic refresh to avoid per-ledger DB queries for classified contracts - Data layer additions: IngestStoreModel.GetTx, CompareAndSwap, ProtocolContractsModel.GetByProtocolID, ProtocolsModel.GetClassified Tests: - Unit tests for processor registry (concurrent safety, overwrite, etc.) - 5 subtests for PersistLedgerData CAS gating (win, lose, behind, no cursor, no processors) using a real test DB and sentinel-writing testProtocolProcessor - Docker integration test (ProtocolStateProductionTestSuite) exercising CAS gating against a live ingest container's DB in three phases --- internal/integrationtests/main_test.go | 5 + .../protocol_state_production_test.go | 261 ++++++++++++++++++ 2 files changed, 266 insertions(+) create mode 100644 internal/integrationtests/protocol_state_production_test.go diff --git a/internal/integrationtests/main_test.go b/internal/integrationtests/main_test.go index c1119e79..3620de85 100644 --- a/internal/integrationtests/main_test.go +++ b/internal/integrationtests/main_test.go @@ -66,6 +66,11 @@ func TestIntegrationTests(t *testing.T) { suite.Run(t, &DataMigrationTestSuite{testEnv: testEnv}) }) + // Protocol state production tests — dual CAS gating during live ingestion + t.Run("ProtocolStateProductionTestSuite", func(t *testing.T) { + suite.Run(t, &ProtocolStateProductionTestSuite{testEnv: testEnv}) + }) + t.Run("BuildAndSubmitTransactionsTestSuite", func(t *testing.T) { suite.Run(t, &BuildAndSubmitTransactionsTestSuite{ testEnv: testEnv, diff --git a/internal/integrationtests/protocol_state_production_test.go b/internal/integrationtests/protocol_state_production_test.go new file mode 100644 index 00000000..fe9ac1ab --- /dev/null +++ b/internal/integrationtests/protocol_state_production_test.go @@ -0,0 +1,261 @@ +package integrationtests + +import ( + "context" + "database/sql" + "fmt" + "testing" + + "github.com/jackc/pgx/v5" + _ "github.com/lib/pq" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "github.com/stellar/wallet-backend/internal/apptracker" + "github.com/stellar/wallet-backend/internal/data" + "github.com/stellar/wallet-backend/internal/db" + "github.com/stellar/wallet-backend/internal/indexer" + "github.com/stellar/wallet-backend/internal/integrationtests/infrastructure" + "github.com/stellar/wallet-backend/internal/metrics" + "github.com/stellar/wallet-backend/internal/services" + "github.com/stellar/wallet-backend/internal/signing/store" +) + +// --- ProtocolStateProductionTestSuite (requires Docker DB + live ingest) --- + +type ProtocolStateProductionTestSuite struct { + suite.Suite + testEnv *infrastructure.TestEnvironment +} + +func (s *ProtocolStateProductionTestSuite) setupDB() (db.ConnectionPool, func()) { + ctx := context.Background() + dbURL, err := s.testEnv.Containers.GetWalletDBConnectionString(ctx) + s.Require().NoError(err) + pool, err := db.OpenDBConnectionPool(dbURL) + s.Require().NoError(err) + return pool, func() { pool.Close() } +} + +func (s *ProtocolStateProductionTestSuite) setupModels(pool db.ConnectionPool) *data.Models { + mockMetrics := metrics.NewMockMetricsService() + mockMetrics.On("ObserveDBQueryDuration", mock.Anything, mock.Anything, mock.Anything).Return() + mockMetrics.On("ObserveDBBatchSize", mock.Anything, mock.Anything, mock.Anything).Return() + mockMetrics.On("IncDBQuery", mock.Anything, mock.Anything).Return() + mockMetrics.On("IncDBQueryError", mock.Anything, mock.Anything, mock.Anything).Return() + mockMetrics.On("RegisterPoolMetrics", mock.Anything, mock.Anything).Return() + models, err := data.NewModels(pool, mockMetrics) + s.Require().NoError(err) + return models +} + +func (s *ProtocolStateProductionTestSuite) cleanupTestKeys(ctx context.Context, dbURL string) { + sqlDB, err := sql.Open("postgres", dbURL) + s.Require().NoError(err) + defer sqlDB.Close() + + _, err = sqlDB.ExecContext(ctx, `DELETE FROM ingest_store WHERE key LIKE 'test_%' OR key LIKE 'protocol_testproto%'`) + s.Require().NoError(err) +} + +// integrationTestProcessor implements services.ProtocolProcessor using the real +// IngestStoreModel to write sentinel keys within the DB transaction. +type integrationTestProcessor struct { + id string + processedLedger uint32 + ingestStore *data.IngestStoreModel +} + +func (p *integrationTestProcessor) ProtocolID() string { return p.id } + +func (p *integrationTestProcessor) ProcessLedger(_ context.Context, input services.ProtocolProcessorInput) error { + p.processedLedger = input.LedgerSequence + return nil +} + +func (p *integrationTestProcessor) PersistHistory(ctx context.Context, dbTx pgx.Tx) error { + return p.ingestStore.Update(ctx, dbTx, fmt.Sprintf("test_%s_history_written", p.id), p.processedLedger) +} + +func (p *integrationTestProcessor) PersistCurrentState(ctx context.Context, dbTx pgx.Tx) error { + return p.ingestStore.Update(ctx, dbTx, fmt.Sprintf("test_%s_current_state_written", p.id), p.processedLedger) +} + +// TestDualCASGatingDuringLiveIngestion proves CAS gating works against the +// Docker DB that has been populated by the real ingest container. +func (s *ProtocolStateProductionTestSuite) TestDualCASGatingDuringLiveIngestion() { + ctx := context.Background() + pool, cleanup := s.setupDB() + defer cleanup() + models := s.setupModels(pool) + + dbURL, err := s.testEnv.Containers.GetWalletDBConnectionString(ctx) + s.Require().NoError(err) + defer s.cleanupTestKeys(ctx, dbURL) + + // Read current latest_ingest_ledger to know where the live container is + latestLedger, err := models.IngestStore.Get(ctx, "latest_ingest_ledger") + s.Require().NoError(err) + s.Require().Greater(latestLedger, uint32(0), "live ingest should have populated latest_ingest_ledger") + s.T().Logf("Live ingest container is at ledger %d", latestLedger) + + // Pick a test ledger well beyond the live ingest tip to avoid collision + testLedger := latestLedger + 1000 + + // Insert protocol cursors at testLedger-1 (ready for CAS win) + sqlDB, err := sql.Open("postgres", dbURL) + s.Require().NoError(err) + defer sqlDB.Close() + + _, err = sqlDB.ExecContext(ctx, + `INSERT INTO ingest_store (key, value) VALUES ($1, $2)`, + "protocol_testproto_history_cursor", testLedger-1) + s.Require().NoError(err) + _, err = sqlDB.ExecContext(ctx, + `INSERT INTO ingest_store (key, value) VALUES ($1, $2)`, + "protocol_testproto_current_state_cursor", testLedger-1) + s.Require().NoError(err) + + // Insert a test-specific main cursor (avoid interfering with real ingest) + _, err = sqlDB.ExecContext(ctx, + `INSERT INTO ingest_store (key, value) VALUES ($1, $2)`, + "test_cursor", testLedger-1) + s.Require().NoError(err) + + processor := &integrationTestProcessor{id: "testproto", ingestStore: models.IngestStore, processedLedger: testLedger} + + mockTokenIngestionService := services.NewTokenIngestionServiceMock(s.T()) + mockTokenIngestionService.On("ProcessTokenChanges", + mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, + ).Return(nil).Maybe() + + mockMetrics := metrics.NewMockMetricsService() + mockMetrics.On("RegisterPoolMetrics", mock.Anything, mock.Anything).Return() + mockMetrics.On("ObserveDBQueryDuration", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() + mockMetrics.On("IncDBQuery", mock.Anything, mock.Anything).Return().Maybe() + mockMetrics.On("IncDBQueryError", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() + + svc, err := services.NewIngestService(services.IngestServiceConfig{ + IngestionMode: services.IngestionModeLive, + Models: models, + LatestLedgerCursorName: "test_cursor", + OldestLedgerCursorName: "test_cursor", + AppTracker: &apptracker.MockAppTracker{}, + RPCService: s.testEnv.RPCService, + LedgerBackend: &services.LedgerBackendMock{}, + ChannelAccountStore: &store.ChannelAccountStoreMock{}, + TokenIngestionService: mockTokenIngestionService, + MetricsService: mockMetrics, + Network: "Test SDF Network ; September 2015", + NetworkPassphrase: "Test SDF Network ; September 2015", + Archive: &services.HistoryArchiveMock{}, + ProtocolProcessors: []services.ProtocolProcessor{processor}, + }) + s.Require().NoError(err) + + buffer := indexer.NewIndexerBuffer() + + // Phase 1: CAS Win — cursors at testLedger-1, persisting testLedger + s.T().Log("Phase 1: CAS win") + _, _, err = svc.PersistLedgerData(ctx, testLedger, buffer, "test_cursor") + s.Require().NoError(err) + + histCursor, err := models.IngestStore.Get(ctx, "protocol_testproto_history_cursor") + s.Require().NoError(err) + s.Assert().Equal(testLedger, histCursor, "history cursor should advance to testLedger") + + csCursor, err := models.IngestStore.Get(ctx, "protocol_testproto_current_state_cursor") + s.Require().NoError(err) + s.Assert().Equal(testLedger, csCursor, "current state cursor should advance to testLedger") + + histSentinel, err := models.IngestStore.Get(ctx, "test_testproto_history_written") + s.Require().NoError(err) + s.Assert().Equal(testLedger, histSentinel, "history sentinel should be testLedger") + + csSentinel, err := models.IngestStore.Get(ctx, "test_testproto_current_state_written") + s.Require().NoError(err) + s.Assert().Equal(testLedger, csSentinel, "current state sentinel should be testLedger") + + // Phase 2: CAS Lose — same ledger again, CAS expects testLedger-1 but finds testLedger + s.T().Log("Phase 2: CAS lose (same ledger again)") + _, _, err = svc.PersistLedgerData(ctx, testLedger, buffer, "test_cursor") + s.Require().NoError(err) + + // Cursors should still be at testLedger + histCursor, err = models.IngestStore.Get(ctx, "protocol_testproto_history_cursor") + s.Require().NoError(err) + s.Assert().Equal(testLedger, histCursor, "history cursor should remain at testLedger after CAS lose") + + csCursor, err = models.IngestStore.Get(ctx, "protocol_testproto_current_state_cursor") + s.Require().NoError(err) + s.Assert().Equal(testLedger, csCursor, "current state cursor should remain at testLedger after CAS lose") + + // Delete sentinels, re-run, and verify they are NOT re-written + _, err = sqlDB.ExecContext(ctx, `DELETE FROM ingest_store WHERE key LIKE 'test_testproto_%_written'`) + s.Require().NoError(err) + + _, _, err = svc.PersistLedgerData(ctx, testLedger, buffer, "test_cursor") + s.Require().NoError(err) + + histSentinel, err = models.IngestStore.Get(ctx, "test_testproto_history_written") + s.Require().NoError(err) + s.Assert().Equal(uint32(0), histSentinel, "sentinels should NOT be re-written after CAS lose") + + csSentinel, err = models.IngestStore.Get(ctx, "test_testproto_current_state_written") + s.Require().NoError(err) + s.Assert().Equal(uint32(0), csSentinel, "sentinels should NOT be re-written after CAS lose") + + // Phase 3: Cursor behind — second protocol at testLedger-2 + s.T().Log("Phase 3: Cursor behind (second protocol)") + _, err = sqlDB.ExecContext(ctx, + `INSERT INTO ingest_store (key, value) VALUES ($1, $2)`, + "protocol_testproto2_history_cursor", testLedger-2) + s.Require().NoError(err) + _, err = sqlDB.ExecContext(ctx, + `INSERT INTO ingest_store (key, value) VALUES ($1, $2)`, + "protocol_testproto2_current_state_cursor", testLedger-2) + s.Require().NoError(err) + + processor2 := &integrationTestProcessor{id: "testproto2", ingestStore: models.IngestStore, processedLedger: testLedger} + + svc2, err := services.NewIngestService(services.IngestServiceConfig{ + IngestionMode: services.IngestionModeLive, + Models: models, + LatestLedgerCursorName: "test_cursor", + OldestLedgerCursorName: "test_cursor", + AppTracker: &apptracker.MockAppTracker{}, + RPCService: s.testEnv.RPCService, + LedgerBackend: &services.LedgerBackendMock{}, + ChannelAccountStore: &store.ChannelAccountStoreMock{}, + TokenIngestionService: mockTokenIngestionService, + MetricsService: mockMetrics, + Network: "Test SDF Network ; September 2015", + NetworkPassphrase: "Test SDF Network ; September 2015", + Archive: &services.HistoryArchiveMock{}, + ProtocolProcessors: []services.ProtocolProcessor{processor2}, + }) + s.Require().NoError(err) + + _, _, err = svc2.PersistLedgerData(ctx, testLedger, buffer, "test_cursor") + s.Require().NoError(err) + + hist2, err := models.IngestStore.Get(ctx, "protocol_testproto2_history_cursor") + s.Require().NoError(err) + s.Assert().Equal(testLedger-2, hist2, "testproto2 history cursor should stay behind") + + cs2, err := models.IngestStore.Get(ctx, "protocol_testproto2_current_state_cursor") + s.Require().NoError(err) + s.Assert().Equal(testLedger-2, cs2, "testproto2 current state cursor should stay behind") + + histSentinel2, err := models.IngestStore.Get(ctx, "test_testproto2_history_written") + s.Require().NoError(err) + s.Assert().Equal(uint32(0), histSentinel2, "no sentinels for behind protocol") + + csSentinel2, err := models.IngestStore.Get(ctx, "test_testproto2_current_state_written") + s.Require().NoError(err) + s.Assert().Equal(uint32(0), csSentinel2, "no sentinels for behind protocol") +} + +func TestProtocolStateProductionTestSuiteStandalone(t *testing.T) { + t.Skip("Run via TestIntegrationTests") +} From 510d6da3024d7182fb8f00bce554eafcdfc962c0 Mon Sep 17 00:00:00 2001 From: Aristides Staffieri Date: Tue, 17 Mar 2026 16:09:20 -0600 Subject: [PATCH 03/12] test: consolidate data migration integration coverage Combine protocol setup and protocol state tests into a shared DataMigrationTestSuite. Use real SEP41 setup classification plus manual cursor seeding to verify live ingestion produces protocol history/current state only when the protocol cursors are ready, and stays inert when they are absent. --- internal/integrationtests/main_test.go | 5 - .../protocol_state_production_test.go | 261 ------------------ 2 files changed, 266 deletions(-) delete mode 100644 internal/integrationtests/protocol_state_production_test.go diff --git a/internal/integrationtests/main_test.go b/internal/integrationtests/main_test.go index 3620de85..c1119e79 100644 --- a/internal/integrationtests/main_test.go +++ b/internal/integrationtests/main_test.go @@ -66,11 +66,6 @@ func TestIntegrationTests(t *testing.T) { suite.Run(t, &DataMigrationTestSuite{testEnv: testEnv}) }) - // Protocol state production tests — dual CAS gating during live ingestion - t.Run("ProtocolStateProductionTestSuite", func(t *testing.T) { - suite.Run(t, &ProtocolStateProductionTestSuite{testEnv: testEnv}) - }) - t.Run("BuildAndSubmitTransactionsTestSuite", func(t *testing.T) { suite.Run(t, &BuildAndSubmitTransactionsTestSuite{ testEnv: testEnv, diff --git a/internal/integrationtests/protocol_state_production_test.go b/internal/integrationtests/protocol_state_production_test.go deleted file mode 100644 index fe9ac1ab..00000000 --- a/internal/integrationtests/protocol_state_production_test.go +++ /dev/null @@ -1,261 +0,0 @@ -package integrationtests - -import ( - "context" - "database/sql" - "fmt" - "testing" - - "github.com/jackc/pgx/v5" - _ "github.com/lib/pq" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/suite" - - "github.com/stellar/wallet-backend/internal/apptracker" - "github.com/stellar/wallet-backend/internal/data" - "github.com/stellar/wallet-backend/internal/db" - "github.com/stellar/wallet-backend/internal/indexer" - "github.com/stellar/wallet-backend/internal/integrationtests/infrastructure" - "github.com/stellar/wallet-backend/internal/metrics" - "github.com/stellar/wallet-backend/internal/services" - "github.com/stellar/wallet-backend/internal/signing/store" -) - -// --- ProtocolStateProductionTestSuite (requires Docker DB + live ingest) --- - -type ProtocolStateProductionTestSuite struct { - suite.Suite - testEnv *infrastructure.TestEnvironment -} - -func (s *ProtocolStateProductionTestSuite) setupDB() (db.ConnectionPool, func()) { - ctx := context.Background() - dbURL, err := s.testEnv.Containers.GetWalletDBConnectionString(ctx) - s.Require().NoError(err) - pool, err := db.OpenDBConnectionPool(dbURL) - s.Require().NoError(err) - return pool, func() { pool.Close() } -} - -func (s *ProtocolStateProductionTestSuite) setupModels(pool db.ConnectionPool) *data.Models { - mockMetrics := metrics.NewMockMetricsService() - mockMetrics.On("ObserveDBQueryDuration", mock.Anything, mock.Anything, mock.Anything).Return() - mockMetrics.On("ObserveDBBatchSize", mock.Anything, mock.Anything, mock.Anything).Return() - mockMetrics.On("IncDBQuery", mock.Anything, mock.Anything).Return() - mockMetrics.On("IncDBQueryError", mock.Anything, mock.Anything, mock.Anything).Return() - mockMetrics.On("RegisterPoolMetrics", mock.Anything, mock.Anything).Return() - models, err := data.NewModels(pool, mockMetrics) - s.Require().NoError(err) - return models -} - -func (s *ProtocolStateProductionTestSuite) cleanupTestKeys(ctx context.Context, dbURL string) { - sqlDB, err := sql.Open("postgres", dbURL) - s.Require().NoError(err) - defer sqlDB.Close() - - _, err = sqlDB.ExecContext(ctx, `DELETE FROM ingest_store WHERE key LIKE 'test_%' OR key LIKE 'protocol_testproto%'`) - s.Require().NoError(err) -} - -// integrationTestProcessor implements services.ProtocolProcessor using the real -// IngestStoreModel to write sentinel keys within the DB transaction. -type integrationTestProcessor struct { - id string - processedLedger uint32 - ingestStore *data.IngestStoreModel -} - -func (p *integrationTestProcessor) ProtocolID() string { return p.id } - -func (p *integrationTestProcessor) ProcessLedger(_ context.Context, input services.ProtocolProcessorInput) error { - p.processedLedger = input.LedgerSequence - return nil -} - -func (p *integrationTestProcessor) PersistHistory(ctx context.Context, dbTx pgx.Tx) error { - return p.ingestStore.Update(ctx, dbTx, fmt.Sprintf("test_%s_history_written", p.id), p.processedLedger) -} - -func (p *integrationTestProcessor) PersistCurrentState(ctx context.Context, dbTx pgx.Tx) error { - return p.ingestStore.Update(ctx, dbTx, fmt.Sprintf("test_%s_current_state_written", p.id), p.processedLedger) -} - -// TestDualCASGatingDuringLiveIngestion proves CAS gating works against the -// Docker DB that has been populated by the real ingest container. -func (s *ProtocolStateProductionTestSuite) TestDualCASGatingDuringLiveIngestion() { - ctx := context.Background() - pool, cleanup := s.setupDB() - defer cleanup() - models := s.setupModels(pool) - - dbURL, err := s.testEnv.Containers.GetWalletDBConnectionString(ctx) - s.Require().NoError(err) - defer s.cleanupTestKeys(ctx, dbURL) - - // Read current latest_ingest_ledger to know where the live container is - latestLedger, err := models.IngestStore.Get(ctx, "latest_ingest_ledger") - s.Require().NoError(err) - s.Require().Greater(latestLedger, uint32(0), "live ingest should have populated latest_ingest_ledger") - s.T().Logf("Live ingest container is at ledger %d", latestLedger) - - // Pick a test ledger well beyond the live ingest tip to avoid collision - testLedger := latestLedger + 1000 - - // Insert protocol cursors at testLedger-1 (ready for CAS win) - sqlDB, err := sql.Open("postgres", dbURL) - s.Require().NoError(err) - defer sqlDB.Close() - - _, err = sqlDB.ExecContext(ctx, - `INSERT INTO ingest_store (key, value) VALUES ($1, $2)`, - "protocol_testproto_history_cursor", testLedger-1) - s.Require().NoError(err) - _, err = sqlDB.ExecContext(ctx, - `INSERT INTO ingest_store (key, value) VALUES ($1, $2)`, - "protocol_testproto_current_state_cursor", testLedger-1) - s.Require().NoError(err) - - // Insert a test-specific main cursor (avoid interfering with real ingest) - _, err = sqlDB.ExecContext(ctx, - `INSERT INTO ingest_store (key, value) VALUES ($1, $2)`, - "test_cursor", testLedger-1) - s.Require().NoError(err) - - processor := &integrationTestProcessor{id: "testproto", ingestStore: models.IngestStore, processedLedger: testLedger} - - mockTokenIngestionService := services.NewTokenIngestionServiceMock(s.T()) - mockTokenIngestionService.On("ProcessTokenChanges", - mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, - ).Return(nil).Maybe() - - mockMetrics := metrics.NewMockMetricsService() - mockMetrics.On("RegisterPoolMetrics", mock.Anything, mock.Anything).Return() - mockMetrics.On("ObserveDBQueryDuration", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() - mockMetrics.On("IncDBQuery", mock.Anything, mock.Anything).Return().Maybe() - mockMetrics.On("IncDBQueryError", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() - - svc, err := services.NewIngestService(services.IngestServiceConfig{ - IngestionMode: services.IngestionModeLive, - Models: models, - LatestLedgerCursorName: "test_cursor", - OldestLedgerCursorName: "test_cursor", - AppTracker: &apptracker.MockAppTracker{}, - RPCService: s.testEnv.RPCService, - LedgerBackend: &services.LedgerBackendMock{}, - ChannelAccountStore: &store.ChannelAccountStoreMock{}, - TokenIngestionService: mockTokenIngestionService, - MetricsService: mockMetrics, - Network: "Test SDF Network ; September 2015", - NetworkPassphrase: "Test SDF Network ; September 2015", - Archive: &services.HistoryArchiveMock{}, - ProtocolProcessors: []services.ProtocolProcessor{processor}, - }) - s.Require().NoError(err) - - buffer := indexer.NewIndexerBuffer() - - // Phase 1: CAS Win — cursors at testLedger-1, persisting testLedger - s.T().Log("Phase 1: CAS win") - _, _, err = svc.PersistLedgerData(ctx, testLedger, buffer, "test_cursor") - s.Require().NoError(err) - - histCursor, err := models.IngestStore.Get(ctx, "protocol_testproto_history_cursor") - s.Require().NoError(err) - s.Assert().Equal(testLedger, histCursor, "history cursor should advance to testLedger") - - csCursor, err := models.IngestStore.Get(ctx, "protocol_testproto_current_state_cursor") - s.Require().NoError(err) - s.Assert().Equal(testLedger, csCursor, "current state cursor should advance to testLedger") - - histSentinel, err := models.IngestStore.Get(ctx, "test_testproto_history_written") - s.Require().NoError(err) - s.Assert().Equal(testLedger, histSentinel, "history sentinel should be testLedger") - - csSentinel, err := models.IngestStore.Get(ctx, "test_testproto_current_state_written") - s.Require().NoError(err) - s.Assert().Equal(testLedger, csSentinel, "current state sentinel should be testLedger") - - // Phase 2: CAS Lose — same ledger again, CAS expects testLedger-1 but finds testLedger - s.T().Log("Phase 2: CAS lose (same ledger again)") - _, _, err = svc.PersistLedgerData(ctx, testLedger, buffer, "test_cursor") - s.Require().NoError(err) - - // Cursors should still be at testLedger - histCursor, err = models.IngestStore.Get(ctx, "protocol_testproto_history_cursor") - s.Require().NoError(err) - s.Assert().Equal(testLedger, histCursor, "history cursor should remain at testLedger after CAS lose") - - csCursor, err = models.IngestStore.Get(ctx, "protocol_testproto_current_state_cursor") - s.Require().NoError(err) - s.Assert().Equal(testLedger, csCursor, "current state cursor should remain at testLedger after CAS lose") - - // Delete sentinels, re-run, and verify they are NOT re-written - _, err = sqlDB.ExecContext(ctx, `DELETE FROM ingest_store WHERE key LIKE 'test_testproto_%_written'`) - s.Require().NoError(err) - - _, _, err = svc.PersistLedgerData(ctx, testLedger, buffer, "test_cursor") - s.Require().NoError(err) - - histSentinel, err = models.IngestStore.Get(ctx, "test_testproto_history_written") - s.Require().NoError(err) - s.Assert().Equal(uint32(0), histSentinel, "sentinels should NOT be re-written after CAS lose") - - csSentinel, err = models.IngestStore.Get(ctx, "test_testproto_current_state_written") - s.Require().NoError(err) - s.Assert().Equal(uint32(0), csSentinel, "sentinels should NOT be re-written after CAS lose") - - // Phase 3: Cursor behind — second protocol at testLedger-2 - s.T().Log("Phase 3: Cursor behind (second protocol)") - _, err = sqlDB.ExecContext(ctx, - `INSERT INTO ingest_store (key, value) VALUES ($1, $2)`, - "protocol_testproto2_history_cursor", testLedger-2) - s.Require().NoError(err) - _, err = sqlDB.ExecContext(ctx, - `INSERT INTO ingest_store (key, value) VALUES ($1, $2)`, - "protocol_testproto2_current_state_cursor", testLedger-2) - s.Require().NoError(err) - - processor2 := &integrationTestProcessor{id: "testproto2", ingestStore: models.IngestStore, processedLedger: testLedger} - - svc2, err := services.NewIngestService(services.IngestServiceConfig{ - IngestionMode: services.IngestionModeLive, - Models: models, - LatestLedgerCursorName: "test_cursor", - OldestLedgerCursorName: "test_cursor", - AppTracker: &apptracker.MockAppTracker{}, - RPCService: s.testEnv.RPCService, - LedgerBackend: &services.LedgerBackendMock{}, - ChannelAccountStore: &store.ChannelAccountStoreMock{}, - TokenIngestionService: mockTokenIngestionService, - MetricsService: mockMetrics, - Network: "Test SDF Network ; September 2015", - NetworkPassphrase: "Test SDF Network ; September 2015", - Archive: &services.HistoryArchiveMock{}, - ProtocolProcessors: []services.ProtocolProcessor{processor2}, - }) - s.Require().NoError(err) - - _, _, err = svc2.PersistLedgerData(ctx, testLedger, buffer, "test_cursor") - s.Require().NoError(err) - - hist2, err := models.IngestStore.Get(ctx, "protocol_testproto2_history_cursor") - s.Require().NoError(err) - s.Assert().Equal(testLedger-2, hist2, "testproto2 history cursor should stay behind") - - cs2, err := models.IngestStore.Get(ctx, "protocol_testproto2_current_state_cursor") - s.Require().NoError(err) - s.Assert().Equal(testLedger-2, cs2, "testproto2 current state cursor should stay behind") - - histSentinel2, err := models.IngestStore.Get(ctx, "test_testproto2_history_written") - s.Require().NoError(err) - s.Assert().Equal(uint32(0), histSentinel2, "no sentinels for behind protocol") - - csSentinel2, err := models.IngestStore.Get(ctx, "test_testproto2_current_state_written") - s.Require().NoError(err) - s.Assert().Equal(uint32(0), csSentinel2, "no sentinels for behind protocol") -} - -func TestProtocolStateProductionTestSuiteStandalone(t *testing.T) { - t.Skip("Run via TestIntegrationTests") -} From 2363a4889c3b0d1ab1ec03a3cdc7d73a64039a55 Mon Sep 17 00:00:00 2001 From: Aristides Staffieri Date: Wed, 18 Mar 2026 14:48:28 -0600 Subject: [PATCH 04/12] Fix query storm from partial protocol contract cache refresh failure MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When any GetByProtocolID call fails during cache refresh, lastRefreshLedger was never updated, causing the staleness check to trigger on every ledger instead of every 100th — a 100x query amplification. Make the ledger update unconditional since the cache already preserves previous entries on partial failure, so data integrity is not at risk. Add warn-level logging to distinguish partial from full refreshes. --- .../resolvers/statechange.resolvers.go | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/internal/serve/graphql/resolvers/statechange.resolvers.go b/internal/serve/graphql/resolvers/statechange.resolvers.go index 895f0070..b9a4eded 100644 --- a/internal/serve/graphql/resolvers/statechange.resolvers.go +++ b/internal/serve/graphql/resolvers/statechange.resolvers.go @@ -431,12 +431,14 @@ func (r *Resolver) TrustlineChange() graphql1.TrustlineChangeResolver { return &trustlineChangeResolver{r} } -type accountChangeResolver struct{ *Resolver } -type balanceAuthorizationChangeResolver struct{ *Resolver } -type flagsChangeResolver struct{ *Resolver } -type metadataChangeResolver struct{ *Resolver } -type reservesChangeResolver struct{ *Resolver } -type signerChangeResolver struct{ *Resolver } -type signerThresholdsChangeResolver struct{ *Resolver } -type standardBalanceChangeResolver struct{ *Resolver } -type trustlineChangeResolver struct{ *Resolver } +type ( + accountChangeResolver struct{ *Resolver } + balanceAuthorizationChangeResolver struct{ *Resolver } + flagsChangeResolver struct{ *Resolver } + metadataChangeResolver struct{ *Resolver } + reservesChangeResolver struct{ *Resolver } + signerChangeResolver struct{ *Resolver } + signerThresholdsChangeResolver struct{ *Resolver } + standardBalanceChangeResolver struct{ *Resolver } + trustlineChangeResolver struct{ *Resolver } +) From ce94c62c491512657445a99a3e8c6eb4a9fe87be Mon Sep 17 00:00:00 2001 From: Aristides Staffieri Date: Mon, 30 Mar 2026 16:48:26 -0600 Subject: [PATCH 05/12] Add write-through current state cache for protocol processors Enable efficient additive state computation during live ingestion by caching protocol current state in processor memory. On the first successful CAS advance of the current_state_cursor (migration handoff), LoadCurrentState reads state from DB into the processor. Subsequent ledgers use the in-memory state maintained by PersistCurrentState, avoiding per-ledger DB reads. Cache-loaded flags reset on transaction rollback to force a DB reload on retry. --- .../integrationtests/data_migration_test.go | 4 + internal/services/ingest.go | 56 ++++++++------ internal/services/ingest_live.go | 24 ++++++ internal/services/ingest_test.go | 76 ++++++++++++++++++- internal/services/mocks.go | 5 ++ internal/services/protocol_migrate_test.go | 4 + internal/services/protocol_processor.go | 6 ++ 7 files changed, 148 insertions(+), 27 deletions(-) diff --git a/internal/integrationtests/data_migration_test.go b/internal/integrationtests/data_migration_test.go index 5154c6ec..ca0489ac 100644 --- a/internal/integrationtests/data_migration_test.go +++ b/internal/integrationtests/data_migration_test.go @@ -239,6 +239,10 @@ func (p *integrationTestProcessor) PersistCurrentState(ctx context.Context, dbTx return p.ingestStore.Update(ctx, dbTx, fmt.Sprintf("test_%s_current_state_written", p.id), p.processedLedger) } +func (p *integrationTestProcessor) LoadCurrentState(_ context.Context, _ pgx.Tx) error { + return nil +} + func (s *DataMigrationTestSuite) mustLedgerCloseMeta() xdr.LedgerCloseMeta { var ledgerMeta xdr.LedgerCloseMeta err := xdr.SafeUnmarshalBase64(protocolStateProductionLedgerMetaWith0Tx, &ledgerMeta) diff --git a/internal/services/ingest.go b/internal/services/ingest.go index 4e202aec..7730bc53 100644 --- a/internal/services/ingest.go +++ b/internal/services/ingest.go @@ -129,6 +129,13 @@ type ingestService struct { // to PersistLedgerData, scoping the CAS loop to only processors that had // ProcessLedger called. Only accessed from the single-threaded live ingestion loop. eligibleProtocolProcessors map[string]ProtocolProcessor + // protocolCurrentStateLoaded tracks which protocols have had their current + // state loaded into processor memory via LoadCurrentState. On the first + // successful CAS advance of the current_state_cursor (handoff from migration), + // LoadCurrentState reads state from DB; subsequent ledgers use the in-memory + // state maintained by PersistCurrentState. Reset on transaction failure to + // force a reload. Only accessed from the single-threaded live ingestion loop. + protocolCurrentStateLoaded map[string]bool } // SetEligibleProtocolProcessorsForTest sets the eligible protocol processors for testing. @@ -172,30 +179,31 @@ func NewIngestService(cfg IngestServiceConfig) (*ingestService, error) { } return &ingestService{ - ingestionMode: cfg.IngestionMode, - models: cfg.Models, - latestLedgerCursorName: cfg.LatestLedgerCursorName, - oldestLedgerCursorName: cfg.OldestLedgerCursorName, - advisoryLockID: generateAdvisoryLockID(cfg.Network), - appTracker: cfg.AppTracker, - rpcService: cfg.RPCService, - ledgerBackend: cfg.LedgerBackend, - ledgerBackendFactory: cfg.LedgerBackendFactory, - chAccStore: cfg.ChannelAccountStore, - tokenIngestionService: cfg.TokenIngestionService, - checkpointService: cfg.CheckpointService, - metricsService: cfg.MetricsService, - networkPassphrase: cfg.NetworkPassphrase, - getLedgersLimit: cfg.GetLedgersLimit, - ledgerIndexer: indexer.NewIndexer(cfg.NetworkPassphrase, ledgerIndexerPool, cfg.MetricsService, cfg.SkipTxMeta, cfg.SkipTxEnvelope), - archive: cfg.Archive, - backfillPool: backfillPool, - backfillBatchSize: uint32(cfg.BackfillBatchSize), - backfillDBInsertBatchSize: uint32(cfg.BackfillDBInsertBatchSize), - catchupThreshold: uint32(cfg.CatchupThreshold), - knownContractIDs: set.NewSet[string](), - protocolProcessors: ppMap, - protocolContractCache: ppCache, + ingestionMode: cfg.IngestionMode, + models: cfg.Models, + latestLedgerCursorName: cfg.LatestLedgerCursorName, + oldestLedgerCursorName: cfg.OldestLedgerCursorName, + advisoryLockID: generateAdvisoryLockID(cfg.Network), + appTracker: cfg.AppTracker, + rpcService: cfg.RPCService, + ledgerBackend: cfg.LedgerBackend, + ledgerBackendFactory: cfg.LedgerBackendFactory, + chAccStore: cfg.ChannelAccountStore, + tokenIngestionService: cfg.TokenIngestionService, + checkpointService: cfg.CheckpointService, + metricsService: cfg.MetricsService, + networkPassphrase: cfg.NetworkPassphrase, + getLedgersLimit: cfg.GetLedgersLimit, + ledgerIndexer: indexer.NewIndexer(cfg.NetworkPassphrase, ledgerIndexerPool, cfg.MetricsService, cfg.SkipTxMeta, cfg.SkipTxEnvelope), + archive: cfg.Archive, + backfillPool: backfillPool, + backfillBatchSize: uint32(cfg.BackfillBatchSize), + backfillDBInsertBatchSize: uint32(cfg.BackfillDBInsertBatchSize), + catchupThreshold: uint32(cfg.CatchupThreshold), + knownContractIDs: set.NewSet[string](), + protocolProcessors: ppMap, + protocolContractCache: ppCache, + protocolCurrentStateLoaded: make(map[string]bool), }, nil } diff --git a/internal/services/ingest_live.go b/internal/services/ingest_live.go index 61b5326d..7e6caa0d 100644 --- a/internal/services/ingest_live.go +++ b/internal/services/ingest_live.go @@ -82,8 +82,13 @@ func (m *ingestService) protocolProcessorsEligibleForProduction(ctx context.Cont // token changes, and cursor update. Channel unlock is a no-op when chAccStore is nil. func (m *ingestService) PersistLedgerData(ctx context.Context, ledgerSeq uint32, buffer *indexer.IndexerBuffer, cursorName string) (int, int, error) { var numTxs, numOps int + // Track protocols that persisted current state in this transaction attempt + // so we can reset their cache-loaded flag on rollback. + var currentStatePersistedProtocols []string err := db.RunInPgxTransaction(ctx, m.models.DB, func(dbTx pgx.Tx) error { + currentStatePersistedProtocols = currentStatePersistedProtocols[:0] + // 1. Insert unique trustline assets (FK prerequisite for trustline balances) uniqueAssets := buffer.GetUniqueTrustlineAssets() if len(uniqueAssets) > 0 { @@ -179,12 +184,25 @@ func (m *ingestService) PersistLedgerData(ctx context.Context, ledgerSeq uint32, return fmt.Errorf("CAS current state cursor for %s: %w", protocolID, casErr) } if swapped { + // On first CAS success (handoff from migration), load current state + // from DB into processor memory. Subsequent ledgers use the in-memory + // state maintained by PersistCurrentState (write-through cache). + if !m.protocolCurrentStateLoaded[protocolID] { + loadStart := time.Now() + if loadErr := processor.LoadCurrentState(ctx, dbTx); loadErr != nil { + return fmt.Errorf("loading current state for %s at ledger %d: %w", protocolID, ledgerSeq, loadErr) + } + m.metricsService.ObserveProtocolStateProcessingDuration(protocolID, "load_current_state", time.Since(loadStart).Seconds()) + m.protocolCurrentStateLoaded[protocolID] = true + } + start := time.Now() persistErr := processor.PersistCurrentState(ctx, dbTx) m.metricsService.ObserveProtocolStateProcessingDuration(protocolID, "persist_current_state", time.Since(start).Seconds()) if persistErr != nil { return fmt.Errorf("persisting current state for %s at ledger %d: %w", protocolID, ledgerSeq, persistErr) } + currentStatePersistedProtocols = append(currentStatePersistedProtocols, protocolID) } } } @@ -197,6 +215,12 @@ func (m *ingestService) PersistLedgerData(ctx context.Context, ledgerSeq uint32, return nil }) if err != nil { + // Transaction rolled back — processor in-memory state may be stale from + // the failed PersistCurrentState call. Reset loaded flags to force a + // DB reload on the next successful CAS attempt. + for _, pid := range currentStatePersistedProtocols { + m.protocolCurrentStateLoaded[pid] = false + } return 0, 0, fmt.Errorf("persisting ledger data for ledger %d: %w", ledgerSeq, err) } diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index cad9a6a9..ac7d7cee 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -2701,9 +2701,10 @@ func Test_ingestService_processBackfillBatchesParallel_BothModes(t *testing.T) { // values into ingest_store within the DB transaction, proving PersistHistory // and PersistCurrentState were called and committed atomically. type testProtocolProcessor struct { - id string - processedLedger uint32 - ingestStore *data.IngestStoreModel + id string + processedLedger uint32 + ingestStore *data.IngestStoreModel + loadCurrentStateCalls int } func (p *testProtocolProcessor) ProtocolID() string { return p.id } @@ -2721,6 +2722,11 @@ func (p *testProtocolProcessor) PersistCurrentState(ctx context.Context, dbTx pg return p.ingestStore.Update(ctx, dbTx, fmt.Sprintf("test_%s_current_state_written", p.id), p.processedLedger) } +func (p *testProtocolProcessor) LoadCurrentState(_ context.Context, _ pgx.Tx) error { + p.loadCurrentStateCalls++ + return nil +} + // setupProtocolCursors inserts protocol cursors into ingest_store. // Call AFTER setupDBCursors (which wipes the table). func setupProtocolCursors(t *testing.T, ctx context.Context, pool db.ConnectionPool, protocolID string, historyCursor, currentStateCursor uint32) { @@ -2934,6 +2940,70 @@ func Test_PersistLedgerData_ProtocolCASGating(t *testing.T) { require.NoError(t, err) assert.Equal(t, uint32(100), mainCursor) }) + + t.Run("F: first CAS success calls LoadCurrentState", func(t *testing.T) { + processor := &testProtocolProcessor{id: "testproto"} + ctx, svc, models, pool := setupTest(t, []ProtocolProcessor{processor}) + processor.ingestStore = models.IngestStore + processor.processedLedger = 100 + svc.eligibleProtocolProcessors = map[string]ProtocolProcessor{"testproto": processor} + + setupDBCursors(t, ctx, pool, 99, 99) + setupProtocolCursors(t, ctx, pool, "testproto", 99, 99) + + buffer := indexer.NewIndexerBuffer() + _, _, err := svc.PersistLedgerData(ctx, 100, buffer, "latest_ledger_cursor") + require.NoError(t, err) + + // LoadCurrentState should be called exactly once on the handoff ledger + assert.Equal(t, 1, processor.loadCurrentStateCalls) + // protocolCurrentStateLoaded should be set + assert.True(t, svc.protocolCurrentStateLoaded["testproto"]) + }) + + t.Run("G: subsequent CAS success skips LoadCurrentState", func(t *testing.T) { + processor := &testProtocolProcessor{id: "testproto"} + ctx, svc, models, pool := setupTest(t, []ProtocolProcessor{processor}) + processor.ingestStore = models.IngestStore + svc.eligibleProtocolProcessors = map[string]ProtocolProcessor{"testproto": processor} + + setupDBCursors(t, ctx, pool, 99, 99) + setupProtocolCursors(t, ctx, pool, "testproto", 99, 99) + + // First ledger — triggers LoadCurrentState + processor.processedLedger = 100 + buffer := indexer.NewIndexerBuffer() + _, _, err := svc.PersistLedgerData(ctx, 100, buffer, "latest_ledger_cursor") + require.NoError(t, err) + assert.Equal(t, 1, processor.loadCurrentStateCalls) + + // Second ledger — should NOT call LoadCurrentState again + processor.processedLedger = 101 + buffer = indexer.NewIndexerBuffer() + _, _, err = svc.PersistLedgerData(ctx, 101, buffer, "latest_ledger_cursor") + require.NoError(t, err) + assert.Equal(t, 1, processor.loadCurrentStateCalls) // still 1, not 2 + }) + + t.Run("H: CAS lose does not trigger LoadCurrentState", func(t *testing.T) { + processor := &testProtocolProcessor{id: "testproto"} + ctx, svc, models, pool := setupTest(t, []ProtocolProcessor{processor}) + processor.ingestStore = models.IngestStore + processor.processedLedger = 100 + svc.eligibleProtocolProcessors = map[string]ProtocolProcessor{"testproto": processor} + + setupDBCursors(t, ctx, pool, 99, 99) + // Current state cursor already at 100 — CAS will fail + setupProtocolCursors(t, ctx, pool, "testproto", 99, 100) + + buffer := indexer.NewIndexerBuffer() + _, _, err := svc.PersistLedgerData(ctx, 100, buffer, "latest_ledger_cursor") + require.NoError(t, err) + + // LoadCurrentState should NOT be called since current state CAS failed + assert.Equal(t, 0, processor.loadCurrentStateCalls) + assert.False(t, svc.protocolCurrentStateLoaded["testproto"]) + }) } func Test_protocolStateCursorReady(t *testing.T) { diff --git a/internal/services/mocks.go b/internal/services/mocks.go index 12f208a9..16a70184 100644 --- a/internal/services/mocks.go +++ b/internal/services/mocks.go @@ -284,6 +284,11 @@ func (m *ProtocolProcessorMock) PersistCurrentState(ctx context.Context, dbTx pg return args.Error(0) } +func (m *ProtocolProcessorMock) LoadCurrentState(ctx context.Context, dbTx pgx.Tx) error { + args := m.Called(ctx, dbTx) + return args.Error(0) +} + // NewProtocolProcessorMock creates a new instance of ProtocolProcessorMock. func NewProtocolProcessorMock(t interface { mock.TestingT diff --git a/internal/services/protocol_migrate_test.go b/internal/services/protocol_migrate_test.go index 950625f9..b5d78c54 100644 --- a/internal/services/protocol_migrate_test.go +++ b/internal/services/protocol_migrate_test.go @@ -204,6 +204,10 @@ func (p *testRecordingProcessor) PersistCurrentState(ctx context.Context, dbTx p return p.ingestStore.Update(ctx, dbTx, fmt.Sprintf("test_%s_current_state_%d", p.id, p.lastProcessed), p.lastProcessed) } +func (p *testRecordingProcessor) LoadCurrentState(_ context.Context, _ pgx.Tx) error { + return nil +} + // testCursorAdvancingProcessor embeds testRecordingProcessor and simulates // live ingestion taking over by advancing its own cursor in the DB during // ProcessLedger at a specific sequence, causing the subsequent CAS to fail. diff --git a/internal/services/protocol_processor.go b/internal/services/protocol_processor.go index 3eaa057b..53822638 100644 --- a/internal/services/protocol_processor.go +++ b/internal/services/protocol_processor.go @@ -15,6 +15,12 @@ type ProtocolProcessor interface { ProcessLedger(ctx context.Context, input ProtocolProcessorInput) error PersistHistory(ctx context.Context, dbTx pgx.Tx) error PersistCurrentState(ctx context.Context, dbTx pgx.Tx) error + // LoadCurrentState reads the protocol's current state from its state tables + // into processor memory. Called once per protocol inside the DB transaction + // on the first successful CAS advance of the current_state_cursor (the + // handoff moment from migration to live ingestion). Subsequent ledgers use + // the in-memory state maintained by PersistCurrentState. + LoadCurrentState(ctx context.Context, dbTx pgx.Tx) error } // ProtocolProcessorInput contains the data needed by a processor to analyze a ledger. From c139d76876c036f07b0715c3cc4fcd6e419a51d7 Mon Sep 17 00:00:00 2001 From: Aristides Staffieri Date: Tue, 31 Mar 2026 09:51:44 -0600 Subject: [PATCH 06/12] adds ledger backend factory in order to reset the recreate ledegr backend on outer loop iterations --- cmd/protocol_migrate.go | 24 ++++++++++++------- internal/services/protocol_migrate.go | 21 ++++++++++++++++ .../protocol_migrate_current_state.go | 2 ++ internal/services/protocol_migrate_history.go | 2 ++ 4 files changed, 41 insertions(+), 8 deletions(-) diff --git a/cmd/protocol_migrate.go b/cmd/protocol_migrate.go index 8cd22814..ed31391a 100644 --- a/cmd/protocol_migrate.go +++ b/cmd/protocol_migrate.go @@ -123,6 +123,7 @@ func runMigration( ctx context.Context, dbPool db.ConnectionPool, ledgerBackend ledgerbackend.LedgerBackend, + ledgerBackendFactory func() ledgerbackend.LedgerBackend, models *data.Models, processors []services.ProtocolProcessor, ) error, @@ -161,18 +162,23 @@ func runMigration( return fmt.Errorf("creating models: %w", err) } - // Create ledger backend - ledgerBackend := ledgerbackend.NewRPCLedgerBackend(ledgerbackend.RPCLedgerBackendOptions{ - RPCServerURL: opts.rpcURL, - BufferSize: 10, - }) + // Create ledger backend factory for re-creating backends between range preparations. + // RPCLedgerBackend does not support calling PrepareRange more than once. + newBackend := func() ledgerbackend.LedgerBackend { + return ledgerbackend.NewRPCLedgerBackend(ledgerbackend.RPCLedgerBackendOptions{ + RPCServerURL: opts.rpcURL, + BufferSize: 10, + }) + } + + ledgerBackend := newBackend() defer func() { if closeErr := ledgerBackend.Close(); closeErr != nil { log.Ctx(ctx).Errorf("error closing ledger backend: %v", closeErr) } }() - return createAndRun(ctx, dbPool, ledgerBackend, models, processors) + return createAndRun(ctx, dbPool, ledgerBackend, newBackend, models, processors) } func (c *protocolMigrateCmd) historyCommand() *cobra.Command { @@ -187,10 +193,11 @@ func (c *protocolMigrateCmd) historyCommand() *cobra.Command { }, nil, func(opts *migrationCommandOpts) error { - return runMigration("history", opts, func(ctx context.Context, dbPool db.ConnectionPool, ledgerBackend ledgerbackend.LedgerBackend, models *data.Models, processors []services.ProtocolProcessor) error { + return runMigration("history", opts, func(ctx context.Context, dbPool db.ConnectionPool, ledgerBackend ledgerbackend.LedgerBackend, ledgerBackendFactory func() ledgerbackend.LedgerBackend, models *data.Models, processors []services.ProtocolProcessor) error { service, err := services.NewProtocolMigrateHistoryService(services.ProtocolMigrateHistoryConfig{ DB: dbPool, LedgerBackend: ledgerBackend, + LedgerBackendFactory: ledgerBackendFactory, ProtocolsModel: models.Protocols, ProtocolContractsModel: models.ProtocolContracts, IngestStore: models.IngestStore, @@ -228,10 +235,11 @@ func (c *protocolMigrateCmd) currentStateCommand() *cobra.Command { return nil }, func(opts *migrationCommandOpts) error { - return runMigration("current-state", opts, func(ctx context.Context, dbPool db.ConnectionPool, ledgerBackend ledgerbackend.LedgerBackend, models *data.Models, processors []services.ProtocolProcessor) error { + return runMigration("current-state", opts, func(ctx context.Context, dbPool db.ConnectionPool, ledgerBackend ledgerbackend.LedgerBackend, ledgerBackendFactory func() ledgerbackend.LedgerBackend, models *data.Models, processors []services.ProtocolProcessor) error { service, err := services.NewProtocolMigrateCurrentStateService(services.ProtocolMigrateCurrentStateConfig{ DB: dbPool, LedgerBackend: ledgerBackend, + LedgerBackendFactory: ledgerBackendFactory, ProtocolsModel: models.Protocols, ProtocolContractsModel: models.ProtocolContracts, IngestStore: models.IngestStore, diff --git a/internal/services/protocol_migrate.go b/internal/services/protocol_migrate.go index a3734c4c..b88aebb0 100644 --- a/internal/services/protocol_migrate.go +++ b/internal/services/protocol_migrate.go @@ -57,6 +57,7 @@ type migrationStrategy struct { type protocolMigrateEngine struct { db db.ConnectionPool ledgerBackend ledgerbackend.LedgerBackend + ledgerBackendFactory func() ledgerbackend.LedgerBackend protocolsModel data.ProtocolsModelInterface protocolContractsModel data.ProtocolContractsModelInterface ingestStore *data.IngestStoreModel @@ -66,6 +67,22 @@ type protocolMigrateEngine struct { strategy migrationStrategy } +// resetLedgerBackend closes the current backend and creates a fresh one. +// RPCLedgerBackend does not support calling PrepareRange more than once, +// so the backend must be recreated between range preparations. +// No-op when no factory is configured (e.g. in tests using mock backends). +func (s *protocolMigrateEngine) resetLedgerBackend(ctx context.Context) { + if s.ledgerBackendFactory == nil { + return + } + if s.ledgerBackend != nil { + if err := s.ledgerBackend.Close(); err != nil { + log.Ctx(ctx).Warnf("error closing ledger backend during reset: %v", err) + } + } + s.ledgerBackend = s.ledgerBackendFactory() +} + // Run performs migration for the given protocol IDs using the configured strategy. func (s *protocolMigrateEngine) Run(ctx context.Context, protocolIDs []string) error { // Phase 1: Validate @@ -260,6 +277,9 @@ func (s *protocolMigrateEngine) processAllProtocols(ctx context.Context, protoco log.Ctx(ctx).Infof("Processing ledgers %d to %d for %d protocol(s)", startLedger, latestLedger, len(protocolIDs)) + // RPCLedgerBackend does not support re-preparing; close and recreate. + s.resetLedgerBackend(ctx) + if prepErr := s.ledgerBackend.PrepareRange(ctx, ledgerbackend.BoundedRange(startLedger, latestLedger)); prepErr != nil { return handedOffProtocolIDs(trackers), fmt.Errorf("preparing ledger range [%d, %d]: %w", startLedger, latestLedger, prepErr) } @@ -364,6 +384,7 @@ func (s *protocolMigrateEngine) processAllProtocols(ctx context.Context, protoco } // At tip — poll briefly for convergence. + s.resetLedgerBackend(ctx) pollCtx, cancel := context.WithTimeout(ctx, convergencePollTimeout) prepErr := s.ledgerBackend.PrepareRange(pollCtx, ledgerbackend.UnboundedRange(latestLedger+1)) if prepErr != nil { diff --git a/internal/services/protocol_migrate_current_state.go b/internal/services/protocol_migrate_current_state.go index 9ad673ea..82a8da3b 100644 --- a/internal/services/protocol_migrate_current_state.go +++ b/internal/services/protocol_migrate_current_state.go @@ -27,6 +27,7 @@ type protocolMigrateCurrentStateService struct { type ProtocolMigrateCurrentStateConfig struct { DB db.ConnectionPool LedgerBackend ledgerbackend.LedgerBackend + LedgerBackendFactory func() ledgerbackend.LedgerBackend ProtocolsModel data.ProtocolsModelInterface ProtocolContractsModel data.ProtocolContractsModelInterface IngestStore *data.IngestStoreModel @@ -65,6 +66,7 @@ func NewProtocolMigrateCurrentStateService(cfg ProtocolMigrateCurrentStateConfig engine: protocolMigrateEngine{ db: cfg.DB, ledgerBackend: cfg.LedgerBackend, + ledgerBackendFactory: cfg.LedgerBackendFactory, protocolsModel: cfg.ProtocolsModel, protocolContractsModel: cfg.ProtocolContractsModel, ingestStore: cfg.IngestStore, diff --git a/internal/services/protocol_migrate_history.go b/internal/services/protocol_migrate_history.go index 378580e0..85c71e37 100644 --- a/internal/services/protocol_migrate_history.go +++ b/internal/services/protocol_migrate_history.go @@ -27,6 +27,7 @@ type protocolMigrateHistoryService struct { type ProtocolMigrateHistoryConfig struct { DB db.ConnectionPool LedgerBackend ledgerbackend.LedgerBackend + LedgerBackendFactory func() ledgerbackend.LedgerBackend ProtocolsModel data.ProtocolsModelInterface ProtocolContractsModel data.ProtocolContractsModelInterface IngestStore *data.IngestStoreModel @@ -65,6 +66,7 @@ func NewProtocolMigrateHistoryService(cfg ProtocolMigrateHistoryConfig) (*protoc engine: protocolMigrateEngine{ db: cfg.DB, ledgerBackend: cfg.LedgerBackend, + ledgerBackendFactory: cfg.LedgerBackendFactory, protocolsModel: cfg.ProtocolsModel, protocolContractsModel: cfg.ProtocolContractsModel, ingestStore: cfg.IngestStore, From d06c3d42361b67f57b39f9116fa9a281647c8cf6 Mon Sep 17 00:00:00 2001 From: Aristides Staffieri Date: Tue, 31 Mar 2026 09:53:51 -0600 Subject: [PATCH 07/12] Fix rollback reset gap in write-through cache for protocol processors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move currentStatePersistedProtocols append to immediately after setting protocolCurrentStateLoaded flag, so that any transaction failure after LoadCurrentState (including PersistCurrentState or cursor Update errors) resets the flag and forces a DB reload on retry. Previously, the append only ran after PersistCurrentState succeeded, leaving the flag stuck true on PersistCurrentState failure — causing retries to skip LoadCurrentState and write stale in-memory state. --- internal/services/ingest_live.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/services/ingest_live.go b/internal/services/ingest_live.go index 7e6caa0d..cea1c09b 100644 --- a/internal/services/ingest_live.go +++ b/internal/services/ingest_live.go @@ -194,6 +194,7 @@ func (m *ingestService) PersistLedgerData(ctx context.Context, ledgerSeq uint32, } m.metricsService.ObserveProtocolStateProcessingDuration(protocolID, "load_current_state", time.Since(loadStart).Seconds()) m.protocolCurrentStateLoaded[protocolID] = true + currentStatePersistedProtocols = append(currentStatePersistedProtocols, protocolID) } start := time.Now() @@ -202,7 +203,6 @@ func (m *ingestService) PersistLedgerData(ctx context.Context, ledgerSeq uint32, if persistErr != nil { return fmt.Errorf("persisting current state for %s at ledger %d: %w", protocolID, ledgerSeq, persistErr) } - currentStatePersistedProtocols = append(currentStatePersistedProtocols, protocolID) } } } @@ -215,9 +215,9 @@ func (m *ingestService) PersistLedgerData(ctx context.Context, ledgerSeq uint32, return nil }) if err != nil { - // Transaction rolled back — processor in-memory state may be stale from - // the failed PersistCurrentState call. Reset loaded flags to force a - // DB reload on the next successful CAS attempt. + // Transaction rolled back — processor in-memory state loaded inside the + // rolled-back transaction may not match the committed DB state. Reset + // loaded flags to force a DB reload on the next successful CAS attempt. for _, pid := range currentStatePersistedProtocols { m.protocolCurrentStateLoaded[pid] = false } From 989c345f9682a461402aba4974d1eb20bb33a1a3 Mon Sep 17 00:00:00 2001 From: Aristides Staffieri Date: Tue, 31 Mar 2026 10:21:54 -0600 Subject: [PATCH 08/12] fix(ingest): reset current-state cache after rollback Track every protocol that wins current-state CAS before calling PersistCurrentState so any later transaction failure clears protocolCurrentStateLoaded and forces a DB reload on retry. Add a regression test covering a post-handoff PersistCurrentState failure and successful retry reload. --- internal/services/ingest_live.go | 7 ++++- internal/services/ingest_test.go | 52 +++++++++++++++++++++++++++++--- 2 files changed, 54 insertions(+), 5 deletions(-) diff --git a/internal/services/ingest_live.go b/internal/services/ingest_live.go index cea1c09b..c8c563c8 100644 --- a/internal/services/ingest_live.go +++ b/internal/services/ingest_live.go @@ -194,9 +194,14 @@ func (m *ingestService) PersistLedgerData(ctx context.Context, ledgerSeq uint32, } m.metricsService.ObserveProtocolStateProcessingDuration(protocolID, "load_current_state", time.Since(loadStart).Seconds()) m.protocolCurrentStateLoaded[protocolID] = true - currentStatePersistedProtocols = append(currentStatePersistedProtocols, protocolID) } + // Any rollback after a successful current-state CAS can leave the + // processor's in-memory cache ahead of committed DB state, either + // because we just loaded it for handoff or because PersistCurrentState + // mutates the write-through cache before a later transactional failure. + currentStatePersistedProtocols = append(currentStatePersistedProtocols, protocolID) + start := time.Now() persistErr := processor.PersistCurrentState(ctx, dbTx) m.metricsService.ObserveProtocolStateProcessingDuration(protocolID, "persist_current_state", time.Since(start).Seconds()) diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index ac7d7cee..a5cb66fa 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -2701,10 +2701,13 @@ func Test_ingestService_processBackfillBatchesParallel_BothModes(t *testing.T) { // values into ingest_store within the DB transaction, proving PersistHistory // and PersistCurrentState were called and committed atomically. type testProtocolProcessor struct { - id string - processedLedger uint32 - ingestStore *data.IngestStoreModel - loadCurrentStateCalls int + id string + processedLedger uint32 + ingestStore *data.IngestStoreModel + loadCurrentStateCalls int + persistCurrentStateCalls int + failPersistCurrentStateAt uint32 + persistCurrentStateVersion int } func (p *testProtocolProcessor) ProtocolID() string { return p.id } @@ -2719,6 +2722,11 @@ func (p *testProtocolProcessor) PersistHistory(ctx context.Context, dbTx pgx.Tx) } func (p *testProtocolProcessor) PersistCurrentState(ctx context.Context, dbTx pgx.Tx) error { + p.persistCurrentStateCalls++ + p.persistCurrentStateVersion++ + if p.failPersistCurrentStateAt != 0 && p.processedLedger == p.failPersistCurrentStateAt { + return fmt.Errorf("simulated current state persist failure at ledger %d", p.processedLedger) + } return p.ingestStore.Update(ctx, dbTx, fmt.Sprintf("test_%s_current_state_written", p.id), p.processedLedger) } @@ -3004,6 +3012,42 @@ func Test_PersistLedgerData_ProtocolCASGating(t *testing.T) { assert.Equal(t, 0, processor.loadCurrentStateCalls) assert.False(t, svc.protocolCurrentStateLoaded["testproto"]) }) + + t.Run("I: rollback after later PersistCurrentState failure clears loaded flag for retry", func(t *testing.T) { + processor := &testProtocolProcessor{id: "testproto", failPersistCurrentStateAt: 101} + ctx, svc, models, pool := setupTest(t, []ProtocolProcessor{processor}) + processor.ingestStore = models.IngestStore + svc.eligibleProtocolProcessors = map[string]ProtocolProcessor{"testproto": processor} + + setupDBCursors(t, ctx, pool, 99, 99) + setupProtocolCursors(t, ctx, pool, "testproto", 99, 99) + + // First ledger succeeds and establishes the in-memory cache handoff. + processor.processedLedger = 100 + _, _, err := svc.PersistLedgerData(ctx, 100, indexer.NewIndexerBuffer(), "latest_ledger_cursor") + require.NoError(t, err) + assert.Equal(t, 1, processor.loadCurrentStateCalls) + assert.True(t, svc.protocolCurrentStateLoaded["testproto"]) + + // Later ledger mutates in-memory state during PersistCurrentState, then fails. + processor.processedLedger = 101 + _, _, err = svc.PersistLedgerData(ctx, 101, indexer.NewIndexerBuffer(), "latest_ledger_cursor") + require.Error(t, err) + assert.Equal(t, 2, processor.persistCurrentStateVersion) + assert.False(t, svc.protocolCurrentStateLoaded["testproto"]) + + currentStateCursor, err := models.IngestStore.Get(ctx, "protocol_testproto_current_state_cursor") + require.NoError(t, err) + assert.Equal(t, uint32(100), currentStateCursor) + + // Retrying the same ledger should reload current state because the flag was reset. + processor.failPersistCurrentStateAt = 0 + processor.processedLedger = 101 + _, _, err = svc.PersistLedgerData(ctx, 101, indexer.NewIndexerBuffer(), "latest_ledger_cursor") + require.NoError(t, err) + assert.Equal(t, 2, processor.loadCurrentStateCalls) + assert.True(t, svc.protocolCurrentStateLoaded["testproto"]) + }) } func Test_protocolStateCursorReady(t *testing.T) { From d29e51b0d40302c54aa7eb31221a69fd69a513c8 Mon Sep 17 00:00:00 2001 From: Aristides Staffieri Date: Tue, 31 Mar 2026 10:45:53 -0600 Subject: [PATCH 09/12] Retry protocol state production on ingest retries Rerun protocol ProcessLedger on every PersistLedgerData retry so protocol processors rebuild staged state after rolled-back transactions instead of reusing cleared or mutated buffers. Document the retry contract for protocol processors and add a regression test covering protocol state rebuild on retry. --- internal/services/ingest.go | 9 +-- internal/services/ingest_live.go | 16 ++--- internal/services/ingest_test.go | 93 ++++++++++++++++++++++++- internal/services/protocol_processor.go | 6 ++ 4 files changed, 109 insertions(+), 15 deletions(-) diff --git a/internal/services/ingest.go b/internal/services/ingest.go index 7730bc53..a418fb2c 100644 --- a/internal/services/ingest.go +++ b/internal/services/ingest.go @@ -125,9 +125,10 @@ type ingestService struct { knownContractIDs set.Set[string] protocolProcessors map[string]ProtocolProcessor protocolContractCache *protocolContractCache - // eligibleProtocolProcessors is set by ingestLiveLedgers before each call - // to PersistLedgerData, scoping the CAS loop to only processors that had - // ProcessLedger called. Only accessed from the single-threaded live ingestion loop. + // eligibleProtocolProcessors is set by ingestLiveLedgers before each retry + // sequence, scoping protocol processing and the CAS loop to processors that + // may persist the current ledger. Only accessed from the single-threaded live + // ingestion loop. eligibleProtocolProcessors map[string]ProtocolProcessor // protocolCurrentStateLoaded tracks which protocols have had their current // state loaded into processor memory via LoadCurrentState. On the first @@ -139,7 +140,7 @@ type ingestService struct { } // SetEligibleProtocolProcessorsForTest sets the eligible protocol processors for testing. -// In production, this is set by ingestLiveLedgers before each PersistLedgerData call. +// In production, this is set by ingestLiveLedgers before each retry sequence. func (m *ingestService) SetEligibleProtocolProcessorsForTest(processors map[string]ProtocolProcessor) { m.eligibleProtocolProcessors = processors } diff --git a/internal/services/ingest_live.go b/internal/services/ingest_live.go index c8c563c8..cf696b12 100644 --- a/internal/services/ingest_live.go +++ b/internal/services/ingest_live.go @@ -347,15 +347,9 @@ func (m *ingestService) ingestLiveLedgers(ctx context.Context, startLedger uint3 } m.eligibleProtocolProcessors = eligibleProcessors - // Run protocol state production (in-memory analysis before DB transaction) only - // for processors that may actually persist this ledger. - if produceErr := m.produceProtocolStateForProcessors(ctx, ledgerMeta, currentLedger, eligibleProcessors); produceErr != nil { - return fmt.Errorf("producing protocol state for ledger %d: %w", currentLedger, produceErr) - } - // All DB operations in a single atomic transaction with retry dbStart := time.Now() - numTransactionProcessed, numOperationProcessed, err := m.ingestProcessedDataWithRetry(ctx, currentLedger, buffer) + numTransactionProcessed, numOperationProcessed, err := m.ingestProcessedDataWithRetry(ctx, ledgerMeta, currentLedger, buffer) if err != nil { return fmt.Errorf("processing ledger %d: %w", currentLedger, err) } @@ -477,7 +471,9 @@ func (m *ingestService) refreshProtocolContractCache(ctx context.Context, curren } // ingestProcessedDataWithRetry wraps PersistLedgerData with retry logic. -func (m *ingestService) ingestProcessedDataWithRetry(ctx context.Context, currentLedger uint32, buffer *indexer.IndexerBuffer) (int, int, error) { +// Protocol state is re-produced on every attempt so processor-owned staged state +// is rebuilt after rolled-back transactions. +func (m *ingestService) ingestProcessedDataWithRetry(ctx context.Context, ledgerMeta xdr.LedgerCloseMeta, currentLedger uint32, buffer *indexer.IndexerBuffer) (int, int, error) { var lastErr error for attempt := 0; attempt < maxIngestProcessedDataRetries; attempt++ { select { @@ -486,6 +482,10 @@ func (m *ingestService) ingestProcessedDataWithRetry(ctx context.Context, curren default: } + if produceErr := m.produceProtocolStateForProcessors(ctx, ledgerMeta, currentLedger, m.eligibleProtocolProcessors); produceErr != nil { + return 0, 0, fmt.Errorf("producing protocol state for ledger %d: %w", currentLedger, produceErr) + } + numTxs, numOps, err := m.PersistLedgerData(ctx, currentLedger, buffer, m.latestLedgerCursorName) if err == nil { return numTxs, numOps, nil diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index a5cb66fa..5fa70cf0 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -1860,7 +1860,7 @@ func Test_ingestProcessedDataWithRetry(t *testing.T) { // Call ingestProcessedDataWithRetry - should succeed // Note: assetIDMap and contractIDMap are no longer passed - operations use direct DB queries - numTx, numOps, err := svc.ingestProcessedDataWithRetry(ctx, 100, buffer) + numTx, numOps, err := svc.ingestProcessedDataWithRetry(ctx, xdr.LedgerCloseMeta{}, 100, buffer) // Verify success require.NoError(t, err) @@ -1950,7 +1950,7 @@ func Test_ingestProcessedDataWithRetry(t *testing.T) { // Call ingestProcessedDataWithRetry - should fail after retries due to DB error // Note: assetIDMap and contractIDMap are no longer passed - operations use direct DB queries - _, _, err = svc.ingestProcessedDataWithRetry(ctx, 100, buffer) + _, _, err = svc.ingestProcessedDataWithRetry(ctx, xdr.LedgerCloseMeta{}, 100, buffer) // Verify error propagates with retry failure message require.Error(t, err) @@ -2048,7 +2048,7 @@ func Test_ingestProcessedDataWithRetry(t *testing.T) { // Call ingestProcessedDataWithRetry - should succeed after retry // Note: assetIDMap and contractIDMap are no longer passed - operations use direct DB queries - numTx, numOps, err := svc.ingestProcessedDataWithRetry(ctx, 100, buffer) + numTx, numOps, err := svc.ingestProcessedDataWithRetry(ctx, xdr.LedgerCloseMeta{}, 100, buffer) // Verify success after retry require.NoError(t, err) @@ -2062,6 +2062,91 @@ func Test_ingestProcessedDataWithRetry(t *testing.T) { mockTokenIngestionService.AssertExpectations(t) }) + + t.Run("rebuilds protocol state on retry", func(t *testing.T) { + dbt := dbtest.Open(t) + defer dbt.Close() + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + defer dbConnectionPool.Close() + + ctx := context.Background() + initialCursor := uint32(99) + setupDBCursors(t, ctx, dbConnectionPool, initialCursor, initialCursor) + setupProtocolCursors(t, ctx, dbConnectionPool, "testproto", initialCursor, initialCursor) + + mockMetricsService := metrics.NewMockMetricsService() + mockMetricsService.On("RegisterPoolMetrics", "ledger_indexer", mock.Anything).Return() + mockMetricsService.On("RegisterPoolMetrics", "backfill", mock.Anything).Return() + mockMetricsService.On("ObserveDBQueryDuration", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() + mockMetricsService.On("IncDBQuery", mock.Anything, mock.Anything).Return().Maybe() + mockMetricsService.On("ObserveProtocolStateProcessingDuration", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() + defer mockMetricsService.AssertExpectations(t) + + models, err := data.NewModels(dbConnectionPool, mockMetricsService) + require.NoError(t, err) + + mockRPCService := &RPCServiceMock{} + mockRPCService.On("NetworkPassphrase").Return(network.TestNetworkPassphrase).Maybe() + + mockTokenIngestionService := NewTokenIngestionServiceMock(t) + mockTokenIngestionService.On("ProcessTokenChanges", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(fmt.Errorf("transient error")).Once() + mockTokenIngestionService.On("ProcessTokenChanges", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + ).Return(nil).Once() + + processor := &testProtocolProcessor{id: "testproto"} + svc, err := NewIngestService(IngestServiceConfig{ + IngestionMode: IngestionModeLive, + Models: models, + LatestLedgerCursorName: "latest_ledger_cursor", + OldestLedgerCursorName: "oldest_ledger_cursor", + AppTracker: &apptracker.MockAppTracker{}, + RPCService: mockRPCService, + LedgerBackend: &LedgerBackendMock{}, + ChannelAccountStore: &store.ChannelAccountStoreMock{}, + TokenIngestionService: mockTokenIngestionService, + MetricsService: mockMetricsService, + GetLedgersLimit: defaultGetLedgersLimit, + Network: network.TestNetworkPassphrase, + NetworkPassphrase: network.TestNetworkPassphrase, + Archive: &HistoryArchiveMock{}, + ProtocolProcessors: []ProtocolProcessor{processor}, + }) + require.NoError(t, err) + + processor.ingestStore = models.IngestStore + svc.protocolContractCache = nil + svc.SetEligibleProtocolProcessorsForTest(map[string]ProtocolProcessor{"testproto": processor}) + + numTx, numOps, err := svc.ingestProcessedDataWithRetry(ctx, xdr.LedgerCloseMeta{}, 100, indexer.NewIndexerBuffer()) + require.NoError(t, err) + assert.Equal(t, 0, numTx) + assert.Equal(t, 0, numOps) + + assert.Equal(t, 2, processor.processLedgerCalls) + assert.Equal(t, 1, processor.loadCurrentStateCalls) + assert.Equal(t, 1, processor.persistCurrentStateCalls) + assert.True(t, svc.protocolCurrentStateLoaded["testproto"]) + + currentStateCursor, err := models.IngestStore.Get(ctx, "protocol_testproto_current_state_cursor") + require.NoError(t, err) + assert.Equal(t, uint32(100), currentStateCursor) + + mockTokenIngestionService.AssertExpectations(t) + }) } // Test_ingestService_processBatchChanges tests the processBatchChanges method which processes @@ -2702,6 +2787,7 @@ func Test_ingestService_processBackfillBatchesParallel_BothModes(t *testing.T) { // and PersistCurrentState were called and committed atomically. type testProtocolProcessor struct { id string + processLedgerCalls int processedLedger uint32 ingestStore *data.IngestStoreModel loadCurrentStateCalls int @@ -2713,6 +2799,7 @@ type testProtocolProcessor struct { func (p *testProtocolProcessor) ProtocolID() string { return p.id } func (p *testProtocolProcessor) ProcessLedger(_ context.Context, input ProtocolProcessorInput) error { + p.processLedgerCalls++ p.processedLedger = input.LedgerSequence return nil } diff --git a/internal/services/protocol_processor.go b/internal/services/protocol_processor.go index 53822638..44f28010 100644 --- a/internal/services/protocol_processor.go +++ b/internal/services/protocol_processor.go @@ -12,6 +12,12 @@ import ( // ProtocolProcessor produces and persists protocol-specific state for a ledger. type ProtocolProcessor interface { ProtocolID() string + // ProcessLedger analyzes a ledger and stages any per-ledger protocol state + // needed by PersistHistory/PersistCurrentState. It runs before any + // transaction-scoped LoadCurrentState reload for the ledger, and it may be + // called more than once for the same ledger when persistence retries, so + // implementations must deterministically rebuild staged state from the + // provided input. ProcessLedger(ctx context.Context, input ProtocolProcessorInput) error PersistHistory(ctx context.Context, dbTx pgx.Tx) error PersistCurrentState(ctx context.Context, dbTx pgx.Tx) error From d466d526bade8d19ed14410c98a90ca9e9cfe3cb Mon Sep 17 00:00:00 2001 From: Aristides Staffieri Date: Tue, 31 Mar 2026 11:01:12 -0600 Subject: [PATCH 10/12] fix: skip terminal retry backoff waits Stop sleeping after the final failed retry attempt in the shared backoff helper and the matching ingest retry loops. This removes unnecessary delay and avoids misleading "retrying" logs when no retry remains. --- ...6-03-09.1-protocol_wasms_and_contracts.sql | 4 ++++ internal/services/ingest_backfill.go | 3 +++ internal/services/ingest_live.go | 3 +++ internal/services/ingest_test.go | 19 ++++++++++--------- internal/utils/retry.go | 3 +++ internal/utils/retry_test.go | 7 ++----- 6 files changed, 25 insertions(+), 14 deletions(-) diff --git a/internal/db/migrations/2026-03-09.1-protocol_wasms_and_contracts.sql b/internal/db/migrations/2026-03-09.1-protocol_wasms_and_contracts.sql index 80dc75b5..aec96970 100644 --- a/internal/db/migrations/2026-03-09.1-protocol_wasms_and_contracts.sql +++ b/internal/db/migrations/2026-03-09.1-protocol_wasms_and_contracts.sql @@ -12,6 +12,10 @@ CREATE TABLE protocol_contracts ( created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); +CREATE INDEX idx_protocol_wasms_protocol_id ON protocol_wasms (protocol_id); +CREATE INDEX idx_protocol_contracts_wasm_hash ON protocol_contracts (wasm_hash); + + -- +migrate Down DROP TABLE IF EXISTS protocol_contracts; DROP TABLE IF EXISTS protocol_wasms; diff --git a/internal/services/ingest_backfill.go b/internal/services/ingest_backfill.go index 23f41754..105a447f 100644 --- a/internal/services/ingest_backfill.go +++ b/internal/services/ingest_backfill.go @@ -476,6 +476,9 @@ func (m *ingestService) flushBatchBufferWithRetry(ctx context.Context, buffer *i return nil } lastErr = err + if attempt == maxIngestProcessedDataRetries-1 { + break + } backoff := time.Duration(1< maxIngestProcessedDataRetryBackoff { diff --git a/internal/services/ingest_live.go b/internal/services/ingest_live.go index cf696b12..2f438f15 100644 --- a/internal/services/ingest_live.go +++ b/internal/services/ingest_live.go @@ -491,6 +491,9 @@ func (m *ingestService) ingestProcessedDataWithRetry(ctx context.Context, ledger return numTxs, numOps, nil } lastErr = err + if attempt == maxIngestProcessedDataRetries-1 { + break + } backoff := time.Duration(1< maxIngestProcessedDataRetryBackoff { diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index 5fa70cf0..46125de6 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -2073,7 +2073,7 @@ func Test_ingestProcessedDataWithRetry(t *testing.T) { ctx := context.Background() initialCursor := uint32(99) setupDBCursors(t, ctx, dbConnectionPool, initialCursor, initialCursor) - setupProtocolCursors(t, ctx, dbConnectionPool, "testproto", initialCursor, initialCursor) + setupProtocolCursors(t, ctx, dbConnectionPool, initialCursor, initialCursor) mockMetricsService := metrics.NewMockMetricsService() mockMetricsService.On("RegisterPoolMetrics", "ledger_indexer", mock.Anything).Return() @@ -2824,8 +2824,9 @@ func (p *testProtocolProcessor) LoadCurrentState(_ context.Context, _ pgx.Tx) er // setupProtocolCursors inserts protocol cursors into ingest_store. // Call AFTER setupDBCursors (which wipes the table). -func setupProtocolCursors(t *testing.T, ctx context.Context, pool db.ConnectionPool, protocolID string, historyCursor, currentStateCursor uint32) { +func setupProtocolCursors(t *testing.T, ctx context.Context, pool db.ConnectionPool, historyCursor, currentStateCursor uint32) { t.Helper() + const protocolID = "testproto" _, err := pool.ExecContext(ctx, `INSERT INTO ingest_store (key, value) VALUES ($1, $2)`, utils.ProtocolHistoryCursorName(protocolID), historyCursor) @@ -2897,7 +2898,7 @@ func Test_PersistLedgerData_ProtocolCASGating(t *testing.T) { svc.eligibleProtocolProcessors = map[string]ProtocolProcessor{"testproto": processor} setupDBCursors(t, ctx, pool, 99, 99) - setupProtocolCursors(t, ctx, pool, "testproto", 99, 99) + setupProtocolCursors(t, ctx, pool, 99, 99) buffer := indexer.NewIndexerBuffer() _, _, err := svc.PersistLedgerData(ctx, 100, buffer, "latest_ledger_cursor") @@ -2930,7 +2931,7 @@ func Test_PersistLedgerData_ProtocolCASGating(t *testing.T) { svc.eligibleProtocolProcessors = map[string]ProtocolProcessor{"testproto": processor} setupDBCursors(t, ctx, pool, 99, 99) - setupProtocolCursors(t, ctx, pool, "testproto", 100, 100) + setupProtocolCursors(t, ctx, pool, 100, 100) buffer := indexer.NewIndexerBuffer() _, _, err := svc.PersistLedgerData(ctx, 100, buffer, "latest_ledger_cursor") @@ -2963,7 +2964,7 @@ func Test_PersistLedgerData_ProtocolCASGating(t *testing.T) { svc.eligibleProtocolProcessors = map[string]ProtocolProcessor{"testproto": processor} setupDBCursors(t, ctx, pool, 99, 99) - setupProtocolCursors(t, ctx, pool, "testproto", 98, 98) + setupProtocolCursors(t, ctx, pool, 98, 98) buffer := indexer.NewIndexerBuffer() _, _, err := svc.PersistLedgerData(ctx, 100, buffer, "latest_ledger_cursor") @@ -3044,7 +3045,7 @@ func Test_PersistLedgerData_ProtocolCASGating(t *testing.T) { svc.eligibleProtocolProcessors = map[string]ProtocolProcessor{"testproto": processor} setupDBCursors(t, ctx, pool, 99, 99) - setupProtocolCursors(t, ctx, pool, "testproto", 99, 99) + setupProtocolCursors(t, ctx, pool, 99, 99) buffer := indexer.NewIndexerBuffer() _, _, err := svc.PersistLedgerData(ctx, 100, buffer, "latest_ledger_cursor") @@ -3063,7 +3064,7 @@ func Test_PersistLedgerData_ProtocolCASGating(t *testing.T) { svc.eligibleProtocolProcessors = map[string]ProtocolProcessor{"testproto": processor} setupDBCursors(t, ctx, pool, 99, 99) - setupProtocolCursors(t, ctx, pool, "testproto", 99, 99) + setupProtocolCursors(t, ctx, pool, 99, 99) // First ledger — triggers LoadCurrentState processor.processedLedger = 100 @@ -3089,7 +3090,7 @@ func Test_PersistLedgerData_ProtocolCASGating(t *testing.T) { setupDBCursors(t, ctx, pool, 99, 99) // Current state cursor already at 100 — CAS will fail - setupProtocolCursors(t, ctx, pool, "testproto", 99, 100) + setupProtocolCursors(t, ctx, pool, 99, 100) buffer := indexer.NewIndexerBuffer() _, _, err := svc.PersistLedgerData(ctx, 100, buffer, "latest_ledger_cursor") @@ -3107,7 +3108,7 @@ func Test_PersistLedgerData_ProtocolCASGating(t *testing.T) { svc.eligibleProtocolProcessors = map[string]ProtocolProcessor{"testproto": processor} setupDBCursors(t, ctx, pool, 99, 99) - setupProtocolCursors(t, ctx, pool, "testproto", 99, 99) + setupProtocolCursors(t, ctx, pool, 99, 99) // First ledger succeeds and establishes the in-memory cache handoff. processor.processedLedger = 100 diff --git a/internal/utils/retry.go b/internal/utils/retry.go index 5f4bb6b5..d4068c27 100644 --- a/internal/utils/retry.go +++ b/internal/utils/retry.go @@ -37,6 +37,9 @@ func RetryWithBackoff[T any]( return result, nil } lastErr = err + if attempt == maxRetries-1 { + break + } backoff := time.Duration(1< maxBackoff { diff --git a/internal/utils/retry_test.go b/internal/utils/retry_test.go index ffe12e66..de53478e 100644 --- a/internal/utils/retry_test.go +++ b/internal/utils/retry_test.go @@ -74,7 +74,7 @@ func TestRetryWithBackoff_CallsOnRetry(t *testing.T) { assert.Greater(t, backoff, time.Duration(0)) }) require.Error(t, err) - assert.Equal(t, []int{0, 1, 2}, retryAttempts) + assert.Equal(t, []int{0, 1}, retryAttempts) } func TestRetryWithBackoff_CapsBackoff(t *testing.T) { @@ -89,10 +89,7 @@ func TestRetryWithBackoff_CapsBackoff(t *testing.T) { observedBackoffs = append(observedBackoffs, backoff) }) require.Error(t, err) - - for _, b := range observedBackoffs { - assert.LessOrEqual(t, b, maxBackoff) - } + assert.Equal(t, []time.Duration{time.Second, maxBackoff, maxBackoff, maxBackoff}, observedBackoffs) } func TestRetryWithBackoff_RejectsZeroMaxRetries(t *testing.T) { From b895d22d74ae38ecc5aa48d2c0ea3e148486f4b8 Mon Sep 17 00:00:00 2001 From: Aristides Staffieri Date: Tue, 31 Mar 2026 14:35:04 -0600 Subject: [PATCH 11/12] fix: move ledger-backend lifecycle ownership into migration engine The cmd layer deferred a close on the initial backend instance, but resetLedgerBackend() closes and replaces it during Run(). This caused the original instance to be double-closed and the final factory-created instance to never be closed. Add closeLedgerBackend() on the engine with a defer in Run() so the current backend is always closed on exit, and remove the cmd-layer defer. --- cmd/protocol_migrate.go | 9 +-------- internal/services/protocol_migrate.go | 10 ++++++++++ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/cmd/protocol_migrate.go b/cmd/protocol_migrate.go index ed31391a..5f0d8567 100644 --- a/cmd/protocol_migrate.go +++ b/cmd/protocol_migrate.go @@ -171,14 +171,7 @@ func runMigration( }) } - ledgerBackend := newBackend() - defer func() { - if closeErr := ledgerBackend.Close(); closeErr != nil { - log.Ctx(ctx).Errorf("error closing ledger backend: %v", closeErr) - } - }() - - return createAndRun(ctx, dbPool, ledgerBackend, newBackend, models, processors) + return createAndRun(ctx, dbPool, newBackend(), newBackend, models, processors) } func (c *protocolMigrateCmd) historyCommand() *cobra.Command { diff --git a/internal/services/protocol_migrate.go b/internal/services/protocol_migrate.go index b88aebb0..1fd38aa3 100644 --- a/internal/services/protocol_migrate.go +++ b/internal/services/protocol_migrate.go @@ -83,8 +83,18 @@ func (s *protocolMigrateEngine) resetLedgerBackend(ctx context.Context) { s.ledgerBackend = s.ledgerBackendFactory() } +// closeLedgerBackend closes the current backend instance held by the engine. +func (s *protocolMigrateEngine) closeLedgerBackend(ctx context.Context) { + if s.ledgerBackend != nil { + if err := s.ledgerBackend.Close(); err != nil { + log.Ctx(ctx).Warnf("error closing ledger backend: %v", err) + } + } +} + // Run performs migration for the given protocol IDs using the configured strategy. func (s *protocolMigrateEngine) Run(ctx context.Context, protocolIDs []string) error { + defer s.closeLedgerBackend(ctx) // Phase 1: Validate activeProtocolIDs, err := s.validate(ctx, protocolIDs) if err != nil { From 08bfdfd397b716888d606602685951cdc145e064 Mon Sep 17 00:00:00 2001 From: Aristides Staffieri Date: Tue, 31 Mar 2026 14:37:57 -0600 Subject: [PATCH 12/12] docs: clarify LoadCurrentState may be called multiple times after rollbacks The interface docstring claimed LoadCurrentState was called once per protocol, but live ingestion resets the loaded flag on rollback and re-invokes it on retry. Update the comment to reflect repeated-call semantics so processor implementations handle reloads safely. --- internal/services/protocol_processor.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/internal/services/protocol_processor.go b/internal/services/protocol_processor.go index 44f28010..74a39f1d 100644 --- a/internal/services/protocol_processor.go +++ b/internal/services/protocol_processor.go @@ -22,10 +22,13 @@ type ProtocolProcessor interface { PersistHistory(ctx context.Context, dbTx pgx.Tx) error PersistCurrentState(ctx context.Context, dbTx pgx.Tx) error // LoadCurrentState reads the protocol's current state from its state tables - // into processor memory. Called once per protocol inside the DB transaction - // on the first successful CAS advance of the current_state_cursor (the - // handoff moment from migration to live ingestion). Subsequent ledgers use - // the in-memory state maintained by PersistCurrentState. + // into processor memory. Called inside the DB transaction on the first + // successful CAS advance of the current_state_cursor (the handoff moment + // from migration to live ingestion). It may be called again after a + // rollback resets the loaded flag, so implementations must be safe to + // invoke multiple times — each call should fully replace in-memory state + // from the DB. Between successful loads, subsequent ledgers use the + // in-memory state maintained by PersistCurrentState. LoadCurrentState(ctx context.Context, dbTx pgx.Tx) error }