Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
0159aa3
removes the enabled flag on the protocols table, adds a new migration…
aristidesstaffieri Feb 19, 2026
8e56f1a
updates diagram for checkpoint population flow to better reflect the …
aristidesstaffieri Feb 19, 2026
049947e
updates live ingestion classification diagram to better reflect the d…
aristidesstaffieri Feb 19, 2026
e822401
removes incorrect details about live ingestions relationship to proto…
aristidesstaffieri Feb 19, 2026
e7de88e
Updates the migration design to be aware of the history retention win…
aristidesstaffieri Feb 19, 2026
882ddc4
updates live ingestion state production diagram to better reflect the…
aristidesstaffieri Feb 19, 2026
f9365f1
removes migration status checks from current state queries, exposes m…
aristidesstaffieri Feb 19, 2026
2c6669f
Extract checkpoint population into dedicated services, add known_wasm…
aristidesstaffieri Feb 20, 2026
89db549
renames known_wasms to protocol_wasms
aristidesstaffieri Feb 26, 2026
73c4d30
Add unit tests for tokenProcessor.ProcessContractCode
aristidesstaffieri Feb 27, 2026
25838f4
services/wasm_ingestion: remove ProtocolValidator execution from Wasm…
Copilot Feb 27, 2026
4e4854b
Simplify ProcessContractCode to pass only WASM hashes, refactor Token…
aristidesstaffieri Mar 9, 2026
bd8f7a0
Add protocol_contracts table and populate during checkpoint
aristidesstaffieri Mar 9, 2026
8ae0c61
Populate protocol_wasms and protocol_contracts during live ingestion …
aristidesstaffieri Mar 9, 2026
a0df89e
Fix FK violation when persisting protocol contracts with evicted WASMs
aristidesstaffieri Mar 9, 2026
1e24f98
renames known_wasms to protocol_wasms for missing references in desig…
aristidesstaffieri Mar 10, 2026
cfb4f19
Change protocol_wasms and protocol_contracts columns from TEXT to BYTEA
aristidesstaffieri Mar 10, 2026
55d3349
Remove redundant protocol_id column from protocol_contracts table
aristidesstaffieri Mar 10, 2026
e1ead2a
Use HashBytea type for protocol wasm/contract bytea fields
aristidesstaffieri Mar 11, 2026
11ad8e5
replaces remaining known_wasms references in diagrams with protocol_w…
aristidesstaffieri Mar 11, 2026
2ad8fcf
Rename ProtocolContract to ProtocolContracts to match table name conv…
aristidesstaffieri Mar 12, 2026
df342c8
runs fmt and tidy to abide by lint rules
aristidesstaffieri Mar 12, 2026
d56dc39
Consolidate WasmIngestionService and checkpoint token logic into Chec…
aristidesstaffieri Mar 13, 2026
f9266d3
Fix checkpoint test mock expectations for consolidated CheckpointService
aristidesstaffieri Mar 13, 2026
51cb115
Extract checkpoint population into dedicated services, add known_wasm…
aristidesstaffieri Feb 20, 2026
26908fb
Remove dead known_wasms model, tests, and migration
aristidesstaffieri Mar 11, 2026
e8dfa1c
Rename Protocol and ProtocolWasm models to plural form matching table…
aristidesstaffieri Mar 13, 2026
ef7c588
Add live protocol state production pipeline with dual CAS gating
aristidesstaffieri Mar 17, 2026
403b1fc
test: consolidate data migration integration coverage
aristidesstaffieri Mar 17, 2026
55c1c82
Validate ProtocolProcessors in NewIngestService for nil and duplicate…
aristidesstaffieri Mar 18, 2026
5af595e
Remove non-transactional reads from PersistLedgerData transaction
aristidesstaffieri Mar 18, 2026
0113123
guard against ledger sequence 0 edge case, dont treat 0 as a valid CAS
aristidesstaffieri Mar 18, 2026
9de6977
Preserve protocol contract cache entries on partial refresh failure
aristidesstaffieri Mar 18, 2026
f0d2ad4
Remove vacuous skippedProcessor assertion from produceProtocolStateFo…
aristidesstaffieri Mar 18, 2026
f1cf92c
Fix TOCTOU gap, missing metrics, and lock-during-IO in live ingestion
aristidesstaffieri Mar 18, 2026
ab14a8a
Fix query storm from partial protocol contract cache refresh failure
aristidesstaffieri Mar 18, 2026
b19dc66
Add protocol history migration service and asymmetric CAS integration…
aristidesstaffieri Mar 19, 2026
1afbb7d
Extract shared helpers between history migration and live ingestion
aristidesstaffieri Mar 20, 2026
69faa5f
Fix convergence poll to distinguish timeout from transient RPC errors
aristidesstaffieri Mar 20, 2026
987ec5a
Fix hardcoded cursor names in protocol history migration
aristidesstaffieri Mar 20, 2026
91413eb
Document and test BoundedRange/UnboundedRange transition on shared le…
aristidesstaffieri Mar 20, 2026
417d911
Fix bulk StatusFailed marking to exclude handed-off protocols
aristidesstaffieri Mar 20, 2026
b96e8f4
De-duplicate protocolIDs early in validate()
aristidesstaffieri Mar 20, 2026
791ae79
tweaks style and formatting to abide by the linter
aristidesstaffieri Mar 23, 2026
209b853
Extract generic utilities from ingest helpers and eliminate helper file
aristidesstaffieri Mar 23, 2026
a22f382
Add migration for protocol_wasms and protocol_contracts tables
aristidesstaffieri Mar 30, 2026
3c7714f
feature(internal): single UnboundedRange for history migration
aristidesstaffieri Apr 13, 2026
a5a260c
refactor(internal): remove test-only receiver methods from ingestService
aristidesstaffieri Apr 13, 2026
9a4f29b
refactor(internal): restore BatchGetByProtocolIDs for cache refresh
aristidesstaffieri Apr 13, 2026
64de5ec
refactor(internal): drop mutex from protocolContractCache
aristidesstaffieri Apr 13, 2026
4ee42c9
feature(cmd): use datastore backend for protocol history migration
aristidesstaffieri Apr 13, 2026
73c3821
fix(graphql): reorder imports to satisfy goimports -local
aristidesstaffieri Apr 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 227 additions & 0 deletions cmd/protocol_migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package cmd

import (
"context"
"fmt"
"go/types"
"io"

_ "github.com/lib/pq"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/stellar/go-stellar-sdk/support/config"
"github.com/stellar/go-stellar-sdk/support/log"

"github.com/stellar/wallet-backend/cmd/utils"
"github.com/stellar/wallet-backend/internal/data"
"github.com/stellar/wallet-backend/internal/db"
"github.com/stellar/wallet-backend/internal/ingest"
"github.com/stellar/wallet-backend/internal/metrics"
"github.com/stellar/wallet-backend/internal/services"
internalutils "github.com/stellar/wallet-backend/internal/utils"
)

type protocolMigrateCmd struct{}

func (c *protocolMigrateCmd) Command() *cobra.Command {
cmd := &cobra.Command{
Use: "protocol-migrate",
Short: "Data migration commands for protocol state",
Long: "Parent command for protocol data migrations. Use subcommands to run specific migration tasks.",
Run: func(cmd *cobra.Command, args []string) {
if err := cmd.Help(); err != nil {
log.Fatalf("Error calling help command: %s", err.Error())
}
},
}

cmd.AddCommand(c.historyCommand())

return cmd
}

// historyCmdOpts holds the resolved flag values for `protocol-migrate history`.
type historyCmdOpts struct {
databaseURL string
rpcURL string
networkPassphrase string
protocolIDs []string
logLevel string
oldestLedgerCursorName string
ledgerBackendType string
datastoreConfigPath string
getLedgersLimit int
}

func (c *protocolMigrateCmd) historyCommand() *cobra.Command {
var opts historyCmdOpts

cfgOpts := config.ConfigOptions{
utils.DatabaseURLOption(&opts.databaseURL),
utils.NetworkPassphraseOption(&opts.networkPassphrase),
// RPC URL is only required when --ledger-backend-type=rpc; validated in PersistentPreRunE.
{
Name: "rpc-url",
Usage: "The URL of the RPC Server. Required when --ledger-backend-type=rpc.",
OptType: types.String,
ConfigKey: &opts.rpcURL,
FlagDefault: "",
Required: false,
},
{
Name: "ledger-backend-type",
Usage: "Type of ledger backend to use for fetching historical ledgers. Options: 'rpc' or 'datastore' (default). Datastore is recommended for migrations because it can reach ledgers outside the RPC retention window.",
OptType: types.String,
ConfigKey: &opts.ledgerBackendType,
FlagDefault: string(ingest.LedgerBackendTypeDatastore),
Required: false,
},
{
Name: "datastore-config-path",
Usage: "Path to TOML config file for datastore backend. Required when --ledger-backend-type=datastore.",
OptType: types.String,
ConfigKey: &opts.datastoreConfigPath,
FlagDefault: "config/datastore-pubnet.toml",
Required: false,
},
{
Name: "get-ledgers-limit",
Usage: "Per-request ledger buffer size for the RPC backend. Ignored for datastore.",
OptType: types.Int,
ConfigKey: &opts.getLedgersLimit,
FlagDefault: 10,
Required: false,
},
}

cmd := &cobra.Command{
Use: "history",
Short: "Backfill protocol history state from oldest to latest ingested ledger",
Long: "Processes historical ledgers from oldest_ingest_ledger to the tip, producing protocol state changes and converging with live ingestion via CAS-gated cursors.",
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
if err := cfgOpts.RequireE(); err != nil {
return fmt.Errorf("requiring values of config options: %w", err)
}
if err := cfgOpts.SetValues(); err != nil {
return fmt.Errorf("setting values of config options: %w", err)
}

if opts.logLevel != "" {
ll, err := logrus.ParseLevel(opts.logLevel)
if err != nil {
return fmt.Errorf("invalid log level %q: %w", opts.logLevel, err)
}
log.DefaultLogger.SetLevel(ll)
}

if len(opts.protocolIDs) == 0 {
return fmt.Errorf("at least one --protocol-id is required")
}

// Per-backend required-field validation.
switch opts.ledgerBackendType {
case string(ingest.LedgerBackendTypeRPC):
if opts.rpcURL == "" {
return fmt.Errorf("--rpc-url is required when --ledger-backend-type=rpc")
}
case string(ingest.LedgerBackendTypeDatastore):
if opts.datastoreConfigPath == "" {
return fmt.Errorf("--datastore-config-path is required when --ledger-backend-type=datastore")
}
default:
return fmt.Errorf("invalid --ledger-backend-type %q, must be 'rpc' or 'datastore'", opts.ledgerBackendType)
}
return nil
},
RunE: func(_ *cobra.Command, _ []string) error {
return c.RunHistory(opts)
},
}

if err := cfgOpts.Init(cmd); err != nil {
log.Fatalf("Error initializing a config option: %s", err.Error())
}

cmd.Flags().StringSliceVar(&opts.protocolIDs, "protocol-id", nil, "Protocol ID(s) to migrate (required, repeatable)")
cmd.Flags().StringVar(&opts.logLevel, "log-level", "", `Log level: "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL", "PANIC"`)
cmd.Flags().StringVar(&opts.oldestLedgerCursorName, "oldest-ledger-cursor-name", data.OldestLedgerCursorName, "Name of the oldest ledger cursor in the ingest store. Must match the value used by the ingest service.")

return cmd
}

func (c *protocolMigrateCmd) RunHistory(opts historyCmdOpts) error {
ctx := context.Background()

// Build processors from protocol IDs using the dynamic registry
var processors []services.ProtocolProcessor
for _, pid := range opts.protocolIDs {
factory, ok := services.GetProcessor(pid)
if !ok {
return fmt.Errorf("unknown protocol ID %q — no processor registered", pid)
}
p := factory()
if p == nil {
return fmt.Errorf("processor factory for protocol %q returned nil", pid)
}
processors = append(processors, p)
}

// Open DB connection
dbPool, err := db.OpenDBConnectionPool(opts.databaseURL)
if err != nil {
return fmt.Errorf("opening database connection: %w", err)
}
defer internalutils.DeferredClose(ctx, dbPool, "closing dbPool in protocol migrate history")

// Create models
sqlxDB, err := dbPool.SqlxDB(ctx)
if err != nil {
return fmt.Errorf("getting sqlx DB: %w", err)
}
metricsService := metrics.NewMetricsService(sqlxDB)
models, err := data.NewModels(dbPool, metricsService)
if err != nil {
return fmt.Errorf("creating models: %w", err)
}

// Build a ledger backend using the same selector the ingest service uses,
// so protocol-migrate inherits the datastore path (recommended for
// backfills — unbounded history, unlike RPC retention windows).
ledgerBackend, err := ingest.NewLedgerBackend(ctx, ingest.Configs{
LedgerBackendType: ingest.LedgerBackendType(opts.ledgerBackendType),
DatastoreConfigPath: opts.datastoreConfigPath,
NetworkPassphrase: opts.networkPassphrase,
RPCURL: opts.rpcURL,
GetLedgersLimit: opts.getLedgersLimit,
})
if err != nil {
return fmt.Errorf("creating ledger backend: %w", err)
}
defer func() {
if closer, ok := ledgerBackend.(io.Closer); ok {
if closeErr := closer.Close(); closeErr != nil {
log.Ctx(ctx).Errorf("error closing ledger backend: %v", closeErr)
}
}
}()

service, err := services.NewProtocolMigrateHistoryService(services.ProtocolMigrateHistoryConfig{
DB: dbPool,
LedgerBackend: ledgerBackend,
ProtocolsModel: models.Protocols,
ProtocolContractsModel: models.ProtocolContracts,
IngestStore: models.IngestStore,
NetworkPassphrase: opts.networkPassphrase,
Processors: processors,
OldestLedgerCursorName: opts.oldestLedgerCursorName,
})
if err != nil {
return fmt.Errorf("creating protocol migrate history service: %w", err)
}

if err := service.Run(ctx, opts.protocolIDs); err != nil {
return fmt.Errorf("running protocol migrate history: %w", err)
}

return nil
}
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,5 @@ func SetupCLI(cfg RootConfig) {
rootCmd.AddCommand((&distributionAccountCmd{}).Command())
rootCmd.AddCommand((&loadtestCmd{}).Command())
rootCmd.AddCommand((&protocolSetupCmd{}).Command())
rootCmd.AddCommand((&protocolMigrateCmd{}).Command())
}
5 changes: 5 additions & 0 deletions internal/data/ingest_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ import (
"github.com/stellar/wallet-backend/internal/utils"
)

const (
LatestLedgerCursorName = "latest_ingest_ledger"
OldestLedgerCursorName = "oldest_ingest_ledger"
)

type LedgerRange struct {
GapStart uint32 `db:"gap_start"`
GapEnd uint32 `db:"gap_end"`
Expand Down
5 changes: 5 additions & 0 deletions internal/data/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@ func (m *ProtocolsModelMock) UpdateClassificationStatus(ctx context.Context, dbT
return args.Error(0)
}

func (m *ProtocolsModelMock) UpdateHistoryMigrationStatus(ctx context.Context, dbTx pgx.Tx, protocolIDs []string, status string) error {
args := m.Called(ctx, dbTx, protocolIDs, status)
return args.Error(0)
}

func (m *ProtocolsModelMock) GetByIDs(ctx context.Context, protocolIDs []string) ([]Protocols, error) {
args := m.Called(ctx, protocolIDs)
if args.Get(0) == nil {
Expand Down
25 changes: 25 additions & 0 deletions internal/data/protocols.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Protocols struct {
// ProtocolsModelInterface defines the interface for protocols operations.
type ProtocolsModelInterface interface {
UpdateClassificationStatus(ctx context.Context, dbTx pgx.Tx, protocolIDs []string, status string) error
UpdateHistoryMigrationStatus(ctx context.Context, dbTx pgx.Tx, protocolIDs []string, status string) error
GetByIDs(ctx context.Context, protocolIDs []string) ([]Protocols, error)
GetClassified(ctx context.Context) ([]Protocols, error)
InsertIfNotExists(ctx context.Context, dbTx pgx.Tx, protocolID string) error
Expand Down Expand Up @@ -70,6 +71,30 @@ func (m *ProtocolsModel) UpdateClassificationStatus(ctx context.Context, dbTx pg
return nil
}

// UpdateHistoryMigrationStatus updates history_migration_status and updated_at for the given protocol IDs.
func (m *ProtocolsModel) UpdateHistoryMigrationStatus(ctx context.Context, dbTx pgx.Tx, protocolIDs []string, status string) error {
if len(protocolIDs) == 0 {
return nil
}

const query = `
UPDATE protocols
SET history_migration_status = $1, updated_at = NOW()
WHERE id = ANY($2)
`

start := time.Now()
_, err := dbTx.Exec(ctx, query, status, protocolIDs)
if err != nil {
m.MetricsService.IncDBQueryError("UpdateHistoryMigrationStatus", "protocols", utils.GetDBErrorType(err))
return fmt.Errorf("updating history migration status for protocols: %w", err)
}

m.MetricsService.ObserveDBQueryDuration("UpdateHistoryMigrationStatus", "protocols", time.Since(start).Seconds())
m.MetricsService.IncDBQuery("UpdateHistoryMigrationStatus", "protocols")
return nil
}

// GetByIDs returns protocols matching the given IDs.
func (m *ProtocolsModel) GetByIDs(ctx context.Context, protocolIDs []string) ([]Protocols, error) {
if len(protocolIDs) == 0 {
Expand Down
28 changes: 28 additions & 0 deletions internal/data/protocols_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,34 @@ func TestProtocolsModel(t *testing.T) {
assert.Equal(t, StatusInProgress, status)
})

t.Run("UpdateHistoryMigrationStatus updates status", func(t *testing.T) {
cleanUpDB()
mockMetricsService := metrics.NewMockMetricsService()
mockMetricsService.On("ObserveDBQueryDuration", mock.Anything, mock.Anything, mock.Anything).Return()
mockMetricsService.On("IncDBQuery", mock.Anything, mock.Anything).Return()
defer mockMetricsService.AssertExpectations(t)

model := &ProtocolsModel{DB: dbConnectionPool, MetricsService: mockMetricsService}

// Insert protocol first
err := db.RunInPgxTransaction(ctx, dbConnectionPool, func(dbTx pgx.Tx) error {
return model.InsertIfNotExists(ctx, dbTx, "SEP41")
})
require.NoError(t, err)

// Update status
err = db.RunInPgxTransaction(ctx, dbConnectionPool, func(dbTx pgx.Tx) error {
return model.UpdateHistoryMigrationStatus(ctx, dbTx, []string{"SEP41"}, StatusInProgress)
})
require.NoError(t, err)

// Verify
var status string
err = dbConnectionPool.GetContext(ctx, &status, `SELECT history_migration_status FROM protocols WHERE id = 'SEP41'`)
require.NoError(t, err)
assert.Equal(t, StatusInProgress, status)
})

t.Run("GetByIDs returns matching protocols", func(t *testing.T) {
cleanUpDB()
mockMetricsService := metrics.NewMockMetricsService()
Expand Down
9 changes: 0 additions & 9 deletions internal/db/migrations/2026-03-09.1-protocol_wasms.sql

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
-- +migrate Up
CREATE TABLE protocol_wasms (
wasm_hash BYTEA PRIMARY KEY,
protocol_id TEXT REFERENCES protocols(id),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE protocol_contracts (
contract_id BYTEA PRIMARY KEY,
wasm_hash BYTEA NOT NULL REFERENCES protocol_wasms(wasm_hash),
name TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX idx_protocol_contracts_wasm_hash ON protocol_contracts(wasm_hash);

-- +migrate Down
DROP TABLE IF EXISTS protocol_contracts;
DROP TABLE IF EXISTS protocol_wasms;
Loading
Loading