-
Notifications
You must be signed in to change notification settings - Fork 13
feature(internal, cmd): protocol-migrate history #546
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
aristidesstaffieri
wants to merge
52
commits into
feature/data-migrations
Choose a base branch
from
feature/protocol-migrate-history
base: feature/data-migrations
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
52 commits
Select commit
Hold shift + click to select a range
0159aa3
removes the enabled flag on the protocols table, adds a new migration…
aristidesstaffieri 8e56f1a
updates diagram for checkpoint population flow to better reflect the …
aristidesstaffieri 049947e
updates live ingestion classification diagram to better reflect the d…
aristidesstaffieri e822401
removes incorrect details about live ingestions relationship to proto…
aristidesstaffieri e7de88e
Updates the migration design to be aware of the history retention win…
aristidesstaffieri 882ddc4
updates live ingestion state production diagram to better reflect the…
aristidesstaffieri f9365f1
removes migration status checks from current state queries, exposes m…
aristidesstaffieri 2c6669f
Extract checkpoint population into dedicated services, add known_wasm…
aristidesstaffieri 89db549
renames known_wasms to protocol_wasms
aristidesstaffieri 73c4d30
Add unit tests for tokenProcessor.ProcessContractCode
aristidesstaffieri 25838f4
services/wasm_ingestion: remove ProtocolValidator execution from Wasm…
Copilot 4e4854b
Simplify ProcessContractCode to pass only WASM hashes, refactor Token…
aristidesstaffieri bd8f7a0
Add protocol_contracts table and populate during checkpoint
aristidesstaffieri 8ae0c61
Populate protocol_wasms and protocol_contracts during live ingestion …
aristidesstaffieri a0df89e
Fix FK violation when persisting protocol contracts with evicted WASMs
aristidesstaffieri 1e24f98
renames known_wasms to protocol_wasms for missing references in desig…
aristidesstaffieri cfb4f19
Change protocol_wasms and protocol_contracts columns from TEXT to BYTEA
aristidesstaffieri 55d3349
Remove redundant protocol_id column from protocol_contracts table
aristidesstaffieri e1ead2a
Use HashBytea type for protocol wasm/contract bytea fields
aristidesstaffieri 11ad8e5
replaces remaining known_wasms references in diagrams with protocol_w…
aristidesstaffieri 2ad8fcf
Rename ProtocolContract to ProtocolContracts to match table name conv…
aristidesstaffieri df342c8
runs fmt and tidy to abide by lint rules
aristidesstaffieri d56dc39
Consolidate WasmIngestionService and checkpoint token logic into Chec…
aristidesstaffieri f9266d3
Fix checkpoint test mock expectations for consolidated CheckpointService
aristidesstaffieri 51cb115
Extract checkpoint population into dedicated services, add known_wasm…
aristidesstaffieri 26908fb
Remove dead known_wasms model, tests, and migration
aristidesstaffieri e8dfa1c
Rename Protocol and ProtocolWasm models to plural form matching table…
aristidesstaffieri ef7c588
Add live protocol state production pipeline with dual CAS gating
aristidesstaffieri 403b1fc
test: consolidate data migration integration coverage
aristidesstaffieri 55c1c82
Validate ProtocolProcessors in NewIngestService for nil and duplicate…
aristidesstaffieri 5af595e
Remove non-transactional reads from PersistLedgerData transaction
aristidesstaffieri 0113123
guard against ledger sequence 0 edge case, dont treat 0 as a valid CAS
aristidesstaffieri 9de6977
Preserve protocol contract cache entries on partial refresh failure
aristidesstaffieri f0d2ad4
Remove vacuous skippedProcessor assertion from produceProtocolStateFo…
aristidesstaffieri f1cf92c
Fix TOCTOU gap, missing metrics, and lock-during-IO in live ingestion
aristidesstaffieri ab14a8a
Fix query storm from partial protocol contract cache refresh failure
aristidesstaffieri b19dc66
Add protocol history migration service and asymmetric CAS integration…
aristidesstaffieri 1afbb7d
Extract shared helpers between history migration and live ingestion
aristidesstaffieri 69faa5f
Fix convergence poll to distinguish timeout from transient RPC errors
aristidesstaffieri 987ec5a
Fix hardcoded cursor names in protocol history migration
aristidesstaffieri 91413eb
Document and test BoundedRange/UnboundedRange transition on shared le…
aristidesstaffieri 417d911
Fix bulk StatusFailed marking to exclude handed-off protocols
aristidesstaffieri b96e8f4
De-duplicate protocolIDs early in validate()
aristidesstaffieri 791ae79
tweaks style and formatting to abide by the linter
aristidesstaffieri 209b853
Extract generic utilities from ingest helpers and eliminate helper file
aristidesstaffieri a22f382
Add migration for protocol_wasms and protocol_contracts tables
aristidesstaffieri 3c7714f
feature(internal): single UnboundedRange for history migration
aristidesstaffieri a5a260c
refactor(internal): remove test-only receiver methods from ingestService
aristidesstaffieri 9a4f29b
refactor(internal): restore BatchGetByProtocolIDs for cache refresh
aristidesstaffieri 64de5ec
refactor(internal): drop mutex from protocolContractCache
aristidesstaffieri 4ee42c9
feature(cmd): use datastore backend for protocol history migration
aristidesstaffieri 73c3821
fix(graphql): reorder imports to satisfy goimports -local
aristidesstaffieri File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,227 @@ | ||
| package cmd | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "go/types" | ||
| "io" | ||
|
|
||
| _ "github.com/lib/pq" | ||
| "github.com/sirupsen/logrus" | ||
| "github.com/spf13/cobra" | ||
| "github.com/stellar/go-stellar-sdk/support/config" | ||
| "github.com/stellar/go-stellar-sdk/support/log" | ||
|
|
||
| "github.com/stellar/wallet-backend/cmd/utils" | ||
| "github.com/stellar/wallet-backend/internal/data" | ||
| "github.com/stellar/wallet-backend/internal/db" | ||
| "github.com/stellar/wallet-backend/internal/ingest" | ||
| "github.com/stellar/wallet-backend/internal/metrics" | ||
| "github.com/stellar/wallet-backend/internal/services" | ||
| internalutils "github.com/stellar/wallet-backend/internal/utils" | ||
| ) | ||
|
|
||
| type protocolMigrateCmd struct{} | ||
|
|
||
| func (c *protocolMigrateCmd) Command() *cobra.Command { | ||
| cmd := &cobra.Command{ | ||
| Use: "protocol-migrate", | ||
| Short: "Data migration commands for protocol state", | ||
| Long: "Parent command for protocol data migrations. Use subcommands to run specific migration tasks.", | ||
| Run: func(cmd *cobra.Command, args []string) { | ||
| if err := cmd.Help(); err != nil { | ||
| log.Fatalf("Error calling help command: %s", err.Error()) | ||
| } | ||
| }, | ||
| } | ||
|
|
||
| cmd.AddCommand(c.historyCommand()) | ||
|
|
||
| return cmd | ||
| } | ||
|
|
||
| // historyCmdOpts holds the resolved flag values for `protocol-migrate history`. | ||
| type historyCmdOpts struct { | ||
| databaseURL string | ||
| rpcURL string | ||
| networkPassphrase string | ||
| protocolIDs []string | ||
| logLevel string | ||
| oldestLedgerCursorName string | ||
| ledgerBackendType string | ||
| datastoreConfigPath string | ||
| getLedgersLimit int | ||
| } | ||
|
|
||
| func (c *protocolMigrateCmd) historyCommand() *cobra.Command { | ||
| var opts historyCmdOpts | ||
|
|
||
| cfgOpts := config.ConfigOptions{ | ||
| utils.DatabaseURLOption(&opts.databaseURL), | ||
| utils.NetworkPassphraseOption(&opts.networkPassphrase), | ||
| // RPC URL is only required when --ledger-backend-type=rpc; validated in PersistentPreRunE. | ||
| { | ||
| Name: "rpc-url", | ||
| Usage: "The URL of the RPC Server. Required when --ledger-backend-type=rpc.", | ||
| OptType: types.String, | ||
| ConfigKey: &opts.rpcURL, | ||
| FlagDefault: "", | ||
| Required: false, | ||
| }, | ||
| { | ||
| Name: "ledger-backend-type", | ||
| Usage: "Type of ledger backend to use for fetching historical ledgers. Options: 'rpc' or 'datastore' (default). Datastore is recommended for migrations because it can reach ledgers outside the RPC retention window.", | ||
| OptType: types.String, | ||
| ConfigKey: &opts.ledgerBackendType, | ||
| FlagDefault: string(ingest.LedgerBackendTypeDatastore), | ||
| Required: false, | ||
| }, | ||
| { | ||
| Name: "datastore-config-path", | ||
| Usage: "Path to TOML config file for datastore backend. Required when --ledger-backend-type=datastore.", | ||
| OptType: types.String, | ||
| ConfigKey: &opts.datastoreConfigPath, | ||
| FlagDefault: "config/datastore-pubnet.toml", | ||
| Required: false, | ||
| }, | ||
| { | ||
| Name: "get-ledgers-limit", | ||
| Usage: "Per-request ledger buffer size for the RPC backend. Ignored for datastore.", | ||
| OptType: types.Int, | ||
| ConfigKey: &opts.getLedgersLimit, | ||
| FlagDefault: 10, | ||
| Required: false, | ||
| }, | ||
| } | ||
|
|
||
| cmd := &cobra.Command{ | ||
| Use: "history", | ||
| Short: "Backfill protocol history state from oldest to latest ingested ledger", | ||
| Long: "Processes historical ledgers from oldest_ingest_ledger to the tip, producing protocol state changes and converging with live ingestion via CAS-gated cursors.", | ||
| PersistentPreRunE: func(cmd *cobra.Command, args []string) error { | ||
| if err := cfgOpts.RequireE(); err != nil { | ||
| return fmt.Errorf("requiring values of config options: %w", err) | ||
| } | ||
| if err := cfgOpts.SetValues(); err != nil { | ||
| return fmt.Errorf("setting values of config options: %w", err) | ||
| } | ||
|
|
||
| if opts.logLevel != "" { | ||
| ll, err := logrus.ParseLevel(opts.logLevel) | ||
| if err != nil { | ||
| return fmt.Errorf("invalid log level %q: %w", opts.logLevel, err) | ||
| } | ||
| log.DefaultLogger.SetLevel(ll) | ||
| } | ||
|
|
||
| if len(opts.protocolIDs) == 0 { | ||
| return fmt.Errorf("at least one --protocol-id is required") | ||
| } | ||
|
|
||
| // Per-backend required-field validation. | ||
| switch opts.ledgerBackendType { | ||
| case string(ingest.LedgerBackendTypeRPC): | ||
| if opts.rpcURL == "" { | ||
| return fmt.Errorf("--rpc-url is required when --ledger-backend-type=rpc") | ||
| } | ||
| case string(ingest.LedgerBackendTypeDatastore): | ||
| if opts.datastoreConfigPath == "" { | ||
| return fmt.Errorf("--datastore-config-path is required when --ledger-backend-type=datastore") | ||
| } | ||
| default: | ||
| return fmt.Errorf("invalid --ledger-backend-type %q, must be 'rpc' or 'datastore'", opts.ledgerBackendType) | ||
| } | ||
| return nil | ||
| }, | ||
| RunE: func(_ *cobra.Command, _ []string) error { | ||
| return c.RunHistory(opts) | ||
| }, | ||
| } | ||
|
|
||
| if err := cfgOpts.Init(cmd); err != nil { | ||
| log.Fatalf("Error initializing a config option: %s", err.Error()) | ||
| } | ||
|
|
||
| cmd.Flags().StringSliceVar(&opts.protocolIDs, "protocol-id", nil, "Protocol ID(s) to migrate (required, repeatable)") | ||
| cmd.Flags().StringVar(&opts.logLevel, "log-level", "", `Log level: "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL", "PANIC"`) | ||
| cmd.Flags().StringVar(&opts.oldestLedgerCursorName, "oldest-ledger-cursor-name", data.OldestLedgerCursorName, "Name of the oldest ledger cursor in the ingest store. Must match the value used by the ingest service.") | ||
|
|
||
| return cmd | ||
| } | ||
|
|
||
| func (c *protocolMigrateCmd) RunHistory(opts historyCmdOpts) error { | ||
| ctx := context.Background() | ||
|
|
||
| // Build processors from protocol IDs using the dynamic registry | ||
| var processors []services.ProtocolProcessor | ||
| for _, pid := range opts.protocolIDs { | ||
| factory, ok := services.GetProcessor(pid) | ||
| if !ok { | ||
| return fmt.Errorf("unknown protocol ID %q — no processor registered", pid) | ||
| } | ||
| p := factory() | ||
| if p == nil { | ||
| return fmt.Errorf("processor factory for protocol %q returned nil", pid) | ||
| } | ||
| processors = append(processors, p) | ||
| } | ||
|
|
||
aristidesstaffieri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // Open DB connection | ||
| dbPool, err := db.OpenDBConnectionPool(opts.databaseURL) | ||
| if err != nil { | ||
| return fmt.Errorf("opening database connection: %w", err) | ||
| } | ||
| defer internalutils.DeferredClose(ctx, dbPool, "closing dbPool in protocol migrate history") | ||
|
|
||
| // Create models | ||
| sqlxDB, err := dbPool.SqlxDB(ctx) | ||
| if err != nil { | ||
| return fmt.Errorf("getting sqlx DB: %w", err) | ||
| } | ||
| metricsService := metrics.NewMetricsService(sqlxDB) | ||
| models, err := data.NewModels(dbPool, metricsService) | ||
| if err != nil { | ||
| return fmt.Errorf("creating models: %w", err) | ||
| } | ||
|
|
||
| // Build a ledger backend using the same selector the ingest service uses, | ||
| // so protocol-migrate inherits the datastore path (recommended for | ||
| // backfills — unbounded history, unlike RPC retention windows). | ||
| ledgerBackend, err := ingest.NewLedgerBackend(ctx, ingest.Configs{ | ||
| LedgerBackendType: ingest.LedgerBackendType(opts.ledgerBackendType), | ||
| DatastoreConfigPath: opts.datastoreConfigPath, | ||
| NetworkPassphrase: opts.networkPassphrase, | ||
| RPCURL: opts.rpcURL, | ||
| GetLedgersLimit: opts.getLedgersLimit, | ||
| }) | ||
| if err != nil { | ||
| return fmt.Errorf("creating ledger backend: %w", err) | ||
| } | ||
| defer func() { | ||
| if closer, ok := ledgerBackend.(io.Closer); ok { | ||
| if closeErr := closer.Close(); closeErr != nil { | ||
| log.Ctx(ctx).Errorf("error closing ledger backend: %v", closeErr) | ||
| } | ||
| } | ||
| }() | ||
|
|
||
| service, err := services.NewProtocolMigrateHistoryService(services.ProtocolMigrateHistoryConfig{ | ||
| DB: dbPool, | ||
| LedgerBackend: ledgerBackend, | ||
| ProtocolsModel: models.Protocols, | ||
| ProtocolContractsModel: models.ProtocolContracts, | ||
| IngestStore: models.IngestStore, | ||
| NetworkPassphrase: opts.networkPassphrase, | ||
| Processors: processors, | ||
| OldestLedgerCursorName: opts.oldestLedgerCursorName, | ||
| }) | ||
| if err != nil { | ||
| return fmt.Errorf("creating protocol migrate history service: %w", err) | ||
| } | ||
|
|
||
| if err := service.Run(ctx, opts.protocolIDs); err != nil { | ||
| return fmt.Errorf("running protocol migrate history: %w", err) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
9 changes: 7 additions & 2 deletions
9
...tions/2026-03-09.2-protocol_contracts.sql → ...-03-09.1-protocol_wasms_and_contracts.sql
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,12 +1,17 @@ | ||
| -- +migrate Up | ||
| CREATE TABLE protocol_wasms ( | ||
| wasm_hash BYTEA PRIMARY KEY, | ||
| protocol_id TEXT REFERENCES protocols(id), | ||
| created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() | ||
| ); | ||
|
|
||
| CREATE TABLE protocol_contracts ( | ||
| contract_id BYTEA PRIMARY KEY, | ||
| wasm_hash BYTEA NOT NULL REFERENCES protocol_wasms(wasm_hash), | ||
| name TEXT, | ||
| created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() | ||
| ); | ||
|
|
||
| CREATE INDEX idx_protocol_contracts_wasm_hash ON protocol_contracts(wasm_hash); | ||
|
|
||
| -- +migrate Down | ||
| DROP TABLE IF EXISTS protocol_contracts; | ||
| DROP TABLE IF EXISTS protocol_wasms; |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.