diff --git a/cmd/protocol_migrate.go b/cmd/protocol_migrate.go new file mode 100644 index 000000000..128ca0b6f --- /dev/null +++ b/cmd/protocol_migrate.go @@ -0,0 +1,227 @@ +package cmd + +import ( + "context" + "fmt" + "go/types" + "io" + + _ "github.com/lib/pq" + "github.com/sirupsen/logrus" + "github.com/spf13/cobra" + "github.com/stellar/go-stellar-sdk/support/config" + "github.com/stellar/go-stellar-sdk/support/log" + + "github.com/stellar/wallet-backend/cmd/utils" + "github.com/stellar/wallet-backend/internal/data" + "github.com/stellar/wallet-backend/internal/db" + "github.com/stellar/wallet-backend/internal/ingest" + "github.com/stellar/wallet-backend/internal/metrics" + "github.com/stellar/wallet-backend/internal/services" + internalutils "github.com/stellar/wallet-backend/internal/utils" +) + +type protocolMigrateCmd struct{} + +func (c *protocolMigrateCmd) Command() *cobra.Command { + cmd := &cobra.Command{ + Use: "protocol-migrate", + Short: "Data migration commands for protocol state", + Long: "Parent command for protocol data migrations. Use subcommands to run specific migration tasks.", + Run: func(cmd *cobra.Command, args []string) { + if err := cmd.Help(); err != nil { + log.Fatalf("Error calling help command: %s", err.Error()) + } + }, + } + + cmd.AddCommand(c.historyCommand()) + + return cmd +} + +// historyCmdOpts holds the resolved flag values for `protocol-migrate history`. +type historyCmdOpts struct { + databaseURL string + rpcURL string + networkPassphrase string + protocolIDs []string + logLevel string + oldestLedgerCursorName string + ledgerBackendType string + datastoreConfigPath string + getLedgersLimit int +} + +func (c *protocolMigrateCmd) historyCommand() *cobra.Command { + var opts historyCmdOpts + + cfgOpts := config.ConfigOptions{ + utils.DatabaseURLOption(&opts.databaseURL), + utils.NetworkPassphraseOption(&opts.networkPassphrase), + // RPC URL is only required when --ledger-backend-type=rpc; validated in PersistentPreRunE. + { + Name: "rpc-url", + Usage: "The URL of the RPC Server. Required when --ledger-backend-type=rpc.", + OptType: types.String, + ConfigKey: &opts.rpcURL, + FlagDefault: "", + Required: false, + }, + { + Name: "ledger-backend-type", + Usage: "Type of ledger backend to use for fetching historical ledgers. Options: 'rpc' or 'datastore' (default). Datastore is recommended for migrations because it can reach ledgers outside the RPC retention window.", + OptType: types.String, + ConfigKey: &opts.ledgerBackendType, + FlagDefault: string(ingest.LedgerBackendTypeDatastore), + Required: false, + }, + { + Name: "datastore-config-path", + Usage: "Path to TOML config file for datastore backend. Required when --ledger-backend-type=datastore.", + OptType: types.String, + ConfigKey: &opts.datastoreConfigPath, + FlagDefault: "config/datastore-pubnet.toml", + Required: false, + }, + { + Name: "get-ledgers-limit", + Usage: "Per-request ledger buffer size for the RPC backend. Ignored for datastore.", + OptType: types.Int, + ConfigKey: &opts.getLedgersLimit, + FlagDefault: 10, + Required: false, + }, + } + + cmd := &cobra.Command{ + Use: "history", + Short: "Backfill protocol history state from oldest to latest ingested ledger", + Long: "Processes historical ledgers from oldest_ingest_ledger to the tip, producing protocol state changes and converging with live ingestion via CAS-gated cursors.", + PersistentPreRunE: func(cmd *cobra.Command, args []string) error { + if err := cfgOpts.RequireE(); err != nil { + return fmt.Errorf("requiring values of config options: %w", err) + } + if err := cfgOpts.SetValues(); err != nil { + return fmt.Errorf("setting values of config options: %w", err) + } + + if opts.logLevel != "" { + ll, err := logrus.ParseLevel(opts.logLevel) + if err != nil { + return fmt.Errorf("invalid log level %q: %w", opts.logLevel, err) + } + log.DefaultLogger.SetLevel(ll) + } + + if len(opts.protocolIDs) == 0 { + return fmt.Errorf("at least one --protocol-id is required") + } + + // Per-backend required-field validation. + switch opts.ledgerBackendType { + case string(ingest.LedgerBackendTypeRPC): + if opts.rpcURL == "" { + return fmt.Errorf("--rpc-url is required when --ledger-backend-type=rpc") + } + case string(ingest.LedgerBackendTypeDatastore): + if opts.datastoreConfigPath == "" { + return fmt.Errorf("--datastore-config-path is required when --ledger-backend-type=datastore") + } + default: + return fmt.Errorf("invalid --ledger-backend-type %q, must be 'rpc' or 'datastore'", opts.ledgerBackendType) + } + return nil + }, + RunE: func(_ *cobra.Command, _ []string) error { + return c.RunHistory(opts) + }, + } + + if err := cfgOpts.Init(cmd); err != nil { + log.Fatalf("Error initializing a config option: %s", err.Error()) + } + + cmd.Flags().StringSliceVar(&opts.protocolIDs, "protocol-id", nil, "Protocol ID(s) to migrate (required, repeatable)") + cmd.Flags().StringVar(&opts.logLevel, "log-level", "", `Log level: "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL", "PANIC"`) + cmd.Flags().StringVar(&opts.oldestLedgerCursorName, "oldest-ledger-cursor-name", data.OldestLedgerCursorName, "Name of the oldest ledger cursor in the ingest store. Must match the value used by the ingest service.") + + return cmd +} + +func (c *protocolMigrateCmd) RunHistory(opts historyCmdOpts) error { + ctx := context.Background() + + // Build processors from protocol IDs using the dynamic registry + var processors []services.ProtocolProcessor + for _, pid := range opts.protocolIDs { + factory, ok := services.GetProcessor(pid) + if !ok { + return fmt.Errorf("unknown protocol ID %q — no processor registered", pid) + } + p := factory() + if p == nil { + return fmt.Errorf("processor factory for protocol %q returned nil", pid) + } + processors = append(processors, p) + } + + // Open DB connection + dbPool, err := db.OpenDBConnectionPool(opts.databaseURL) + if err != nil { + return fmt.Errorf("opening database connection: %w", err) + } + defer internalutils.DeferredClose(ctx, dbPool, "closing dbPool in protocol migrate history") + + // Create models + sqlxDB, err := dbPool.SqlxDB(ctx) + if err != nil { + return fmt.Errorf("getting sqlx DB: %w", err) + } + metricsService := metrics.NewMetricsService(sqlxDB) + models, err := data.NewModels(dbPool, metricsService) + if err != nil { + return fmt.Errorf("creating models: %w", err) + } + + // Build a ledger backend using the same selector the ingest service uses, + // so protocol-migrate inherits the datastore path (recommended for + // backfills — unbounded history, unlike RPC retention windows). + ledgerBackend, err := ingest.NewLedgerBackend(ctx, ingest.Configs{ + LedgerBackendType: ingest.LedgerBackendType(opts.ledgerBackendType), + DatastoreConfigPath: opts.datastoreConfigPath, + NetworkPassphrase: opts.networkPassphrase, + RPCURL: opts.rpcURL, + GetLedgersLimit: opts.getLedgersLimit, + }) + if err != nil { + return fmt.Errorf("creating ledger backend: %w", err) + } + defer func() { + if closer, ok := ledgerBackend.(io.Closer); ok { + if closeErr := closer.Close(); closeErr != nil { + log.Ctx(ctx).Errorf("error closing ledger backend: %v", closeErr) + } + } + }() + + service, err := services.NewProtocolMigrateHistoryService(services.ProtocolMigrateHistoryConfig{ + DB: dbPool, + LedgerBackend: ledgerBackend, + ProtocolsModel: models.Protocols, + ProtocolContractsModel: models.ProtocolContracts, + IngestStore: models.IngestStore, + NetworkPassphrase: opts.networkPassphrase, + Processors: processors, + OldestLedgerCursorName: opts.oldestLedgerCursorName, + }) + if err != nil { + return fmt.Errorf("creating protocol migrate history service: %w", err) + } + + if err := service.Run(ctx, opts.protocolIDs); err != nil { + return fmt.Errorf("running protocol migrate history: %w", err) + } + + return nil +} diff --git a/cmd/root.go b/cmd/root.go index ff0aa4aca..ba6c10bc0 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -53,4 +53,5 @@ func SetupCLI(cfg RootConfig) { rootCmd.AddCommand((&distributionAccountCmd{}).Command()) rootCmd.AddCommand((&loadtestCmd{}).Command()) rootCmd.AddCommand((&protocolSetupCmd{}).Command()) + rootCmd.AddCommand((&protocolMigrateCmd{}).Command()) } diff --git a/internal/data/ingest_store.go b/internal/data/ingest_store.go index 2d7e1951b..50e5e7bac 100644 --- a/internal/data/ingest_store.go +++ b/internal/data/ingest_store.go @@ -15,6 +15,11 @@ import ( "github.com/stellar/wallet-backend/internal/utils" ) +const ( + LatestLedgerCursorName = "latest_ingest_ledger" + OldestLedgerCursorName = "oldest_ingest_ledger" +) + type LedgerRange struct { GapStart uint32 `db:"gap_start"` GapEnd uint32 `db:"gap_end"` diff --git a/internal/data/mocks.go b/internal/data/mocks.go index c5b14eadd..c1406eac1 100644 --- a/internal/data/mocks.go +++ b/internal/data/mocks.go @@ -253,6 +253,11 @@ func (m *ProtocolsModelMock) UpdateClassificationStatus(ctx context.Context, dbT return args.Error(0) } +func (m *ProtocolsModelMock) UpdateHistoryMigrationStatus(ctx context.Context, dbTx pgx.Tx, protocolIDs []string, status string) error { + args := m.Called(ctx, dbTx, protocolIDs, status) + return args.Error(0) +} + func (m *ProtocolsModelMock) GetByIDs(ctx context.Context, protocolIDs []string) ([]Protocols, error) { args := m.Called(ctx, protocolIDs) if args.Get(0) == nil { diff --git a/internal/data/protocols.go b/internal/data/protocols.go index cd77250a8..9841affb1 100644 --- a/internal/data/protocols.go +++ b/internal/data/protocols.go @@ -33,6 +33,7 @@ type Protocols struct { // ProtocolsModelInterface defines the interface for protocols operations. type ProtocolsModelInterface interface { UpdateClassificationStatus(ctx context.Context, dbTx pgx.Tx, protocolIDs []string, status string) error + UpdateHistoryMigrationStatus(ctx context.Context, dbTx pgx.Tx, protocolIDs []string, status string) error GetByIDs(ctx context.Context, protocolIDs []string) ([]Protocols, error) GetClassified(ctx context.Context) ([]Protocols, error) InsertIfNotExists(ctx context.Context, dbTx pgx.Tx, protocolID string) error @@ -70,6 +71,30 @@ func (m *ProtocolsModel) UpdateClassificationStatus(ctx context.Context, dbTx pg return nil } +// UpdateHistoryMigrationStatus updates history_migration_status and updated_at for the given protocol IDs. +func (m *ProtocolsModel) UpdateHistoryMigrationStatus(ctx context.Context, dbTx pgx.Tx, protocolIDs []string, status string) error { + if len(protocolIDs) == 0 { + return nil + } + + const query = ` + UPDATE protocols + SET history_migration_status = $1, updated_at = NOW() + WHERE id = ANY($2) + ` + + start := time.Now() + _, err := dbTx.Exec(ctx, query, status, protocolIDs) + if err != nil { + m.MetricsService.IncDBQueryError("UpdateHistoryMigrationStatus", "protocols", utils.GetDBErrorType(err)) + return fmt.Errorf("updating history migration status for protocols: %w", err) + } + + m.MetricsService.ObserveDBQueryDuration("UpdateHistoryMigrationStatus", "protocols", time.Since(start).Seconds()) + m.MetricsService.IncDBQuery("UpdateHistoryMigrationStatus", "protocols") + return nil +} + // GetByIDs returns protocols matching the given IDs. func (m *ProtocolsModel) GetByIDs(ctx context.Context, protocolIDs []string) ([]Protocols, error) { if len(protocolIDs) == 0 { diff --git a/internal/data/protocols_test.go b/internal/data/protocols_test.go index 096893d86..d6ee8f1f5 100644 --- a/internal/data/protocols_test.go +++ b/internal/data/protocols_test.go @@ -98,6 +98,34 @@ func TestProtocolsModel(t *testing.T) { assert.Equal(t, StatusInProgress, status) }) + t.Run("UpdateHistoryMigrationStatus updates status", func(t *testing.T) { + cleanUpDB() + mockMetricsService := metrics.NewMockMetricsService() + mockMetricsService.On("ObserveDBQueryDuration", mock.Anything, mock.Anything, mock.Anything).Return() + mockMetricsService.On("IncDBQuery", mock.Anything, mock.Anything).Return() + defer mockMetricsService.AssertExpectations(t) + + model := &ProtocolsModel{DB: dbConnectionPool, MetricsService: mockMetricsService} + + // Insert protocol first + err := db.RunInPgxTransaction(ctx, dbConnectionPool, func(dbTx pgx.Tx) error { + return model.InsertIfNotExists(ctx, dbTx, "SEP41") + }) + require.NoError(t, err) + + // Update status + err = db.RunInPgxTransaction(ctx, dbConnectionPool, func(dbTx pgx.Tx) error { + return model.UpdateHistoryMigrationStatus(ctx, dbTx, []string{"SEP41"}, StatusInProgress) + }) + require.NoError(t, err) + + // Verify + var status string + err = dbConnectionPool.GetContext(ctx, &status, `SELECT history_migration_status FROM protocols WHERE id = 'SEP41'`) + require.NoError(t, err) + assert.Equal(t, StatusInProgress, status) + }) + t.Run("GetByIDs returns matching protocols", func(t *testing.T) { cleanUpDB() mockMetricsService := metrics.NewMockMetricsService() diff --git a/internal/db/migrations/2026-03-09.1-protocol_wasms.sql b/internal/db/migrations/2026-03-09.1-protocol_wasms.sql deleted file mode 100644 index e3eac642e..000000000 --- a/internal/db/migrations/2026-03-09.1-protocol_wasms.sql +++ /dev/null @@ -1,9 +0,0 @@ --- +migrate Up -CREATE TABLE protocol_wasms ( - wasm_hash BYTEA PRIMARY KEY, - protocol_id TEXT REFERENCES protocols(id), - created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() -); - --- +migrate Down -DROP TABLE IF EXISTS protocol_wasms; diff --git a/internal/db/migrations/2026-03-09.2-protocol_contracts.sql b/internal/db/migrations/2026-03-09.1-protocol_wasms_and_contracts.sql similarity index 58% rename from internal/db/migrations/2026-03-09.2-protocol_contracts.sql rename to internal/db/migrations/2026-03-09.1-protocol_wasms_and_contracts.sql index a3d055e38..80dc75b50 100644 --- a/internal/db/migrations/2026-03-09.2-protocol_contracts.sql +++ b/internal/db/migrations/2026-03-09.1-protocol_wasms_and_contracts.sql @@ -1,4 +1,10 @@ -- +migrate Up +CREATE TABLE protocol_wasms ( + wasm_hash BYTEA PRIMARY KEY, + protocol_id TEXT REFERENCES protocols(id), + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + CREATE TABLE protocol_contracts ( contract_id BYTEA PRIMARY KEY, wasm_hash BYTEA NOT NULL REFERENCES protocol_wasms(wasm_hash), @@ -6,7 +12,6 @@ CREATE TABLE protocol_contracts ( created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); -CREATE INDEX idx_protocol_contracts_wasm_hash ON protocol_contracts(wasm_hash); - -- +migrate Down DROP TABLE IF EXISTS protocol_contracts; +DROP TABLE IF EXISTS protocol_wasms; diff --git a/internal/integrationtests/data_migration_test.go b/internal/integrationtests/data_migration_test.go index 15d96f4a6..23487e53c 100644 --- a/internal/integrationtests/data_migration_test.go +++ b/internal/integrationtests/data_migration_test.go @@ -23,6 +23,8 @@ import ( "github.com/stellar/wallet-backend/internal/services" ) +const sep41ProtocolID = "SEP41" + type DataMigrationTestSuite struct { suite.Suite testEnv *infrastructure.TestEnvironment @@ -94,7 +96,7 @@ func (s *DataMigrationTestSuite) ingestStoreKeyExists(ctx context.Context, pool func (s *DataMigrationTestSuite) runSEP41ProtocolSetup(ctx context.Context, pool db.ConnectionPool, models *data.Models) { err := db.RunInPgxTransaction(ctx, pool, func(dbTx pgx.Tx) error { - return models.Protocols.InsertIfNotExists(ctx, dbTx, "SEP41") + return models.Protocols.InsertIfNotExists(ctx, dbTx, sep41ProtocolID) }) s.Require().NoError(err) @@ -110,7 +112,7 @@ func (s *DataMigrationTestSuite) runSEP41ProtocolSetup(ctx context.Context, pool []services.ProtocolValidator{validator}, ) - s.Require().NoError(svc.Run(ctx, []string{"SEP41"})) + s.Require().NoError(svc.Run(ctx, []string{sep41ProtocolID})) } func (s *DataMigrationTestSuite) newServiceMetricsMock() *metrics.MockMetricsService { @@ -181,11 +183,42 @@ func (b *singleLedgerBackend) Close() error { return nil } +type rangeLedgerBackend struct { + startSeq, endSeq uint32 + ledgerMeta xdr.LedgerCloseMeta +} + +func (b *rangeLedgerBackend) GetLatestLedgerSequence(context.Context) (uint32, error) { + return b.endSeq, nil +} + +func (b *rangeLedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) { + if sequence >= b.startSeq && sequence <= b.endSeq { + return b.ledgerMeta, nil + } + <-ctx.Done() + return xdr.LedgerCloseMeta{}, ctx.Err() +} + +func (b *rangeLedgerBackend) PrepareRange(context.Context, ledgerbackend.Range) error { + return nil +} + +func (b *rangeLedgerBackend) IsPrepared(context.Context, ledgerbackend.Range) (bool, error) { + return true, nil +} + +func (b *rangeLedgerBackend) Close() error { + return nil +} + type integrationTestProcessor struct { - id string - processedLedger uint32 - seenContracts []data.ProtocolContracts - ingestStore *data.IngestStoreModel + id string + processedLedger uint32 + seenContracts []data.ProtocolContracts + ingestStore *data.IngestStoreModel + persistedHistorySeqs []uint32 + persistedCurrentStateSeqs []uint32 } func (p *integrationTestProcessor) ProtocolID() string { return p.id } @@ -197,10 +230,12 @@ func (p *integrationTestProcessor) ProcessLedger(_ context.Context, input servic } func (p *integrationTestProcessor) PersistHistory(ctx context.Context, dbTx pgx.Tx) error { + p.persistedHistorySeqs = append(p.persistedHistorySeqs, p.processedLedger) return p.ingestStore.Update(ctx, dbTx, fmt.Sprintf("test_%s_history_written", p.id), p.processedLedger) } func (p *integrationTestProcessor) PersistCurrentState(ctx context.Context, dbTx pgx.Tx) error { + p.persistedCurrentStateSeqs = append(p.persistedCurrentStateSeqs, p.processedLedger) return p.ingestStore.Update(ctx, dbTx, fmt.Sprintf("test_%s_current_state_written", p.id), p.processedLedger) } @@ -237,6 +272,29 @@ func (s *DataMigrationTestSuite) newLiveRunService( return svc } +func (s *DataMigrationTestSuite) newHistoryMigrationService( + pool db.ConnectionPool, + models *data.Models, + ledgerBackend ledgerbackend.LedgerBackend, + processor services.ProtocolProcessor, +) services.ProtocolMigrateHistoryService { + svc, err := services.NewProtocolMigrateHistoryService(services.ProtocolMigrateHistoryConfig{ + DB: pool, + LedgerBackend: ledgerBackend, + ProtocolsModel: models.Protocols, + ProtocolContractsModel: models.ProtocolContracts, + IngestStore: models.IngestStore, + NetworkPassphrase: "Test SDF Network ; September 2015", + Processors: []services.ProtocolProcessor{processor}, + OldestLedgerCursorName: data.OldestLedgerCursorName, + // Short poll so the integration test converges quickly once the + // migration reaches the RPC tip. + ConvergencePollTimeout: 500 * time.Millisecond, + }) + s.Require().NoError(err) + return svc +} + func protocolContractKeys(contracts []data.ProtocolContracts) []string { keys := make([]string, len(contracts)) for i, contract := range contracts { @@ -267,7 +325,7 @@ func (s *DataMigrationTestSuite) TestProtocolSetupClassifiesIngestedWasms() { s.Assert().Greater(classifiedCount, 0, "at least one WASM should have been classified as SEP41") s.T().Logf("Classified %d/%d WASMs as SEP41", classifiedCount, totalWasms) - protocols, err := models.Protocols.GetByIDs(ctx, []string{"SEP41"}) + protocols, err := models.Protocols.GetByIDs(ctx, []string{sep41ProtocolID}) s.Require().NoError(err) s.Require().Len(protocols, 1) s.Assert().Equal(data.StatusSuccess, protocols[0].ClassificationStatus) @@ -297,7 +355,7 @@ func (s *DataMigrationTestSuite) TestLiveIngestionProcessesSetupClassifiedSEP41W s.runSEP41ProtocolSetup(ctx, pool, models) - classifiedContracts, err := models.ProtocolContracts.GetByProtocolID(ctx, "SEP41") + classifiedContracts, err := models.ProtocolContracts.GetByProtocolID(ctx, sep41ProtocolID) s.Require().NoError(err) s.Require().NotEmpty(classifiedContracts, "setup-classified SEP41 contracts should be queryable for live ingestion") expectedContractKeys := protocolContractKeys(classifiedContracts) @@ -307,7 +365,7 @@ func (s *DataMigrationTestSuite) TestLiveIngestionProcessesSetupClassifiedSEP41W s.upsertIngestStoreValue(ctx, pool, "protocol_SEP41_history_cursor", testLedger-1) s.upsertIngestStoreValue(ctx, pool, "protocol_SEP41_current_state_cursor", testLedger-1) - processor := &integrationTestProcessor{id: "SEP41", ingestStore: models.IngestStore} + processor := &integrationTestProcessor{id: sep41ProtocolID, ingestStore: models.IngestStore} rpcService := services.NewRPCServiceMock(s.T()) rpcService.On("GetHealth").Return(entities.RPCGetHealthResult{ Status: "healthy", @@ -391,14 +449,14 @@ func (s *DataMigrationTestSuite) TestLiveIngestionSkipsSetupClassifiedSEP41Witho s.runSEP41ProtocolSetup(ctx, pool, models) - classifiedContracts, err := models.ProtocolContracts.GetByProtocolID(ctx, "SEP41") + classifiedContracts, err := models.ProtocolContracts.GetByProtocolID(ctx, sep41ProtocolID) s.Require().NoError(err) s.Require().NotEmpty(classifiedContracts, "setup-classified SEP41 contracts should be queryable for live ingestion") const cursorName = "test_live_run_cursor" s.upsertIngestStoreValue(ctx, pool, cursorName, testLedger-1) - processor := &integrationTestProcessor{id: "SEP41", ingestStore: models.IngestStore} + processor := &integrationTestProcessor{id: sep41ProtocolID, ingestStore: models.IngestStore} rpcService := services.NewRPCServiceMock(s.T()) rpcService.On("GetHealth").Return(entities.RPCGetHealthResult{ Status: "healthy", @@ -467,7 +525,7 @@ func (s *DataMigrationTestSuite) TestLiveIngestionSkipsSetupClassifiedSEP41WhenP s.runSEP41ProtocolSetup(ctx, pool, models) - classifiedContracts, err := models.ProtocolContracts.GetByProtocolID(ctx, "SEP41") + classifiedContracts, err := models.ProtocolContracts.GetByProtocolID(ctx, sep41ProtocolID) s.Require().NoError(err) s.Require().NotEmpty(classifiedContracts, "setup-classified SEP41 contracts should be queryable for live ingestion") @@ -476,7 +534,7 @@ func (s *DataMigrationTestSuite) TestLiveIngestionSkipsSetupClassifiedSEP41WhenP s.upsertIngestStoreValue(ctx, pool, "protocol_SEP41_history_cursor", testLedger-2) s.upsertIngestStoreValue(ctx, pool, "protocol_SEP41_current_state_cursor", testLedger-2) - processor := &integrationTestProcessor{id: "SEP41", ingestStore: models.IngestStore} + processor := &integrationTestProcessor{id: sep41ProtocolID, ingestStore: models.IngestStore} rpcService := services.NewRPCServiceMock(s.T()) rpcService.On("GetHealth").Return(entities.RPCGetHealthResult{ Status: "healthy", @@ -537,6 +595,249 @@ func (s *DataMigrationTestSuite) TestLiveIngestionSkipsSetupClassifiedSEP41WhenP } } +func (s *DataMigrationTestSuite) TestHistoryMigrationThenLiveIngestionHandoff() { + ctx := context.Background() + pool, cleanup := s.setupDB() + defer cleanup() + + models := s.setupModels(pool) + + latestLedger, err := models.IngestStore.Get(ctx, "latest_ingest_ledger") + s.Require().NoError(err) + s.Require().Greater(latestLedger, uint32(0)) + baseSeq := latestLedger + 2000 + + // Phase 1: Protocol setup — classify contracts, verify no protocol cursors yet. + s.runSEP41ProtocolSetup(ctx, pool, models) + + classifiedContracts, err := models.ProtocolContracts.GetByProtocolID(ctx, sep41ProtocolID) + s.Require().NoError(err) + s.Require().NotEmpty(classifiedContracts, "setup should classify at least one SEP41 contract") + expectedContractKeys := protocolContractKeys(classifiedContracts) + + s.Assert().False(s.ingestStoreKeyExists(ctx, pool, "protocol_SEP41_history_cursor"), "protocol cursors should not exist after setup") + s.Assert().False(s.ingestStoreKeyExists(ctx, pool, "protocol_SEP41_current_state_cursor"), "protocol cursors should not exist after setup") + + // Phase 2: History migration — backfill 3 ledgers [baseSeq, baseSeq+2]. + s.upsertIngestStoreValue(ctx, pool, data.OldestLedgerCursorName, baseSeq) + s.upsertIngestStoreValue(ctx, pool, data.LatestLedgerCursorName, baseSeq+2) + s.upsertIngestStoreValue(ctx, pool, "protocol_SEP41_current_state_cursor", baseSeq+2) + + processor := &integrationTestProcessor{id: sep41ProtocolID, ingestStore: models.IngestStore} + + rangeBackend := &rangeLedgerBackend{ + startSeq: baseSeq, + endSeq: baseSeq + 2, + ledgerMeta: s.mustLedgerCloseMeta(), + } + + migrationSvc := s.newHistoryMigrationService( + pool, models, rangeBackend, processor, + ) + + err = migrationSvc.Run(ctx, []string{sep41ProtocolID}) + s.Require().NoError(err, "history migration should complete successfully") + + historyCursor, err := models.IngestStore.Get(ctx, "protocol_SEP41_history_cursor") + s.Require().NoError(err) + s.Assert().Equal(baseSeq+2, historyCursor, "history cursor should advance to the tip of the migration range") + + historyWritten, err := models.IngestStore.Get(ctx, "test_SEP41_history_written") + s.Require().NoError(err) + s.Assert().Equal(baseSeq+2, historyWritten, "PersistHistory should have committed data through the last migrated ledger") + + protocols, err := models.Protocols.GetByIDs(ctx, []string{sep41ProtocolID}) + s.Require().NoError(err) + s.Require().Len(protocols, 1) + s.Assert().Equal(data.StatusSuccess, protocols[0].HistoryMigrationStatus, "history migration status should be success") + + s.Assert().NotEmpty(processor.seenContracts, "processor should have seen classified contracts during history migration") + s.Assert().Equal(expectedContractKeys, protocolContractKeys(processor.seenContracts)) + s.Assert().Equal([]uint32{baseSeq, baseSeq + 1, baseSeq + 2}, processor.persistedHistorySeqs, + "PersistHistory should be called for every ledger in the migration range") + + // Phase 3: Live ingestion handoff — process baseSeq+3, proving CAS picks up where migration left off. + const liveCursorName = "test_handoff_live_cursor" + s.upsertIngestStoreValue(ctx, pool, liveCursorName, baseSeq+2) + s.upsertIngestStoreValue(ctx, pool, data.LatestLedgerCursorName, baseSeq+3) + + processor.processedLedger = 0 + processor.seenContracts = nil + processor.persistedHistorySeqs = nil + processor.persistedCurrentStateSeqs = nil + + liveBackend := &singleLedgerBackend{ + ledgerSeq: baseSeq + 3, + ledgerMeta: s.mustLedgerCloseMeta(), + } + + rpcService := services.NewRPCServiceMock(s.T()) + rpcService.On("GetHealth").Return(entities.RPCGetHealthResult{ + Status: "healthy", + LatestLedger: baseSeq + 3, + OldestLedger: 1, + }, nil).Once() + + metricsService := s.newServiceMetricsMock() + liveSvc := s.newLiveRunService(models, rpcService, liveBackend, metricsService, processor, liveCursorName) + + runCtx, cancel := context.WithCancel(ctx) + defer cancel() + + runErrCh := make(chan error, 1) + go func() { + runErrCh <- liveSvc.Run(runCtx, 0, 0) + }() + + var earlyRunErr error + require.Eventually(s.T(), func() bool { + select { + case earlyRunErr = <-runErrCh: + return true + default: + } + + hc, err := models.IngestStore.Get(ctx, "protocol_SEP41_history_cursor") + if err != nil || hc != baseSeq+3 { + return false + } + + hw, err := models.IngestStore.Get(ctx, "test_SEP41_history_written") + if err != nil || hw != baseSeq+3 { + return false + } + + csc, err := models.IngestStore.Get(ctx, "protocol_SEP41_current_state_cursor") + if err != nil || csc != baseSeq+3 { + return false + } + + csw, err := models.IngestStore.Get(ctx, "test_SEP41_current_state_written") + return err == nil && csw == baseSeq+3 + }, 10*time.Second, 100*time.Millisecond) + + s.Require().NoError(earlyRunErr, "live Run exited before the expected DB state was committed") + s.Assert().Equal(baseSeq+3, processor.processedLedger, "live ingestion should have processed the handoff ledger") + s.Assert().Equal(expectedContractKeys, protocolContractKeys(processor.seenContracts), "live ingestion should see the same classified contracts") + s.Assert().Equal([]uint32{baseSeq + 3}, processor.persistedHistorySeqs, + "live ingestion should call PersistHistory for the handoff ledger") + s.Assert().Equal([]uint32{baseSeq + 3}, processor.persistedCurrentStateSeqs, + "live ingestion should call PersistCurrentState for the handoff ledger") + + cancel() + + select { + case err := <-runErrCh: + s.Require().Error(err) + s.Require().ErrorIs(err, context.Canceled) + case <-time.After(5 * time.Second): + s.FailNow("timed out waiting for live Run to stop after context cancellation") + } +} + +func (s *DataMigrationTestSuite) TestLiveIngestionHistoryCursorReadyCurrentStateLags() { + ctx := context.Background() + pool, cleanup := s.setupDB() + defer cleanup() + + models := s.setupModels(pool) + + latestLedger, err := models.IngestStore.Get(ctx, "latest_ingest_ledger") + s.Require().NoError(err) + s.Require().Greater(latestLedger, uint32(0)) + testLedger := latestLedger + 1000 + + // Protocol setup — classify contracts. + s.runSEP41ProtocolSetup(ctx, pool, models) + + classifiedContracts, err := models.ProtocolContracts.GetByProtocolID(ctx, sep41ProtocolID) + s.Require().NoError(err) + s.Require().NotEmpty(classifiedContracts) + + // Set up asymmetric cursors: + // history_cursor = testLedger-1 → ready (CAS expected matches) + // current_state_cursor = testLedger-2 → lags (CAS expected=testLedger-1 ≠ testLedger-2) + const cursorName = "test_asymmetric_cursor" + s.upsertIngestStoreValue(ctx, pool, cursorName, testLedger-1) + s.upsertIngestStoreValue(ctx, pool, "protocol_SEP41_history_cursor", testLedger-1) + s.upsertIngestStoreValue(ctx, pool, "protocol_SEP41_current_state_cursor", testLedger-2) + + processor := &integrationTestProcessor{id: sep41ProtocolID, ingestStore: models.IngestStore} + rpcService := services.NewRPCServiceMock(s.T()) + rpcService.On("GetHealth").Return(entities.RPCGetHealthResult{ + Status: "healthy", + LatestLedger: testLedger, + OldestLedger: 1, + }, nil).Once() + + metricsService := s.newServiceMetricsMock() + ledgerBackend := &singleLedgerBackend{ + ledgerSeq: testLedger, + ledgerMeta: s.mustLedgerCloseMeta(), + } + svc := s.newLiveRunService(models, rpcService, ledgerBackend, metricsService, processor, cursorName) + + runCtx, cancel := context.WithCancel(ctx) + defer cancel() + + runErrCh := make(chan error, 1) + go func() { + runErrCh <- svc.Run(runCtx, 0, 0) + }() + + var earlyRunErr error + require.Eventually(s.T(), func() bool { + select { + case earlyRunErr = <-runErrCh: + return true + default: + } + + cursor, err := models.IngestStore.Get(ctx, cursorName) + return err == nil && cursor == testLedger + }, 10*time.Second, 100*time.Millisecond) + + s.Require().NoError(earlyRunErr, "live Run exited before the main ingest cursor advanced") + + // ProcessLedger WAS called — OR precheck passed because history cursor was ready. + s.Assert().Equal(testLedger, processor.processedLedger, + "ProcessLedger should run when at least one protocol cursor is ready (OR precheck)") + + // History CAS succeeded — cursor advanced and PersistHistory was called. + historyCursor, err := models.IngestStore.Get(ctx, "protocol_SEP41_history_cursor") + s.Require().NoError(err) + s.Assert().Equal(testLedger, historyCursor, + "history cursor should advance when its CAS succeeds independently") + + historyWritten, err := models.IngestStore.Get(ctx, "test_SEP41_history_written") + s.Require().NoError(err) + s.Assert().Equal(testLedger, historyWritten, + "PersistHistory should commit when history CAS succeeds") + s.Assert().Equal([]uint32{testLedger}, processor.persistedHistorySeqs, + "PersistHistory should be called exactly once for the processed ledger") + + // Current-state CAS failed — cursor unchanged, PersistCurrentState never called. + currentStateCursor, err := models.IngestStore.Get(ctx, "protocol_SEP41_current_state_cursor") + s.Require().NoError(err) + s.Assert().Equal(testLedger-2, currentStateCursor, + "current-state cursor should remain unchanged when CAS rejects mismatched expected value") + + s.Assert().False(s.ingestStoreKeyExists(ctx, pool, "test_SEP41_current_state_written"), + "PersistCurrentState should not be called when current-state CAS fails") + s.Assert().Empty(processor.persistedCurrentStateSeqs, + "persistedCurrentStateSeqs should be empty when current-state CAS fails") + + cancel() + + select { + case err := <-runErrCh: + s.Require().Error(err) + s.Require().ErrorIs(err, context.Canceled) + case <-time.After(5 * time.Second): + s.FailNow("timed out waiting for live Run to stop after context cancellation") + } +} + func TestDataMigrationTestSuiteStandalone(t *testing.T) { t.Skip("Run via TestIntegrationTests") } diff --git a/internal/serve/graphql/resolvers/statechange.resolvers.go b/internal/serve/graphql/resolvers/statechange.resolvers.go index b9a4ededf..895f00708 100644 --- a/internal/serve/graphql/resolvers/statechange.resolvers.go +++ b/internal/serve/graphql/resolvers/statechange.resolvers.go @@ -431,14 +431,12 @@ func (r *Resolver) TrustlineChange() graphql1.TrustlineChangeResolver { return &trustlineChangeResolver{r} } -type ( - accountChangeResolver struct{ *Resolver } - balanceAuthorizationChangeResolver struct{ *Resolver } - flagsChangeResolver struct{ *Resolver } - metadataChangeResolver struct{ *Resolver } - reservesChangeResolver struct{ *Resolver } - signerChangeResolver struct{ *Resolver } - signerThresholdsChangeResolver struct{ *Resolver } - standardBalanceChangeResolver struct{ *Resolver } - trustlineChangeResolver struct{ *Resolver } -) +type accountChangeResolver struct{ *Resolver } +type balanceAuthorizationChangeResolver struct{ *Resolver } +type flagsChangeResolver struct{ *Resolver } +type metadataChangeResolver struct{ *Resolver } +type reservesChangeResolver struct{ *Resolver } +type signerChangeResolver struct{ *Resolver } +type signerThresholdsChangeResolver struct{ *Resolver } +type standardBalanceChangeResolver struct{ *Resolver } +type trustlineChangeResolver struct{ *Resolver } diff --git a/internal/services/ingest.go b/internal/services/ingest.go index e8bb97c92..b9fcba77a 100644 --- a/internal/services/ingest.go +++ b/internal/services/ingest.go @@ -22,6 +22,7 @@ import ( "github.com/stellar/wallet-backend/internal/indexer/types" "github.com/stellar/wallet-backend/internal/metrics" "github.com/stellar/wallet-backend/internal/signing/store" + "github.com/stellar/wallet-backend/internal/utils" ) const ( @@ -145,16 +146,16 @@ func NewIngestService(cfg IngestServiceConfig) (*ingestService, error) { cfg.MetricsService.RegisterPoolMetrics("backfill", backfillPool) // Build protocol processor map from slice - ppMap := make(map[string]ProtocolProcessor, len(cfg.ProtocolProcessors)) for i, p := range cfg.ProtocolProcessors { if p == nil { return nil, fmt.Errorf("protocol processor at index %d is nil", i) } - id := p.ProtocolID() - if _, exists := ppMap[id]; exists { - return nil, fmt.Errorf("duplicate protocol processor ID %q", id) - } - ppMap[id] = p + } + ppMap, err := utils.BuildMap(cfg.ProtocolProcessors, func(p ProtocolProcessor) string { + return p.ProtocolID() + }) + if err != nil { + return nil, fmt.Errorf("building protocol processor map: %w", err) } var ppCache *protocolContractCache @@ -206,39 +207,6 @@ func (m *ingestService) Run(ctx context.Context, startLedger uint32, endLedger u } } -// getLedgerWithRetry fetches a ledger with exponential backoff retry logic. -// It respects context cancellation and limits retries to maxLedgerFetchRetries attempts. -func (m *ingestService) getLedgerWithRetry(ctx context.Context, backend ledgerbackend.LedgerBackend, ledgerSeq uint32) (xdr.LedgerCloseMeta, error) { - var lastErr error - for attempt := 0; attempt < maxLedgerFetchRetries; attempt++ { - select { - case <-ctx.Done(): - return xdr.LedgerCloseMeta{}, fmt.Errorf("context cancelled: %w", ctx.Err()) - default: - } - - ledgerMeta, err := backend.GetLedger(ctx, ledgerSeq) - if err == nil { - return ledgerMeta, nil - } - lastErr = err - - backoff := time.Duration(1< maxRetryBackoff { - backoff = maxRetryBackoff - } - log.Ctx(ctx).Warnf("Error fetching ledger %d (attempt %d/%d): %v, retrying in %v...", - ledgerSeq, attempt+1, maxLedgerFetchRetries, err, backoff) - - select { - case <-ctx.Done(): - return xdr.LedgerCloseMeta{}, fmt.Errorf("context cancelled during backoff: %w", ctx.Err()) - case <-time.After(backoff): - } - } - return xdr.LedgerCloseMeta{}, fmt.Errorf("failed after %d attempts: %w", maxLedgerFetchRetries, lastErr) -} - // processLedger processes a single ledger - gets the transactions and processes them using indexer processors. func (m *ingestService) processLedger(ctx context.Context, ledgerMeta xdr.LedgerCloseMeta, buffer *indexer.IndexerBuffer) error { participantCount, err := indexer.ProcessLedger(ctx, m.networkPassphrase, ledgerMeta, m.ledgerIndexer, buffer) diff --git a/internal/services/ingest_backfill.go b/internal/services/ingest_backfill.go index f6bfdbd3d..23f41754e 100644 --- a/internal/services/ingest_backfill.go +++ b/internal/services/ingest_backfill.go @@ -12,11 +12,13 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" "github.com/stellar/go-stellar-sdk/support/log" + "github.com/stellar/go-stellar-sdk/xdr" "github.com/stellar/wallet-backend/internal/data" "github.com/stellar/wallet-backend/internal/db" "github.com/stellar/wallet-backend/internal/indexer" "github.com/stellar/wallet-backend/internal/indexer/types" + "github.com/stellar/wallet-backend/internal/utils" ) // BackfillMode indicates the purpose of backfilling. @@ -521,7 +523,15 @@ func (m *ingestService) processLedgersInBatch( } for ledgerSeq := batch.StartLedger; ledgerSeq <= batch.EndLedger; ledgerSeq++ { - ledgerMeta, err := m.getLedgerWithRetry(ctx, backend, ledgerSeq) + ledgerMeta, err := utils.RetryWithBackoff(ctx, maxLedgerFetchRetries, maxRetryBackoff, + func(ctx context.Context) (xdr.LedgerCloseMeta, error) { + return backend.GetLedger(ctx, ledgerSeq) + }, + func(attempt int, err error, backoff time.Duration) { + log.Ctx(ctx).Warnf("Error fetching ledger %d (attempt %d/%d): %v, retrying in %v...", + ledgerSeq, attempt+1, maxLedgerFetchRetries, err, backoff) + }, + ) if err != nil { return ledgersProcessed, nil, startTime, endTime, fmt.Errorf("getting ledger %d: %w", ledgerSeq, err) } diff --git a/internal/services/ingest_live.go b/internal/services/ingest_live.go index 984d43357..8daf352e4 100644 --- a/internal/services/ingest_live.go +++ b/internal/services/ingest_live.go @@ -17,6 +17,7 @@ import ( "github.com/stellar/wallet-backend/internal/db" "github.com/stellar/wallet-backend/internal/indexer" "github.com/stellar/wallet-backend/internal/indexer/types" + "github.com/stellar/wallet-backend/internal/utils" ) const ( @@ -27,7 +28,7 @@ const ( ) // protocolContractCache caches classified protocol contracts to avoid per-ledger DB queries. -// Only accessed from the single-threaded live ingestion loop, so no mutex is needed. +// Accessed only from the single-goroutine live ingestion loop; no locking needed. type protocolContractCache struct { contractsByProtocol map[string][]data.ProtocolContracts lastRefreshLedger uint32 @@ -52,13 +53,13 @@ func (m *ingestService) protocolProcessorsEligibleForProduction(ctx context.Cont eligible := make(map[string]ProtocolProcessor, len(m.protocolProcessors)) for protocolID, processor := range m.protocolProcessors { - historyCursor := fmt.Sprintf("protocol_%s_history_cursor", protocolID) + historyCursor := utils.ProtocolHistoryCursorName(protocolID) historyVal, err := m.models.IngestStore.Get(ctx, historyCursor) if err != nil { return nil, fmt.Errorf("reading history cursor for %s: %w", protocolID, err) } - currentStateCursor := fmt.Sprintf("protocol_%s_current_state_cursor", protocolID) + currentStateCursor := utils.ProtocolCurrentStateCursorName(protocolID) currentStateVal, err := m.models.IngestStore.Get(ctx, currentStateCursor) if err != nil { return nil, fmt.Errorf("reading current state cursor for %s: %w", protocolID, err) @@ -149,8 +150,8 @@ func (m *ingestService) PersistLedgerData(ctx context.Context, ledgerSeq uint32, // No previous ledger to form an expected cursor value; skip CAS for this ledger. continue } - historyCursor := fmt.Sprintf("protocol_%s_history_cursor", protocolID) - currentStateCursor := fmt.Sprintf("protocol_%s_current_state_cursor", protocolID) + historyCursor := utils.ProtocolHistoryCursorName(protocolID) + currentStateCursor := utils.ProtocolCurrentStateCursorName(protocolID) expected := strconv.FormatUint(uint64(ledgerSeq-1), 10) next := strconv.FormatUint(uint64(ledgerSeq), 10) @@ -286,7 +287,15 @@ func (m *ingestService) ingestLiveLedgers(ctx context.Context, startLedger uint3 currentLedger := startLedger log.Ctx(ctx).Infof("Starting ingestion from ledger: %d", currentLedger) for { - ledgerMeta, ledgerErr := m.getLedgerWithRetry(ctx, m.ledgerBackend, currentLedger) + ledgerMeta, ledgerErr := utils.RetryWithBackoff(ctx, maxLedgerFetchRetries, maxRetryBackoff, + func(ctx context.Context) (xdr.LedgerCloseMeta, error) { + return m.ledgerBackend.GetLedger(ctx, currentLedger) + }, + func(attempt int, err error, backoff time.Duration) { + log.Ctx(ctx).Warnf("Error fetching ledger %d (attempt %d/%d): %v, retrying in %v...", + currentLedger, attempt+1, maxLedgerFetchRetries, err, backoff) + }, + ) if ledgerErr != nil { return fmt.Errorf("fetching ledger %d: %w", currentLedger, ledgerErr) } @@ -308,8 +317,8 @@ func (m *ingestService) ingestLiveLedgers(ctx context.Context, startLedger uint3 // Run protocol state production (in-memory analysis before DB transaction) only // for processors that may actually persist this ledger. - if err := m.produceProtocolStateForProcessors(ctx, ledgerMeta, currentLedger, eligibleProcessors); err != nil { - return fmt.Errorf("producing protocol state for ledger %d: %w", currentLedger, err) + if produceErr := m.produceProtocolStateForProcessors(ctx, ledgerMeta, currentLedger, eligibleProcessors); produceErr != nil { + return fmt.Errorf("producing protocol state for ledger %d: %w", currentLedger, produceErr) } // All DB operations in a single atomic transaction with retry @@ -337,6 +346,9 @@ func (m *ingestService) ingestLiveLedgers(ctx context.Context, startLedger uint3 } } +// produceProtocolStateForProcessors runs the given protocol processors against +// a ledger. Callers pass either `m.protocolProcessors` (all registered) or a +// filtered subset (e.g., live ingestion scopes to `eligibleProtocolProcessors`). func (m *ingestService) produceProtocolStateForProcessors(ctx context.Context, ledgerMeta xdr.LedgerCloseMeta, ledgerSeq uint32, processors map[string]ProtocolProcessor) error { if len(processors) == 0 { return nil @@ -377,8 +389,10 @@ func (m *ingestService) getProtocolContracts(ctx context.Context, protocolID str return m.protocolContractCache.contractsByProtocol[protocolID] } -// refreshProtocolContractCache reloads all protocol contracts from the DB. -// Only called from the single-threaded live ingestion loop. +// refreshProtocolContractCache reloads all protocol contracts from the DB in a +// single batch query. On failure, the existing cache is left untouched — a +// single SELECT has the same failure domain as N per-protocol SELECTs, so +// there's no partial-failure path to handle. func (m *ingestService) refreshProtocolContractCache(ctx context.Context, currentLedger uint32) { start := time.Now() protocolIDs := make([]string, 0, len(m.protocolProcessors)) @@ -386,19 +400,14 @@ func (m *ingestService) refreshProtocolContractCache(ctx context.Context, curren protocolIDs = append(protocolIDs, protocolID) } newMap, err := m.models.ProtocolContracts.BatchGetByProtocolIDs(ctx, protocolIDs) - if err != nil { - log.Ctx(ctx).Warnf("Error refreshing protocol contract cache: %v; preserving previous entries", err) - newMap = m.protocolContractCache.contractsByProtocol - } - - m.protocolContractCache.contractsByProtocol = newMap m.protocolContractCache.lastRefreshLedger = currentLedger m.metricsService.ObserveProtocolContractCacheRefreshDuration(time.Since(start).Seconds()) if err != nil { - log.Ctx(ctx).Warnf("Protocol contract cache refresh failed at ledger %d; will retry at next interval", currentLedger) - } else { - log.Ctx(ctx).Infof("Refreshed protocol contract cache at ledger %d", currentLedger) + log.Ctx(ctx).Warnf("Protocol contract cache refresh failed at ledger %d; preserving previous entries, will retry at next interval: %v", currentLedger, err) + return } + m.protocolContractCache.contractsByProtocol = newMap + log.Ctx(ctx).Infof("Refreshed protocol contract cache at ledger %d", currentLedger) } // ingestProcessedDataWithRetry wraps PersistLedgerData with retry logic. diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index 720af440e..a9ad4fb7d 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "reflect" + "slices" "testing" "time" @@ -27,6 +28,7 @@ import ( "github.com/stellar/wallet-backend/internal/indexer/types" "github.com/stellar/wallet-backend/internal/metrics" "github.com/stellar/wallet-backend/internal/signing/store" + "github.com/stellar/wallet-backend/internal/utils" ) var ( @@ -391,7 +393,7 @@ func Test_NewIngestService_ProtocolProcessorValidation(t *testing.T) { cfg.ProtocolProcessors = []ProtocolProcessor{p1, p2} _, err := NewIngestService(cfg) require.Error(t, err) - assert.Contains(t, err.Error(), `duplicate protocol processor ID "dup-id"`) + assert.Contains(t, err.Error(), `duplicate key "dup-id"`) }) } @@ -678,120 +680,6 @@ func Test_analyzeBatchResults(t *testing.T) { } } -func Test_ingestService_getLedgerWithRetry(t *testing.T) { - dbt := dbtest.Open(t) - defer dbt.Close() - dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) - require.NoError(t, err) - defer dbConnectionPool.Close() - - ctx := context.Background() - - testCases := []struct { - name string - setupBackend func(*LedgerBackendMock) - ctxFunc func() (context.Context, context.CancelFunc) - wantErr bool - wantErrContains string - }{ - { - name: "success_on_first_try", - setupBackend: func(lb *LedgerBackendMock) { - var meta xdr.LedgerCloseMeta - err := xdr.SafeUnmarshalBase64(ledgerMetadataWith0Tx, &meta) - require.NoError(t, err) - lb.On("GetLedger", mock.Anything, uint32(100)).Return(meta, nil).Once() - }, - ctxFunc: func() (context.Context, context.CancelFunc) { - return context.WithCancel(ctx) - }, - wantErr: false, - }, - { - name: "success_after_retries", - setupBackend: func(lb *LedgerBackendMock) { - var meta xdr.LedgerCloseMeta - err := xdr.SafeUnmarshalBase64(ledgerMetadataWith0Tx, &meta) - require.NoError(t, err) - // Fail twice, then succeed - lb.On("GetLedger", mock.Anything, uint32(100)).Return(xdr.LedgerCloseMeta{}, fmt.Errorf("temporary error")).Twice() - lb.On("GetLedger", mock.Anything, uint32(100)).Return(meta, nil).Once() - }, - ctxFunc: func() (context.Context, context.CancelFunc) { - return context.WithCancel(ctx) - }, - wantErr: false, - }, - { - name: "context_cancelled_immediately", - setupBackend: func(lb *LedgerBackendMock) { - // May or may not be called depending on timing - lb.On("GetLedger", mock.Anything, uint32(100)).Return(xdr.LedgerCloseMeta{}, fmt.Errorf("error")).Maybe() - }, - ctxFunc: func() (context.Context, context.CancelFunc) { - cancelledCtx, cancel := context.WithCancel(ctx) - cancel() // Cancel immediately - return cancelledCtx, cancel - }, - wantErr: true, - wantErrContains: "context cancelled", - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - mockMetricsService := metrics.NewMockMetricsService() - mockMetricsService.On("RegisterPoolMetrics", "ledger_indexer", mock.Anything).Return() - mockMetricsService.On("RegisterPoolMetrics", "backfill", mock.Anything).Return() - defer mockMetricsService.AssertExpectations(t) - - models, err := data.NewModels(dbConnectionPool, mockMetricsService) - require.NoError(t, err) - - mockLedgerBackend := &LedgerBackendMock{} - tc.setupBackend(mockLedgerBackend) - defer mockLedgerBackend.AssertExpectations(t) - - mockRPCService := &RPCServiceMock{} - mockRPCService.On("NetworkPassphrase").Return(network.TestNetworkPassphrase).Maybe() - - svc, err := NewIngestService(IngestServiceConfig{ - IngestionMode: IngestionModeBackfill, - Models: models, - LatestLedgerCursorName: "latest_ledger_cursor", - OldestLedgerCursorName: "oldest_ledger_cursor", - AppTracker: &apptracker.MockAppTracker{}, - RPCService: mockRPCService, - LedgerBackend: mockLedgerBackend, - MetricsService: mockMetricsService, - GetLedgersLimit: defaultGetLedgersLimit, - Network: network.TestNetworkPassphrase, - NetworkPassphrase: network.TestNetworkPassphrase, - Archive: &HistoryArchiveMock{}, - }) - require.NoError(t, err) - - testCtx, cancel := tc.ctxFunc() - defer cancel() - - ledger, err := svc.getLedgerWithRetry(testCtx, mockLedgerBackend, 100) - if tc.wantErr { - require.Error(t, err) - if tc.wantErrContains != "" { - assert.Contains(t, err.Error(), tc.wantErrContains) - } - } else { - require.NoError(t, err) - - var meta xdr.LedgerCloseMeta - err := xdr.SafeUnmarshalBase64(ledgerMetadataWith0Tx, &meta) - require.NoError(t, err) - assert.Equal(t, meta, ledger) - } - }) - } -} - func Test_ingestService_setupBatchBackend(t *testing.T) { dbt := dbtest.Open(t) defer dbt.Close() @@ -2840,11 +2728,11 @@ func setupProtocolCursors(t *testing.T, ctx context.Context, pool db.ConnectionP t.Helper() _, err := pool.ExecContext(ctx, `INSERT INTO ingest_store (key, value) VALUES ($1, $2)`, - fmt.Sprintf("protocol_%s_history_cursor", protocolID), historyCursor) + utils.ProtocolHistoryCursorName(protocolID), historyCursor) require.NoError(t, err) _, err = pool.ExecContext(ctx, `INSERT INTO ingest_store (key, value) VALUES ($1, $2)`, - fmt.Sprintf("protocol_%s_current_state_cursor", protocolID), currentStateCursor) + utils.ProtocolCurrentStateCursorName(protocolID), currentStateCursor) require.NoError(t, err) } @@ -3107,11 +2995,6 @@ func Test_ingestService_produceProtocolStateForProcessors_ProcessesOnlyProvidedP mockMetrics.AssertExpectations(t) } -// produceProtocolState runs all registered protocol processors against a ledger. -func (m *ingestService) produceProtocolState(ctx context.Context, ledgerMeta xdr.LedgerCloseMeta, ledgerSeq uint32) error { - return m.produceProtocolStateForProcessors(ctx, ledgerMeta, ledgerSeq, m.protocolProcessors) -} - func Test_ingestService_produceProtocolState_RecordsMetrics(t *testing.T) { t.Parallel() @@ -3142,7 +3025,7 @@ func Test_ingestService_produceProtocolState_RecordsMetrics(t *testing.T) { }, } - err := svc.produceProtocolState(ctx, xdr.LedgerCloseMeta{}, 123) + err := svc.produceProtocolStateForProcessors(ctx, xdr.LedgerCloseMeta{}, 123, svc.protocolProcessors) require.NoError(t, err) mockMetrics.AssertExpectations(t) } @@ -3183,36 +3066,32 @@ func Test_ingestService_refreshProtocolContractCache_Failure_StillUpdatesLedger( ctx := context.Background() mockMetrics := metrics.NewMockMetricsService() - mockMetrics.On("IncProtocolContractCacheAccess", "proto_a", "miss").Return().Once() - mockMetrics.On("IncProtocolContractCacheAccess", "proto_a", "hit").Return().Once() mockMetrics.On("ObserveProtocolContractCacheRefreshDuration", mock.Anything).Return().Once() protocolContractsModel := data.NewProtocolContractsModelMock(t) - protocolContractsModel.On("BatchGetByProtocolIDs", ctx, mock.AnythingOfType("[]string")). - Return(nil, fmt.Errorf("db error")).Once() + protocolContractsModel.On("BatchGetByProtocolIDs", ctx, mock.MatchedBy(func(ids []string) bool { + return len(ids) == 2 && slices.Contains(ids, "proto_ok") && slices.Contains(ids, "proto_fail") + })).Return(nil, fmt.Errorf("db error")).Once() svc := &ingestService{ metricsService: mockMetrics, models: &data.Models{ProtocolContracts: protocolContractsModel}, protocolProcessors: map[string]ProtocolProcessor{ - "proto_a": NewProtocolProcessorMock(t), - "proto_b": NewProtocolProcessorMock(t), + "proto_ok": NewProtocolProcessorMock(t), + "proto_fail": NewProtocolProcessorMock(t), }, protocolContractCache: &protocolContractCache{ contractsByProtocol: make(map[string][]data.ProtocolContracts), }, } - // First call triggers refresh (cache is empty, so stale) - svc.getProtocolContracts(ctx, "proto_a", 200) + svc.refreshProtocolContractCache(ctx, 200) - // lastRefreshLedger must advance despite failure + // lastRefreshLedger must advance despite batch failure so we don't + // hammer the DB on every subsequent ledger (staleness is gated by + // getProtocolContracts against this value). assert.Equal(t, uint32(200), svc.protocolContractCache.lastRefreshLedger) - // Calling again at currentLedger+1 should be a cache hit (not stale yet). - // The .Once() expectations on the mock ensure no extra DB calls happen. - svc.getProtocolContracts(ctx, "proto_a", 201) - mockMetrics.AssertExpectations(t) } @@ -3223,32 +3102,34 @@ func Test_ingestService_refreshProtocolContractCache_Failure_PreservesPreviousEn mockMetrics := metrics.NewMockMetricsService() mockMetrics.On("ObserveProtocolContractCacheRefreshDuration", mock.Anything).Return().Once() - previousContracts := map[string][]data.ProtocolContracts{ - "proto_a": {{ContractID: types.HashBytea(txHash1)}}, - "proto_b": {{ContractID: types.HashBytea(txHash2)}}, - } + previousContracts := []data.ProtocolContracts{{ContractID: types.HashBytea(txHash1)}} protocolContractsModel := data.NewProtocolContractsModelMock(t) - protocolContractsModel.On("BatchGetByProtocolIDs", ctx, mock.AnythingOfType("[]string")). - Return(nil, fmt.Errorf("db error")).Once() + protocolContractsModel.On("BatchGetByProtocolIDs", ctx, mock.MatchedBy(func(ids []string) bool { + return len(ids) == 2 && slices.Contains(ids, "proto_ok") && slices.Contains(ids, "proto_fail") + })).Return(nil, fmt.Errorf("db error")).Once() svc := &ingestService{ metricsService: mockMetrics, models: &data.Models{ProtocolContracts: protocolContractsModel}, protocolProcessors: map[string]ProtocolProcessor{ - "proto_a": NewProtocolProcessorMock(t), - "proto_b": NewProtocolProcessorMock(t), + "proto_ok": NewProtocolProcessorMock(t), + "proto_fail": NewProtocolProcessorMock(t), }, protocolContractCache: &protocolContractCache{ - contractsByProtocol: previousContracts, - lastRefreshLedger: 0, // force refresh + contractsByProtocol: map[string][]data.ProtocolContracts{ + "proto_ok": previousContracts, + "proto_fail": previousContracts, + }, + lastRefreshLedger: 0, // force refresh }, } svc.refreshProtocolContractCache(ctx, 300) - // All previous entries preserved on failure - assert.Equal(t, previousContracts, svc.protocolContractCache.contractsByProtocol) + // Batch failure → both protocols' previous entries are preserved wholesale. + assert.Equal(t, previousContracts, svc.protocolContractCache.contractsByProtocol["proto_ok"]) + assert.Equal(t, previousContracts, svc.protocolContractCache.contractsByProtocol["proto_fail"]) mockMetrics.AssertExpectations(t) } diff --git a/internal/services/protocol_migrate_history.go b/internal/services/protocol_migrate_history.go new file mode 100644 index 000000000..d934f81e8 --- /dev/null +++ b/internal/services/protocol_migrate_history.go @@ -0,0 +1,509 @@ +package services + +import ( + "context" + "errors" + "fmt" + "strconv" + "time" + + "github.com/jackc/pgx/v5" + "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" + "github.com/stellar/go-stellar-sdk/support/log" + "github.com/stellar/go-stellar-sdk/xdr" + + "github.com/stellar/wallet-backend/internal/data" + "github.com/stellar/wallet-backend/internal/db" + "github.com/stellar/wallet-backend/internal/utils" +) + +const ( + // defaultConvergencePollTimeout bounds how long a single GetLedger call may + // block waiting for a new ledger at the tip. Exceeding it signals convergence. + defaultConvergencePollTimeout = 5 * time.Second +) + +// protocolTracker holds per-protocol state for the ledger-first migration loop. +type protocolTracker struct { + protocolID string + cursorName string + cursorValue uint32 + processor ProtocolProcessor + handedOff bool +} + +// ProtocolMigrateHistoryService backfills protocol state changes for historical ledgers. +type ProtocolMigrateHistoryService interface { + Run(ctx context.Context, protocolIDs []string) error +} + +var _ ProtocolMigrateHistoryService = (*protocolMigrateHistoryService)(nil) + +type protocolMigrateHistoryService struct { + db db.ConnectionPool + ledgerBackend ledgerbackend.LedgerBackend + protocolsModel data.ProtocolsModelInterface + protocolContractsModel data.ProtocolContractsModelInterface + ingestStore *data.IngestStoreModel + networkPassphrase string + processors map[string]ProtocolProcessor + oldestLedgerCursorName string + convergencePollTimeout time.Duration +} + +// ProtocolMigrateHistoryConfig holds the configuration for creating a protocolMigrateHistoryService. +type ProtocolMigrateHistoryConfig struct { + DB db.ConnectionPool + LedgerBackend ledgerbackend.LedgerBackend + ProtocolsModel data.ProtocolsModelInterface + ProtocolContractsModel data.ProtocolContractsModelInterface + IngestStore *data.IngestStoreModel + NetworkPassphrase string + Processors []ProtocolProcessor + OldestLedgerCursorName string + // ConvergencePollTimeout optionally overrides the per-ledger fetch deadline + // used to detect convergence at the RPC tip. Zero uses defaultConvergencePollTimeout. + ConvergencePollTimeout time.Duration +} + +// NewProtocolMigrateHistoryService creates a new protocolMigrateHistoryService from the given config. +func NewProtocolMigrateHistoryService(cfg ProtocolMigrateHistoryConfig) (*protocolMigrateHistoryService, error) { + for i, p := range cfg.Processors { + if p == nil { + return nil, fmt.Errorf("protocol processor at index %d is nil", i) + } + } + ppMap, err := utils.BuildMap(cfg.Processors, func(p ProtocolProcessor) string { + return p.ProtocolID() + }) + if err != nil { + return nil, fmt.Errorf("building protocol processor map: %w", err) + } + + oldestCursor := cfg.OldestLedgerCursorName + if oldestCursor == "" { + oldestCursor = data.OldestLedgerCursorName + } + + pollTimeout := cfg.ConvergencePollTimeout + if pollTimeout == 0 { + pollTimeout = defaultConvergencePollTimeout + } + + return &protocolMigrateHistoryService{ + db: cfg.DB, + ledgerBackend: cfg.LedgerBackend, + protocolsModel: cfg.ProtocolsModel, + protocolContractsModel: cfg.ProtocolContractsModel, + ingestStore: cfg.IngestStore, + networkPassphrase: cfg.NetworkPassphrase, + processors: ppMap, + oldestLedgerCursorName: oldestCursor, + convergencePollTimeout: pollTimeout, + }, nil +} + +// Run performs history migration for the given protocol IDs. +func (s *protocolMigrateHistoryService) Run(ctx context.Context, protocolIDs []string) error { + // Phase 1: Validate + activeProtocolIDs, err := s.validate(ctx, protocolIDs) + if err != nil { + return fmt.Errorf("validating protocols: %w", err) + } + + if len(activeProtocolIDs) == 0 { + log.Ctx(ctx).Info("All protocols already completed history migration, nothing to do") + return nil + } + + if txErr := db.RunInPgxTransaction(ctx, s.db, func(dbTx pgx.Tx) error { + return s.protocolsModel.UpdateHistoryMigrationStatus(ctx, dbTx, activeProtocolIDs, data.StatusInProgress) + }); txErr != nil { + return fmt.Errorf("setting history migration status to in_progress: %w", txErr) + } + + // Phase 2: Process each protocol + handedOffIDs, err := s.processAllProtocols(ctx, activeProtocolIDs) + if err != nil { + cleanupCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Mark handed-off protocols as success — live ingestion owns them now + if len(handedOffIDs) > 0 { + if txErr := db.RunInPgxTransaction(cleanupCtx, s.db, func(dbTx pgx.Tx) error { + return s.protocolsModel.UpdateHistoryMigrationStatus(cleanupCtx, dbTx, handedOffIDs, data.StatusSuccess) + }); txErr != nil { + log.Ctx(ctx).Errorf("error setting handed-off protocols to success: %v", txErr) + } + } + + // Mark only non-handed-off protocols as failed + failedIDs := subtract(activeProtocolIDs, handedOffIDs) + if len(failedIDs) > 0 { + if txErr := db.RunInPgxTransaction(cleanupCtx, s.db, func(dbTx pgx.Tx) error { + return s.protocolsModel.UpdateHistoryMigrationStatus(cleanupCtx, dbTx, failedIDs, data.StatusFailed) + }); txErr != nil { + log.Ctx(ctx).Errorf("error setting history migration status to failed: %v", txErr) + } + } + + return fmt.Errorf("processing protocols: %w", err) + } + + // Phase 3: Set status to success + if txErr := db.RunInPgxTransaction(ctx, s.db, func(dbTx pgx.Tx) error { + return s.protocolsModel.UpdateHistoryMigrationStatus(ctx, dbTx, activeProtocolIDs, data.StatusSuccess) + }); txErr != nil { + return fmt.Errorf("setting history migration status to success: %w", txErr) + } + + log.Ctx(ctx).Infof("History migration completed successfully for protocols: %v", activeProtocolIDs) + return nil +} + +// validate checks that all protocol IDs are valid and ready for history migration. +// Returns the list of protocol IDs that need processing (excludes already-success ones). +func (s *protocolMigrateHistoryService) validate(ctx context.Context, protocolIDs []string) ([]string, error) { + // De-duplicate protocolIDs, preserving order. + seen := make(map[string]struct{}, len(protocolIDs)) + unique := make([]string, 0, len(protocolIDs)) + for _, pid := range protocolIDs { + if _, dup := seen[pid]; !dup { + seen[pid] = struct{}{} + unique = append(unique, pid) + } + } + protocolIDs = unique + + // Check each protocol has a registered processor + for _, pid := range protocolIDs { + if _, ok := s.processors[pid]; !ok { + return nil, fmt.Errorf("no processor registered for protocol %q", pid) + } + } + + // Verify all protocols exist in the DB and classification is complete + protocols, err := s.protocolsModel.GetByIDs(ctx, protocolIDs) + if err != nil { + return nil, fmt.Errorf("querying protocols: %w", err) + } + + foundSet := make(map[string]*data.Protocols, len(protocols)) + for i := range protocols { + foundSet[protocols[i].ID] = &protocols[i] + } + + var missing []string + for _, pid := range protocolIDs { + if _, ok := foundSet[pid]; !ok { + missing = append(missing, pid) + } + } + if len(missing) > 0 { + return nil, fmt.Errorf("protocols not found in DB: %v", missing) + } + + // Check classification status and filter out already-completed migrations + var active []string + for _, pid := range protocolIDs { + p := foundSet[pid] + if p.ClassificationStatus != data.StatusSuccess { + return nil, fmt.Errorf("protocol %q classification not complete (status: %s)", pid, p.ClassificationStatus) + } + if p.HistoryMigrationStatus == data.StatusSuccess { + log.Ctx(ctx).Infof("Protocol %q history migration already completed, skipping", pid) + continue + } + active = append(active, pid) + } + + return active, nil +} + +// processAllProtocols runs history migration for all protocols using ledger-first iteration. +// Each ledger is fetched once and processed by all eligible protocols, avoiding redundant RPC calls. +// +// The backend is prepared exactly once with an UnboundedRange starting at the +// minimum tracker cursor + 1. Convergence with live ingestion is detected in +// two ways: +// - Per-protocol CAS failure (live ingestion advanced the cursor past us) → handoff. +// - GetLedger blocks past convergencePollTimeout because no newer ledger is +// available at the RPC tip → all remaining work is done. +func (s *protocolMigrateHistoryService) processAllProtocols(ctx context.Context, protocolIDs []string) ([]string, error) { + oldestLedger, err := s.ingestStore.Get(ctx, s.oldestLedgerCursorName) + if err != nil { + return nil, fmt.Errorf("reading oldest ingest ledger: %w", err) + } + if oldestLedger == 0 { + return nil, fmt.Errorf("ingestion has not started yet (oldest_ingest_ledger is 0)") + } + + trackers, err := s.initTrackers(ctx, protocolIDs, oldestLedger) + if err != nil { + return nil, err + } + + contractsByProtocol, err := s.loadContracts(ctx, trackers) + if err != nil { + return nil, err + } + + startLedger := minNonHandedOffCursor(trackers) + 1 + + log.Ctx(ctx).Infof("Processing ledgers starting at %d (unbounded) for %d protocol(s)", startLedger, len(protocolIDs)) + + prepareFn := func(ctx context.Context) (struct{}, error) { + return struct{}{}, s.ledgerBackend.PrepareRange(ctx, ledgerbackend.UnboundedRange(startLedger)) + } + if _, prepErr := utils.RetryWithBackoff(ctx, maxLedgerFetchRetries, maxRetryBackoff, prepareFn, + func(attempt int, err error, backoff time.Duration) { + log.Ctx(ctx).Warnf("Error preparing unbounded range from %d (attempt %d/%d): %v, retrying in %v...", + startLedger, attempt+1, maxLedgerFetchRetries, err, backoff) + }, + ); prepErr != nil { + return handedOffProtocolIDs(trackers), fmt.Errorf("preparing unbounded range from %d: %w", startLedger, prepErr) + } + + for seq := startLedger; ; seq++ { + if err := ctx.Err(); err != nil { + return handedOffProtocolIDs(trackers), fmt.Errorf("context cancelled: %w", err) + } + if allHandedOff(trackers) { + return handedOffProtocolIDs(trackers), nil + } + + // Skip if no non-handed-off tracker needs this ledger. + if !anyTrackerNeedsLedger(trackers, seq) { + continue + } + + ledgerMeta, fetchErr := s.fetchLedgerOrConverge(ctx, seq) + if errors.Is(fetchErr, errConverged) { + log.Ctx(ctx).Infof("Converged at ledger %d (no new ledger within %v)", seq-1, s.convergencePollTimeout) + return handedOffProtocolIDs(trackers), nil + } + if fetchErr != nil { + return handedOffProtocolIDs(trackers), fmt.Errorf("fetching ledger %d: %w", seq, fetchErr) + } + + for _, t := range trackers { + if t.handedOff || t.cursorValue >= seq { + continue + } + if err := s.processTrackerAtLedger(ctx, t, seq, ledgerMeta, contractsByProtocol[t.protocolID]); err != nil { + return handedOffProtocolIDs(trackers), err + } + } + + if seq%100 == 0 { + log.Ctx(ctx).Infof("Progress: processed ledger %d", seq) + } + } +} + +// errConverged signals that a GetLedger poll hit its deadline without a new +// ledger arriving — migration has caught up to the RPC tip. +var errConverged = errors.New("converged: no new ledger within poll timeout") + +// fetchLedgerOrConverge fetches a single ledger, retrying on transient RPC errors +// with exponential backoff. Returns errConverged when the per-call deadline +// elapses without a response (signaling we've caught up to the tip). Returns +// the wrapped parent-context error if the caller's context is cancelled. +func (s *protocolMigrateHistoryService) fetchLedgerOrConverge(ctx context.Context, seq uint32) (xdr.LedgerCloseMeta, error) { + var lastErr error + for attempt := 0; attempt < maxLedgerFetchRetries; attempt++ { + if err := ctx.Err(); err != nil { + return xdr.LedgerCloseMeta{}, fmt.Errorf("context cancelled: %w", err) + } + + pollCtx, cancel := context.WithTimeout(ctx, s.convergencePollTimeout) + meta, err := s.ledgerBackend.GetLedger(pollCtx, seq) + pollDeadline := pollCtx.Err() == context.DeadlineExceeded + cancel() + + if err == nil { + return meta, nil + } + if ctx.Err() != nil { + return xdr.LedgerCloseMeta{}, fmt.Errorf("context cancelled: %w", ctx.Err()) + } + // If the per-call deadline fired with no result, treat as convergence. + if pollDeadline || errors.Is(err, context.DeadlineExceeded) { + return xdr.LedgerCloseMeta{}, errConverged + } + lastErr = err + + backoff := time.Duration(1< maxRetryBackoff { + backoff = maxRetryBackoff + } + log.Ctx(ctx).Warnf("Error fetching ledger %d (attempt %d/%d): %v, retrying in %v...", + seq, attempt+1, maxLedgerFetchRetries, err, backoff) + + select { + case <-ctx.Done(): + return xdr.LedgerCloseMeta{}, fmt.Errorf("context cancelled during backoff: %w", ctx.Err()) + case <-time.After(backoff): + } + } + return xdr.LedgerCloseMeta{}, fmt.Errorf("failed after %d attempts: %w", maxLedgerFetchRetries, lastErr) +} + +// initTrackers reads (or initializes) each protocol's history cursor and builds +// the per-protocol tracker slice. Freshly-seen protocols have their cursor set +// to oldestLedger-1 so the first processed ledger is oldestLedger. +func (s *protocolMigrateHistoryService) initTrackers(ctx context.Context, protocolIDs []string, oldestLedger uint32) ([]*protocolTracker, error) { + trackers := make([]*protocolTracker, 0, len(protocolIDs)) + for _, pid := range protocolIDs { + cursorName := utils.ProtocolHistoryCursorName(pid) + cursorValue, readErr := s.ingestStore.Get(ctx, cursorName) + if readErr != nil { + return nil, fmt.Errorf("reading history cursor for %s: %w", pid, readErr) + } + + if cursorValue == 0 { + initValue := oldestLedger - 1 + if initErr := db.RunInPgxTransaction(ctx, s.db, func(dbTx pgx.Tx) error { + return s.ingestStore.Update(ctx, dbTx, cursorName, initValue) + }); initErr != nil { + return nil, fmt.Errorf("initializing history cursor for %s: %w", pid, initErr) + } + cursorValue = initValue + } + + trackers = append(trackers, &protocolTracker{ + protocolID: pid, + cursorName: cursorName, + cursorValue: cursorValue, + processor: s.processors[pid], + }) + } + return trackers, nil +} + +// loadContracts preloads each protocol's contract set once up front. validate() +// already enforces ClassificationStatus == StatusSuccess, so the DB is the +// source of truth for the full contract set. +func (s *protocolMigrateHistoryService) loadContracts(ctx context.Context, trackers []*protocolTracker) (map[string][]data.ProtocolContracts, error) { + contractsByProtocol := make(map[string][]data.ProtocolContracts, len(trackers)) + for _, t := range trackers { + contracts, err := s.protocolContractsModel.GetByProtocolID(ctx, t.protocolID) + if err != nil { + return nil, fmt.Errorf("loading contracts for %s: %w", t.protocolID, err) + } + contractsByProtocol[t.protocolID] = contracts + } + return contractsByProtocol, nil +} + +// processTrackerAtLedger runs one protocol's processor on the given ledger and, +// on success, performs the CAS+persist transaction that atomically commits the +// cursor advance and the processor's history rows. A failed CAS (cursor already +// advanced by live ingestion) marks the tracker as handed off. +func (s *protocolMigrateHistoryService) processTrackerAtLedger( + ctx context.Context, + t *protocolTracker, + seq uint32, + ledgerMeta xdr.LedgerCloseMeta, + contracts []data.ProtocolContracts, +) error { + input := ProtocolProcessorInput{ + LedgerSequence: seq, + LedgerCloseMeta: ledgerMeta, + ProtocolContracts: contracts, + NetworkPassphrase: s.networkPassphrase, + } + if err := t.processor.ProcessLedger(ctx, input); err != nil { + return fmt.Errorf("processing ledger %d for protocol %s: %w", seq, t.protocolID, err) + } + + expected := strconv.FormatUint(uint64(seq-1), 10) + next := strconv.FormatUint(uint64(seq), 10) + + var swapped bool + if err := db.RunInPgxTransaction(ctx, s.db, func(dbTx pgx.Tx) error { + var casErr error + swapped, casErr = s.ingestStore.CompareAndSwap(ctx, dbTx, t.cursorName, expected, next) + if casErr != nil { + return fmt.Errorf("CAS history cursor for %s: %w", t.protocolID, casErr) + } + if swapped { + return t.processor.PersistHistory(ctx, dbTx) + } + return nil + }); err != nil { + return fmt.Errorf("persisting ledger %d for protocol %s: %w", seq, t.protocolID, err) + } + + if !swapped { + log.Ctx(ctx).Infof("Protocol %s: CAS failed at ledger %d, handoff to live ingestion detected", t.protocolID, seq) + t.handedOff = true + } else { + t.cursorValue = seq + } + return nil +} + +// minNonHandedOffCursor returns the smallest cursorValue among trackers that +// have not yet been handed off. If every tracker is handed off, it returns 0. +func minNonHandedOffCursor(trackers []*protocolTracker) uint32 { + var minCursor uint32 + first := true + for _, t := range trackers { + if t.handedOff { + continue + } + if first || t.cursorValue < minCursor { + minCursor = t.cursorValue + first = false + } + } + return minCursor +} + +// anyTrackerNeedsLedger reports whether at least one non-handed-off tracker +// still needs to process the given ledger sequence. +func anyTrackerNeedsLedger(trackers []*protocolTracker, seq uint32) bool { + for _, t := range trackers { + if !t.handedOff && t.cursorValue < seq { + return true + } + } + return false +} + +// allHandedOff returns true if every tracker has been handed off to live ingestion. +func allHandedOff(trackers []*protocolTracker) bool { + for _, t := range trackers { + if !t.handedOff { + return false + } + } + return true +} + +// handedOffProtocolIDs returns the IDs of trackers that have been handed off to live ingestion. +func handedOffProtocolIDs(trackers []*protocolTracker) []string { + var ids []string + for _, t := range trackers { + if t.handedOff { + ids = append(ids, t.protocolID) + } + } + return ids +} + +// subtract returns all elements in `all` that are not in `remove`. +func subtract(all, remove []string) []string { + removeSet := make(map[string]struct{}, len(remove)) + for _, id := range remove { + removeSet[id] = struct{}{} + } + var result []string + for _, id := range all { + if _, ok := removeSet[id]; !ok { + result = append(result, id) + } + } + return result +} diff --git a/internal/services/protocol_migrate_history_test.go b/internal/services/protocol_migrate_history_test.go new file mode 100644 index 000000000..77ce0edc8 --- /dev/null +++ b/internal/services/protocol_migrate_history_test.go @@ -0,0 +1,1382 @@ +package services + +import ( + "context" + "fmt" + "strconv" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/jackc/pgx/v5" + "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" + "github.com/stellar/go-stellar-sdk/xdr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/stellar/wallet-backend/internal/data" + "github.com/stellar/wallet-backend/internal/db" + "github.com/stellar/wallet-backend/internal/db/dbtest" + "github.com/stellar/wallet-backend/internal/metrics" + "github.com/stellar/wallet-backend/internal/utils" +) + +// testConvergenceTimeout keeps tests fast: 200ms is long enough for a block to +// land in a local goroutine and short enough that "no new ledger" cases don't +// dominate test runtime. +const testConvergenceTimeout = 200 * time.Millisecond + +// multiLedgerBackend is a test double that serves ledger meta for a range of ledgers. +type multiLedgerBackend struct { + ledgers map[uint32]xdr.LedgerCloseMeta +} + +func (b *multiLedgerBackend) GetLatestLedgerSequence(_ context.Context) (uint32, error) { + var max uint32 + for seq := range b.ledgers { + if seq > max { + max = seq + } + } + return max, nil +} + +func (b *multiLedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) { + if meta, ok := b.ledgers[sequence]; ok { + return meta, nil + } + <-ctx.Done() + return xdr.LedgerCloseMeta{}, ctx.Err() +} + +func (b *multiLedgerBackend) PrepareRange(context.Context, ledgerbackend.Range) error { + return nil +} + +func (b *multiLedgerBackend) IsPrepared(context.Context, ledgerbackend.Range) (bool, error) { + return true, nil +} + +func (b *multiLedgerBackend) Close() error { + return nil +} + +// transientErrorBackend wraps multiLedgerBackend and injects transient errors +// on the initial PrepareRange call and on missing-ledger GetLedger calls +// before delegating normally. Simulates RPC blips that should not be mistaken +// for convergence. +type transientErrorBackend struct { + multiLedgerBackend + // prepareFailsLeft counts how many PrepareRange calls should return a + // transient error before succeeding. + prepareFailsLeft atomic.Int32 + // missingGetLedgerFailsLeft counts how many GetLedger calls for missing + // ledgers should return a transient error instead of blocking. + missingGetLedgerFailsLeft atomic.Int32 +} + +func (b *transientErrorBackend) PrepareRange(ctx context.Context, r ledgerbackend.Range) error { + if b.prepareFailsLeft.Add(-1) >= 0 { + return fmt.Errorf("transient RPC error: connection refused") + } + return b.multiLedgerBackend.PrepareRange(ctx, r) +} + +func (b *transientErrorBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) { + if _, ok := b.multiLedgerBackend.ledgers[sequence]; !ok { + if b.missingGetLedgerFailsLeft.Add(-1) >= 0 { + return xdr.LedgerCloseMeta{}, fmt.Errorf("transient RPC error: connection reset") + } + } + return b.multiLedgerBackend.GetLedger(ctx, sequence) +} + +// rangeTrackingBackend wraps multiLedgerBackend and records every +// PrepareRange call. It also exposes an onMiss hook that fires synchronously +// when GetLedger is called for a ledger that's not yet in the map, letting +// tests inject new ledgers mid-run to simulate tip advancement. +type rangeTrackingBackend struct { + multiLedgerBackend + mu sync.Mutex + ranges []rangeCall + onMiss func(sequence uint32) + onMissOnce sync.Once +} + +type rangeCall struct { + bounded bool + r ledgerbackend.Range +} + +func (b *rangeTrackingBackend) PrepareRange(ctx context.Context, r ledgerbackend.Range) error { + b.mu.Lock() + b.ranges = append(b.ranges, rangeCall{bounded: r.Bounded(), r: r}) + b.mu.Unlock() + return b.multiLedgerBackend.PrepareRange(ctx, r) +} + +// GetLedger checks the map, runs the onMiss hook (once) if the ledger isn't +// present so tests can inject new ledgers, then re-checks. If still missing, +// falls back to the base blocking behavior. +func (b *rangeTrackingBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) { + b.mu.Lock() + meta, ok := b.multiLedgerBackend.ledgers[sequence] + b.mu.Unlock() + if ok { + return meta, nil + } + if b.onMiss != nil { + b.onMissOnce.Do(func() { b.onMiss(sequence) }) + } + b.mu.Lock() + meta, ok = b.multiLedgerBackend.ledgers[sequence] + b.mu.Unlock() + if ok { + return meta, nil + } + return b.multiLedgerBackend.GetLedger(ctx, sequence) +} + +// prepareEnforcingBackend wraps multiLedgerBackend and errors on any second +// PrepareRange call. Guards against regressing into the pre-refactor behavior +// where the service called PrepareRange multiple times (which RPCLedgerBackend +// rejects in production). +type prepareEnforcingBackend struct { + multiLedgerBackend + prepareCalls atomic.Int32 +} + +func (b *prepareEnforcingBackend) PrepareRange(ctx context.Context, r ledgerbackend.Range) error { + if n := b.prepareCalls.Add(1); n > 1 { + return fmt.Errorf("PrepareRange called %d times; RPCLedgerBackend only accepts one", n) + } + return b.multiLedgerBackend.PrepareRange(ctx, r) +} + +// recordingProcessor is a test double that records all ProcessLedger inputs +// and writes per-ledger sentinel keys to ingest_store during PersistHistory, +// proving that PersistHistory actually committed data inside the transaction. +type recordingProcessor struct { + id string + ingestStore *data.IngestStoreModel + processedInputs []ProtocolProcessorInput + persistedSeqs []uint32 + lastProcessed uint32 +} + +func (p *recordingProcessor) ProtocolID() string { return p.id } + +func (p *recordingProcessor) ProcessLedger(_ context.Context, input ProtocolProcessorInput) error { + p.processedInputs = append(p.processedInputs, input) + p.lastProcessed = input.LedgerSequence + return nil +} + +func (p *recordingProcessor) PersistHistory(ctx context.Context, dbTx pgx.Tx) error { + p.persistedSeqs = append(p.persistedSeqs, p.lastProcessed) + return p.ingestStore.Update(ctx, dbTx, fmt.Sprintf("test_%s_history_%d", p.id, p.lastProcessed), p.lastProcessed) +} + +func (p *recordingProcessor) PersistCurrentState(_ context.Context, _ pgx.Tx) error { + return nil +} + +// cursorAdvancingProcessor simulates live ingestion taking over by advancing +// its own cursor in the DB during ProcessLedger, causing the subsequent CAS to fail. +type cursorAdvancingProcessor struct { + recordingProcessor + dbPool db.ConnectionPool + advanceAtSeq uint32 +} + +func (p *cursorAdvancingProcessor) ProcessLedger(ctx context.Context, input ProtocolProcessorInput) error { + if input.LedgerSequence == p.advanceAtSeq { + if _, err := p.dbPool.ExecContext(ctx, + `UPDATE ingest_store SET value = $1 WHERE key = $2`, + strconv.FormatUint(uint64(p.advanceAtSeq+100), 10), + utils.ProtocolHistoryCursorName(p.id)); err != nil { + return fmt.Errorf("advancing cursor for test: %w", err) + } + } + return p.recordingProcessor.ProcessLedger(ctx, input) +} + +// errorAtSeqProcessor wraps recordingProcessor and returns an error when +// ProcessLedger is called for a specific ledger sequence. +type errorAtSeqProcessor struct { + recordingProcessor + errorAtSeq uint32 +} + +func (p *errorAtSeqProcessor) ProcessLedger(ctx context.Context, input ProtocolProcessorInput) error { + if input.LedgerSequence == p.errorAtSeq { + return fmt.Errorf("simulated error at ledger %d", p.errorAtSeq) + } + return p.recordingProcessor.ProcessLedger(ctx, input) +} + +func getHistorySentinel(t *testing.T, ctx context.Context, dbPool db.ConnectionPool, protocolID string, seq uint32) (uint32, bool) { + t.Helper() + var val uint32 + err := dbPool.GetContext(ctx, &val, `SELECT value FROM ingest_store WHERE key = $1`, fmt.Sprintf("test_%s_history_%d", protocolID, seq)) + if err != nil { + return 0, false + } + return val, true +} + +func dummyLedgerMeta(seq uint32) xdr.LedgerCloseMeta { + return xdr.LedgerCloseMeta{ + V: 0, + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(seq), + }, + }, + }, + } +} + +func setupTestDB(t *testing.T) (db.ConnectionPool, *data.IngestStoreModel) { + t.Helper() + dbt := dbtest.Open(t) + t.Cleanup(func() { dbt.Close() }) + + dbPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + t.Cleanup(func() { dbPool.Close() }) + + mockMetrics := metrics.NewMockMetricsService() + mockMetrics.On("ObserveDBQueryDuration", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() + mockMetrics.On("IncDBQuery", mock.Anything, mock.Anything).Return().Maybe() + mockMetrics.On("IncDBQueryError", mock.Anything, mock.Anything, mock.Anything).Return().Maybe() + + ingestStore := &data.IngestStoreModel{DB: dbPool, MetricsService: mockMetrics} + return dbPool, ingestStore +} + +func setIngestStoreValue(t *testing.T, ctx context.Context, dbPool db.ConnectionPool, key string, value uint32) { + t.Helper() + _, err := dbPool.ExecContext(ctx, `INSERT INTO ingest_store (key, value) VALUES ($1, $2) + ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value`, key, strconv.FormatUint(uint64(value), 10)) + require.NoError(t, err) +} + +func getIngestStoreValue(t *testing.T, ctx context.Context, dbPool db.ConnectionPool, key string) uint32 { + t.Helper() + var val uint32 + err := dbPool.GetContext(ctx, &val, `SELECT value FROM ingest_store WHERE key = $1`, key) + require.NoError(t, err) + return val +} + +func TestProtocolMigrateHistory(t *testing.T) { + t.Run("happy path — single protocol, 3 ledgers, all CAS succeed", func(t *testing.T) { + ctx := context.Background() + dbPool, ingestStore := setupTestDB(t) + + // Set up ingest cursors + setIngestStoreValue(t, ctx, dbPool, "oldest_ingest_ledger", 100) + setIngestStoreValue(t, ctx, dbPool, "latest_ingest_ledger", 102) + + // Set up protocol in DB + _, err := dbPool.ExecContext(ctx, `INSERT INTO protocols (id, classification_status) VALUES ('testproto', 'success') ON CONFLICT (id) DO UPDATE SET classification_status = 'success'`) + require.NoError(t, err) + + protocolsModel := data.NewProtocolsModelMock(t) + protocolContractsModel := data.NewProtocolContractsModelMock(t) + processor := &recordingProcessor{id: "testproto", ingestStore: ingestStore} + + protocolsModel.On("GetByIDs", ctx, []string{"testproto"}).Return([]data.Protocols{ + {ID: "testproto", ClassificationStatus: data.StatusSuccess, HistoryMigrationStatus: data.StatusNotStarted}, + }, nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusInProgress).Return(nil) + protocolContractsModel.On("GetByProtocolID", mock.Anything, "testproto").Return([]data.ProtocolContracts{}, nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusSuccess).Return(nil) + + backend := &multiLedgerBackend{ + ledgers: map[uint32]xdr.LedgerCloseMeta{ + 100: dummyLedgerMeta(100), + 101: dummyLedgerMeta(101), + 102: dummyLedgerMeta(102), + }, + } + + svc, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + DB: dbPool, LedgerBackend: backend, + ProtocolsModel: protocolsModel, ProtocolContractsModel: protocolContractsModel, + IngestStore: ingestStore, NetworkPassphrase: "Test SDF Network ; September 2015", + ConvergencePollTimeout: testConvergenceTimeout, + + Processors: []ProtocolProcessor{processor}, + }) + require.NoError(t, err) + + err = svc.Run(ctx, []string{"testproto"}) + require.NoError(t, err) + + // Verify cursor advanced + cursorVal := getIngestStoreValue(t, ctx, dbPool, "protocol_testproto_history_cursor") + assert.Equal(t, uint32(102), cursorVal) + + // Verify PersistHistory actually committed sentinel values to the DB + for _, seq := range []uint32{100, 101, 102} { + val, ok := getHistorySentinel(t, ctx, dbPool, "testproto", seq) + require.True(t, ok, "sentinel for ledger %d should exist", seq) + assert.Equal(t, seq, val, "sentinel value for ledger %d", seq) + } + + // Verify processor recorded all inputs + require.Len(t, processor.processedInputs, 3) + for i, seq := range []uint32{100, 101, 102} { + assert.Equal(t, seq, processor.processedInputs[i].LedgerSequence) + assert.Equal(t, "Test SDF Network ; September 2015", processor.processedInputs[i].NetworkPassphrase) + } + assert.Equal(t, []uint32{100, 101, 102}, processor.persistedSeqs) + }) + + t.Run("CAS failure (handoff) — CAS fails at ledger N, status success", func(t *testing.T) { + ctx := context.Background() + dbPool, ingestStore := setupTestDB(t) + + setIngestStoreValue(t, ctx, dbPool, "oldest_ingest_ledger", 100) + setIngestStoreValue(t, ctx, dbPool, "latest_ingest_ledger", 102) + // Pre-set cursor to 100, so processing starts at 101 + // But we'll simulate CAS failure at 101 by having someone else advance it + setIngestStoreValue(t, ctx, dbPool, "protocol_testproto_history_cursor", 100) + + _, err := dbPool.ExecContext(ctx, `INSERT INTO protocols (id, classification_status) VALUES ('testproto', 'success') ON CONFLICT (id) DO UPDATE SET classification_status = 'success'`) + require.NoError(t, err) + + protocolsModel := data.NewProtocolsModelMock(t) + protocolContractsModel := data.NewProtocolContractsModelMock(t) + processorMock := NewProtocolProcessorMock(t) + + protocolsModel.On("GetByIDs", ctx, []string{"testproto"}).Return([]data.Protocols{ + {ID: "testproto", ClassificationStatus: data.StatusSuccess, HistoryMigrationStatus: data.StatusNotStarted}, + }, nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusInProgress).Return(nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusSuccess).Return(nil) + + protocolContractsModel.On("GetByProtocolID", mock.Anything, "testproto").Return([]data.ProtocolContracts{}, nil).Maybe() + + processorMock.On("ProtocolID").Return("testproto") + processorMock.On("ProcessLedger", mock.Anything, mock.Anything).Return(nil).Maybe() + processorMock.On("PersistHistory", mock.Anything, mock.Anything).Return(nil).Maybe() + + backend := &multiLedgerBackend{ + ledgers: map[uint32]xdr.LedgerCloseMeta{ + 101: dummyLedgerMeta(101), + 102: dummyLedgerMeta(102), + }, + } + + svc, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + DB: dbPool, LedgerBackend: backend, + ProtocolsModel: protocolsModel, ProtocolContractsModel: protocolContractsModel, + IngestStore: ingestStore, NetworkPassphrase: "Test SDF Network ; September 2015", + ConvergencePollTimeout: testConvergenceTimeout, + + Processors: []ProtocolProcessor{processorMock}, + }) + require.NoError(t, err) + + // Simulate CAS failure: advance cursor externally before service runs + setIngestStoreValue(t, ctx, dbPool, "protocol_testproto_history_cursor", 105) + + err = svc.Run(ctx, []string{"testproto"}) + require.NoError(t, err) // Handoff is success + }) + + t.Run("validation: classification not complete", func(t *testing.T) { + ctx := context.Background() + dbPool, ingestStore := setupTestDB(t) + + protocolsModel := data.NewProtocolsModelMock(t) + protocolContractsModel := data.NewProtocolContractsModelMock(t) + processorMock := NewProtocolProcessorMock(t) + + processorMock.On("ProtocolID").Return("testproto") + protocolsModel.On("GetByIDs", ctx, []string{"testproto"}).Return([]data.Protocols{ + {ID: "testproto", ClassificationStatus: data.StatusInProgress, HistoryMigrationStatus: data.StatusNotStarted}, + }, nil) + + backend := &multiLedgerBackend{ledgers: map[uint32]xdr.LedgerCloseMeta{}} + + svc, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + DB: dbPool, LedgerBackend: backend, + ProtocolsModel: protocolsModel, ProtocolContractsModel: protocolContractsModel, + IngestStore: ingestStore, NetworkPassphrase: "Test SDF Network ; September 2015", + ConvergencePollTimeout: testConvergenceTimeout, + + Processors: []ProtocolProcessor{processorMock}, + }) + require.NoError(t, err) + + err = svc.Run(ctx, []string{"testproto"}) + require.Error(t, err) + assert.Contains(t, err.Error(), "classification not complete") + }) + + t.Run("validation: protocol not found in DB", func(t *testing.T) { + ctx := context.Background() + dbPool, ingestStore := setupTestDB(t) + + protocolsModel := data.NewProtocolsModelMock(t) + protocolContractsModel := data.NewProtocolContractsModelMock(t) + processorMock := NewProtocolProcessorMock(t) + + processorMock.On("ProtocolID").Return("testproto") + protocolsModel.On("GetByIDs", ctx, []string{"testproto"}).Return([]data.Protocols{}, nil) + + backend := &multiLedgerBackend{ledgers: map[uint32]xdr.LedgerCloseMeta{}} + + svc, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + DB: dbPool, LedgerBackend: backend, + ProtocolsModel: protocolsModel, ProtocolContractsModel: protocolContractsModel, + IngestStore: ingestStore, NetworkPassphrase: "Test SDF Network ; September 2015", + ConvergencePollTimeout: testConvergenceTimeout, + + Processors: []ProtocolProcessor{processorMock}, + }) + require.NoError(t, err) + + err = svc.Run(ctx, []string{"testproto"}) + require.Error(t, err) + assert.Contains(t, err.Error(), "not found in DB") + }) + + t.Run("validation: no processor registered for protocol", func(t *testing.T) { + ctx := context.Background() + dbPool, ingestStore := setupTestDB(t) + + protocolsModel := data.NewProtocolsModelMock(t) + protocolContractsModel := data.NewProtocolContractsModelMock(t) + processorMock := NewProtocolProcessorMock(t) + + processorMock.On("ProtocolID").Return("otherproto") + + backend := &multiLedgerBackend{ledgers: map[uint32]xdr.LedgerCloseMeta{}} + + svc, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + DB: dbPool, LedgerBackend: backend, + ProtocolsModel: protocolsModel, ProtocolContractsModel: protocolContractsModel, + IngestStore: ingestStore, NetworkPassphrase: "Test SDF Network ; September 2015", + ConvergencePollTimeout: testConvergenceTimeout, + + Processors: []ProtocolProcessor{processorMock}, + }) + require.NoError(t, err) + + err = svc.Run(ctx, []string{"testproto"}) + require.Error(t, err) + assert.Contains(t, err.Error(), "no processor registered") + }) + + t.Run("duplicate protocol IDs are deduplicated — each processed once", func(t *testing.T) { + ctx := context.Background() + dbPool, ingestStore := setupTestDB(t) + + setIngestStoreValue(t, ctx, dbPool, "oldest_ingest_ledger", 100) + setIngestStoreValue(t, ctx, dbPool, "latest_ingest_ledger", 101) + + _, err := dbPool.ExecContext(ctx, `INSERT INTO protocols (id, classification_status) VALUES ('testproto', 'success') ON CONFLICT (id) DO UPDATE SET classification_status = 'success'`) + require.NoError(t, err) + + protocolsModel := data.NewProtocolsModelMock(t) + protocolContractsModel := data.NewProtocolContractsModelMock(t) + processor := &recordingProcessor{id: "testproto", ingestStore: ingestStore} + + // Mock expects the deduplicated slice (single element), not the duplicated input. + protocolsModel.On("GetByIDs", ctx, []string{"testproto"}).Return([]data.Protocols{ + {ID: "testproto", ClassificationStatus: data.StatusSuccess, HistoryMigrationStatus: data.StatusNotStarted}, + }, nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusInProgress).Return(nil) + protocolContractsModel.On("GetByProtocolID", mock.Anything, "testproto").Return([]data.ProtocolContracts{}, nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusSuccess).Return(nil) + + backend := &multiLedgerBackend{ + ledgers: map[uint32]xdr.LedgerCloseMeta{ + 100: dummyLedgerMeta(100), + 101: dummyLedgerMeta(101), + }, + } + + svc, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + DB: dbPool, LedgerBackend: backend, + ProtocolsModel: protocolsModel, ProtocolContractsModel: protocolContractsModel, + IngestStore: ingestStore, NetworkPassphrase: "Test SDF Network ; September 2015", + ConvergencePollTimeout: testConvergenceTimeout, + Processors: []ProtocolProcessor{processor}, + }) + require.NoError(t, err) + + // Pass duplicate IDs — should be deduplicated internally. + err = svc.Run(ctx, []string{"testproto", "testproto", "testproto"}) + require.NoError(t, err) + + // Single cursor write — only one tracker was created. + cursorVal := getIngestStoreValue(t, ctx, dbPool, "protocol_testproto_history_cursor") + assert.Equal(t, uint32(101), cursorVal) + + // Each ledger processed exactly once. + require.Len(t, processor.processedInputs, 2) + assert.Equal(t, uint32(100), processor.processedInputs[0].LedgerSequence) + assert.Equal(t, uint32(101), processor.processedInputs[1].LedgerSequence) + assert.Equal(t, []uint32{100, 101}, processor.persistedSeqs) + }) + + t.Run("resume from cursor — cursor already at N, process from N+1", func(t *testing.T) { + ctx := context.Background() + dbPool, ingestStore := setupTestDB(t) + + setIngestStoreValue(t, ctx, dbPool, "oldest_ingest_ledger", 100) + setIngestStoreValue(t, ctx, dbPool, "latest_ingest_ledger", 103) + // Cursor already at 101 (previous partial run) + setIngestStoreValue(t, ctx, dbPool, "protocol_testproto_history_cursor", 101) + + _, err := dbPool.ExecContext(ctx, `INSERT INTO protocols (id, classification_status) VALUES ('testproto', 'success') ON CONFLICT (id) DO UPDATE SET classification_status = 'success'`) + require.NoError(t, err) + + protocolsModel := data.NewProtocolsModelMock(t) + protocolContractsModel := data.NewProtocolContractsModelMock(t) + processor := &recordingProcessor{id: "testproto", ingestStore: ingestStore} + + protocolsModel.On("GetByIDs", ctx, []string{"testproto"}).Return([]data.Protocols{ + {ID: "testproto", ClassificationStatus: data.StatusSuccess, HistoryMigrationStatus: data.StatusNotStarted}, + }, nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusInProgress).Return(nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusSuccess).Return(nil) + + protocolContractsModel.On("GetByProtocolID", mock.Anything, "testproto").Return([]data.ProtocolContracts{}, nil) + + backend := &multiLedgerBackend{ + ledgers: map[uint32]xdr.LedgerCloseMeta{ + 102: dummyLedgerMeta(102), + 103: dummyLedgerMeta(103), + }, + } + + svc, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + DB: dbPool, LedgerBackend: backend, + ProtocolsModel: protocolsModel, ProtocolContractsModel: protocolContractsModel, + IngestStore: ingestStore, NetworkPassphrase: "Test SDF Network ; September 2015", + ConvergencePollTimeout: testConvergenceTimeout, + + Processors: []ProtocolProcessor{processor}, + }) + require.NoError(t, err) + + err = svc.Run(ctx, []string{"testproto"}) + require.NoError(t, err) + + cursorVal := getIngestStoreValue(t, ctx, dbPool, "protocol_testproto_history_cursor") + assert.Equal(t, uint32(103), cursorVal) + + // Verify sentinels exist only for 102, 103 (not 100, 101) + for _, seq := range []uint32{100, 101} { + _, ok := getHistorySentinel(t, ctx, dbPool, "testproto", seq) + assert.False(t, ok, "sentinel for ledger %d should NOT exist (already processed)", seq) + } + for _, seq := range []uint32{102, 103} { + val, ok := getHistorySentinel(t, ctx, dbPool, "testproto", seq) + require.True(t, ok, "sentinel for ledger %d should exist", seq) + assert.Equal(t, seq, val, "sentinel value for ledger %d", seq) + } + assert.Equal(t, []uint32{102, 103}, processor.persistedSeqs) + }) + + t.Run("error during ProcessLedger — status failed", func(t *testing.T) { + ctx := context.Background() + dbPool, ingestStore := setupTestDB(t) + + setIngestStoreValue(t, ctx, dbPool, "oldest_ingest_ledger", 100) + setIngestStoreValue(t, ctx, dbPool, "latest_ingest_ledger", 101) + + _, err := dbPool.ExecContext(ctx, `INSERT INTO protocols (id, classification_status) VALUES ('testproto', 'success') ON CONFLICT (id) DO UPDATE SET classification_status = 'success'`) + require.NoError(t, err) + + protocolsModel := data.NewProtocolsModelMock(t) + protocolContractsModel := data.NewProtocolContractsModelMock(t) + processorMock := NewProtocolProcessorMock(t) + + protocolsModel.On("GetByIDs", ctx, []string{"testproto"}).Return([]data.Protocols{ + {ID: "testproto", ClassificationStatus: data.StatusSuccess, HistoryMigrationStatus: data.StatusNotStarted}, + }, nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusInProgress).Return(nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusFailed).Return(nil) + + protocolContractsModel.On("GetByProtocolID", mock.Anything, "testproto").Return([]data.ProtocolContracts{}, nil) + + processorMock.On("ProtocolID").Return("testproto") + processorMock.On("ProcessLedger", mock.Anything, mock.Anything).Return(fmt.Errorf("simulated ProcessLedger error")) + + backend := &multiLedgerBackend{ + ledgers: map[uint32]xdr.LedgerCloseMeta{ + 100: dummyLedgerMeta(100), + }, + } + + svc, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + DB: dbPool, LedgerBackend: backend, + ProtocolsModel: protocolsModel, ProtocolContractsModel: protocolContractsModel, + IngestStore: ingestStore, NetworkPassphrase: "Test SDF Network ; September 2015", + ConvergencePollTimeout: testConvergenceTimeout, + + Processors: []ProtocolProcessor{processorMock}, + }) + require.NoError(t, err) + + err = svc.Run(ctx, []string{"testproto"}) + require.Error(t, err) + assert.Contains(t, err.Error(), "simulated ProcessLedger error") + }) + + t.Run("error during PersistHistory — tx rolls back, status failed", func(t *testing.T) { + ctx := context.Background() + dbPool, ingestStore := setupTestDB(t) + + setIngestStoreValue(t, ctx, dbPool, "oldest_ingest_ledger", 100) + setIngestStoreValue(t, ctx, dbPool, "latest_ingest_ledger", 100) + + _, err := dbPool.ExecContext(ctx, `INSERT INTO protocols (id, classification_status) VALUES ('testproto', 'success') ON CONFLICT (id) DO UPDATE SET classification_status = 'success'`) + require.NoError(t, err) + + protocolsModel := data.NewProtocolsModelMock(t) + protocolContractsModel := data.NewProtocolContractsModelMock(t) + processorMock := NewProtocolProcessorMock(t) + + protocolsModel.On("GetByIDs", ctx, []string{"testproto"}).Return([]data.Protocols{ + {ID: "testproto", ClassificationStatus: data.StatusSuccess, HistoryMigrationStatus: data.StatusNotStarted}, + }, nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusInProgress).Return(nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusFailed).Return(nil) + + protocolContractsModel.On("GetByProtocolID", mock.Anything, "testproto").Return([]data.ProtocolContracts{}, nil) + + processorMock.On("ProtocolID").Return("testproto") + processorMock.On("ProcessLedger", mock.Anything, mock.Anything).Return(nil) + processorMock.On("PersistHistory", mock.Anything, mock.Anything).Return(fmt.Errorf("simulated PersistHistory error")) + + backend := &multiLedgerBackend{ + ledgers: map[uint32]xdr.LedgerCloseMeta{ + 100: dummyLedgerMeta(100), + }, + } + + svc, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + DB: dbPool, LedgerBackend: backend, + ProtocolsModel: protocolsModel, ProtocolContractsModel: protocolContractsModel, + IngestStore: ingestStore, NetworkPassphrase: "Test SDF Network ; September 2015", + ConvergencePollTimeout: testConvergenceTimeout, + + Processors: []ProtocolProcessor{processorMock}, + }) + require.NoError(t, err) + + err = svc.Run(ctx, []string{"testproto"}) + require.Error(t, err) + assert.Contains(t, err.Error(), "simulated PersistHistory error") + + // Cursor should NOT have advanced because tx rolled back + cursorVal := getIngestStoreValue(t, ctx, dbPool, "protocol_testproto_history_cursor") + assert.Equal(t, uint32(99), cursorVal) // initialized to oldest-1 + }) + + t.Run("already at tip — cursor equals latest, immediate success", func(t *testing.T) { + ctx := context.Background() + dbPool, ingestStore := setupTestDB(t) + + setIngestStoreValue(t, ctx, dbPool, "oldest_ingest_ledger", 100) + setIngestStoreValue(t, ctx, dbPool, "latest_ingest_ledger", 105) + setIngestStoreValue(t, ctx, dbPool, "protocol_testproto_history_cursor", 105) + + _, err := dbPool.ExecContext(ctx, `INSERT INTO protocols (id, classification_status) VALUES ('testproto', 'success') ON CONFLICT (id) DO UPDATE SET classification_status = 'success'`) + require.NoError(t, err) + + protocolsModel := data.NewProtocolsModelMock(t) + protocolContractsModel := data.NewProtocolContractsModelMock(t) + processor := &recordingProcessor{id: "testproto", ingestStore: ingestStore} + + protocolsModel.On("GetByIDs", ctx, []string{"testproto"}).Return([]data.Protocols{ + {ID: "testproto", ClassificationStatus: data.StatusSuccess, HistoryMigrationStatus: data.StatusNotStarted}, + }, nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusInProgress).Return(nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusSuccess).Return(nil) + + protocolContractsModel.On("GetByProtocolID", mock.Anything, "testproto").Return([]data.ProtocolContracts{}, nil) + + backend := &multiLedgerBackend{ledgers: map[uint32]xdr.LedgerCloseMeta{}} + + svc, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + DB: dbPool, LedgerBackend: backend, + ProtocolsModel: protocolsModel, ProtocolContractsModel: protocolContractsModel, + IngestStore: ingestStore, NetworkPassphrase: "Test SDF Network ; September 2015", + ConvergencePollTimeout: testConvergenceTimeout, + + Processors: []ProtocolProcessor{processor}, + }) + require.NoError(t, err) + + err = svc.Run(ctx, []string{"testproto"}) + require.NoError(t, err) + + // No processing happened — no sentinels should exist + _, ok := getHistorySentinel(t, ctx, dbPool, "testproto", 105) + assert.False(t, ok, "no sentinel should exist when already at tip") + assert.Empty(t, processor.processedInputs) + }) + + t.Run("multiple protocols — both process each ledger via shared fetch", func(t *testing.T) { + ctx := context.Background() + dbPool, ingestStore := setupTestDB(t) + + setIngestStoreValue(t, ctx, dbPool, "oldest_ingest_ledger", 100) + setIngestStoreValue(t, ctx, dbPool, "latest_ingest_ledger", 101) + + _, err := dbPool.ExecContext(ctx, `INSERT INTO protocols (id, classification_status) VALUES ('proto1', 'success') ON CONFLICT (id) DO UPDATE SET classification_status = 'success'`) + require.NoError(t, err) + _, err = dbPool.ExecContext(ctx, `INSERT INTO protocols (id, classification_status) VALUES ('proto2', 'success') ON CONFLICT (id) DO UPDATE SET classification_status = 'success'`) + require.NoError(t, err) + + protocolsModel := data.NewProtocolsModelMock(t) + protocolContractsModel := data.NewProtocolContractsModelMock(t) + proc1 := &recordingProcessor{id: "proto1", ingestStore: ingestStore} + proc2 := &recordingProcessor{id: "proto2", ingestStore: ingestStore} + + protocolsModel.On("GetByIDs", ctx, []string{"proto1", "proto2"}).Return([]data.Protocols{ + {ID: "proto1", ClassificationStatus: data.StatusSuccess, HistoryMigrationStatus: data.StatusNotStarted}, + {ID: "proto2", ClassificationStatus: data.StatusSuccess, HistoryMigrationStatus: data.StatusNotStarted}, + }, nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"proto1", "proto2"}, data.StatusInProgress).Return(nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"proto1", "proto2"}, data.StatusSuccess).Return(nil) + + protocolContractsModel.On("GetByProtocolID", mock.Anything, "proto1").Return([]data.ProtocolContracts{}, nil) + protocolContractsModel.On("GetByProtocolID", mock.Anything, "proto2").Return([]data.ProtocolContracts{}, nil) + + backend := &multiLedgerBackend{ + ledgers: map[uint32]xdr.LedgerCloseMeta{ + 100: dummyLedgerMeta(100), + 101: dummyLedgerMeta(101), + }, + } + + svc, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + DB: dbPool, LedgerBackend: backend, + ProtocolsModel: protocolsModel, ProtocolContractsModel: protocolContractsModel, + IngestStore: ingestStore, NetworkPassphrase: "Test SDF Network ; September 2015", + ConvergencePollTimeout: testConvergenceTimeout, + + Processors: []ProtocolProcessor{proc1, proc2}, + }) + require.NoError(t, err) + + err = svc.Run(ctx, []string{"proto1", "proto2"}) + require.NoError(t, err) + + cursor1 := getIngestStoreValue(t, ctx, dbPool, "protocol_proto1_history_cursor") + cursor2 := getIngestStoreValue(t, ctx, dbPool, "protocol_proto2_history_cursor") + assert.Equal(t, uint32(101), cursor1) + assert.Equal(t, uint32(101), cursor2) + + // Verify each protocol has independently keyed sentinels + for _, id := range []string{"proto1", "proto2"} { + for _, seq := range []uint32{100, 101} { + val, ok := getHistorySentinel(t, ctx, dbPool, id, seq) + require.True(t, ok, "sentinel for %s ledger %d should exist", id, seq) + assert.Equal(t, seq, val, "sentinel value for %s ledger %d", id, seq) + } + } + assert.Equal(t, []uint32{100, 101}, proc1.persistedSeqs) + assert.Equal(t, []uint32{100, 101}, proc2.persistedSeqs) + }) + + t.Run("protocols at different cursors — each starts from its own position", func(t *testing.T) { + ctx := context.Background() + dbPool, ingestStore := setupTestDB(t) + + setIngestStoreValue(t, ctx, dbPool, "oldest_ingest_ledger", 50) + setIngestStoreValue(t, ctx, dbPool, "latest_ingest_ledger", 102) + // proto1 cursor at 98, proto2 cursor at 100 + setIngestStoreValue(t, ctx, dbPool, "protocol_proto1_history_cursor", 98) + setIngestStoreValue(t, ctx, dbPool, "protocol_proto2_history_cursor", 100) + + _, err := dbPool.ExecContext(ctx, `INSERT INTO protocols (id, classification_status) VALUES ('proto1', 'success') ON CONFLICT (id) DO UPDATE SET classification_status = 'success'`) + require.NoError(t, err) + _, err = dbPool.ExecContext(ctx, `INSERT INTO protocols (id, classification_status) VALUES ('proto2', 'success') ON CONFLICT (id) DO UPDATE SET classification_status = 'success'`) + require.NoError(t, err) + + protocolsModel := data.NewProtocolsModelMock(t) + protocolContractsModel := data.NewProtocolContractsModelMock(t) + proc1 := &recordingProcessor{id: "proto1", ingestStore: ingestStore} + proc2 := &recordingProcessor{id: "proto2", ingestStore: ingestStore} + + protocolsModel.On("GetByIDs", ctx, []string{"proto1", "proto2"}).Return([]data.Protocols{ + {ID: "proto1", ClassificationStatus: data.StatusSuccess, HistoryMigrationStatus: data.StatusNotStarted}, + {ID: "proto2", ClassificationStatus: data.StatusSuccess, HistoryMigrationStatus: data.StatusNotStarted}, + }, nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"proto1", "proto2"}, data.StatusInProgress).Return(nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"proto1", "proto2"}, data.StatusSuccess).Return(nil) + + protocolContractsModel.On("GetByProtocolID", mock.Anything, "proto1").Return([]data.ProtocolContracts{}, nil) + protocolContractsModel.On("GetByProtocolID", mock.Anything, "proto2").Return([]data.ProtocolContracts{}, nil) + + backend := &multiLedgerBackend{ + ledgers: map[uint32]xdr.LedgerCloseMeta{ + 99: dummyLedgerMeta(99), + 100: dummyLedgerMeta(100), + 101: dummyLedgerMeta(101), + 102: dummyLedgerMeta(102), + }, + } + + svc, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + DB: dbPool, LedgerBackend: backend, + ProtocolsModel: protocolsModel, ProtocolContractsModel: protocolContractsModel, + IngestStore: ingestStore, NetworkPassphrase: "Test SDF Network ; September 2015", + ConvergencePollTimeout: testConvergenceTimeout, + + Processors: []ProtocolProcessor{proc1, proc2}, + }) + require.NoError(t, err) + + err = svc.Run(ctx, []string{"proto1", "proto2"}) + require.NoError(t, err) + + cursor1 := getIngestStoreValue(t, ctx, dbPool, "protocol_proto1_history_cursor") + cursor2 := getIngestStoreValue(t, ctx, dbPool, "protocol_proto2_history_cursor") + assert.Equal(t, uint32(102), cursor1) + assert.Equal(t, uint32(102), cursor2) + + // proto1 should process 99-102, proto2 should process 101-102 + require.Len(t, proc1.processedInputs, 4) + for i, seq := range []uint32{99, 100, 101, 102} { + assert.Equal(t, seq, proc1.processedInputs[i].LedgerSequence) + } + require.Len(t, proc2.processedInputs, 2) + for i, seq := range []uint32{101, 102} { + assert.Equal(t, seq, proc2.processedInputs[i].LedgerSequence) + } + + assert.Equal(t, []uint32{99, 100, 101, 102}, proc1.persistedSeqs) + assert.Equal(t, []uint32{101, 102}, proc2.persistedSeqs) + }) + + t.Run("one protocol hands off, other continues", func(t *testing.T) { + ctx := context.Background() + dbPool, ingestStore := setupTestDB(t) + + setIngestStoreValue(t, ctx, dbPool, "oldest_ingest_ledger", 100) + setIngestStoreValue(t, ctx, dbPool, "latest_ingest_ledger", 102) + + _, err := dbPool.ExecContext(ctx, `INSERT INTO protocols (id, classification_status) VALUES ('proto1', 'success') ON CONFLICT (id) DO UPDATE SET classification_status = 'success'`) + require.NoError(t, err) + _, err = dbPool.ExecContext(ctx, `INSERT INTO protocols (id, classification_status) VALUES ('proto2', 'success') ON CONFLICT (id) DO UPDATE SET classification_status = 'success'`) + require.NoError(t, err) + + protocolsModel := data.NewProtocolsModelMock(t) + protocolContractsModel := data.NewProtocolContractsModelMock(t) + // proc1 advances its own cursor during ProcessLedger at seq 100, causing CAS failure + proc1 := &cursorAdvancingProcessor{ + recordingProcessor: recordingProcessor{id: "proto1", ingestStore: ingestStore}, + dbPool: dbPool, + advanceAtSeq: 100, + } + proc2 := &recordingProcessor{id: "proto2", ingestStore: ingestStore} + + protocolsModel.On("GetByIDs", ctx, []string{"proto1", "proto2"}).Return([]data.Protocols{ + {ID: "proto1", ClassificationStatus: data.StatusSuccess, HistoryMigrationStatus: data.StatusNotStarted}, + {ID: "proto2", ClassificationStatus: data.StatusSuccess, HistoryMigrationStatus: data.StatusNotStarted}, + }, nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"proto1", "proto2"}, data.StatusInProgress).Return(nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"proto1", "proto2"}, data.StatusSuccess).Return(nil) + + protocolContractsModel.On("GetByProtocolID", mock.Anything, "proto1").Return([]data.ProtocolContracts{}, nil) + protocolContractsModel.On("GetByProtocolID", mock.Anything, "proto2").Return([]data.ProtocolContracts{}, nil) + + backend := &multiLedgerBackend{ + ledgers: map[uint32]xdr.LedgerCloseMeta{ + 100: dummyLedgerMeta(100), + 101: dummyLedgerMeta(101), + 102: dummyLedgerMeta(102), + }, + } + + svc, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + DB: dbPool, LedgerBackend: backend, + ProtocolsModel: protocolsModel, ProtocolContractsModel: protocolContractsModel, + IngestStore: ingestStore, NetworkPassphrase: "Test SDF Network ; September 2015", + ConvergencePollTimeout: testConvergenceTimeout, + + Processors: []ProtocolProcessor{proc1, proc2}, + }) + require.NoError(t, err) + + err = svc.Run(ctx, []string{"proto1", "proto2"}) + require.NoError(t, err) + + // proto2 should have processed all 3 ledgers + cursor2 := getIngestStoreValue(t, ctx, dbPool, "protocol_proto2_history_cursor") + assert.Equal(t, uint32(102), cursor2) + assert.Equal(t, []uint32{100, 101, 102}, proc2.persistedSeqs) + + // proto1 should have processed only ledger 100 (then CAS failed, handed off) + require.Len(t, proc1.processedInputs, 1) + assert.Equal(t, uint32(100), proc1.processedInputs[0].LedgerSequence) + // proto1 PersistHistory was NOT called because CAS failed + assert.Empty(t, proc1.persistedSeqs) + + // Verify proto2 sentinels exist for all ledgers + for _, seq := range []uint32{100, 101, 102} { + val, ok := getHistorySentinel(t, ctx, dbPool, "proto2", seq) + require.True(t, ok, "sentinel for proto2 ledger %d should exist", seq) + assert.Equal(t, seq, val) + } + }) + + t.Run("multi-protocol failure with handoff — handed-off gets success, other gets failed", func(t *testing.T) { + ctx := context.Background() + dbPool, ingestStore := setupTestDB(t) + + setIngestStoreValue(t, ctx, dbPool, "oldest_ingest_ledger", 100) + setIngestStoreValue(t, ctx, dbPool, "latest_ingest_ledger", 102) + + _, err := dbPool.ExecContext(ctx, `INSERT INTO protocols (id, classification_status) VALUES ('proto1', 'success') ON CONFLICT (id) DO UPDATE SET classification_status = 'success'`) + require.NoError(t, err) + _, err = dbPool.ExecContext(ctx, `INSERT INTO protocols (id, classification_status) VALUES ('proto2', 'success') ON CONFLICT (id) DO UPDATE SET classification_status = 'success'`) + require.NoError(t, err) + + protocolsModel := data.NewProtocolsModelMock(t) + protocolContractsModel := data.NewProtocolContractsModelMock(t) + + // proto1: hands off via CAS failure at ledger 100 + proc1 := &cursorAdvancingProcessor{ + recordingProcessor: recordingProcessor{id: "proto1", ingestStore: ingestStore}, + dbPool: dbPool, + advanceAtSeq: 100, + } + // proto2: errors at ledger 101 + proc2 := &errorAtSeqProcessor{ + recordingProcessor: recordingProcessor{id: "proto2", ingestStore: ingestStore}, + errorAtSeq: 101, + } + + protocolsModel.On("GetByIDs", ctx, []string{"proto1", "proto2"}).Return([]data.Protocols{ + {ID: "proto1", ClassificationStatus: data.StatusSuccess, HistoryMigrationStatus: data.StatusNotStarted}, + {ID: "proto2", ClassificationStatus: data.StatusSuccess, HistoryMigrationStatus: data.StatusNotStarted}, + }, nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"proto1", "proto2"}, data.StatusInProgress).Return(nil) + // proto1 should be marked success (handed off to live ingestion) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"proto1"}, data.StatusSuccess).Return(nil) + // proto2 should be marked failed (ProcessLedger error) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"proto2"}, data.StatusFailed).Return(nil) + + protocolContractsModel.On("GetByProtocolID", mock.Anything, "proto1").Return([]data.ProtocolContracts{}, nil) + protocolContractsModel.On("GetByProtocolID", mock.Anything, "proto2").Return([]data.ProtocolContracts{}, nil) + + backend := &multiLedgerBackend{ + ledgers: map[uint32]xdr.LedgerCloseMeta{ + 100: dummyLedgerMeta(100), + 101: dummyLedgerMeta(101), + 102: dummyLedgerMeta(102), + }, + } + + svc, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + DB: dbPool, LedgerBackend: backend, + ProtocolsModel: protocolsModel, ProtocolContractsModel: protocolContractsModel, + IngestStore: ingestStore, NetworkPassphrase: "Test SDF Network ; September 2015", + ConvergencePollTimeout: testConvergenceTimeout, + + Processors: []ProtocolProcessor{proc1, proc2}, + }) + require.NoError(t, err) + + err = svc.Run(ctx, []string{"proto1", "proto2"}) + require.Error(t, err) + assert.Contains(t, err.Error(), "simulated error at ledger 101") + + // Verify the mock expectations — proto1 got StatusSuccess, proto2 got StatusFailed + protocolsModel.AssertExpectations(t) + }) + + t.Run("already success — skips without error", func(t *testing.T) { + ctx := context.Background() + dbPool, ingestStore := setupTestDB(t) + + protocolsModel := data.NewProtocolsModelMock(t) + protocolContractsModel := data.NewProtocolContractsModelMock(t) + processorMock := NewProtocolProcessorMock(t) + + processorMock.On("ProtocolID").Return("testproto") + protocolsModel.On("GetByIDs", ctx, []string{"testproto"}).Return([]data.Protocols{ + {ID: "testproto", ClassificationStatus: data.StatusSuccess, HistoryMigrationStatus: data.StatusSuccess}, + }, nil) + + backend := &multiLedgerBackend{ledgers: map[uint32]xdr.LedgerCloseMeta{}} + + svc, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + DB: dbPool, LedgerBackend: backend, + ProtocolsModel: protocolsModel, ProtocolContractsModel: protocolContractsModel, + IngestStore: ingestStore, NetworkPassphrase: "Test SDF Network ; September 2015", + ConvergencePollTimeout: testConvergenceTimeout, + + Processors: []ProtocolProcessor{processorMock}, + }) + require.NoError(t, err) + + err = svc.Run(ctx, []string{"testproto"}) + require.NoError(t, err) // No-op, nothing to do + }) + + t.Run("transient PrepareRange error retries then converges", func(t *testing.T) { + ctx := context.Background() + dbPool, ingestStore := setupTestDB(t) + + setIngestStoreValue(t, ctx, dbPool, "oldest_ingest_ledger", 100) + setIngestStoreValue(t, ctx, dbPool, "latest_ingest_ledger", 101) + + _, err := dbPool.ExecContext(ctx, `INSERT INTO protocols (id, classification_status) VALUES ('testproto', 'success') ON CONFLICT (id) DO UPDATE SET classification_status = 'success'`) + require.NoError(t, err) + + protocolsModel := data.NewProtocolsModelMock(t) + protocolContractsModel := data.NewProtocolContractsModelMock(t) + processor := &recordingProcessor{id: "testproto", ingestStore: ingestStore} + + protocolsModel.On("GetByIDs", ctx, []string{"testproto"}).Return([]data.Protocols{ + {ID: "testproto", ClassificationStatus: data.StatusSuccess, HistoryMigrationStatus: data.StatusNotStarted}, + }, nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusInProgress).Return(nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusSuccess).Return(nil) + protocolContractsModel.On("GetByProtocolID", mock.Anything, "testproto").Return([]data.ProtocolContracts{}, nil) + + backend := &transientErrorBackend{ + multiLedgerBackend: multiLedgerBackend{ + ledgers: map[uint32]xdr.LedgerCloseMeta{ + 100: dummyLedgerMeta(100), + 101: dummyLedgerMeta(101), + }, + }, + } + // First PrepareRange call fails transiently; RetryWithBackoff must retry + // until success. A second call would be the retry and will succeed. + backend.prepareFailsLeft.Store(1) + + svc, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + DB: dbPool, LedgerBackend: backend, + ProtocolsModel: protocolsModel, ProtocolContractsModel: protocolContractsModel, + IngestStore: ingestStore, NetworkPassphrase: "Test SDF Network ; September 2015", + ConvergencePollTimeout: testConvergenceTimeout, + Processors: []ProtocolProcessor{processor}, + }) + require.NoError(t, err) + + err = svc.Run(ctx, []string{"testproto"}) + require.NoError(t, err) + + // Verify all ledgers were processed — the transient error did not cause premature convergence. + cursorVal := getIngestStoreValue(t, ctx, dbPool, "protocol_testproto_history_cursor") + assert.Equal(t, uint32(101), cursorVal) + assert.Equal(t, []uint32{100, 101}, processor.persistedSeqs) + }) + + t.Run("transient GetLedger error retries then converges", func(t *testing.T) { + ctx := context.Background() + dbPool, ingestStore := setupTestDB(t) + + setIngestStoreValue(t, ctx, dbPool, "oldest_ingest_ledger", 100) + setIngestStoreValue(t, ctx, dbPool, "latest_ingest_ledger", 101) + + _, err := dbPool.ExecContext(ctx, `INSERT INTO protocols (id, classification_status) VALUES ('testproto', 'success') ON CONFLICT (id) DO UPDATE SET classification_status = 'success'`) + require.NoError(t, err) + + protocolsModel := data.NewProtocolsModelMock(t) + protocolContractsModel := data.NewProtocolContractsModelMock(t) + processor := &recordingProcessor{id: "testproto", ingestStore: ingestStore} + + protocolsModel.On("GetByIDs", ctx, []string{"testproto"}).Return([]data.Protocols{ + {ID: "testproto", ClassificationStatus: data.StatusSuccess, HistoryMigrationStatus: data.StatusNotStarted}, + }, nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusInProgress).Return(nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusSuccess).Return(nil) + protocolContractsModel.On("GetByProtocolID", mock.Anything, "testproto").Return([]data.ProtocolContracts{}, nil) + + backend := &transientErrorBackend{ + multiLedgerBackend: multiLedgerBackend{ + ledgers: map[uint32]xdr.LedgerCloseMeta{ + 100: dummyLedgerMeta(100), + 101: dummyLedgerMeta(101), + }, + }, + } + // First GetLedger call for the convergence poll (ledger 102, which doesn't exist) + // will fail transiently instead of blocking until context done. + backend.missingGetLedgerFailsLeft.Store(1) + + svc, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + DB: dbPool, LedgerBackend: backend, + ProtocolsModel: protocolsModel, ProtocolContractsModel: protocolContractsModel, + IngestStore: ingestStore, NetworkPassphrase: "Test SDF Network ; September 2015", + ConvergencePollTimeout: testConvergenceTimeout, + Processors: []ProtocolProcessor{processor}, + }) + require.NoError(t, err) + + err = svc.Run(ctx, []string{"testproto"}) + require.NoError(t, err) + + // Verify all ledgers were processed — the transient error did not cause premature convergence. + cursorVal := getIngestStoreValue(t, ctx, dbPool, "protocol_testproto_history_cursor") + assert.Equal(t, uint32(101), cursorVal) + assert.Equal(t, []uint32{100, 101}, processor.persistedSeqs) + }) + + t.Run("tip advances mid-run — PrepareRange called once, new ledgers picked up", func(t *testing.T) { + ctx := context.Background() + dbPool, ingestStore := setupTestDB(t) + + setIngestStoreValue(t, ctx, dbPool, "oldest_ingest_ledger", 100) + + _, err := dbPool.ExecContext(ctx, `INSERT INTO protocols (id, classification_status) VALUES ('testproto', 'success') ON CONFLICT (id) DO UPDATE SET classification_status = 'success'`) + require.NoError(t, err) + + protocolsModel := data.NewProtocolsModelMock(t) + protocolContractsModel := data.NewProtocolContractsModelMock(t) + processor := &recordingProcessor{id: "testproto", ingestStore: ingestStore} + + protocolsModel.On("GetByIDs", ctx, []string{"testproto"}).Return([]data.Protocols{ + {ID: "testproto", ClassificationStatus: data.StatusSuccess, HistoryMigrationStatus: data.StatusNotStarted}, + }, nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusInProgress).Return(nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusSuccess).Return(nil) + protocolContractsModel.On("GetByProtocolID", mock.Anything, "testproto").Return([]data.ProtocolContracts{}, nil) + + backend := &rangeTrackingBackend{ + multiLedgerBackend: multiLedgerBackend{ + ledgers: map[uint32]xdr.LedgerCloseMeta{ + 100: dummyLedgerMeta(100), + 101: dummyLedgerMeta(101), + }, + }, + } + + // When the service first calls GetLedger for a missing sequence (102), + // inject ledgers 102 and 103 synchronously. The refactored design fetches + // them on the retry path inside the same GetLedger loop; no extra + // PrepareRange is needed. + backend.onMiss = func(_ uint32) { + backend.mu.Lock() + backend.multiLedgerBackend.ledgers[102] = dummyLedgerMeta(102) + backend.multiLedgerBackend.ledgers[103] = dummyLedgerMeta(103) + backend.mu.Unlock() + } + + svc, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + DB: dbPool, LedgerBackend: backend, + ProtocolsModel: protocolsModel, ProtocolContractsModel: protocolContractsModel, + IngestStore: ingestStore, NetworkPassphrase: "Test SDF Network ; September 2015", + ConvergencePollTimeout: testConvergenceTimeout, + Processors: []ProtocolProcessor{processor}, + }) + require.NoError(t, err) + + err = svc.Run(ctx, []string{"testproto"}) + require.NoError(t, err) + + // The refactor guarantees PrepareRange is called exactly once with an + // UnboundedRange — anything else would regress the RPCLedgerBackend fix. + backend.mu.Lock() + ranges := make([]rangeCall, len(backend.ranges)) + copy(ranges, backend.ranges) + backend.mu.Unlock() + + require.Len(t, ranges, 1, "PrepareRange must be called exactly once") + assert.False(t, ranges[0].bounded, "PrepareRange must be unbounded") + + // All ledgers 100-103 processed despite the tip advancing mid-run. + cursorVal := getIngestStoreValue(t, ctx, dbPool, "protocol_testproto_history_cursor") + assert.Equal(t, uint32(103), cursorVal) + assert.Equal(t, []uint32{100, 101, 102, 103}, processor.persistedSeqs) + + for _, seq := range []uint32{100, 101, 102, 103} { + val, ok := getHistorySentinel(t, ctx, dbPool, "testproto", seq) + require.True(t, ok, "sentinel for ledger %d should exist", seq) + assert.Equal(t, seq, val, "sentinel value for ledger %d", seq) + } + }) + + t.Run("PrepareRange called exactly once — guards against RPCLedgerBackend re-prepare error", func(t *testing.T) { + // Simulates RPCLedgerBackend's single-prepare constraint. If the service + // ever regresses and calls PrepareRange a second time, this backend + // returns an error and Run fails. + ctx := context.Background() + dbPool, ingestStore := setupTestDB(t) + + setIngestStoreValue(t, ctx, dbPool, "oldest_ingest_ledger", 100) + + _, err := dbPool.ExecContext(ctx, `INSERT INTO protocols (id, classification_status) VALUES ('testproto', 'success') ON CONFLICT (id) DO UPDATE SET classification_status = 'success'`) + require.NoError(t, err) + + protocolsModel := data.NewProtocolsModelMock(t) + protocolContractsModel := data.NewProtocolContractsModelMock(t) + processor := &recordingProcessor{id: "testproto", ingestStore: ingestStore} + + protocolsModel.On("GetByIDs", ctx, []string{"testproto"}).Return([]data.Protocols{ + {ID: "testproto", ClassificationStatus: data.StatusSuccess, HistoryMigrationStatus: data.StatusNotStarted}, + }, nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusInProgress).Return(nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusSuccess).Return(nil) + protocolContractsModel.On("GetByProtocolID", mock.Anything, "testproto").Return([]data.ProtocolContracts{}, nil) + + backend := &prepareEnforcingBackend{ + multiLedgerBackend: multiLedgerBackend{ + ledgers: map[uint32]xdr.LedgerCloseMeta{ + 100: dummyLedgerMeta(100), + 101: dummyLedgerMeta(101), + 102: dummyLedgerMeta(102), + }, + }, + } + + svc, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + DB: dbPool, LedgerBackend: backend, + ProtocolsModel: protocolsModel, ProtocolContractsModel: protocolContractsModel, + IngestStore: ingestStore, NetworkPassphrase: "Test SDF Network ; September 2015", + ConvergencePollTimeout: testConvergenceTimeout, + Processors: []ProtocolProcessor{processor}, + }) + require.NoError(t, err) + + err = svc.Run(ctx, []string{"testproto"}) + require.NoError(t, err) + + assert.Equal(t, int32(1), backend.prepareCalls.Load(), "PrepareRange must be called exactly once") + }) + + t.Run("context cancelled during fetch — returns context error, status failed", func(t *testing.T) { + // Aditya review comment #8: the ctx.Done() select inside the fetch loop + // had no test coverage. Deterministic: we cancel the parent context from + // inside a GetLedger onMiss hook (ledger 101 is absent), so cancellation + // fires while fetchLedgerOrConverge is trying to serve 101 — before any + // tracker processes it. + parentCtx := context.Background() + dbPool, ingestStore := setupTestDB(t) + + setIngestStoreValue(t, parentCtx, dbPool, "oldest_ingest_ledger", 100) + + _, err := dbPool.ExecContext(parentCtx, `INSERT INTO protocols (id, classification_status) VALUES ('testproto', 'success') ON CONFLICT (id) DO UPDATE SET classification_status = 'success'`) + require.NoError(t, err) + + protocolsModel := data.NewProtocolsModelMock(t) + protocolContractsModel := data.NewProtocolContractsModelMock(t) + processor := &recordingProcessor{id: "testproto", ingestStore: ingestStore} + + protocolsModel.On("GetByIDs", mock.Anything, []string{"testproto"}).Return([]data.Protocols{ + {ID: "testproto", ClassificationStatus: data.StatusSuccess, HistoryMigrationStatus: data.StatusNotStarted}, + }, nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusInProgress).Return(nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusFailed).Return(nil) + protocolContractsModel.On("GetByProtocolID", mock.Anything, "testproto").Return([]data.ProtocolContracts{}, nil) + + cancelCtx, cancel := context.WithCancel(parentCtx) + backend := &rangeTrackingBackend{ + multiLedgerBackend: multiLedgerBackend{ + ledgers: map[uint32]xdr.LedgerCloseMeta{ + 100: dummyLedgerMeta(100), + }, + }, + } + // On the first miss (seq 101 is absent), cancel the parent context. + // fetchLedgerOrConverge then sees ctx.Err() != nil and returns a + // "context cancelled" error. + backend.onMiss = func(_ uint32) { cancel() } + + svc, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + DB: dbPool, LedgerBackend: backend, + ProtocolsModel: protocolsModel, ProtocolContractsModel: protocolContractsModel, + IngestStore: ingestStore, NetworkPassphrase: "Test SDF Network ; September 2015", + ConvergencePollTimeout: testConvergenceTimeout, + Processors: []ProtocolProcessor{processor}, + }) + require.NoError(t, err) + + err = svc.Run(cancelCtx, []string{"testproto"}) + require.Error(t, err) + assert.Contains(t, err.Error(), "context") + + // Ledger 100 was served before cancellation and should be committed. + // Ledger 101 was the trigger for cancellation and must not be committed. + assert.Equal(t, []uint32{100}, processor.persistedSeqs, "only ledger 100 persisted") + cursorVal := getIngestStoreValue(t, parentCtx, dbPool, "protocol_testproto_history_cursor") + assert.Equal(t, uint32(100), cursorVal, "cursor advances only for committed ledger 100") + }) + + t.Run("oldest_ingest_ledger is 0 — returns error, does not call backend", func(t *testing.T) { + // Aditya review comment #8: the "ingestion has not started yet" guard + // has no test coverage. + ctx := context.Background() + dbPool, ingestStore := setupTestDB(t) + + // Do NOT set oldest_ingest_ledger; the ingest store returns 0 by default. + + _, err := dbPool.ExecContext(ctx, `INSERT INTO protocols (id, classification_status) VALUES ('testproto', 'success') ON CONFLICT (id) DO UPDATE SET classification_status = 'success'`) + require.NoError(t, err) + + protocolsModel := data.NewProtocolsModelMock(t) + protocolContractsModel := data.NewProtocolContractsModelMock(t) + processorMock := NewProtocolProcessorMock(t) + + processorMock.On("ProtocolID").Return("testproto") + protocolsModel.On("GetByIDs", ctx, []string{"testproto"}).Return([]data.Protocols{ + {ID: "testproto", ClassificationStatus: data.StatusSuccess, HistoryMigrationStatus: data.StatusNotStarted}, + }, nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusInProgress).Return(nil) + protocolsModel.On("UpdateHistoryMigrationStatus", mock.Anything, mock.Anything, []string{"testproto"}, data.StatusFailed).Return(nil) + + backend := &prepareEnforcingBackend{ + multiLedgerBackend: multiLedgerBackend{ledgers: map[uint32]xdr.LedgerCloseMeta{}}, + } + + svc, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + DB: dbPool, LedgerBackend: backend, + ProtocolsModel: protocolsModel, ProtocolContractsModel: protocolContractsModel, + IngestStore: ingestStore, NetworkPassphrase: "Test SDF Network ; September 2015", + ConvergencePollTimeout: testConvergenceTimeout, + Processors: []ProtocolProcessor{processorMock}, + }) + require.NoError(t, err) + + err = svc.Run(ctx, []string{"testproto"}) + require.Error(t, err) + assert.Contains(t, err.Error(), "ingestion has not started yet") + + // Critical: the backend must not be touched if oldest_ingest_ledger is 0. + assert.Equal(t, int32(0), backend.prepareCalls.Load(), "PrepareRange should not be called when oldest_ingest_ledger is 0") + }) +} + +func TestNewProtocolMigrateHistoryService(t *testing.T) { + t.Run("nil processor returns error", func(t *testing.T) { + _, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + Processors: []ProtocolProcessor{nil}, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "nil") + }) + + t.Run("duplicate processor ID returns error", func(t *testing.T) { + proc1 := &ProtocolProcessorMock{} + proc1.On("ProtocolID").Return("dup") + proc2 := &ProtocolProcessorMock{} + proc2.On("ProtocolID").Return("dup") + + _, err := NewProtocolMigrateHistoryService(ProtocolMigrateHistoryConfig{ + Processors: []ProtocolProcessor{proc1, proc2}, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "duplicate") + }) +} diff --git a/internal/utils/collections.go b/internal/utils/collections.go new file mode 100644 index 000000000..3ad3c05b2 --- /dev/null +++ b/internal/utils/collections.go @@ -0,0 +1,17 @@ +package utils + +import "fmt" + +// BuildMap converts a slice into a map keyed by keyFn. Returns an error if +// any two elements produce the same key. +func BuildMap[T any](items []T, keyFn func(T) string) (map[string]T, error) { + result := make(map[string]T, len(items)) + for _, item := range items { + key := keyFn(item) + if _, exists := result[key]; exists { + return nil, fmt.Errorf("duplicate key %q", key) + } + result[key] = item + } + return result, nil +} diff --git a/internal/utils/collections_test.go b/internal/utils/collections_test.go new file mode 100644 index 000000000..f8af571b7 --- /dev/null +++ b/internal/utils/collections_test.go @@ -0,0 +1,37 @@ +package utils + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBuildMap_Success(t *testing.T) { + type item struct { + id string + name string + } + items := []item{ + {id: "a", name: "Alice"}, + {id: "b", name: "Bob"}, + } + result, err := BuildMap(items, func(i item) string { return i.id }) + require.NoError(t, err) + assert.Len(t, result, 2) + assert.Equal(t, "Alice", result["a"].name) + assert.Equal(t, "Bob", result["b"].name) +} + +func TestBuildMap_EmptySlice(t *testing.T) { + result, err := BuildMap([]string{}, func(s string) string { return s }) + require.NoError(t, err) + assert.Empty(t, result) +} + +func TestBuildMap_DuplicateKey(t *testing.T) { + items := []string{"x", "y", "x"} + _, err := BuildMap(items, func(s string) string { return s }) + require.Error(t, err) + assert.Contains(t, err.Error(), `duplicate key "x"`) +} diff --git a/internal/utils/ingestion_utils.go b/internal/utils/ingestion_utils.go index a8bd58546..937c4f431 100644 --- a/internal/utils/ingestion_utils.go +++ b/internal/utils/ingestion_utils.go @@ -1,6 +1,7 @@ package utils import ( + "fmt" "strconv" "github.com/stellar/go-stellar-sdk/xdr" @@ -41,3 +42,13 @@ func Memo(memo xdr.Memo, txHash string) (*string, string) { // sentry.CaptureException(fmt.Errorf("failed to parse memo for type %q and transaction %s", memoType.String(), txHash)) return nil, memoType.String() } + +// ProtocolHistoryCursorName returns the ingest_store key for a protocol's history migration cursor. +func ProtocolHistoryCursorName(protocolID string) string { + return fmt.Sprintf("protocol_%s_history_cursor", protocolID) +} + +// ProtocolCurrentStateCursorName returns the ingest_store key for a protocol's current state cursor. +func ProtocolCurrentStateCursorName(protocolID string) string { + return fmt.Sprintf("protocol_%s_current_state_cursor", protocolID) +} diff --git a/internal/utils/retry.go b/internal/utils/retry.go new file mode 100644 index 000000000..d82f973f5 --- /dev/null +++ b/internal/utils/retry.go @@ -0,0 +1,51 @@ +package utils + +import ( + "context" + "fmt" + "time" +) + +// RetryWithBackoff calls fn up to maxRetries times with exponential backoff +// capped at maxBackoff. It respects context cancellation between attempts. +// onRetry, if non-nil, is called before each backoff wait with the attempt +// number (0-indexed), the error, and the backoff duration. +func RetryWithBackoff[T any]( + ctx context.Context, + maxRetries int, + maxBackoff time.Duration, + fn func(ctx context.Context) (T, error), + onRetry func(attempt int, err error, backoff time.Duration), +) (T, error) { + var zero T + var lastErr error + for attempt := 0; attempt < maxRetries; attempt++ { + select { + case <-ctx.Done(): + return zero, fmt.Errorf("context cancelled: %w", ctx.Err()) + default: + } + + result, err := fn(ctx) + if err == nil { + return result, nil + } + lastErr = err + + backoff := time.Duration(1< maxBackoff { + backoff = maxBackoff + } + + if onRetry != nil { + onRetry(attempt, err, backoff) + } + + select { + case <-ctx.Done(): + return zero, fmt.Errorf("context cancelled during backoff: %w", ctx.Err()) + case <-time.After(backoff): + } + } + return zero, fmt.Errorf("failed after %d attempts: %w", maxRetries, lastErr) +} diff --git a/internal/utils/retry_test.go b/internal/utils/retry_test.go new file mode 100644 index 000000000..4cb23c7ac --- /dev/null +++ b/internal/utils/retry_test.go @@ -0,0 +1,96 @@ +package utils + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRetryWithBackoff_SucceedsFirstAttempt(t *testing.T) { + result, err := RetryWithBackoff(context.Background(), 3, 10*time.Second, + func(ctx context.Context) (string, error) { + return "ok", nil + }, nil) + require.NoError(t, err) + assert.Equal(t, "ok", result) +} + +func TestRetryWithBackoff_SucceedsAfterRetries(t *testing.T) { + attempts := 0 + result, err := RetryWithBackoff(context.Background(), 5, 1*time.Second, + func(ctx context.Context) (int, error) { + attempts++ + if attempts < 3 { + return 0, errors.New("not yet") + } + return 42, nil + }, nil) + require.NoError(t, err) + assert.Equal(t, 42, result) + assert.Equal(t, 3, attempts) +} + +func TestRetryWithBackoff_ExhaustsRetries(t *testing.T) { + sentinel := errors.New("persistent failure") + attempts := 0 + _, err := RetryWithBackoff(context.Background(), 3, 1*time.Second, + func(ctx context.Context) (string, error) { + attempts++ + return "", sentinel + }, nil) + require.Error(t, err) + assert.ErrorIs(t, err, sentinel) + assert.Contains(t, err.Error(), "failed after 3 attempts") + assert.Equal(t, 3, attempts) +} + +func TestRetryWithBackoff_RespectsContextCancellation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + _, err := RetryWithBackoff(ctx, 5, 10*time.Second, + func(ctx context.Context) (string, error) { + return "", errors.New("should not reach") + }, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "context cancelled") +} + +func TestRetryWithBackoff_CallsOnRetry(t *testing.T) { + var retryAttempts []int + sentinel := errors.New("fail") + + _, err := RetryWithBackoff(context.Background(), 3, 1*time.Second, + func(ctx context.Context) (string, error) { + return "", sentinel + }, + func(attempt int, err error, backoff time.Duration) { + retryAttempts = append(retryAttempts, attempt) + assert.ErrorIs(t, err, sentinel) + assert.Greater(t, backoff, time.Duration(0)) + }) + require.Error(t, err) + assert.Equal(t, []int{0, 1, 2}, retryAttempts) +} + +func TestRetryWithBackoff_CapsBackoff(t *testing.T) { + maxBackoff := 2 * time.Second + var observedBackoffs []time.Duration + + _, err := RetryWithBackoff(context.Background(), 5, maxBackoff, + func(ctx context.Context) (string, error) { + return "", errors.New("fail") + }, + func(attempt int, err error, backoff time.Duration) { + observedBackoffs = append(observedBackoffs, backoff) + }) + require.Error(t, err) + + for _, b := range observedBackoffs { + assert.LessOrEqual(t, b, maxBackoff) + } +}