Skip to content

feat:(internal, cmd): Add write-through current state cache for protocol processors#557

Open
aristidesstaffieri wants to merge 57 commits intofeature/protocol-migrate-current-statefrom
feature/write-through-cache
Open

feat:(internal, cmd): Add write-through current state cache for protocol processors#557
aristidesstaffieri wants to merge 57 commits intofeature/protocol-migrate-current-statefrom
feature/write-through-cache

Conversation

@aristidesstaffieri
Copy link
Copy Markdown
Contributor

What

  • Enable efficient additive state computation during live ingestion by caching protocol current state in processor memory. On the first successful CAS advance of the current_state_cursor (migration handoff), LoadCurrentState reads state from DB into the processor. Subsequent ledgers use the in-memory state maintained by PersistCurrentState, avoiding per-ledger DB reads. Cache-loaded flags reset on transaction rollback to force a DB reload on retry.
  • Fix bug in migration where LedgerBackend was not being recreated on outer loop iterations.
  • Fix rollback reset gap in write-through cache for protocol processors.
  • Fix reset current-state cache after rollback
  • Retry protocol-state production on ingest retries
  • skip terminal retry

Why

Ingestion optimization and bug fixes from live smoke test

Known limitations

No protocols implemented yet

Issue that this PR addresses

#518

Checklist

PR Structure

  • It is not possible to break this PR down into smaller PRs.
  • This PR does not mix refactoring changes with feature changes.
  • This PR's title starts with name of package that is most changed in the PR, or all if the changes are broad or impact many packages.

Thoroughness

  • This PR adds tests for the new functionality or fixes.
  • All updated queries have been tested (refer to this check if the data set returned by the updated query is expected to be same as the original one).

Release

  • This is not a breaking change.
  • This is ready to be tested in development.
  • The new functionality is gated with a feature flag if this is not ready for production.

aristidesstaffieri and others added 30 commits March 30, 2026 12:39
…_status to differentiate between not started and in progress migrations
…steps in the ContractData branch, removes Balance branch
…istinction between uploads and upgrades/deployments
…col-setup in the "When Checkpoint Classification Runs" section
…dow, in order to discard state changes outside of retention.

1. Schema changes: enabled field removed, display_name removed, status default is not_started
2. Status values: All updated to new naming scheme (not_started, classification_in_progress, classification_success, backfilling_in_progress, backfilling_success, failed)
3. protocol-setup: Now uses --protocol-id flag (opt-in), updated command examples and workflow
4. Classification section (line 125): Updated to describe ContractCode validation and ContractData lookup
5. Checkpoint population diagram: Removed Balance branch, updated to show WASM hash storage in known_wasms
6. Live ingestion classification diagram: Separated into ContractCode and ContractData paths with RPC fallback
7. Live State Production diagram: Updated classification box to mention ContractCode uploads and ContractData Instance changes
8. Backfill migration: Added retention-aware processing throughout (flow diagram, workflow diagram, parallel processing)
9. Parallel backfill worker pool: Added steps for retention window filtering
… relationship between classification and state production
…s tracking

  - Add known_wasms table (migration, model, mock, and data layer tests) for tracking WASM hashes during checkpoint population
  - Add KnownWasm field to Models struct
  - Create WasmIngestionService (wasm_ingestion.go) that runs protocol validators against WASM bytecode and batch-persists hashes to known_wasms
  - Create CheckpointService (checkpoint.go) that orchestrates single-pass checkpoint population, delegating ContractCode entries to both WasmIngestionService and
  TokenProcessor, and all other entries to TokenProcessor
  - Extract readerFactory on checkpointService for injectable checkpoint reader creation
  - Extract TokenProcessor interface and NewTokenProcessor from TokenIngestionService, moving checkpoint iteration logic out of token_ingestion.go into checkpoint.go
  - Remove db, archive, and PopulateAccountTokens from TokenIngestionService interface and struct
  - Remove dbPool parameter from NewTokenIngestionServiceForLoadtest
  - Wire CheckpointService into IngestServiceConfig and ingestService
  - Update ingest_live.go to call checkpointService.PopulateFromCheckpoint instead of tokenIngestionService.PopulateAccountTokens
  - Update ingest.go setupDeps to construct WasmIngestionService and CheckpointService
  - Add ContractValidatorMock, ProtocolValidatorMock, ChangeReaderMock, CheckpointServiceMock, WasmIngestionServiceMock, TokenProcessorMock, and TokenIngestionServiceMock
  updates to mocks.go
  - Add unit tests for WasmIngestionService (10 cases covering ProcessContractCode and PersistKnownWasms)
  - Add unit tests for CheckpointService (16 cases covering entry routing, error propagation, and context cancellation)
…IngestionService (#524)

* Initial plan

* Remove validator execution from WasmIngestionService

Co-authored-by: aristidesstaffieri <6886006+aristidesstaffieri@users.noreply.github.com>

* services/wasm_ingestion: remove ProtocolValidator execution from WasmIngestionService

Co-authored-by: aristidesstaffieri <6886006+aristidesstaffieri@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: aristidesstaffieri <6886006+aristidesstaffieri@users.noreply.github.com>
…IngestionService to use config struct

  WasmIngestionService.ProcessContractCode no longer receives the full
  bytecode—it only needs the hash to track protocol WASMs. This reduces
  memory pressure during checkpoint population.

  TokenIngestionService construction is consolidated into a single
  NewTokenIngestionService(config) constructor, eliminating the separate
  NewTokenIngestionServiceForLoadtest variant. The loadtest runner now
  uses the same constructor with only the fields it needs.

  Also refactors processContractInstanceChange to return a
  contractInstanceResult struct instead of multiple return values,
  extracts newCheckpointData() helper, uses idiomatic nil slices
  instead of make([]T, 0), and introduces a checkpointTestFixture
  struct to reduce boilerplate in checkpoint tests. Constructors
  return concrete types instead of interfaces to allow direct field
  access in tests.
  Persist contract-to-WASM-hash mappings by extending WasmIngestionService
  with ProcessContractData and PersistProtocolContracts methods. During
  checkpoint population, ContractData Instance entries are parsed to extract
  the wasm_hash and contract_id relationship, which is stored in a new
  protocol_contracts table (FK to protocol_wasms). This mapping will be used
  by protocol-setup and live ingestion to classify contracts by protocol.
…and backfill

  Add two new LedgerChangeProcessors (ProtocolWasmProcessor, ProtocolContractProcessor)
  that extract WASM hashes and contract-to-WASM mappings from ledger changes during
  live ingestion, catchup, and historical backfill. Previously this data was only
  populated during checkpoint.

  - ProtocolWasmProcessor extracts hashes from ContractCode entries
  - ProtocolContractProcessor extracts contract-to-WASM mappings from ContractData Instance entries
  - Extended IndexerBuffer with protocolWasmsByHash/protocolContractsByID maps (Push/Get/Merge/Clear)
  - PersistLedgerData inserts wasms before contracts (FK ordering) with ON CONFLICT DO NOTHING
  - BatchChanges and processBatchChanges extended for backfill paths
  ContractData Instance entries can outlive their referenced ContractCode
  entries due to independent TTLs, causing FK violations when inserting
  protocol_contracts during checkpoint population.

  - Skip contracts referencing unknown WASM hashes in PersistProtocolContracts
  - Add WHERE EXISTS guard in BatchInsert SQL for live/backfill path
  - Add test for contracts_with_missing_wasm_skipped scenario
  Store wasm_hash and contract_id as raw bytes instead of hex/strkey-encoded
  strings. Both values originate as [32]byte arrays in XDR, so BYTEA reduces
  storage by ~50%, improves index performance on fixed-size keys, and removes
  unnecessary encoding/decoding at the persistence boundary.
  The protocol_id on protocol_contracts was always NULL and never queried.
  It's derivable via the existing FK join: protocol_contracts.wasm_hash →
  protocol_wasms.wasm_hash → protocol_wasms.protocol_id.
   Replace raw []byte with types.HashBytea for WasmHash and ContractID
   fields in ProtocolWasm and ProtocolContract models. HashBytea implements
   sql.Scanner and driver.Valuer to auto-convert between raw bytes (DB)
   and hex strings (Go), consistent with how Transaction.Hash is handled.

   Updated files:
   - internal/data/protocol_wasms.go, protocol_contracts.go (models + BatchInsert)
   - internal/indexer/processors/protocol_wasms.go, protocol_contracts.go
   - internal/services/wasm_ingestion.go
   - All corresponding test files
…kpointService

  WasmIngestionService was only used by CheckpointService, and
  TokenIngestionService's NewTokenProcessor/TokenProcessor interface was
  only used by CheckpointService. This inlines all checkpoint-specific
  logic directly into CheckpointService, eliminating unnecessary
  intermediate service abstractions.

  - Rewrite checkpoint.go to absorb all checkpoint logic: checkpointData,
    batch, trustline/contract/WASM processing, and protocol persistence
  - Replace positional NewCheckpointService args with CheckpointServiceConfig
  - Strip token_ingestion.go to live-only (ProcessTokenChanges); remove
    TokenProcessor interface, NewTokenProcessor, and checkpoint-only fields
    from TokenIngestionServiceConfig
  - Delete wasm_ingestion.go (absorbed into checkpoint.go)
  - Remove WasmIngestionServiceMock, TokenProcessorMock from mocks.go
  - Update ingest.go wiring and simplify TokenIngestionServiceConfig
  - Rewrite checkpoint_test.go with data model mocks; port WASM and
    checkpoint processor tests from deleted test files
  - Add TrustlineAssetModelMock to data/mocks.go
  - Add valid AccountId to makeAccountChange() helper to prevent nil pointer dereference
  - Add missing protocolWasmModel.BatchInsert mock expectation in ContractCodeEntry test
  - Fix ContextCancellation test to cancel context during reader.Read() instead of before PopulateFromCheckpoint, matching the expected error path
…s tracking

  - Add known_wasms table (migration, model, mock, and data layer tests) for tracking WASM hashes during checkpoint population
  - Add KnownWasm field to Models struct
  - Create WasmIngestionService (wasm_ingestion.go) that runs protocol validators against WASM bytecode and batch-persists hashes to known_wasms
  - Create CheckpointService (checkpoint.go) that orchestrates single-pass checkpoint population, delegating ContractCode entries to both WasmIngestionService and
  TokenProcessor, and all other entries to TokenProcessor
  - Extract readerFactory on checkpointService for injectable checkpoint reader creation
  - Extract TokenProcessor interface and NewTokenProcessor from TokenIngestionService, moving checkpoint iteration logic out of token_ingestion.go into checkpoint.go
  - Remove db, archive, and PopulateAccountTokens from TokenIngestionService interface and struct
  - Remove dbPool parameter from NewTokenIngestionServiceForLoadtest
  - Wire CheckpointService into IngestServiceConfig and ingestService
  - Update ingest_live.go to call checkpointService.PopulateFromCheckpoint instead of tokenIngestionService.PopulateAccountTokens
  - Update ingest.go setupDeps to construct WasmIngestionService and CheckpointService
  - Add ContractValidatorMock, ProtocolValidatorMock, ChangeReaderMock, CheckpointServiceMock, WasmIngestionServiceMock, TokenProcessorMock, and TokenIngestionServiceMock
  updates to mocks.go
  - Add unit tests for WasmIngestionService (10 cases covering ProcessContractCode and PersistKnownWasms)
  - Add unit tests for CheckpointService (16 cases covering entry routing, error propagation, and context cancellation)
   The known_wasms table was renamed to protocol_wasms, and the new
   ProtocolWasm model already exists. Remove the obsolete KnownWasm
   model, its tests, and the old migration file.
… names

  This aligns Protocol→Protocols and ProtocolWasm→ProtocolWasms (structs, interfaces, mocks, and Models struct fields) to match the protocols and protocol_wasms table names,
   consistent with the existing ProtocolContracts convention.
  Enable atomic cursor advancement by adding a compare-and-swap primitive
  that only updates a value when it matches the expected current value.
  This prevents concurrent writers from overwriting each other's progress,
  supporting exactly-once processing for data migrations and live ingestion.
  Introduces the infrastructure for protocol processors to produce and
  persist protocol-specific state during live ledger ingestion, gated by
  per-protocol compare-and-swap cursors that coordinate with concurrent
  migration processes.

  Key changes:
  - ProtocolProcessor interface and ProtocolProcessorInput for protocol-
    specific ledger analysis and state persistence
  - Processor registry (RegisterProcessor/GetAllProcessors) for protocol
    processor discovery at startup
  - Dual CAS gating in PersistLedgerData (step 5.5): per-protocol history
    and current_state cursors ensure exactly-once writes even when live
    ingestion and migration run concurrently
  - Protocol contract cache with periodic refresh to avoid per-ledger DB
    queries for classified contracts
  - Data layer additions: IngestStoreModel.GetTx, CompareAndSwap,
    ProtocolContractsModel.GetByProtocolID, ProtocolsModel.GetClassified

  Tests:
  - Unit tests for processor registry (concurrent safety, overwrite, etc.)
  - 5 subtests for PersistLedgerData CAS gating (win, lose, behind, no
    cursor, no processors) using a real test DB and sentinel-writing
    testProtocolProcessor
  - Docker integration test (ProtocolStateProductionTestSuite) exercising
    CAS gating against a live ingest container's DB in three phases
   Combine protocol setup and protocol state tests into a shared
   DataMigrationTestSuite. Use real SEP41 setup classification plus
   manual cursor seeding to verify live ingestion produces protocol
   history/current state only when the protocol cursors are ready,
   and stays inert when they are absent.
… tests

  Introduce ProtocolMigrateHistoryService that backfills protocol state
  changes for historical ledgers, walking forward from the oldest ingest
  cursor to the latest cursor and persisting PersistHistory at each ledger.
  The service tracks progress via a per-protocol history_cursor using CAS,
  refreshes the protocol contract cache periodically, and marks
  history_migration_status on completion.

  Supporting changes:
  - Add `protocol-data-migrate` CLI command (cmd/protocol_data_migrate.go)
  - Add UpdateHistoryMigrationStatus to ProtocolsModel and its mock/tests
  - Add per-call tracking (persistedHistorySeqs, persistedCurrentStateSeqs)
    to integrationTestProcessor for verifying persistence call counts

  Integration test additions:
  - Enhance TestHistoryMigrationThenLiveIngestionHandoff with per-ledger
    PersistHistory verification across migration and live handoff phases
  - Add TestLiveIngestionHistoryCursorReadyCurrentStateLags proving the
    asymmetric cursor CAS path: when history_cursor is ready but
    current_state_cursor lags, only PersistHistory executes while
    PersistCurrentState is correctly skipped
  Move duplicated logic into ingest_helpers.go:
  - getLedgerWithRetry: was identical method on both ingestService and
    protocolMigrateHistoryService, now a package-level function
  - buildProtocolProcessorMap: deduplicates processor slice-to-map
    conversion with nil/duplicate validation
  - protocolHistoryCursorName/protocolCurrentStateCursorName: replaces
    scattered Sprintf calls for cursor key formatting

  Simplifies getLedgerWithRetry test to call the function directly
  without constructing a full ingestService.
  The convergence poll in processAllProtocols treated any error from
  PrepareRange/GetLedger as convergence, including transient RPC failures
  like connection refused. This could prematurely mark protocols as
  StatusSuccess during network blips. Now discriminates three cases:
  poll deadline exceeded (converged), parent context cancelled (propagate),
  anything else (transient — retry).
  The history migration service read cursor positions using hardcoded
  constants (data.OldestLedgerCursorName, data.LatestLedgerCursorName),
  ignoring operator overrides via CLI flags. Add configurable cursor name
  fields with defaults matching the ingest command, so operators who
  override --latest-ledger-cursor-name or --oldest-ledger-cursor-name
  get consistent behavior across live ingestion and history migration.
…dger backend

  The outer loop in protocol history migration transitions the same
  LedgerBackend instance between BoundedRange and UnboundedRange without
  explicit reset. This works because captive core internally closes the
  subprocess before opening a new range, but that behavior is an
  implementation detail not guaranteed by the LedgerBackend interface.

  Add an explanatory comment at the transition point and a new integration
  test (rangeTrackingBackend) that verifies the Bounded→Unbounded→Bounded
  PrepareRange sequence when the tip advances during the convergence poll.
  When processAllProtocols fails, the Run() method was marking all active
  protocols as StatusFailed, including ones already handed off to live
  ingestion via CAS failure. This caused handed-off protocols to be
  re-processed on the next Run(), conflicting with live ingestion's cursor
  ownership.

  Change processAllProtocols to return handed-off protocol IDs alongside
  the error, then split the status update: handed-off protocols get
  StatusSuccess (live ingestion owns them), while only non-handed-off
  protocols get StatusFailed.
  If the caller passes duplicate protocol IDs (e.g. --protocol-id foo
  --protocol-id foo), duplicate trackers would be created for the same
  protocol, causing self-induced CAS failures and incorrect handoff
  detection. Add order-preserving deduplication as the first operation
  in validate(), which is the single choke-point for both Run() and
  processAllProtocols().
  Move reusable logic into internal/utils/ as generic functions
  (RetryWithBackoff[T], BuildMap[T]) and move cursor name helpers to
  ingestion_utils.go. Inline all call sites in services to use utils
  directly and delete the ingest helpers file entirely.

  Also fix variable shadow lint errors in ingest_live.go and
  protocol_migrate_history.go.
…on engine

  Implement the current-state migration CLI command and underlying service,
  then extract a shared protocolMigrateEngine parameterized by a
  migrationStrategy struct to eliminate ~1100 lines of duplication between
  the history and current-state implementations.

  The current-state migration builds protocol state from a user-specified
  --start-ledger forward to the tip, converging with live ingestion via
  CAS-gated cursors on protocol_{ID}_current_state_cursor. It mirrors the
  history migration with 5 substitutions: status field, status update
  method, cursor name function, persist method, and start ledger source.

  Changes:
  - Add UpdateCurrentStateMigrationStatus to ProtocolsModel and interface
  - Create protocolMigrateEngine with migrationStrategy in protocol_migrate.go
  - Rewrite history and current-state services as thin wrappers (~100 lines each)
  - Consolidate tests into shared engine suite (18 cases) + thin strategy tests
  - Extract buildMigrationCommand/runMigration helpers in CLI
  - Add integration test: TestCurrentStateMigrationThenLiveIngestionHandoff
…sy-loops

  Return a clear error upfront when maxRetries <= 0 or maxBackoff <= 0
  instead of silently producing a confusing "failed after 0 attempts: <nil>"
  or spinning with zero backoff. Add tests for both validation cases.
  Add IngestStoreModel.GetMany to fetch multiple ingest_store keys in a
  single WHERE key = ANY($1) query. Refactor
  protocolProcessorsEligibleForProduction to collect all cursor names
  upfront and issue one GetMany call instead of 2N individual Get calls
  (one history + one current-state cursor per protocol per ledger).
  Enable efficient additive state computation during live ingestion by
  caching protocol current state in processor memory. On the first
  successful CAS advance of the current_state_cursor (migration handoff),
  LoadCurrentState reads state from DB into the processor. Subsequent
  ledgers use the in-memory state maintained by PersistCurrentState,
  avoiding per-ledger DB reads. Cache-loaded flags reset on transaction
  rollback to force a DB reload on retry.
  Move currentStatePersistedProtocols append to immediately after setting
  protocolCurrentStateLoaded flag, so that any transaction failure after
  LoadCurrentState (including PersistCurrentState or cursor Update errors)
  resets the flag and forces a DB reload on retry. Previously, the append
  only ran after PersistCurrentState succeeded, leaving the flag stuck true
  on PersistCurrentState failure — causing retries to skip LoadCurrentState
  and write stale in-memory state.
 Track every protocol that wins current-state CAS before calling
 PersistCurrentState so any later transaction failure clears
 protocolCurrentStateLoaded and forces a DB reload on retry.

 Add a regression test covering a post-handoff PersistCurrentState
 failure and successful retry reload.
   Rerun protocol ProcessLedger on every PersistLedgerData retry so protocol
   processors rebuild staged state after rolled-back transactions instead of
   reusing cleared or mutated buffers. Document the retry contract for protocol
   processors and add a regression test covering protocol state rebuild on retry.
@aristidesstaffieri aristidesstaffieri self-assigned this Mar 31, 2026
 Stop sleeping after the final failed retry attempt in the shared
 backoff helper and the matching ingest retry loops. This removes
 unnecessary delay and avoids misleading "retrying" logs when no
 retry remains.
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a write-through, in-memory “current state” cache for protocol processors during live ingestion (loaded once at migration handoff via CAS, then maintained by PersistCurrentState), along with related ingestion retry/rollback fixes and a migration-side ledger backend reset fix.

Changes:

  • Extend ProtocolProcessor with LoadCurrentState() and update live ingestion to load once on first successful current-state CAS, then reuse in-memory state; reset “loaded” flags on rollback to force reload on retry.
  • Re-produce protocol state (ProcessLedger) on every ingest retry attempt; avoid sleeping/backing off after the final retry attempt in multiple retry loops.
  • Add a ledger backend factory + reset path in protocol migration to handle RPCLedgerBackend’s inability to PrepareRange more than once; add protocol wasm/contract tables migration and related plumbing.

Reviewed changes

Copilot reviewed 16 out of 16 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
internal/utils/retry.go Skip backoff sleep after the terminal retry attempt.
internal/utils/retry_test.go Update tests to reflect revised retry/onRetry/backoff behavior.
internal/services/protocol_processor.go Add LoadCurrentState() to processor interface and document retry behavior.
internal/services/protocol_migrate.go Add backend reset mechanism via factory before repeated PrepareRange calls.
internal/services/protocol_migrate_test.go Update test processors to satisfy new interface (LoadCurrentState).
internal/services/protocol_migrate_history.go Wire LedgerBackendFactory into history migration service.
internal/services/protocol_migrate_current_state.go Wire LedgerBackendFactory into current-state migration service.
internal/services/mocks.go Update ProtocolProcessorMock for new interface method.
internal/services/ingest.go Track per-protocol “current state loaded” flags in ingest service.
internal/services/ingest_test.go Add/adjust tests for protocol-state rebuild on retry and current-state loading semantics.
internal/services/ingest_live.go Implement write-through cache load/reset logic and move protocol production into retry loop.
internal/services/ingest_backfill.go Skip backoff sleep after final retry attempt when flushing batch buffers.
internal/serve/graphql/resolvers/statechange.resolvers.go Minor refactor: group resolver type declarations.
internal/integrationtests/data_migration_test.go Update integration test processor for LoadCurrentState().
internal/db/migrations/2026-03-09.1-protocol_wasms_and_contracts.sql Add protocol_wasms and protocol_contracts tables + indexes.
cmd/protocol_migrate.go Create and pass a ledger backend factory to enable backend recreation during migration.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

  The cmd layer deferred a close on the initial backend instance, but
  resetLedgerBackend() closes and replaces it during Run(). This caused
  the original instance to be double-closed and the final factory-created
  instance to never be closed.

  Add closeLedgerBackend() on the engine with a defer in Run() so the
  current backend is always closed on exit, and remove the cmd-layer defer.
…lbacks

  The interface docstring claimed LoadCurrentState was called once per protocol,
  but live ingestion resets the loaded flag on rollback and re-invokes it on
  retry. Update the comment to reflect repeated-call semantics so processor
  implementations handle reloads safely.
@aristidesstaffieri aristidesstaffieri marked this pull request as ready for review March 31, 2026 20:45
@aristidesstaffieri aristidesstaffieri requested a review from a team March 31, 2026 20:45
@aristidesstaffieri aristidesstaffieri force-pushed the feature/protocol-migrate-current-state branch 3 times, most recently from 3b89302 to 702bef3 Compare April 6, 2026 16:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants