From 5188e5626d03115cfbf4c3b3e1c894f2c96bb509 Mon Sep 17 00:00:00 2001 From: vietddude Date: Wed, 25 Mar 2026 16:21:17 +0700 Subject: [PATCH 1/5] feat: add health status endpoint and improve health monitoring for chains --- cmd/indexer/main.go | 27 +++- configs/config.example.yaml | 7 + internal/status/status.go | 249 +++++++++++++++++++++++++++++ internal/status/status_test.go | 104 ++++++++++++ internal/worker/base.go | 10 ++ internal/worker/catchup.go | 48 ++++++ internal/worker/factory.go | 14 ++ internal/worker/factory_test.go | 4 +- internal/worker/manager.go | 6 + internal/worker/manual.go | 3 + internal/worker/mempool.go | 5 +- internal/worker/regular.go | 31 +++- internal/worker/rescanner.go | 12 ++ pkg/common/config/chains.go | 3 + pkg/common/config/chains_test.go | 38 +++++ pkg/common/config/types.go | 23 +++ pkg/common/config/validate.go | 8 + pkg/common/config/validate_test.go | 30 ++++ 18 files changed, 614 insertions(+), 8 deletions(-) create mode 100644 internal/status/status.go create mode 100644 internal/status/status_test.go create mode 100644 pkg/common/config/chains_test.go diff --git a/cmd/indexer/main.go b/cmd/indexer/main.go index 40ad16d..5d4feff 100644 --- a/cmd/indexer/main.go +++ b/cmd/indexer/main.go @@ -14,6 +14,7 @@ import ( "github.com/alecthomas/kong" "gorm.io/gorm" + "github.com/fystack/multichain-indexer/internal/status" "github.com/fystack/multichain-indexer/internal/worker" "github.com/fystack/multichain-indexer/pkg/addressbloomfilter" "github.com/fystack/multichain-indexer/pkg/common/config" @@ -212,7 +213,7 @@ func runIndexer(chains []string, configPath string, debug, manual, catchup, from managerCfg, ) - healthServer := startHealthServer(cfg.Services.Port, cfg) + healthServer := startHealthServer(cfg.Services.Port, cfg, manager) // Start all workers logger.Info("Starting all workers") @@ -246,7 +247,7 @@ type HealthResponse struct { Version string `json:"version"` } -func startHealthServer(port int, cfg *config.Config) *http.Server { +func startHealthServer(port int, cfg *config.Config, manager *worker.Manager) *http.Server { mux := http.NewServeMux() version := cfg.Version @@ -266,13 +267,33 @@ func startHealthServer(port int, cfg *config.Config) *http.Server { json.NewEncoder(w).Encode(response) }) + mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + response := status.StatusResponse{ + Timestamp: time.Now().UTC(), + Version: version, + Networks: []status.NetworkStatus{}, + } + if manager != nil && manager.Registry() != nil { + response = manager.Registry().Snapshot(version) + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(response) + }) + server := &http.Server{ Addr: fmt.Sprintf(":%d", port), Handler: mux, } go func() { - logger.Info("Health check server started", "port", port, "endpoint", "/health") + logger.Info("Health check server started", "port", port, "endpoints", []string{"/health", "/status"}) if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { logger.Error("Health server failed to start", "error", err) } diff --git a/configs/config.example.yaml b/configs/config.example.yaml index 3a86e4a..95dece5 100644 --- a/configs/config.example.yaml +++ b/configs/config.example.yaml @@ -8,6 +8,9 @@ defaults: poll_interval: "5s" # how often to poll for new blocks reorg_rollback_window: 20 # number of blocks to roll back on reorg two_way_indexing: false # enable two-way indexing for all chains + status: + healthy_max_pending_blocks: 50 + slow_max_pending_blocks: 250 client: timeout: "20s" # RPC timeout per request max_retries: 3 # max retries per RPC call @@ -38,6 +41,10 @@ chains: throttle: rps: 5 burst: 8 + status: + # Optional per-chain override for /status health condition. + healthy_max_pending_blocks: 80 + slow_max_pending_blocks: 400 ethereum_mainnet: network_id: "ethereum_mainnet" diff --git a/internal/status/status.go b/internal/status/status.go new file mode 100644 index 0000000..67c0d7e --- /dev/null +++ b/internal/status/status.go @@ -0,0 +1,249 @@ +package status + +import ( + "sort" + "strings" + "sync" + "time" + + "github.com/fystack/multichain-indexer/pkg/common/config" +) + +type HealthStatus string + +const ( + HealthHealthy HealthStatus = "healthy" + HealthSlow HealthStatus = "slow" + HealthDegraded HealthStatus = "degraded" +) + +type NetworkStatus struct { + NetworkID string `json:"network_id"` + ChainName string `json:"chain_name"` + InternalCode string `json:"internal_code"` + NetworkType string `json:"network_type"` + Health HealthStatus `json:"health"` + LatestBlock uint64 `json:"latest_block"` + IndexedBlock uint64 `json:"indexed_block"` + PendingBlocks uint64 `json:"pending_blocks"` + HeadGap uint64 `json:"head_gap"` + CatchupPendingBlocks uint64 `json:"catchup_pending_blocks"` + CatchupRanges int `json:"catchup_ranges"` + FailedBlocks int `json:"failed_blocks"` + LastIndexedAt *time.Time `json:"last_indexed_at,omitempty"` +} + +type StatusResponse struct { + Timestamp time.Time `json:"timestamp"` + Version string `json:"version"` + Networks []NetworkStatus `json:"networks"` +} + +type chainState struct { + networkID string + chainName string + internalCode string + networkType string + thresholds config.StatusConfig + latestBlock uint64 + indexedBlock uint64 + catchupRemain uint64 + catchupRanges int + lastIndexedAt time.Time + failedBlocks map[uint64]struct{} +} + +type Registry struct { + mu sync.RWMutex + chains map[string]*chainState +} + +func NewRegistry() *Registry { + return &Registry{ + chains: make(map[string]*chainState), + } +} + +func (r *Registry) RegisterChain(chainKey, chainName string, chainCfg config.ChainConfig) { + key := normalizeChainKey(chainKey) + if key == "" { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + state, exists := r.chains[key] + if !exists { + state = &chainState{ + failedBlocks: make(map[uint64]struct{}), + } + r.chains[key] = state + } + + if strings.TrimSpace(chainCfg.NetworkId) != "" { + state.networkID = chainCfg.NetworkId + } else { + state.networkID = chainName + } + state.chainName = chainName + state.internalCode = chainCfg.InternalCode + state.networkType = string(chainCfg.Type) + state.thresholds = chainCfg.Status.Normalize() +} + +func (r *Registry) UpdateHead(chainKey string, latestBlock, indexedBlock uint64, indexedAt time.Time) { + key := normalizeChainKey(chainKey) + if key == "" { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + state := r.ensureStateLocked(key) + state.latestBlock = latestBlock + state.indexedBlock = indexedBlock + if !indexedAt.IsZero() { + state.lastIndexedAt = indexedAt.UTC() + } +} + +func (r *Registry) UpdateCatchup(chainKey string, pendingBlocks uint64, ranges int) { + key := normalizeChainKey(chainKey) + if key == "" { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + state := r.ensureStateLocked(key) + state.catchupRemain = pendingBlocks + state.catchupRanges = ranges +} + +func (r *Registry) MarkFailedBlock(chainKey string, blockNumber uint64) { + key := normalizeChainKey(chainKey) + if key == "" || blockNumber == 0 { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + state := r.ensureStateLocked(key) + state.failedBlocks[blockNumber] = struct{}{} +} + +func (r *Registry) ClearFailedBlocks(chainKey string, blockNumbers []uint64) { + key := normalizeChainKey(chainKey) + if key == "" || len(blockNumbers) == 0 { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + state := r.ensureStateLocked(key) + for _, block := range blockNumbers { + delete(state.failedBlocks, block) + } +} + +func (r *Registry) SetFailedBlocks(chainKey string, blockNumbers []uint64) { + key := normalizeChainKey(chainKey) + if key == "" { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + state := r.ensureStateLocked(key) + state.failedBlocks = make(map[uint64]struct{}, len(blockNumbers)) + for _, block := range blockNumbers { + if block == 0 { + continue + } + state.failedBlocks[block] = struct{}{} + } +} + +func (r *Registry) Snapshot(version string) StatusResponse { + r.mu.RLock() + defer r.mu.RUnlock() + + networks := make([]NetworkStatus, 0, len(r.chains)) + for _, state := range r.chains { + headGap := uint64(0) + if state.latestBlock > state.indexedBlock { + headGap = state.latestBlock - state.indexedBlock + } + pending := headGap + state.catchupRemain + thresholds := state.thresholds.Normalize() + + item := NetworkStatus{ + NetworkID: state.networkID, + ChainName: state.chainName, + InternalCode: state.internalCode, + NetworkType: state.networkType, + Health: deriveHealth(pending, thresholds), + LatestBlock: state.latestBlock, + IndexedBlock: state.indexedBlock, + PendingBlocks: pending, + HeadGap: headGap, + CatchupPendingBlocks: state.catchupRemain, + CatchupRanges: state.catchupRanges, + FailedBlocks: len(state.failedBlocks), + } + if !state.lastIndexedAt.IsZero() { + t := state.lastIndexedAt + item.LastIndexedAt = &t + } + networks = append(networks, item) + } + + sort.Slice(networks, func(i, j int) bool { + return networks[i].ChainName < networks[j].ChainName + }) + + return StatusResponse{ + Timestamp: time.Now().UTC(), + Version: version, + Networks: networks, + } +} + +func (r *Registry) ensureStateLocked(key string) *chainState { + state, exists := r.chains[key] + if exists { + return state + } + + state = &chainState{ + networkID: strings.ToLower(key), + chainName: strings.ToLower(key), + internalCode: key, + thresholds: config.StatusConfig{}.Normalize(), + failedBlocks: make(map[uint64]struct{}), + } + r.chains[key] = state + return state +} + +func deriveHealth(pendingBlocks uint64, thresholds config.StatusConfig) HealthStatus { + normalized := thresholds.Normalize() + switch { + case pendingBlocks < normalized.HealthyMaxPendingBlocks: + return HealthHealthy + case pendingBlocks < normalized.SlowMaxPendingBlocks: + return HealthSlow + default: + return HealthDegraded + } +} + +func normalizeChainKey(key string) string { + return strings.ToUpper(strings.TrimSpace(key)) +} diff --git a/internal/status/status_test.go b/internal/status/status_test.go new file mode 100644 index 0000000..27ecb8b --- /dev/null +++ b/internal/status/status_test.go @@ -0,0 +1,104 @@ +package status + +import ( + "testing" + "time" + + "github.com/fystack/multichain-indexer/pkg/common/config" + "github.com/fystack/multichain-indexer/pkg/common/enum" + "github.com/stretchr/testify/require" +) + +func TestRegistrySnapshotDerivesHealthWithPerChainThresholds(t *testing.T) { + t.Parallel() + + registry := NewRegistry() + registry.RegisterChain( + "ETH_MAINNET", + "ethereum_mainnet", + config.ChainConfig{ + NetworkId: "eth-mainnet", + InternalCode: "ETH_MAINNET", + Type: enum.NetworkTypeEVM, + Status: config.StatusConfig{ + HealthyMaxPendingBlocks: 20, + SlowMaxPendingBlocks: 100, + }, + }, + ) + + indexedAt := time.Date(2026, 3, 25, 12, 0, 0, 0, time.UTC) + registry.UpdateHead("eth_mainnet", 1_000, 980, indexedAt) + registry.UpdateCatchup("eth_mainnet", 5, 2) + registry.MarkFailedBlock("eth_mainnet", 981) + registry.MarkFailedBlock("eth_mainnet", 982) + + resp := registry.Snapshot("1.2.3") + require.Equal(t, "1.2.3", resp.Version) + require.Len(t, resp.Networks, 1) + + network := resp.Networks[0] + require.Equal(t, "eth-mainnet", network.NetworkID) + require.Equal(t, "ethereum_mainnet", network.ChainName) + require.Equal(t, "ETH_MAINNET", network.InternalCode) + require.Equal(t, "evm", network.NetworkType) + require.Equal(t, uint64(1_000), network.LatestBlock) + require.Equal(t, uint64(980), network.IndexedBlock) + require.Equal(t, uint64(20), network.HeadGap) + require.Equal(t, uint64(5), network.CatchupPendingBlocks) + require.Equal(t, uint64(25), network.PendingBlocks) + require.Equal(t, 2, network.CatchupRanges) + require.Equal(t, 2, network.FailedBlocks) + require.Equal(t, HealthSlow, network.Health) + require.NotNil(t, network.LastIndexedAt) + require.True(t, network.LastIndexedAt.Equal(indexedAt)) +} + +func TestRegistrySnapshotUsesDefaultThresholdWhenMissing(t *testing.T) { + t.Parallel() + + registry := NewRegistry() + registry.RegisterChain( + "TRON_MAINNET", + "tron_mainnet", + config.ChainConfig{ + NetworkId: "tron-mainnet", + InternalCode: "TRON_MAINNET", + Type: enum.NetworkTypeTron, + }, + ) + + registry.UpdateHead("tron_mainnet", 500, 260, time.Time{}) + registry.UpdateCatchup("tron_mainnet", 20, 1) + + resp := registry.Snapshot("1.0.0") + require.Len(t, resp.Networks, 1) + + network := resp.Networks[0] + // default healthy<50, slow<250 => pending=260 should be degraded + require.Equal(t, uint64(260), network.PendingBlocks) + require.Equal(t, HealthDegraded, network.Health) +} + +func TestRegistryClearFailedBlocks(t *testing.T) { + t.Parallel() + + registry := NewRegistry() + registry.RegisterChain("BTC_MAINNET", "bitcoin_mainnet", config.ChainConfig{ + NetworkId: "btc-mainnet", + InternalCode: "BTC_MAINNET", + Type: enum.NetworkTypeBtc, + }) + + registry.MarkFailedBlock("btc_mainnet", 10) + registry.MarkFailedBlock("btc_mainnet", 11) + registry.ClearFailedBlocks("btc_mainnet", []uint64{10}) + + resp := registry.Snapshot("1.0.0") + require.Len(t, resp.Networks, 1) + require.Equal(t, 1, resp.Networks[0].FailedBlocks) + + registry.SetFailedBlocks("btc_mainnet", []uint64{21, 22, 22}) + resp = registry.Snapshot("1.0.0") + require.Equal(t, 2, resp.Networks[0].FailedBlocks) +} diff --git a/internal/worker/base.go b/internal/worker/base.go index 53cde6d..31a16e0 100644 --- a/internal/worker/base.go +++ b/internal/worker/base.go @@ -8,6 +8,7 @@ import ( "log/slog" "github.com/fystack/multichain-indexer/internal/indexer" + "github.com/fystack/multichain-indexer/internal/status" "github.com/fystack/multichain-indexer/pkg/common/config" "github.com/fystack/multichain-indexer/pkg/common/logger" "github.com/fystack/multichain-indexer/pkg/common/types" @@ -46,6 +47,7 @@ type BaseWorker struct { emitter events.Emitter failedChan chan FailedBlockEvent observer BlockResultObserver + registry *status.Registry } // Stop stops the worker and cleans up internal resources @@ -65,6 +67,7 @@ func newWorkerWithMode( pubkeyStore pubkeystore.Store, mode WorkerMode, failedChan chan FailedBlockEvent, + registry *status.Registry, ) *BaseWorker { ctx, cancel := context.WithCancel(ctx) log := logger.With( @@ -84,6 +87,7 @@ func newWorkerWithMode( pubkeyStore: pubkeyStore, emitter: emitter, failedChan: failedChan, + registry: registry, } } @@ -140,6 +144,9 @@ func (bw *BaseWorker) notifyObserver(blockNumber uint64, status BlockStatus) { func (bw *BaseWorker) handleBlockResult(result indexer.BlockResult) bool { if result.Error != nil { _ = bw.blockStore.SaveFailedBlock(bw.chain.GetNetworkInternalCode(), result.Number) + if bw.registry != nil { + bw.registry.MarkFailedBlock(bw.chain.GetName(), result.Number) + } // Non-blocking push to failedChan select { @@ -182,6 +189,9 @@ func (bw *BaseWorker) handleBlockResult(result indexer.BlockResult) bool { "chain", bw.chain.GetName(), "block", result.Block.Number, ) + if bw.registry != nil { + bw.registry.ClearFailedBlocks(bw.chain.GetName(), []uint64{result.Number}) + } bw.notifyObserver(result.Number, BlockStatusProcessed) return true } diff --git a/internal/worker/catchup.go b/internal/worker/catchup.go index a45fdce..c0980b9 100644 --- a/internal/worker/catchup.go +++ b/internal/worker/catchup.go @@ -7,6 +7,7 @@ import ( "time" "github.com/fystack/multichain-indexer/internal/indexer" + "github.com/fystack/multichain-indexer/internal/status" "github.com/fystack/multichain-indexer/pkg/common/config" "github.com/fystack/multichain-indexer/pkg/common/enum" "github.com/fystack/multichain-indexer/pkg/events" @@ -38,6 +39,7 @@ func NewCatchupWorker( emitter events.Emitter, pubkeyStore pubkeystore.Store, failedChan chan FailedBlockEvent, + registry *status.Registry, ) *CatchupWorker { worker := newWorkerWithMode( ctx, @@ -49,12 +51,14 @@ func NewCatchupWorker( pubkeyStore, ModeCatchup, failedChan, + registry, ) cw := &CatchupWorker{ BaseWorker: worker, workerPool: make(chan struct{}, CATCHUP_WORKERS), } cw.blockRanges = cw.loadCatchupProgress() + cw.syncCatchupStatus() return cw } @@ -223,6 +227,7 @@ func (cw *CatchupWorker) processCatchupBlocksParallel() error { // Reload ranges to check for any remaining work cw.blockRanges = cw.loadCatchupProgress() + cw.syncCatchupStatus() return nil } @@ -352,6 +357,13 @@ func (cw *CatchupWorker) saveProgress(r blockstore.CatchupRange, current uint64) "current", current, ) _ = cw.blockStore.SaveCatchupProgress(cw.chain.GetNetworkInternalCode(), r.Start, r.End, current) + for i := range cw.blockRanges { + if cw.blockRanges[i].Start == r.Start && cw.blockRanges[i].End == r.End { + cw.blockRanges[i].Current = min(current, cw.blockRanges[i].End) + break + } + } + cw.syncCatchupStatusLocked() } func (cw *CatchupWorker) completeRange(r blockstore.CatchupRange) error { @@ -372,6 +384,7 @@ func (cw *CatchupWorker) completeRange(r blockstore.CatchupRange) error { break } } + cw.syncCatchupStatusLocked() return nil } @@ -410,6 +423,41 @@ func (cw *CatchupWorker) Close() error { "error", err, ) } + cw.syncCatchupStatusLocked() return nil } + +func (cw *CatchupWorker) syncCatchupStatus() { + cw.progressMu.Lock() + defer cw.progressMu.Unlock() + cw.syncCatchupStatusLocked() +} + +func (cw *CatchupWorker) syncCatchupStatusLocked() { + if cw.registry == nil { + return + } + cw.registry.UpdateCatchup( + cw.chain.GetName(), + catchupPendingBlocks(cw.blockRanges), + len(cw.blockRanges), + ) +} + +func catchupPendingBlocks(ranges []blockstore.CatchupRange) uint64 { + var pending uint64 + for _, r := range ranges { + if r.End < r.Start { + continue + } + if r.Current < r.Start { + pending += r.End - r.Start + 1 + continue + } + if r.Current < r.End { + pending += r.End - r.Current + } + } + return pending +} diff --git a/internal/worker/factory.go b/internal/worker/factory.go index 71064ef..1569442 100644 --- a/internal/worker/factory.go +++ b/internal/worker/factory.go @@ -18,6 +18,7 @@ import ( "github.com/fystack/multichain-indexer/internal/rpc/sui" tonrpc "github.com/fystack/multichain-indexer/internal/rpc/ton" "github.com/fystack/multichain-indexer/internal/rpc/tron" + "github.com/fystack/multichain-indexer/internal/status" "github.com/fystack/multichain-indexer/pkg/addressbloomfilter" "github.com/fystack/multichain-indexer/pkg/common/config" "github.com/fystack/multichain-indexer/pkg/common/enum" @@ -43,6 +44,7 @@ type WorkerDeps struct { Redis infra.RedisClient FailedChan chan FailedBlockEvent Observer BlockResultObserver + Registry *status.Registry } // ManagerConfig defines which workers to enable per chain. @@ -105,6 +107,7 @@ func BuildWorkers( deps.Emitter, deps.Pubkey, deps.FailedChan, + deps.Registry, ), } case ModeCatchup: @@ -118,6 +121,7 @@ func BuildWorkers( deps.Emitter, deps.Pubkey, deps.FailedChan, + deps.Registry, ), } case ModeRescanner: @@ -131,6 +135,7 @@ func BuildWorkers( deps.Emitter, deps.Pubkey, deps.FailedChan, + deps.Registry, ), } case ModeManual: @@ -145,6 +150,7 @@ func BuildWorkers( deps.Emitter, deps.Pubkey, deps.FailedChan, + deps.Registry, ), } case ModeMempool: @@ -158,6 +164,7 @@ func BuildWorkers( deps.Emitter, deps.Pubkey, deps.FailedChan, + deps.Registry, ), } default: @@ -772,8 +779,10 @@ func CreateManagerWithWorkers( // Shared stores blockStore := blockstore.NewBlockStore(kvstore) pubkeyStore := pubkeystore.NewPublicKeyStore(addressBF) + registry := status.NewRegistry() manager := NewManager(ctx, kvstore, blockStore, emitter, pubkeyStore) + manager.registry = registry // Loop each chain for _, chainName := range managerCfg.Chains { @@ -805,6 +814,10 @@ func CreateManagerWithWorkers( default: logger.Fatal("Unsupported network type", "chain", chainName, "type", chainCfg.Type) } + registry.RegisterChain(idxr.GetName(), chainName, chainCfg) + if existingFailed, err := blockStore.GetFailedBlocks(idxr.GetNetworkInternalCode()); err == nil { + registry.SetFailedBlocks(idxr.GetName(), existingFailed) + } failedChan := make(chan FailedBlockEvent, 100) @@ -818,6 +831,7 @@ func CreateManagerWithWorkers( Redis: redisClient, FailedChan: failedChan, Observer: managerCfg.Observer, + Registry: registry, } // Helper: add workers if enabled (all modes share the same indexer and global rate limiter) diff --git a/internal/worker/factory_test.go b/internal/worker/factory_test.go index 187d5dc..94c28cf 100644 --- a/internal/worker/factory_test.go +++ b/internal/worker/factory_test.go @@ -116,8 +116,8 @@ func TestRescannerFailedChannelIsolationByChain(t *testing.T) { chA := make(chan FailedBlockEvent, 1) chB := make(chan FailedBlockEvent, 1) - rwA := NewRescannerWorker(ctx, chainA, testChainConfig(), noopKVStore{}, &stubBlockStore{}, events.Emitter(nil), nil, chA) - rwB := NewRescannerWorker(ctx, chainB, testChainConfig(), noopKVStore{}, &stubBlockStore{}, events.Emitter(nil), nil, chB) + rwA := NewRescannerWorker(ctx, chainA, testChainConfig(), noopKVStore{}, &stubBlockStore{}, events.Emitter(nil), nil, chA, nil) + rwB := NewRescannerWorker(ctx, chainB, testChainConfig(), noopKVStore{}, &stubBlockStore{}, events.Emitter(nil), nil, chB, nil) doneA := make(chan struct{}) doneB := make(chan struct{}) diff --git a/internal/worker/manager.go b/internal/worker/manager.go index 59d64bf..525c394 100644 --- a/internal/worker/manager.go +++ b/internal/worker/manager.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/fystack/multichain-indexer/internal/status" "github.com/fystack/multichain-indexer/pkg/common/logger" "github.com/fystack/multichain-indexer/pkg/events" "github.com/fystack/multichain-indexer/pkg/infra" @@ -22,6 +23,11 @@ type Manager struct { blockStore blockstore.Store emitter events.Emitter pubkeyStore pubkeystore.Store + registry *status.Registry +} + +func (m *Manager) Registry() *status.Registry { + return m.registry } func NewManager( diff --git a/internal/worker/manual.go b/internal/worker/manual.go index 34d4340..6f3d706 100644 --- a/internal/worker/manual.go +++ b/internal/worker/manual.go @@ -5,6 +5,7 @@ import ( "time" "github.com/fystack/multichain-indexer/internal/indexer" + "github.com/fystack/multichain-indexer/internal/status" "github.com/fystack/multichain-indexer/pkg/common/config" "github.com/fystack/multichain-indexer/pkg/events" "github.com/fystack/multichain-indexer/pkg/infra" @@ -41,6 +42,7 @@ func NewManualWorker( emitter events.Emitter, pubkeyStore pubkeystore.Store, failedChan chan FailedBlockEvent, + registry *status.Registry, ) *ManualWorker { return &ManualWorker{ BaseWorker: newWorkerWithMode( @@ -53,6 +55,7 @@ func NewManualWorker( pubkeyStore, ModeManual, failedChan, + registry, ), mbs: missingblockstore.NewMissingBlocksStore(redisClient), config: DefaultManualConfig, diff --git a/internal/worker/mempool.go b/internal/worker/mempool.go index 7955a41..387c327 100644 --- a/internal/worker/mempool.go +++ b/internal/worker/mempool.go @@ -6,6 +6,7 @@ import ( "time" "github.com/fystack/multichain-indexer/internal/indexer" + "github.com/fystack/multichain-indexer/internal/status" "github.com/fystack/multichain-indexer/pkg/common/config" "github.com/fystack/multichain-indexer/pkg/common/types" "github.com/fystack/multichain-indexer/pkg/events" @@ -33,6 +34,7 @@ func NewMempoolWorker( emitter events.Emitter, pubkeyStore pubkeystore.Store, failedChan chan FailedBlockEvent, + registry *status.Registry, ) *MempoolWorker { worker := newWorkerWithMode( ctx, @@ -44,6 +46,7 @@ func NewMempoolWorker( pubkeyStore, ModeMempool, failedChan, + registry, ) // Cast to Bitcoin indexer (mempool is Bitcoin-specific) @@ -156,7 +159,7 @@ func (mw *MempoolWorker) processMempool() error { if mw.config.IndexUTXO { for i := range utxoEvents { event := &utxoEvents[i] - + if mw.seenTxs[event.TxHash+":utxo"] { continue } diff --git a/internal/worker/regular.go b/internal/worker/regular.go index e99646e..52938c4 100644 --- a/internal/worker/regular.go +++ b/internal/worker/regular.go @@ -7,6 +7,7 @@ import ( "time" "github.com/fystack/multichain-indexer/internal/indexer" + "github.com/fystack/multichain-indexer/internal/status" "github.com/fystack/multichain-indexer/pkg/common/config" "github.com/fystack/multichain-indexer/pkg/common/constant" "github.com/fystack/multichain-indexer/pkg/common/enum" @@ -16,7 +17,6 @@ import ( "github.com/fystack/multichain-indexer/pkg/store/pubkeystore" ) - const ( MaxBlockHashSize = 50 regularGapRetryAttempts = 2 @@ -31,7 +31,7 @@ type RegularWorker struct { *BaseWorker currentBlock uint64 blockHashes []blockstore.BlockHashEntry - hashesModified bool + hashesModified bool persistTicker *time.Ticker } @@ -44,6 +44,7 @@ func NewRegularWorker( emitter events.Emitter, pubkeyStore pubkeystore.Store, failedChan chan FailedBlockEvent, + registry *status.Registry, ) *RegularWorker { worker := newWorkerWithMode( ctx, @@ -55,6 +56,7 @@ func NewRegularWorker( pubkeyStore, ModeRegular, failedChan, + registry, ) rw := &RegularWorker{BaseWorker: worker} rw.currentBlock = rw.determineStartingBlock() @@ -94,17 +96,20 @@ func (rw *RegularWorker) processRegularBlocks() error { if err != nil { return fmt.Errorf("get latest block: %w", err) } + rw.updateHeadStatus(latest, time.Time{}) rw.logger.Info("Got latest block", "latest", latest, "current", rw.currentBlock) // Lag detection: if we're too far behind, jump to chain head and queue skipped range for catchup if rw.skipAheadIfLagging(latest) { + rw.updateHeadStatus(latest, time.Time{}) return nil } if rw.currentBlock > latest { rw.logger.Info("Waiting for new blocks...", "current", rw.currentBlock, "latest", latest) time.Sleep(rw.config.PollInterval) + rw.updateHeadStatus(latest, time.Time{}) return nil } @@ -152,14 +157,17 @@ func (rw *RegularWorker) processRegularBlocks() error { return nil } + indexedAt := time.Time{} if lastSuccess >= rw.currentBlock { rw.currentBlock = lastSuccess + 1 _ = rw.blockStore.SaveLatestBlock(rw.chain.GetNetworkInternalCode(), lastSuccess) + indexedAt = time.Now().UTC() if lastSuccessHash != "" { rw.addBlockHash(lastSuccess, lastSuccessHash) } } + rw.updateHeadStatus(latest, indexedAt) rw.logger.Info("Processed latest blocks", "chain", rw.chain.GetName(), @@ -391,6 +399,25 @@ func (rw *RegularWorker) skipAheadIfLagging(latest uint64) bool { return true } +func (rw *RegularWorker) currentIndexedBlock() uint64 { + if rw.currentBlock == 0 { + return 0 + } + return rw.currentBlock - 1 +} + +func (rw *RegularWorker) updateHeadStatus(latest uint64, indexedAt time.Time) { + if rw.registry == nil { + return + } + rw.registry.UpdateHead( + rw.chain.GetName(), + latest, + rw.currentIndexedBlock(), + indexedAt, + ) +} + func (rw *RegularWorker) processReorgCheckedBatch( results []indexer.BlockResult, end uint64, diff --git a/internal/worker/rescanner.go b/internal/worker/rescanner.go index adcfd8d..703a2c0 100644 --- a/internal/worker/rescanner.go +++ b/internal/worker/rescanner.go @@ -7,6 +7,7 @@ import ( "time" "github.com/fystack/multichain-indexer/internal/indexer" + "github.com/fystack/multichain-indexer/internal/status" "github.com/fystack/multichain-indexer/pkg/common/config" "github.com/fystack/multichain-indexer/pkg/common/enum" "github.com/fystack/multichain-indexer/pkg/events" @@ -45,6 +46,7 @@ func NewRescannerWorker( emitter events.Emitter, pubkeyStore pubkeystore.Store, failedChan chan FailedBlockEvent, + registry *status.Registry, ) *RescannerWorker { return &RescannerWorker{ BaseWorker: newWorkerWithMode( @@ -57,6 +59,7 @@ func NewRescannerWorker( pubkeyStore, ModeRescanner, failedChan, + registry, ), failedBlocks: make(map[uint64]uint8), maxRetries: RescannerMaxRetries, @@ -112,6 +115,9 @@ func (rw *RescannerWorker) addFailedBlock(block uint64, errMsg string) { if _, exists := rw.failedBlocks[block]; !exists { rw.failedBlocks[block] = 0 rw.addSave(block) + if rw.registry != nil { + rw.registry.MarkFailedBlock(rw.chain.GetName(), block) + } rw.logger.Info("Added failed block", "block", block, "error", errMsg) } } @@ -121,6 +127,9 @@ func (rw *RescannerWorker) syncFromKV() error { if err != nil { return err } + if rw.registry != nil { + rw.registry.SetFailedBlocks(rw.chain.GetName(), blocks) + } rw.mu.Lock() defer rw.mu.Unlock() for _, num := range blocks { @@ -141,6 +150,9 @@ func (rw *RescannerWorker) removeBlocks(blocks []uint64) { } rw.mu.Unlock() rw.addRemove(blocks...) + if rw.registry != nil { + rw.registry.ClearFailedBlocks(rw.chain.GetName(), blocks) + } } func (rw *RescannerWorker) incrementRetry(block uint64) { diff --git a/pkg/common/config/chains.go b/pkg/common/config/chains.go index 0f44d3d..8d7d5ac 100644 --- a/pkg/common/config/chains.go +++ b/pkg/common/config/chains.go @@ -72,6 +72,9 @@ func (c Chains) ApplyDefaults(def Defaults) error { if err := mergo.Merge(&chain.Throttle, def.Throttle); err != nil { return fmt.Errorf("merge throttle defaults for %s: %w", name, err) } + if err := mergo.Merge(&chain.Status, def.Status); err != nil { + return fmt.Errorf("merge status defaults for %s: %w", name, err) + } c[name] = chain } return nil diff --git a/pkg/common/config/chains_test.go b/pkg/common/config/chains_test.go new file mode 100644 index 0000000..fb9fea3 --- /dev/null +++ b/pkg/common/config/chains_test.go @@ -0,0 +1,38 @@ +package config + +import ( + "testing" + "time" + + "github.com/fystack/multichain-indexer/pkg/common/enum" + "github.com/stretchr/testify/require" +) + +func TestApplyDefaults_MergesStatusThresholds(t *testing.T) { + t.Parallel() + + chains := Chains{ + "ethereum_mainnet": { + Type: enum.NetworkTypeEVM, + PollInterval: time.Second, + Nodes: []NodeConfig{{URL: "https://example.com"}}, + Status: StatusConfig{ + SlowMaxPendingBlocks: 120, + }, + }, + } + + err := chains.ApplyDefaults(Defaults{ + PollInterval: time.Second, + ReorgRollbackWindow: 20, + Status: StatusConfig{ + HealthyMaxPendingBlocks: 30, + SlowMaxPendingBlocks: 250, + }, + }) + require.NoError(t, err) + + chain := chains["ethereum_mainnet"] + require.Equal(t, uint64(30), chain.Status.HealthyMaxPendingBlocks) + require.Equal(t, uint64(120), chain.Status.SlowMaxPendingBlocks) +} diff --git a/pkg/common/config/types.go b/pkg/common/config/types.go index 42a002b..510c102 100644 --- a/pkg/common/config/types.go +++ b/pkg/common/config/types.go @@ -14,6 +14,11 @@ const ( ProdEnv Env = "production" ) +const ( + DefaultStatusHealthyMaxPendingBlocks uint64 = 50 + DefaultStatusSlowMaxPendingBlocks uint64 = 250 +) + type Config struct { Version string `yaml:"version"` Environment Env `yaml:"env" validate:"required,oneof=development production"` @@ -27,6 +32,7 @@ type Defaults struct { TwoWayIndexing bool `yaml:"two_way_indexing"` PollInterval time.Duration `yaml:"poll_interval" validate:"required"` ReorgRollbackWindow int `yaml:"reorg_rollback_window" validate:"required,min=1"` + Status StatusConfig `yaml:"status"` Client ClientConfig `yaml:"client"` Throttle Throttle `yaml:"throttle"` Failover rpc.FailoverConfig `yaml:"failover"` @@ -50,12 +56,29 @@ type ChainConfig struct { IndexUTXO bool `yaml:"index_utxo"` DebugTrace bool `yaml:"debug_trace"` TraceThrottle TraceThrottle `yaml:"trace_throttle"` + Status StatusConfig `yaml:"status"` Client ClientConfig `yaml:"client"` Throttle Throttle `yaml:"throttle"` Ton TonConfig `yaml:"ton"` Nodes []NodeConfig `yaml:"nodes" validate:"required,min=1"` } +type StatusConfig struct { + HealthyMaxPendingBlocks uint64 `yaml:"healthy_max_pending_blocks"` + SlowMaxPendingBlocks uint64 `yaml:"slow_max_pending_blocks"` +} + +func (s StatusConfig) Normalize() StatusConfig { + normalized := s + if normalized.HealthyMaxPendingBlocks == 0 { + normalized.HealthyMaxPendingBlocks = DefaultStatusHealthyMaxPendingBlocks + } + if normalized.SlowMaxPendingBlocks == 0 { + normalized.SlowMaxPendingBlocks = DefaultStatusSlowMaxPendingBlocks + } + return normalized +} + type ClientConfig struct { Timeout time.Duration `yaml:"timeout"` MaxRetries int `yaml:"max_retries" validate:"min=0"` diff --git a/pkg/common/config/validate.go b/pkg/common/config/validate.go index db300b5..e00b080 100644 --- a/pkg/common/config/validate.go +++ b/pkg/common/config/validate.go @@ -10,5 +10,13 @@ func validateChainConfig(chain ChainConfig) error { if chain.Type == enum.NetworkTypeCosmos && chain.NativeDenom == "" { return fmt.Errorf("native_denom is required for cosmos chains") } + statusCfg := chain.Status.Normalize() + if statusCfg.HealthyMaxPendingBlocks >= statusCfg.SlowMaxPendingBlocks { + return fmt.Errorf( + "status thresholds invalid: healthy_max_pending_blocks (%d) must be less than slow_max_pending_blocks (%d)", + statusCfg.HealthyMaxPendingBlocks, + statusCfg.SlowMaxPendingBlocks, + ) + } return nil } diff --git a/pkg/common/config/validate_test.go b/pkg/common/config/validate_test.go index b074050..25ff2f2 100644 --- a/pkg/common/config/validate_test.go +++ b/pkg/common/config/validate_test.go @@ -30,3 +30,33 @@ func TestValidateChainConfig_DoesNotRequireNativeDenomForNonCosmos(t *testing.T) }) require.NoError(t, err) } + +func TestValidateChainConfig_AllowsDefaultStatusThresholds(t *testing.T) { + err := validateChainConfig(ChainConfig{ + Type: enum.NetworkTypeEVM, + }) + require.NoError(t, err) +} + +func TestValidateChainConfig_RejectsInvalidStatusThresholds(t *testing.T) { + err := validateChainConfig(ChainConfig{ + Type: enum.NetworkTypeEVM, + Status: StatusConfig{ + HealthyMaxPendingBlocks: 300, + SlowMaxPendingBlocks: 200, + }, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "status thresholds invalid") +} + +func TestValidateChainConfig_RejectsSlowLowerThanDefaultHealthy(t *testing.T) { + err := validateChainConfig(ChainConfig{ + Type: enum.NetworkTypeEVM, + Status: StatusConfig{ + SlowMaxPendingBlocks: 40, + }, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "status thresholds invalid") +} From 76fd3ca2c0021e7488cc4851453552f88ed1ecf7 Mon Sep 17 00:00:00 2001 From: vietddude Date: Wed, 25 Mar 2026 17:02:37 +0700 Subject: [PATCH 2/5] refactor: update health status snapshot to include catchup progress from blockstore --- cmd/indexer/main.go | 4 ++-- internal/status/status.go | 40 +++++++++++++++++----------------- internal/status/status_test.go | 40 ++++++++++++++++++++++++++++------ internal/worker/catchup.go | 39 --------------------------------- internal/worker/factory.go | 10 +++++++-- internal/worker/manager.go | 30 +++++++++++++++++-------- pkg/store/blockstore/store.go | 18 +++++++++++++++ 7 files changed, 102 insertions(+), 79 deletions(-) diff --git a/cmd/indexer/main.go b/cmd/indexer/main.go index 5d4feff..93aa69a 100644 --- a/cmd/indexer/main.go +++ b/cmd/indexer/main.go @@ -278,8 +278,8 @@ func startHealthServer(port int, cfg *config.Config, manager *worker.Manager) *h Version: version, Networks: []status.NetworkStatus{}, } - if manager != nil && manager.Registry() != nil { - response = manager.Registry().Snapshot(version) + if manager != nil { + response = manager.StatusSnapshot(version) } w.Header().Set("Content-Type", "application/json") diff --git a/internal/status/status.go b/internal/status/status.go index 67c0d7e..a61fa7e 100644 --- a/internal/status/status.go +++ b/internal/status/status.go @@ -7,6 +7,7 @@ import ( "time" "github.com/fystack/multichain-indexer/pkg/common/config" + "github.com/fystack/multichain-indexer/pkg/store/blockstore" ) type HealthStatus string @@ -47,12 +48,15 @@ type chainState struct { thresholds config.StatusConfig latestBlock uint64 indexedBlock uint64 - catchupRemain uint64 - catchupRanges int lastIndexedAt time.Time failedBlocks map[uint64]struct{} } +// CatchupProgressSource supplies persisted catchup ranges (e.g. blockstore.Store). +type CatchupProgressSource interface { + GetCatchupProgress(chain string) ([]blockstore.CatchupRange, error) +} + type Registry struct { mu sync.RWMutex chains map[string]*chainState @@ -109,20 +113,6 @@ func (r *Registry) UpdateHead(chainKey string, latestBlock, indexedBlock uint64, } } -func (r *Registry) UpdateCatchup(chainKey string, pendingBlocks uint64, ranges int) { - key := normalizeChainKey(chainKey) - if key == "" { - return - } - - r.mu.Lock() - defer r.mu.Unlock() - - state := r.ensureStateLocked(key) - state.catchupRemain = pendingBlocks - state.catchupRanges = ranges -} - func (r *Registry) MarkFailedBlock(chainKey string, blockNumber uint64) { key := normalizeChainKey(chainKey) if key == "" || blockNumber == 0 { @@ -170,7 +160,7 @@ func (r *Registry) SetFailedBlocks(chainKey string, blockNumbers []uint64) { } } -func (r *Registry) Snapshot(version string) StatusResponse { +func (r *Registry) Snapshot(version string, src CatchupProgressSource) StatusResponse { r.mu.RLock() defer r.mu.RUnlock() @@ -180,7 +170,17 @@ func (r *Registry) Snapshot(version string) StatusResponse { if state.latestBlock > state.indexedBlock { headGap = state.latestBlock - state.indexedBlock } - pending := headGap + state.catchupRemain + + var catchupPending uint64 + catchupRanges := 0 + if src != nil && state.internalCode != "" { + if ranges, err := src.GetCatchupProgress(state.internalCode); err == nil { + catchupRanges = len(ranges) + catchupPending = blockstore.CatchupPendingBlocks(ranges) + } + } + + pending := headGap + catchupPending thresholds := state.thresholds.Normalize() item := NetworkStatus{ @@ -193,8 +193,8 @@ func (r *Registry) Snapshot(version string) StatusResponse { IndexedBlock: state.indexedBlock, PendingBlocks: pending, HeadGap: headGap, - CatchupPendingBlocks: state.catchupRemain, - CatchupRanges: state.catchupRanges, + CatchupPendingBlocks: catchupPending, + CatchupRanges: catchupRanges, FailedBlocks: len(state.failedBlocks), } if !state.lastIndexedAt.IsZero() { diff --git a/internal/status/status_test.go b/internal/status/status_test.go index 27ecb8b..cc08920 100644 --- a/internal/status/status_test.go +++ b/internal/status/status_test.go @@ -6,9 +6,26 @@ import ( "github.com/fystack/multichain-indexer/pkg/common/config" "github.com/fystack/multichain-indexer/pkg/common/enum" + "github.com/fystack/multichain-indexer/pkg/store/blockstore" "github.com/stretchr/testify/require" ) +// mapCatchupStore implements CatchupProgressSource for tests. +type mapCatchupStore map[string][]blockstore.CatchupRange + +func (m mapCatchupStore) GetCatchupProgress(chain string) ([]blockstore.CatchupRange, error) { + if m == nil { + return nil, nil + } + ranges := m[chain] + if ranges == nil { + return nil, nil + } + out := make([]blockstore.CatchupRange, len(ranges)) + copy(out, ranges) + return out, nil +} + func TestRegistrySnapshotDerivesHealthWithPerChainThresholds(t *testing.T) { t.Parallel() @@ -29,11 +46,17 @@ func TestRegistrySnapshotDerivesHealthWithPerChainThresholds(t *testing.T) { indexedAt := time.Date(2026, 3, 25, 12, 0, 0, 0, time.UTC) registry.UpdateHead("eth_mainnet", 1_000, 980, indexedAt) - registry.UpdateCatchup("eth_mainnet", 5, 2) registry.MarkFailedBlock("eth_mainnet", 981) registry.MarkFailedBlock("eth_mainnet", 982) - resp := registry.Snapshot("1.2.3") + kvCatchup := mapCatchupStore{ + "ETH_MAINNET": { + {Start: 100, End: 102, Current: 99}, + {Start: 200, End: 201, Current: 199}, + }, + } + + resp := registry.Snapshot("1.2.3", kvCatchup) require.Equal(t, "1.2.3", resp.Version) require.Len(t, resp.Networks, 1) @@ -69,13 +92,16 @@ func TestRegistrySnapshotUsesDefaultThresholdWhenMissing(t *testing.T) { ) registry.UpdateHead("tron_mainnet", 500, 260, time.Time{}) - registry.UpdateCatchup("tron_mainnet", 20, 1) - resp := registry.Snapshot("1.0.0") + kvCatchup := mapCatchupStore{ + "TRON_MAINNET": {{Start: 1, End: 20, Current: 0}}, + } + + resp := registry.Snapshot("1.0.0", kvCatchup) require.Len(t, resp.Networks, 1) network := resp.Networks[0] - // default healthy<50, slow<250 => pending=260 should be degraded + // head_gap 240 + catchup 20 = 260; default healthy<50, slow<250 => degraded require.Equal(t, uint64(260), network.PendingBlocks) require.Equal(t, HealthDegraded, network.Health) } @@ -94,11 +120,11 @@ func TestRegistryClearFailedBlocks(t *testing.T) { registry.MarkFailedBlock("btc_mainnet", 11) registry.ClearFailedBlocks("btc_mainnet", []uint64{10}) - resp := registry.Snapshot("1.0.0") + resp := registry.Snapshot("1.0.0", mapCatchupStore{}) require.Len(t, resp.Networks, 1) require.Equal(t, 1, resp.Networks[0].FailedBlocks) registry.SetFailedBlocks("btc_mainnet", []uint64{21, 22, 22}) - resp = registry.Snapshot("1.0.0") + resp = registry.Snapshot("1.0.0", mapCatchupStore{}) require.Equal(t, 2, resp.Networks[0].FailedBlocks) } diff --git a/internal/worker/catchup.go b/internal/worker/catchup.go index c0980b9..c96e9ef 100644 --- a/internal/worker/catchup.go +++ b/internal/worker/catchup.go @@ -58,7 +58,6 @@ func NewCatchupWorker( workerPool: make(chan struct{}, CATCHUP_WORKERS), } cw.blockRanges = cw.loadCatchupProgress() - cw.syncCatchupStatus() return cw } @@ -227,7 +226,6 @@ func (cw *CatchupWorker) processCatchupBlocksParallel() error { // Reload ranges to check for any remaining work cw.blockRanges = cw.loadCatchupProgress() - cw.syncCatchupStatus() return nil } @@ -363,7 +361,6 @@ func (cw *CatchupWorker) saveProgress(r blockstore.CatchupRange, current uint64) break } } - cw.syncCatchupStatusLocked() } func (cw *CatchupWorker) completeRange(r blockstore.CatchupRange) error { @@ -384,7 +381,6 @@ func (cw *CatchupWorker) completeRange(r blockstore.CatchupRange) error { break } } - cw.syncCatchupStatusLocked() return nil } @@ -423,41 +419,6 @@ func (cw *CatchupWorker) Close() error { "error", err, ) } - cw.syncCatchupStatusLocked() return nil } - -func (cw *CatchupWorker) syncCatchupStatus() { - cw.progressMu.Lock() - defer cw.progressMu.Unlock() - cw.syncCatchupStatusLocked() -} - -func (cw *CatchupWorker) syncCatchupStatusLocked() { - if cw.registry == nil { - return - } - cw.registry.UpdateCatchup( - cw.chain.GetName(), - catchupPendingBlocks(cw.blockRanges), - len(cw.blockRanges), - ) -} - -func catchupPendingBlocks(ranges []blockstore.CatchupRange) uint64 { - var pending uint64 - for _, r := range ranges { - if r.End < r.Start { - continue - } - if r.Current < r.Start { - pending += r.End - r.Start + 1 - continue - } - if r.Current < r.End { - pending += r.End - r.Current - } - } - return pending -} diff --git a/internal/worker/factory.go b/internal/worker/factory.go index 1569442..a491867 100644 --- a/internal/worker/factory.go +++ b/internal/worker/factory.go @@ -834,10 +834,16 @@ func CreateManagerWithWorkers( Registry: registry, } - // Helper: add workers if enabled (all modes share the same indexer and global rate limiter) + // Helper: add workers if enabled (all modes share the same indexer and global rate limiter). + // Status registry (head / failed-block counters for /status) is only wired to the regular worker; + // catchup progress is read from KV at snapshot time. addIfEnabled := func(mode WorkerMode, enabled bool) { if enabled { - ws := BuildWorkers(idxr, chainCfg, mode, deps) + wdeps := deps + if mode != ModeRegular { + wdeps.Registry = nil + } + ws := BuildWorkers(idxr, chainCfg, mode, wdeps) manager.AddWorkers(ws...) logger.Info("Worker enabled", "chain", chainName, "mode", mode) } else { diff --git a/internal/worker/manager.go b/internal/worker/manager.go index 525c394..5768e0a 100644 --- a/internal/worker/manager.go +++ b/internal/worker/manager.go @@ -26,10 +26,6 @@ type Manager struct { registry *status.Registry } -func (m *Manager) Registry() *status.Registry { - return m.registry -} - func NewManager( ctx context.Context, kvstore infra.KVStore, @@ -46,6 +42,10 @@ func NewManager( } } +func (m *Manager) Registry() *status.Registry { + return m.registry +} + // Start launches all injected workers func (m *Manager) Start() { for _, w := range m.workers { @@ -92,16 +92,28 @@ func (m *Manager) Stop() { logger.Info("Manager stopped") } -// closeResource is a helper to close resources with consistent error handling -func (m *Manager) closeResource(name string, resource interface{}, closer func() error) { - if resource != nil { - if err := closer(); err != nil { - logger.Error("Failed to close "+name, "err", err) +// StatusSnapshot returns /status payload; catchup fields are read from the block store. +func (m *Manager) StatusSnapshot(version string) status.StatusResponse { + if m.registry == nil { + return status.StatusResponse{ + Timestamp: time.Now().UTC(), + Version: version, + Networks: []status.NetworkStatus{}, } } + return m.registry.Snapshot(version, m.blockStore) } // Inject workers into manager func (m *Manager) AddWorkers(workers ...Worker) { m.workers = append(m.workers, workers...) } + +// closeResource is a helper to close resources with consistent error handling +func (m *Manager) closeResource(name string, resource any, closer func() error) { + if resource != nil { + if err := closer(); err != nil { + logger.Error("Failed to close "+name, "err", err) + } + } +} diff --git a/pkg/store/blockstore/store.go b/pkg/store/blockstore/store.go index e946752..69c2729 100644 --- a/pkg/store/blockstore/store.go +++ b/pkg/store/blockstore/store.go @@ -29,6 +29,24 @@ type CatchupRange struct { Current uint64 `json:"current"` } +// CatchupPendingBlocks returns how many blocks remain across all active catchup ranges. +func CatchupPendingBlocks(ranges []CatchupRange) uint64 { + var pending uint64 + for _, r := range ranges { + if r.End < r.Start { + continue + } + if r.Current < r.Start { + pending += r.End - r.Start + 1 + continue + } + if r.Current < r.End { + pending += r.End - r.Current + } + } + return pending +} + func latestBlockKey(chainName string) string { return fmt.Sprintf("%s/%s/%s", BlockStates, chainName, constant.KVPrefixLatestBlock) } From 253e7b619c9f68c95350630b5a994dec5205ee26 Mon Sep 17 00:00:00 2001 From: vietddude Date: Wed, 25 Mar 2026 17:17:14 +0700 Subject: [PATCH 3/5] feat: add health status configuration for chains with max pending blocks --- configs/config.example.yaml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/configs/config.example.yaml b/configs/config.example.yaml index 95dece5..8b9e6dc 100644 --- a/configs/config.example.yaml +++ b/configs/config.example.yaml @@ -246,7 +246,10 @@ chains: batch_size: 200 concurrency: 12 parallel: true - + status: + healthy_max_pending_blocks: 50 + slow_max_pending_blocks: 250 + osmosis_mainnet: network_id: "osmosis-1" internal_code: "OSMO_MAINNET" From 2b38fced7c923bf63884b7af28e9399481b9eaa5 Mon Sep 17 00:00:00 2001 From: vietddude Date: Thu, 26 Mar 2026 13:59:32 +0700 Subject: [PATCH 4/5] fix(status): sync worker metrics and avoid snapshot lock contention --- internal/status/registry.go | 210 +++++++++++++++++++++++++++ internal/status/status.go | 257 +++------------------------------ internal/status/status_test.go | 13 +- internal/status/types.go | 67 +++++++++ internal/worker/base.go | 54 ++++--- internal/worker/catchup.go | 4 +- internal/worker/factory.go | 62 ++++---- internal/worker/manual.go | 4 +- internal/worker/mempool.go | 4 +- internal/worker/regular.go | 10 +- internal/worker/rescanner.go | 16 +- pkg/common/config/chains.go | 7 + pkg/common/config/load.go | 2 +- 13 files changed, 375 insertions(+), 335 deletions(-) create mode 100644 internal/status/registry.go create mode 100644 internal/status/types.go diff --git a/internal/status/registry.go b/internal/status/registry.go new file mode 100644 index 0000000..bf134cf --- /dev/null +++ b/internal/status/registry.go @@ -0,0 +1,210 @@ +package status + +import ( + "sort" + "strings" + "sync" + "time" + + "github.com/fystack/multichain-indexer/pkg/common/config" + "github.com/fystack/multichain-indexer/pkg/store/blockstore" +) + +type Registry struct { + mu sync.RWMutex + chains map[string]*chainState +} + +func NewRegistry() *Registry { + return &Registry{ + chains: make(map[string]*chainState), + } +} + +func (r *Registry) RegisterChain(chainKey, chainName string, chainCfg config.ChainConfig) { + key := config.CanonicalChainKey(chainKey) + if key == "" { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + state, exists := r.chains[key] + if !exists { + state = &chainState{ + failedBlocks: make(map[uint64]struct{}), + } + r.chains[key] = state + } + + if strings.TrimSpace(chainCfg.NetworkId) != "" { + state.networkID = chainCfg.NetworkId + } else { + state.networkID = chainName + } + state.chainName = chainName + state.internalCode = chainCfg.InternalCode + state.networkType = string(chainCfg.Type) + state.thresholds = chainCfg.Status.Normalize() +} + +func (r *Registry) UpdateHead(chainKey string, latestBlock, indexedBlock uint64, indexedAt time.Time) { + key := config.CanonicalChainKey(chainKey) + if key == "" { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + state := r.ensureStateLocked(key) + state.latestBlock = latestBlock + state.indexedBlock = indexedBlock + if !indexedAt.IsZero() { + state.lastIndexedAt = indexedAt.UTC() + } +} + +func (r *Registry) MarkFailedBlock(chainKey string, blockNumber uint64) { + key := config.CanonicalChainKey(chainKey) + if key == "" || blockNumber == 0 { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + state := r.ensureStateLocked(key) + state.failedBlocks[blockNumber] = struct{}{} +} + +func (r *Registry) ClearFailedBlocks(chainKey string, blockNumbers []uint64) { + key := config.CanonicalChainKey(chainKey) + if key == "" || len(blockNumbers) == 0 { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + state := r.ensureStateLocked(key) + for _, block := range blockNumbers { + delete(state.failedBlocks, block) + } +} + +func (r *Registry) SetFailedBlocks(chainKey string, blockNumbers []uint64) { + key := config.CanonicalChainKey(chainKey) + if key == "" { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + state := r.ensureStateLocked(key) + state.failedBlocks = make(map[uint64]struct{}, len(blockNumbers)) + for _, block := range blockNumbers { + if block == 0 { + continue + } + state.failedBlocks[block] = struct{}{} + } +} + +func (r *Registry) Snapshot(version string, src CatchupProgressSource) StatusResponse { + r.mu.RLock() + states := make([]chainSnapshot, 0, len(r.chains)) + for _, state := range r.chains { + states = append(states, chainSnapshot{ + networkID: state.networkID, + chainName: state.chainName, + internalCode: state.internalCode, + networkType: state.networkType, + thresholds: state.thresholds, + latestBlock: state.latestBlock, + indexedBlock: state.indexedBlock, + lastIndexedAt: state.lastIndexedAt, + failedBlocksCount: len(state.failedBlocks), + }) + } + r.mu.RUnlock() + + networks := make([]NetworkStatus, 0, len(states)) + for _, state := range states { + headGap := uint64(0) + if state.latestBlock > state.indexedBlock { + headGap = state.latestBlock - state.indexedBlock + } + + var catchupPending uint64 + catchupRanges := 0 + if src != nil && state.internalCode != "" { + if ranges, err := src.GetCatchupProgress(state.internalCode); err == nil { + catchupRanges = len(ranges) + catchupPending = blockstore.CatchupPendingBlocks(ranges) + } + } + + pending := headGap + catchupPending + + item := NetworkStatus{ + NetworkID: state.networkID, + ChainName: state.chainName, + InternalCode: state.internalCode, + NetworkType: state.networkType, + Health: deriveHealth(pending, state.thresholds), + LatestBlock: state.latestBlock, + IndexedBlock: state.indexedBlock, + PendingBlocks: pending, + HeadGap: headGap, + CatchupPendingBlocks: catchupPending, + CatchupRanges: catchupRanges, + FailedBlocks: state.failedBlocksCount, + } + if !state.lastIndexedAt.IsZero() { + t := state.lastIndexedAt + item.LastIndexedAt = &t + } + networks = append(networks, item) + } + + sort.Slice(networks, func(i, j int) bool { + return networks[i].ChainName < networks[j].ChainName + }) + + return StatusResponse{ + Timestamp: time.Now().UTC(), + Version: version, + Networks: networks, + } +} + +func (r *Registry) ensureStateLocked(key string) *chainState { + state, exists := r.chains[key] + if exists { + return state + } + + state = &chainState{ + networkID: strings.ToLower(key), + chainName: strings.ToLower(key), + internalCode: key, + thresholds: config.StatusConfig{}.Normalize(), + failedBlocks: make(map[uint64]struct{}), + } + r.chains[key] = state + return state +} + +func deriveHealth(pendingBlocks uint64, thresholds config.StatusConfig) HealthStatus { + switch { + case pendingBlocks < thresholds.HealthyMaxPendingBlocks: + return HealthHealthy + case pendingBlocks < thresholds.SlowMaxPendingBlocks: + return HealthSlow + default: + return HealthDegraded + } +} diff --git a/internal/status/status.go b/internal/status/status.go index a61fa7e..590f023 100644 --- a/internal/status/status.go +++ b/internal/status/status.go @@ -1,249 +1,26 @@ package status -import ( - "sort" - "strings" - "sync" - "time" +import "time" - "github.com/fystack/multichain-indexer/pkg/common/config" - "github.com/fystack/multichain-indexer/pkg/store/blockstore" -) - -type HealthStatus string - -const ( - HealthHealthy HealthStatus = "healthy" - HealthSlow HealthStatus = "slow" - HealthDegraded HealthStatus = "degraded" -) - -type NetworkStatus struct { - NetworkID string `json:"network_id"` - ChainName string `json:"chain_name"` - InternalCode string `json:"internal_code"` - NetworkType string `json:"network_type"` - Health HealthStatus `json:"health"` - LatestBlock uint64 `json:"latest_block"` - IndexedBlock uint64 `json:"indexed_block"` - PendingBlocks uint64 `json:"pending_blocks"` - HeadGap uint64 `json:"head_gap"` - CatchupPendingBlocks uint64 `json:"catchup_pending_blocks"` - CatchupRanges int `json:"catchup_ranges"` - FailedBlocks int `json:"failed_blocks"` - LastIndexedAt *time.Time `json:"last_indexed_at,omitempty"` -} - -type StatusResponse struct { - Timestamp time.Time `json:"timestamp"` - Version string `json:"version"` - Networks []NetworkStatus `json:"networks"` -} - -type chainState struct { - networkID string - chainName string - internalCode string - networkType string - thresholds config.StatusConfig - latestBlock uint64 - indexedBlock uint64 - lastIndexedAt time.Time - failedBlocks map[uint64]struct{} -} - -// CatchupProgressSource supplies persisted catchup ranges (e.g. blockstore.Store). -type CatchupProgressSource interface { - GetCatchupProgress(chain string) ([]blockstore.CatchupRange, error) -} - -type Registry struct { - mu sync.RWMutex - chains map[string]*chainState -} - -func NewRegistry() *Registry { - return &Registry{ - chains: make(map[string]*chainState), - } -} - -func (r *Registry) RegisterChain(chainKey, chainName string, chainCfg config.ChainConfig) { - key := normalizeChainKey(chainKey) - if key == "" { - return - } - - r.mu.Lock() - defer r.mu.Unlock() - - state, exists := r.chains[key] - if !exists { - state = &chainState{ - failedBlocks: make(map[uint64]struct{}), - } - r.chains[key] = state - } - - if strings.TrimSpace(chainCfg.NetworkId) != "" { - state.networkID = chainCfg.NetworkId - } else { - state.networkID = chainName - } - state.chainName = chainName - state.internalCode = chainCfg.InternalCode - state.networkType = string(chainCfg.Type) - state.thresholds = chainCfg.Status.Normalize() -} - -func (r *Registry) UpdateHead(chainKey string, latestBlock, indexedBlock uint64, indexedAt time.Time) { - key := normalizeChainKey(chainKey) - if key == "" { - return - } - - r.mu.Lock() - defer r.mu.Unlock() - - state := r.ensureStateLocked(key) - state.latestBlock = latestBlock - state.indexedBlock = indexedBlock - if !indexedAt.IsZero() { - state.lastIndexedAt = indexedAt.UTC() - } -} - -func (r *Registry) MarkFailedBlock(chainKey string, blockNumber uint64) { - key := normalizeChainKey(chainKey) - if key == "" || blockNumber == 0 { - return - } - - r.mu.Lock() - defer r.mu.Unlock() - - state := r.ensureStateLocked(key) - state.failedBlocks[blockNumber] = struct{}{} -} - -func (r *Registry) ClearFailedBlocks(chainKey string, blockNumbers []uint64) { - key := normalizeChainKey(chainKey) - if key == "" || len(blockNumbers) == 0 { - return - } - - r.mu.Lock() - defer r.mu.Unlock() - - state := r.ensureStateLocked(key) - for _, block := range blockNumbers { - delete(state.failedBlocks, block) - } -} - -func (r *Registry) SetFailedBlocks(chainKey string, blockNumbers []uint64) { - key := normalizeChainKey(chainKey) - if key == "" { - return - } - - r.mu.Lock() - defer r.mu.Unlock() - - state := r.ensureStateLocked(key) - state.failedBlocks = make(map[uint64]struct{}, len(blockNumbers)) - for _, block := range blockNumbers { - if block == 0 { - continue - } - state.failedBlocks[block] = struct{}{} - } +// StatusRegistry captures status mutations used by workers. +type StatusRegistry interface { + UpdateHead(chainKey string, latestBlock, indexedBlock uint64, indexedAt time.Time) + MarkFailedBlock(chainKey string, blockNumber uint64) + ClearFailedBlocks(chainKey string, blockNumbers []uint64) + SetFailedBlocks(chainKey string, blockNumbers []uint64) } -func (r *Registry) Snapshot(version string, src CatchupProgressSource) StatusResponse { - r.mu.RLock() - defer r.mu.RUnlock() +// NoopStatusRegistry is a no-op implementation of StatusRegistry. +type NoopStatusRegistry struct{} - networks := make([]NetworkStatus, 0, len(r.chains)) - for _, state := range r.chains { - headGap := uint64(0) - if state.latestBlock > state.indexedBlock { - headGap = state.latestBlock - state.indexedBlock - } +func (NoopStatusRegistry) UpdateHead(string, uint64, uint64, time.Time) {} +func (NoopStatusRegistry) MarkFailedBlock(string, uint64) {} +func (NoopStatusRegistry) ClearFailedBlocks(string, []uint64) {} +func (NoopStatusRegistry) SetFailedBlocks(string, []uint64) {} - var catchupPending uint64 - catchupRanges := 0 - if src != nil && state.internalCode != "" { - if ranges, err := src.GetCatchupProgress(state.internalCode); err == nil { - catchupRanges = len(ranges) - catchupPending = blockstore.CatchupPendingBlocks(ranges) - } - } - - pending := headGap + catchupPending - thresholds := state.thresholds.Normalize() - - item := NetworkStatus{ - NetworkID: state.networkID, - ChainName: state.chainName, - InternalCode: state.internalCode, - NetworkType: state.networkType, - Health: deriveHealth(pending, thresholds), - LatestBlock: state.latestBlock, - IndexedBlock: state.indexedBlock, - PendingBlocks: pending, - HeadGap: headGap, - CatchupPendingBlocks: catchupPending, - CatchupRanges: catchupRanges, - FailedBlocks: len(state.failedBlocks), - } - if !state.lastIndexedAt.IsZero() { - t := state.lastIndexedAt - item.LastIndexedAt = &t - } - networks = append(networks, item) - } - - sort.Slice(networks, func(i, j int) bool { - return networks[i].ChainName < networks[j].ChainName - }) - - return StatusResponse{ - Timestamp: time.Now().UTC(), - Version: version, - Networks: networks, - } -} - -func (r *Registry) ensureStateLocked(key string) *chainState { - state, exists := r.chains[key] - if exists { - return state +func EnsureStatusRegistry(statusRegistry StatusRegistry) StatusRegistry { + if statusRegistry == nil { + return NoopStatusRegistry{} } - - state = &chainState{ - networkID: strings.ToLower(key), - chainName: strings.ToLower(key), - internalCode: key, - thresholds: config.StatusConfig{}.Normalize(), - failedBlocks: make(map[uint64]struct{}), - } - r.chains[key] = state - return state -} - -func deriveHealth(pendingBlocks uint64, thresholds config.StatusConfig) HealthStatus { - normalized := thresholds.Normalize() - switch { - case pendingBlocks < normalized.HealthyMaxPendingBlocks: - return HealthHealthy - case pendingBlocks < normalized.SlowMaxPendingBlocks: - return HealthSlow - default: - return HealthDegraded - } -} - -func normalizeChainKey(key string) string { - return strings.ToUpper(strings.TrimSpace(key)) + return statusRegistry } diff --git a/internal/status/status_test.go b/internal/status/status_test.go index cc08920..2338502 100644 --- a/internal/status/status_test.go +++ b/internal/status/status_test.go @@ -91,19 +91,14 @@ func TestRegistrySnapshotUsesDefaultThresholdWhenMissing(t *testing.T) { }, ) - registry.UpdateHead("tron_mainnet", 500, 260, time.Time{}) + registry.UpdateHead("tron_mainnet", 500, 470, time.Time{}) - kvCatchup := mapCatchupStore{ - "TRON_MAINNET": {{Start: 1, End: 20, Current: 0}}, - } - - resp := registry.Snapshot("1.0.0", kvCatchup) + resp := registry.Snapshot("1.0.0", mapCatchupStore{}) require.Len(t, resp.Networks, 1) network := resp.Networks[0] - // head_gap 240 + catchup 20 = 260; default healthy<50, slow<250 => degraded - require.Equal(t, uint64(260), network.PendingBlocks) - require.Equal(t, HealthDegraded, network.Health) + require.Equal(t, uint64(30), network.PendingBlocks) + require.Equal(t, HealthHealthy, network.Health) } func TestRegistryClearFailedBlocks(t *testing.T) { diff --git a/internal/status/types.go b/internal/status/types.go new file mode 100644 index 0000000..7c1f729 --- /dev/null +++ b/internal/status/types.go @@ -0,0 +1,67 @@ +package status + +import ( + "time" + + "github.com/fystack/multichain-indexer/pkg/common/config" + "github.com/fystack/multichain-indexer/pkg/store/blockstore" +) + +type HealthStatus string + +const ( + HealthHealthy HealthStatus = "healthy" + HealthSlow HealthStatus = "slow" + HealthDegraded HealthStatus = "degraded" +) + +type NetworkStatus struct { + NetworkID string `json:"network_id"` + ChainName string `json:"chain_name"` + InternalCode string `json:"internal_code"` + NetworkType string `json:"network_type"` + Health HealthStatus `json:"health"` + LatestBlock uint64 `json:"latest_block"` + IndexedBlock uint64 `json:"indexed_block"` + PendingBlocks uint64 `json:"pending_blocks"` + HeadGap uint64 `json:"head_gap"` + CatchupPendingBlocks uint64 `json:"catchup_pending_blocks"` + CatchupRanges int `json:"catchup_ranges"` + FailedBlocks int `json:"failed_blocks"` + LastIndexedAt *time.Time `json:"last_indexed_at,omitempty"` +} + +type StatusResponse struct { + Timestamp time.Time `json:"timestamp"` + Version string `json:"version"` + Networks []NetworkStatus `json:"networks"` +} + +// CatchupProgressSource supplies persisted catchup ranges (e.g. blockstore.Store). +type CatchupProgressSource interface { + GetCatchupProgress(chain string) ([]blockstore.CatchupRange, error) +} + +type chainState struct { + networkID string + chainName string + internalCode string + networkType string + thresholds config.StatusConfig + latestBlock uint64 + indexedBlock uint64 + lastIndexedAt time.Time + failedBlocks map[uint64]struct{} +} + +type chainSnapshot struct { + networkID string + chainName string + internalCode string + networkType string + thresholds config.StatusConfig + latestBlock uint64 + indexedBlock uint64 + lastIndexedAt time.Time + failedBlocksCount int +} diff --git a/internal/worker/base.go b/internal/worker/base.go index 31a16e0..546d0e4 100644 --- a/internal/worker/base.go +++ b/internal/worker/base.go @@ -39,15 +39,15 @@ type BaseWorker struct { mode WorkerMode logger *slog.Logger - config config.ChainConfig - chain indexer.Indexer - kvstore infra.KVStore - blockStore blockstore.Store - pubkeyStore pubkeystore.Store - emitter events.Emitter - failedChan chan FailedBlockEvent - observer BlockResultObserver - registry *status.Registry + config config.ChainConfig + chain indexer.Indexer + kvstore infra.KVStore + blockStore blockstore.Store + pubkeyStore pubkeystore.Store + emitter events.Emitter + failedChan chan FailedBlockEvent + observer BlockResultObserver + statusRegistry status.StatusRegistry } // Stop stops the worker and cleans up internal resources @@ -67,7 +67,7 @@ func newWorkerWithMode( pubkeyStore pubkeystore.Store, mode WorkerMode, failedChan chan FailedBlockEvent, - registry *status.Registry, + statusRegistry status.StatusRegistry, ) *BaseWorker { ctx, cancel := context.WithCancel(ctx) log := logger.With( @@ -76,18 +76,18 @@ func newWorkerWithMode( ) return &BaseWorker{ - ctx: ctx, - cancel: cancel, - mode: mode, - logger: log, - config: cfg, - chain: chain, - kvstore: kv, - blockStore: blockStore, - pubkeyStore: pubkeyStore, - emitter: emitter, - failedChan: failedChan, - registry: registry, + ctx: ctx, + cancel: cancel, + mode: mode, + logger: log, + config: cfg, + chain: chain, + kvstore: kv, + blockStore: blockStore, + pubkeyStore: pubkeyStore, + emitter: emitter, + failedChan: failedChan, + statusRegistry: status.EnsureStatusRegistry(statusRegistry), } } @@ -142,11 +142,11 @@ func (bw *BaseWorker) notifyObserver(blockNumber uint64, status BlockStatus) { // handleBlockResult processes a block result and persists/forwards errors if needed. func (bw *BaseWorker) handleBlockResult(result indexer.BlockResult) bool { + registry := status.EnsureStatusRegistry(bw.statusRegistry) + if result.Error != nil { _ = bw.blockStore.SaveFailedBlock(bw.chain.GetNetworkInternalCode(), result.Number) - if bw.registry != nil { - bw.registry.MarkFailedBlock(bw.chain.GetName(), result.Number) - } + registry.MarkFailedBlock(bw.chain.GetName(), result.Number) // Non-blocking push to failedChan select { @@ -189,9 +189,7 @@ func (bw *BaseWorker) handleBlockResult(result indexer.BlockResult) bool { "chain", bw.chain.GetName(), "block", result.Block.Number, ) - if bw.registry != nil { - bw.registry.ClearFailedBlocks(bw.chain.GetName(), []uint64{result.Number}) - } + registry.ClearFailedBlocks(bw.chain.GetName(), []uint64{result.Number}) bw.notifyObserver(result.Number, BlockStatusProcessed) return true } diff --git a/internal/worker/catchup.go b/internal/worker/catchup.go index c96e9ef..c6b5a87 100644 --- a/internal/worker/catchup.go +++ b/internal/worker/catchup.go @@ -39,7 +39,7 @@ func NewCatchupWorker( emitter events.Emitter, pubkeyStore pubkeystore.Store, failedChan chan FailedBlockEvent, - registry *status.Registry, + statusRegistry status.StatusRegistry, ) *CatchupWorker { worker := newWorkerWithMode( ctx, @@ -51,7 +51,7 @@ func NewCatchupWorker( pubkeyStore, ModeCatchup, failedChan, - registry, + statusRegistry, ) cw := &CatchupWorker{ BaseWorker: worker, diff --git a/internal/worker/factory.go b/internal/worker/factory.go index a491867..625cfd7 100644 --- a/internal/worker/factory.go +++ b/internal/worker/factory.go @@ -36,15 +36,15 @@ import ( // WorkerDeps bundles dependencies injected into workers. type WorkerDeps struct { - Ctx context.Context - KVStore infra.KVStore - BlockStore blockstore.Store - Emitter events.Emitter - Pubkey pubkeystore.Store - Redis infra.RedisClient - FailedChan chan FailedBlockEvent - Observer BlockResultObserver - Registry *status.Registry + Ctx context.Context + KVStore infra.KVStore + BlockStore blockstore.Store + Emitter events.Emitter + Pubkey pubkeystore.Store + Redis infra.RedisClient + FailedChan chan FailedBlockEvent + Observer BlockResultObserver + StatusRegistry status.StatusRegistry } // ManagerConfig defines which workers to enable per chain. @@ -107,7 +107,7 @@ func BuildWorkers( deps.Emitter, deps.Pubkey, deps.FailedChan, - deps.Registry, + deps.StatusRegistry, ), } case ModeCatchup: @@ -121,7 +121,7 @@ func BuildWorkers( deps.Emitter, deps.Pubkey, deps.FailedChan, - deps.Registry, + deps.StatusRegistry, ), } case ModeRescanner: @@ -135,7 +135,7 @@ func BuildWorkers( deps.Emitter, deps.Pubkey, deps.FailedChan, - deps.Registry, + deps.StatusRegistry, ), } case ModeManual: @@ -150,7 +150,7 @@ func BuildWorkers( deps.Emitter, deps.Pubkey, deps.FailedChan, - deps.Registry, + deps.StatusRegistry, ), } case ModeMempool: @@ -164,7 +164,7 @@ func BuildWorkers( deps.Emitter, deps.Pubkey, deps.FailedChan, - deps.Registry, + deps.StatusRegistry, ), } default: @@ -779,10 +779,10 @@ func CreateManagerWithWorkers( // Shared stores blockStore := blockstore.NewBlockStore(kvstore) pubkeyStore := pubkeystore.NewPublicKeyStore(addressBF) - registry := status.NewRegistry() + statusRegistry := status.NewRegistry() manager := NewManager(ctx, kvstore, blockStore, emitter, pubkeyStore) - manager.registry = registry + manager.registry = statusRegistry // Loop each chain for _, chainName := range managerCfg.Chains { @@ -814,36 +814,30 @@ func CreateManagerWithWorkers( default: logger.Fatal("Unsupported network type", "chain", chainName, "type", chainCfg.Type) } - registry.RegisterChain(idxr.GetName(), chainName, chainCfg) + statusRegistry.RegisterChain(idxr.GetName(), chainName, chainCfg) if existingFailed, err := blockStore.GetFailedBlocks(idxr.GetNetworkInternalCode()); err == nil { - registry.SetFailedBlocks(idxr.GetName(), existingFailed) + statusRegistry.SetFailedBlocks(idxr.GetName(), existingFailed) } failedChan := make(chan FailedBlockEvent, 100) // Worker deps deps := WorkerDeps{ - Ctx: ctx, - KVStore: kvstore, - BlockStore: blockStore, - Emitter: emitter, - Pubkey: pubkeyStore, - Redis: redisClient, - FailedChan: failedChan, - Observer: managerCfg.Observer, - Registry: registry, + Ctx: ctx, + KVStore: kvstore, + BlockStore: blockStore, + Emitter: emitter, + Pubkey: pubkeyStore, + Redis: redisClient, + FailedChan: failedChan, + Observer: managerCfg.Observer, + StatusRegistry: statusRegistry, } // Helper: add workers if enabled (all modes share the same indexer and global rate limiter). - // Status registry (head / failed-block counters for /status) is only wired to the regular worker; - // catchup progress is read from KV at snapshot time. addIfEnabled := func(mode WorkerMode, enabled bool) { if enabled { - wdeps := deps - if mode != ModeRegular { - wdeps.Registry = nil - } - ws := BuildWorkers(idxr, chainCfg, mode, wdeps) + ws := BuildWorkers(idxr, chainCfg, mode, deps) manager.AddWorkers(ws...) logger.Info("Worker enabled", "chain", chainName, "mode", mode) } else { diff --git a/internal/worker/manual.go b/internal/worker/manual.go index 6f3d706..64ebe62 100644 --- a/internal/worker/manual.go +++ b/internal/worker/manual.go @@ -42,7 +42,7 @@ func NewManualWorker( emitter events.Emitter, pubkeyStore pubkeystore.Store, failedChan chan FailedBlockEvent, - registry *status.Registry, + statusRegistry status.StatusRegistry, ) *ManualWorker { return &ManualWorker{ BaseWorker: newWorkerWithMode( @@ -55,7 +55,7 @@ func NewManualWorker( pubkeyStore, ModeManual, failedChan, - registry, + statusRegistry, ), mbs: missingblockstore.NewMissingBlocksStore(redisClient), config: DefaultManualConfig, diff --git a/internal/worker/mempool.go b/internal/worker/mempool.go index 387c327..9bf9f75 100644 --- a/internal/worker/mempool.go +++ b/internal/worker/mempool.go @@ -34,7 +34,7 @@ func NewMempoolWorker( emitter events.Emitter, pubkeyStore pubkeystore.Store, failedChan chan FailedBlockEvent, - registry *status.Registry, + statusRegistry status.StatusRegistry, ) *MempoolWorker { worker := newWorkerWithMode( ctx, @@ -46,7 +46,7 @@ func NewMempoolWorker( pubkeyStore, ModeMempool, failedChan, - registry, + statusRegistry, ) // Cast to Bitcoin indexer (mempool is Bitcoin-specific) diff --git a/internal/worker/regular.go b/internal/worker/regular.go index 52938c4..4e41114 100644 --- a/internal/worker/regular.go +++ b/internal/worker/regular.go @@ -44,7 +44,7 @@ func NewRegularWorker( emitter events.Emitter, pubkeyStore pubkeystore.Store, failedChan chan FailedBlockEvent, - registry *status.Registry, + statusRegistry status.StatusRegistry, ) *RegularWorker { worker := newWorkerWithMode( ctx, @@ -56,7 +56,7 @@ func NewRegularWorker( pubkeyStore, ModeRegular, failedChan, - registry, + statusRegistry, ) rw := &RegularWorker{BaseWorker: worker} rw.currentBlock = rw.determineStartingBlock() @@ -407,10 +407,8 @@ func (rw *RegularWorker) currentIndexedBlock() uint64 { } func (rw *RegularWorker) updateHeadStatus(latest uint64, indexedAt time.Time) { - if rw.registry == nil { - return - } - rw.registry.UpdateHead( + registry := status.EnsureStatusRegistry(rw.statusRegistry) + registry.UpdateHead( rw.chain.GetName(), latest, rw.currentIndexedBlock(), diff --git a/internal/worker/rescanner.go b/internal/worker/rescanner.go index 703a2c0..72ff67e 100644 --- a/internal/worker/rescanner.go +++ b/internal/worker/rescanner.go @@ -46,7 +46,7 @@ func NewRescannerWorker( emitter events.Emitter, pubkeyStore pubkeystore.Store, failedChan chan FailedBlockEvent, - registry *status.Registry, + statusRegistry status.StatusRegistry, ) *RescannerWorker { return &RescannerWorker{ BaseWorker: newWorkerWithMode( @@ -59,7 +59,7 @@ func NewRescannerWorker( pubkeyStore, ModeRescanner, failedChan, - registry, + statusRegistry, ), failedBlocks: make(map[uint64]uint8), maxRetries: RescannerMaxRetries, @@ -115,9 +115,7 @@ func (rw *RescannerWorker) addFailedBlock(block uint64, errMsg string) { if _, exists := rw.failedBlocks[block]; !exists { rw.failedBlocks[block] = 0 rw.addSave(block) - if rw.registry != nil { - rw.registry.MarkFailedBlock(rw.chain.GetName(), block) - } + rw.statusRegistry.MarkFailedBlock(rw.chain.GetName(), block) rw.logger.Info("Added failed block", "block", block, "error", errMsg) } } @@ -127,9 +125,7 @@ func (rw *RescannerWorker) syncFromKV() error { if err != nil { return err } - if rw.registry != nil { - rw.registry.SetFailedBlocks(rw.chain.GetName(), blocks) - } + rw.statusRegistry.SetFailedBlocks(rw.chain.GetName(), blocks) rw.mu.Lock() defer rw.mu.Unlock() for _, num := range blocks { @@ -150,9 +146,7 @@ func (rw *RescannerWorker) removeBlocks(blocks []uint64) { } rw.mu.Unlock() rw.addRemove(blocks...) - if rw.registry != nil { - rw.registry.ClearFailedBlocks(rw.chain.GetName(), blocks) - } + rw.statusRegistry.ClearFailedBlocks(rw.chain.GetName(), blocks) } func (rw *RescannerWorker) incrementRetry(block uint64) { diff --git a/pkg/common/config/chains.go b/pkg/common/config/chains.go index 8d7d5ac..505e032 100644 --- a/pkg/common/config/chains.go +++ b/pkg/common/config/chains.go @@ -7,6 +7,12 @@ import ( "dario.cat/mergo" ) +// CanonicalChainKey is the normalized chain identifier used as registry keys, indexer GetName(), +// and ChainConfig.Name after Load (uppercase + trim). Callers should pass raw YAML keys or CLI names. +func CanonicalChainKey(s string) string { + return strings.ToUpper(strings.TrimSpace(s)) +} + // GetChain returns a chain config by name. func (c Chains) GetChain(name string) (ChainConfig, error) { chain, ok := c[name] @@ -75,6 +81,7 @@ func (c Chains) ApplyDefaults(def Defaults) error { if err := mergo.Merge(&chain.Status, def.Status); err != nil { return fmt.Errorf("merge status defaults for %s: %w", name, err) } + chain.Status = chain.Status.Normalize() c[name] = chain } return nil diff --git a/pkg/common/config/load.go b/pkg/common/config/load.go index e2a764a..d1fc7e6 100644 --- a/pkg/common/config/load.go +++ b/pkg/common/config/load.go @@ -45,7 +45,7 @@ func Load(path string) (*Config, error) { for name, chain := range cfg.Chains { // apply name to struct name - chain.Name = strings.ToUpper(name) + chain.Name = CanonicalChainKey(name) chain.NativeDenom = strings.TrimSpace(chain.NativeDenom) if err := validate.Struct(chain); err != nil { return nil, fmt.Errorf("chain %s validation failed: %w", name, err) From abdf4890209121a3b5395ad2bc357cdaf82e6251 Mon Sep 17 00:00:00 2001 From: vietddude Date: Thu, 26 Mar 2026 15:17:22 +0700 Subject: [PATCH 5/5] fix(status): track catchup progress in registry --- internal/status/registry.go | 150 ++++++++++++++++++++++++++------ internal/status/status.go | 14 ++- internal/status/status_test.go | 71 +++++++++------ internal/status/types.go | 47 +++++----- internal/worker/catchup.go | 37 +++++++- internal/worker/factory.go | 9 ++ internal/worker/factory_test.go | 79 +++++++++++++++++ internal/worker/manager.go | 4 +- internal/worker/regular.go | 6 ++ internal/worker/regular_test.go | 141 ++++++++++++++++++++++++++++-- 10 files changed, 467 insertions(+), 91 deletions(-) diff --git a/internal/status/registry.go b/internal/status/registry.go index bf134cf..d723284 100644 --- a/internal/status/registry.go +++ b/internal/status/registry.go @@ -33,7 +33,8 @@ func (r *Registry) RegisterChain(chainKey, chainName string, chainCfg config.Cha state, exists := r.chains[key] if !exists { state = &chainState{ - failedBlocks: make(map[uint64]struct{}), + failedBlocks: make(map[uint64]struct{}), + catchupRanges: make(map[catchupRangeKey]uint64), } r.chains[key] = state } @@ -113,20 +114,79 @@ func (r *Registry) SetFailedBlocks(chainKey string, blockNumbers []uint64) { } } -func (r *Registry) Snapshot(version string, src CatchupProgressSource) StatusResponse { +func (r *Registry) SetCatchupRanges(chainKey string, ranges []blockstore.CatchupRange) { + key := config.CanonicalChainKey(chainKey) + if key == "" { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + state := r.ensureStateLocked(key) + state.catchupPendingBlocks = 0 + state.catchupRanges = make(map[catchupRangeKey]uint64, len(ranges)) + for _, rng := range ranges { + upsertCatchupRangeLocked(state, rng) + } +} + +func (r *Registry) UpsertCatchupRanges(chainKey string, ranges []blockstore.CatchupRange) { + key := config.CanonicalChainKey(chainKey) + if key == "" || len(ranges) == 0 { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + state := r.ensureStateLocked(key) + for _, rng := range ranges { + upsertCatchupRangeLocked(state, rng) + } +} + +func (r *Registry) DeleteCatchupRange(chainKey string, start, end uint64) { + key := config.CanonicalChainKey(chainKey) + if key == "" { + return + } + + rangeKey, ok := makeCatchupRangeKey(start, end) + if !ok { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + state := r.ensureStateLocked(key) + if pending, exists := state.catchupRanges[rangeKey]; exists { + if state.catchupPendingBlocks >= pending { + state.catchupPendingBlocks -= pending + } else { + state.catchupPendingBlocks = 0 + } + delete(state.catchupRanges, rangeKey) + } +} + +func (r *Registry) Snapshot(version string) StatusResponse { r.mu.RLock() states := make([]chainSnapshot, 0, len(r.chains)) for _, state := range r.chains { states = append(states, chainSnapshot{ - networkID: state.networkID, - chainName: state.chainName, - internalCode: state.internalCode, - networkType: state.networkType, - thresholds: state.thresholds, - latestBlock: state.latestBlock, - indexedBlock: state.indexedBlock, - lastIndexedAt: state.lastIndexedAt, - failedBlocksCount: len(state.failedBlocks), + networkID: state.networkID, + chainName: state.chainName, + internalCode: state.internalCode, + networkType: state.networkType, + thresholds: state.thresholds, + latestBlock: state.latestBlock, + indexedBlock: state.indexedBlock, + lastIndexedAt: state.lastIndexedAt, + failedBlocksCount: len(state.failedBlocks), + catchupRangesCount: len(state.catchupRanges), + catchupPendingBlocks: state.catchupPendingBlocks, }) } r.mu.RUnlock() @@ -137,17 +197,7 @@ func (r *Registry) Snapshot(version string, src CatchupProgressSource) StatusRes if state.latestBlock > state.indexedBlock { headGap = state.latestBlock - state.indexedBlock } - - var catchupPending uint64 - catchupRanges := 0 - if src != nil && state.internalCode != "" { - if ranges, err := src.GetCatchupProgress(state.internalCode); err == nil { - catchupRanges = len(ranges) - catchupPending = blockstore.CatchupPendingBlocks(ranges) - } - } - - pending := headGap + catchupPending + pending := headGap + state.catchupPendingBlocks item := NetworkStatus{ NetworkID: state.networkID, @@ -159,8 +209,8 @@ func (r *Registry) Snapshot(version string, src CatchupProgressSource) StatusRes IndexedBlock: state.indexedBlock, PendingBlocks: pending, HeadGap: headGap, - CatchupPendingBlocks: catchupPending, - CatchupRanges: catchupRanges, + CatchupPendingBlocks: state.catchupPendingBlocks, + CatchupRanges: state.catchupRangesCount, FailedBlocks: state.failedBlocksCount, } if !state.lastIndexedAt.IsZero() { @@ -188,16 +238,58 @@ func (r *Registry) ensureStateLocked(key string) *chainState { } state = &chainState{ - networkID: strings.ToLower(key), - chainName: strings.ToLower(key), - internalCode: key, - thresholds: config.StatusConfig{}.Normalize(), - failedBlocks: make(map[uint64]struct{}), + networkID: strings.ToLower(key), + chainName: strings.ToLower(key), + internalCode: key, + thresholds: config.StatusConfig{}.Normalize(), + failedBlocks: make(map[uint64]struct{}), + catchupRanges: make(map[catchupRangeKey]uint64), } r.chains[key] = state return state } +func makeCatchupRangeKey(start, end uint64) (catchupRangeKey, bool) { + if start == 0 || end < start { + return catchupRangeKey{}, false + } + return catchupRangeKey{start: start, end: end}, true +} + +func catchupPendingForRange(rng blockstore.CatchupRange) uint64 { + if rng.End < rng.Start || rng.Start == 0 { + return 0 + } + if rng.Current < rng.Start { + return rng.End - rng.Start + 1 + } + if rng.Current < rng.End { + return rng.End - rng.Current + } + return 0 +} + +func upsertCatchupRangeLocked(state *chainState, rng blockstore.CatchupRange) { + rangeKey, ok := makeCatchupRangeKey(rng.Start, rng.End) + if !ok { + return + } + if state.catchupRanges == nil { + state.catchupRanges = make(map[catchupRangeKey]uint64) + } + + newPending := catchupPendingForRange(rng) + oldPending := state.catchupRanges[rangeKey] + state.catchupRanges[rangeKey] = newPending + + if state.catchupPendingBlocks >= oldPending { + state.catchupPendingBlocks -= oldPending + } else { + state.catchupPendingBlocks = 0 + } + state.catchupPendingBlocks += newPending +} + func deriveHealth(pendingBlocks uint64, thresholds config.StatusConfig) HealthStatus { switch { case pendingBlocks < thresholds.HealthyMaxPendingBlocks: diff --git a/internal/status/status.go b/internal/status/status.go index 590f023..762a408 100644 --- a/internal/status/status.go +++ b/internal/status/status.go @@ -1,6 +1,10 @@ package status -import "time" +import ( + "time" + + "github.com/fystack/multichain-indexer/pkg/store/blockstore" +) // StatusRegistry captures status mutations used by workers. type StatusRegistry interface { @@ -8,6 +12,9 @@ type StatusRegistry interface { MarkFailedBlock(chainKey string, blockNumber uint64) ClearFailedBlocks(chainKey string, blockNumbers []uint64) SetFailedBlocks(chainKey string, blockNumbers []uint64) + SetCatchupRanges(chainKey string, ranges []blockstore.CatchupRange) + UpsertCatchupRanges(chainKey string, ranges []blockstore.CatchupRange) + DeleteCatchupRange(chainKey string, start, end uint64) } // NoopStatusRegistry is a no-op implementation of StatusRegistry. @@ -17,6 +24,11 @@ func (NoopStatusRegistry) UpdateHead(string, uint64, uint64, time.Time) {} func (NoopStatusRegistry) MarkFailedBlock(string, uint64) {} func (NoopStatusRegistry) ClearFailedBlocks(string, []uint64) {} func (NoopStatusRegistry) SetFailedBlocks(string, []uint64) {} +func (NoopStatusRegistry) SetCatchupRanges(string, []blockstore.CatchupRange) { +} +func (NoopStatusRegistry) UpsertCatchupRanges(string, []blockstore.CatchupRange) { +} +func (NoopStatusRegistry) DeleteCatchupRange(string, uint64, uint64) {} func EnsureStatusRegistry(statusRegistry StatusRegistry) StatusRegistry { if statusRegistry == nil { diff --git a/internal/status/status_test.go b/internal/status/status_test.go index 2338502..630ac6a 100644 --- a/internal/status/status_test.go +++ b/internal/status/status_test.go @@ -10,22 +10,6 @@ import ( "github.com/stretchr/testify/require" ) -// mapCatchupStore implements CatchupProgressSource for tests. -type mapCatchupStore map[string][]blockstore.CatchupRange - -func (m mapCatchupStore) GetCatchupProgress(chain string) ([]blockstore.CatchupRange, error) { - if m == nil { - return nil, nil - } - ranges := m[chain] - if ranges == nil { - return nil, nil - } - out := make([]blockstore.CatchupRange, len(ranges)) - copy(out, ranges) - return out, nil -} - func TestRegistrySnapshotDerivesHealthWithPerChainThresholds(t *testing.T) { t.Parallel() @@ -48,15 +32,12 @@ func TestRegistrySnapshotDerivesHealthWithPerChainThresholds(t *testing.T) { registry.UpdateHead("eth_mainnet", 1_000, 980, indexedAt) registry.MarkFailedBlock("eth_mainnet", 981) registry.MarkFailedBlock("eth_mainnet", 982) + registry.SetCatchupRanges("eth_mainnet", []blockstore.CatchupRange{ + {Start: 100, End: 102, Current: 99}, + {Start: 200, End: 201, Current: 199}, + }) - kvCatchup := mapCatchupStore{ - "ETH_MAINNET": { - {Start: 100, End: 102, Current: 99}, - {Start: 200, End: 201, Current: 199}, - }, - } - - resp := registry.Snapshot("1.2.3", kvCatchup) + resp := registry.Snapshot("1.2.3") require.Equal(t, "1.2.3", resp.Version) require.Len(t, resp.Networks, 1) @@ -93,7 +74,7 @@ func TestRegistrySnapshotUsesDefaultThresholdWhenMissing(t *testing.T) { registry.UpdateHead("tron_mainnet", 500, 470, time.Time{}) - resp := registry.Snapshot("1.0.0", mapCatchupStore{}) + resp := registry.Snapshot("1.0.0") require.Len(t, resp.Networks, 1) network := resp.Networks[0] @@ -115,11 +96,47 @@ func TestRegistryClearFailedBlocks(t *testing.T) { registry.MarkFailedBlock("btc_mainnet", 11) registry.ClearFailedBlocks("btc_mainnet", []uint64{10}) - resp := registry.Snapshot("1.0.0", mapCatchupStore{}) + resp := registry.Snapshot("1.0.0") require.Len(t, resp.Networks, 1) require.Equal(t, 1, resp.Networks[0].FailedBlocks) registry.SetFailedBlocks("btc_mainnet", []uint64{21, 22, 22}) - resp = registry.Snapshot("1.0.0", mapCatchupStore{}) + resp = registry.Snapshot("1.0.0") require.Equal(t, 2, resp.Networks[0].FailedBlocks) } + +func TestRegistryCatchupRangeMutations(t *testing.T) { + t.Parallel() + + registry := NewRegistry() + registry.RegisterChain("SOL_MAINNET", "sol_mainnet", config.ChainConfig{ + NetworkId: "sol-mainnet", + InternalCode: "SOL_MAINNET", + Type: enum.NetworkTypeSol, + }) + + registry.SetCatchupRanges("sol_mainnet", []blockstore.CatchupRange{ + {Start: 1, End: 10, Current: 0}, + {Start: 20, End: 25, Current: 22}, + }) + + resp := registry.Snapshot("1.0.0") + require.Len(t, resp.Networks, 1) + require.Equal(t, 2, resp.Networks[0].CatchupRanges) + require.Equal(t, uint64(13), resp.Networks[0].CatchupPendingBlocks) + + registry.UpsertCatchupRanges("sol_mainnet", []blockstore.CatchupRange{ + {Start: 1, End: 10, Current: 5}, + {Start: 30, End: 31, Current: 29}, + }) + + resp = registry.Snapshot("1.0.0") + require.Equal(t, 3, resp.Networks[0].CatchupRanges) + require.Equal(t, uint64(10), resp.Networks[0].CatchupPendingBlocks) + + registry.DeleteCatchupRange("sol_mainnet", 20, 25) + + resp = registry.Snapshot("1.0.0") + require.Equal(t, 2, resp.Networks[0].CatchupRanges) + require.Equal(t, uint64(7), resp.Networks[0].CatchupPendingBlocks) +} diff --git a/internal/status/types.go b/internal/status/types.go index 7c1f729..3b839da 100644 --- a/internal/status/types.go +++ b/internal/status/types.go @@ -4,7 +4,6 @@ import ( "time" "github.com/fystack/multichain-indexer/pkg/common/config" - "github.com/fystack/multichain-indexer/pkg/store/blockstore" ) type HealthStatus string @@ -37,31 +36,35 @@ type StatusResponse struct { Networks []NetworkStatus `json:"networks"` } -// CatchupProgressSource supplies persisted catchup ranges (e.g. blockstore.Store). -type CatchupProgressSource interface { - GetCatchupProgress(chain string) ([]blockstore.CatchupRange, error) +type catchupRangeKey struct { + start uint64 + end uint64 } type chainState struct { - networkID string - chainName string - internalCode string - networkType string - thresholds config.StatusConfig - latestBlock uint64 - indexedBlock uint64 - lastIndexedAt time.Time - failedBlocks map[uint64]struct{} + networkID string + chainName string + internalCode string + networkType string + thresholds config.StatusConfig + latestBlock uint64 + indexedBlock uint64 + lastIndexedAt time.Time + failedBlocks map[uint64]struct{} + catchupRanges map[catchupRangeKey]uint64 + catchupPendingBlocks uint64 } type chainSnapshot struct { - networkID string - chainName string - internalCode string - networkType string - thresholds config.StatusConfig - latestBlock uint64 - indexedBlock uint64 - lastIndexedAt time.Time - failedBlocksCount int + networkID string + chainName string + internalCode string + networkType string + thresholds config.StatusConfig + latestBlock uint64 + indexedBlock uint64 + lastIndexedAt time.Time + failedBlocksCount int + catchupRangesCount int + catchupPendingBlocks uint64 } diff --git a/internal/worker/catchup.go b/internal/worker/catchup.go index c6b5a87..2af7cdb 100644 --- a/internal/worker/catchup.go +++ b/internal/worker/catchup.go @@ -112,6 +112,7 @@ func (cw *CatchupWorker) runCatchup() { } func (cw *CatchupWorker) loadCatchupProgress() []blockstore.CatchupRange { + registry := status.EnsureStatusRegistry(cw.statusRegistry) var ranges []blockstore.CatchupRange // Load existing catchup ranges from database (they're already split when saved) @@ -121,6 +122,7 @@ func (cw *CatchupWorker) loadCatchupProgress() []blockstore.CatchupRange { "progress_ranges", len(progress), ) ranges = progress + registry.SetCatchupRanges(cw.chain.GetName(), progress) } else { cw.logger.Warn("Failed to load catchup progress, will create new range", "chain", cw.chain.GetName(), @@ -160,6 +162,8 @@ func (cw *CatchupWorker) loadCatchupProgress() []blockstore.CatchupRange { "count", len(newRanges), "error", err, ) + } else { + registry.UpsertCatchupRanges(cw.chain.GetName(), newRanges) } ranges = append(ranges, newRanges...) } @@ -349,15 +353,30 @@ func (cw *CatchupWorker) processRange(r blockstore.CatchupRange, workerID int) e func (cw *CatchupWorker) saveProgress(r blockstore.CatchupRange, current uint64) { cw.progressMu.Lock() defer cw.progressMu.Unlock() + registry := status.EnsureStatusRegistry(cw.statusRegistry) cw.logger.Debug("Saving catchup progress", "chain", cw.chain.GetName(), "range", fmt.Sprintf("%d-%d", r.Start, r.End), "current", current, ) - _ = cw.blockStore.SaveCatchupProgress(cw.chain.GetNetworkInternalCode(), r.Start, r.End, current) + current = min(current, r.End) + if err := cw.blockStore.SaveCatchupProgress(cw.chain.GetNetworkInternalCode(), r.Start, r.End, current); err != nil { + cw.logger.Warn("Failed to save catchup progress", + "chain", cw.chain.GetName(), + "range", fmt.Sprintf("%d-%d", r.Start, r.End), + "current", current, + "error", err, + ) + return + } + registry.UpsertCatchupRanges(cw.chain.GetName(), []blockstore.CatchupRange{{ + Start: r.Start, + End: r.End, + Current: current, + }}) for i := range cw.blockRanges { if cw.blockRanges[i].Start == r.Start && cw.blockRanges[i].End == r.End { - cw.blockRanges[i].Current = min(current, cw.blockRanges[i].End) + cw.blockRanges[i].Current = current break } } @@ -366,13 +385,22 @@ func (cw *CatchupWorker) saveProgress(r blockstore.CatchupRange, current uint64) func (cw *CatchupWorker) completeRange(r blockstore.CatchupRange) error { cw.progressMu.Lock() defer cw.progressMu.Unlock() + registry := status.EnsureStatusRegistry(cw.statusRegistry) cw.logger.Info("Completing catchup range", "chain", cw.chain.GetName(), "range", fmt.Sprintf("%d-%d", r.Start, r.End), ) - _ = cw.blockStore.DeleteCatchupRange(cw.chain.GetNetworkInternalCode(), r.Start, r.End) + if err := cw.blockStore.DeleteCatchupRange(cw.chain.GetNetworkInternalCode(), r.Start, r.End); err != nil { + cw.logger.Warn("Failed to delete catchup range", + "chain", cw.chain.GetName(), + "range", fmt.Sprintf("%d-%d", r.Start, r.End), + "error", err, + ) + return err + } + registry.DeleteCatchupRange(cw.chain.GetName(), r.Start, r.End) // Remove from local ranges for i, existing := range cw.blockRanges { @@ -418,6 +446,9 @@ func (cw *CatchupWorker) Close() error { "ranges", len(rangesToSave), "error", err, ) + } else { + registry := status.EnsureStatusRegistry(cw.statusRegistry) + registry.UpsertCatchupRanges(cw.chain.GetName(), rangesToSave) } return nil diff --git a/internal/worker/factory.go b/internal/worker/factory.go index 625cfd7..6d14611 100644 --- a/internal/worker/factory.go +++ b/internal/worker/factory.go @@ -818,6 +818,15 @@ func CreateManagerWithWorkers( if existingFailed, err := blockStore.GetFailedBlocks(idxr.GetNetworkInternalCode()); err == nil { statusRegistry.SetFailedBlocks(idxr.GetName(), existingFailed) } + if existingCatchup, err := blockStore.GetCatchupProgress(idxr.GetNetworkInternalCode()); err == nil { + statusRegistry.SetCatchupRanges(idxr.GetName(), existingCatchup) + } else { + logger.Warn("Failed to load catchup progress for status registry", + "chain", chainName, + "internal_code", idxr.GetNetworkInternalCode(), + "error", err, + ) + } failedChan := make(chan FailedBlockEvent, 100) diff --git a/internal/worker/factory_test.go b/internal/worker/factory_test.go index 94c28cf..be6261b 100644 --- a/internal/worker/factory_test.go +++ b/internal/worker/factory_test.go @@ -3,15 +3,18 @@ package worker import ( "context" "errors" + "fmt" "log/slog" "testing" "time" "github.com/fystack/multichain-indexer/pkg/common/config" + "github.com/fystack/multichain-indexer/pkg/common/constant" "github.com/fystack/multichain-indexer/pkg/common/enum" commonlogger "github.com/fystack/multichain-indexer/pkg/common/logger" "github.com/fystack/multichain-indexer/pkg/events" "github.com/fystack/multichain-indexer/pkg/infra" + "github.com/fystack/multichain-indexer/pkg/store/blockstore" "github.com/hashicorp/consul/api" "github.com/stretchr/testify/require" ) @@ -145,8 +148,63 @@ func TestRescannerFailedChannelIsolationByChain(t *testing.T) { require.NotContains(t, rwB.failedBlocks, uint64(101)) } +func TestCreateManagerWithWorkersBootstrapsCatchupRangesIntoStatusRegistry(t *testing.T) { + t.Parallel() + initTestLogger() + + cfg := &config.Config{ + Chains: config.Chains{ + "chain-a": { + Name: "chain-a", + NetworkId: "chain-a", + InternalCode: "a", + Type: enum.NetworkTypeEVM, + PollInterval: time.Millisecond, + Client: config.ClientConfig{ + Timeout: time.Second, + }, + Throttle: config.Throttle{ + BatchSize: 1, + RPS: 1, + Burst: 1, + }, + Nodes: []config.NodeConfig{{URL: "http://127.0.0.1:8545"}}, + }, + }, + } + + kv := &listKVStore{ + pairs: []*infra.KVPair{{ + Key: fmt.Sprintf("%s/%s/%s/%d-%d", blockstore.BlockStates, "a", constant.KVPrefixProgressCatchup, 1, 20), + Value: []byte("10"), + }}, + } + + manager := CreateManagerWithWorkers( + context.Background(), + cfg, + kv, + nil, + nil, + events.Emitter(nil), + nil, + ManagerConfig{ + Chains: []string{"chain-a"}, + }, + ) + + resp := manager.StatusSnapshot("1.0.0") + require.Len(t, resp.Networks, 1) + require.Equal(t, 1, resp.Networks[0].CatchupRanges) + require.Equal(t, uint64(10), resp.Networks[0].CatchupPendingBlocks) +} + type noopKVStore struct{} +type listKVStore struct { + pairs []*infra.KVPair +} + func initTestLogger() { commonlogger.Init(&commonlogger.Options{ Level: slog.LevelError, @@ -165,3 +223,24 @@ func (noopKVStore) List(string) ([]*infra.KVPair, error) { return nil, nil } func (noopKVStore) Delete(string) error { return nil } func (noopKVStore) BatchSet([]infra.KVPair) error { return nil } func (noopKVStore) Close() error { return nil } + +func (s *listKVStore) GetName() string { return "list" } +func (s *listKVStore) Set(string, string) error { return nil } +func (s *listKVStore) Get(string) (string, error) { return "", errors.New("not found") } +func (s *listKVStore) GetWithOptions(string, *api.QueryOptions) (string, error) { + return "", errors.New("not found") +} +func (s *listKVStore) SetAny(string, any) error { return nil } +func (s *listKVStore) GetAny(string, any) (bool, error) { return false, nil } +func (s *listKVStore) Delete(string) error { return nil } +func (s *listKVStore) BatchSet([]infra.KVPair) error { return nil } +func (s *listKVStore) Close() error { return nil } +func (s *listKVStore) List(prefix string) ([]*infra.KVPair, error) { + var out []*infra.KVPair + for _, pair := range s.pairs { + if len(pair.Key) >= len(prefix) && pair.Key[:len(prefix)] == prefix { + out = append(out, pair) + } + } + return out, nil +} diff --git a/internal/worker/manager.go b/internal/worker/manager.go index 5768e0a..bcf6e6a 100644 --- a/internal/worker/manager.go +++ b/internal/worker/manager.go @@ -92,7 +92,7 @@ func (m *Manager) Stop() { logger.Info("Manager stopped") } -// StatusSnapshot returns /status payload; catchup fields are read from the block store. +// StatusSnapshot returns /status payload from live in-memory registry state. func (m *Manager) StatusSnapshot(version string) status.StatusResponse { if m.registry == nil { return status.StatusResponse{ @@ -101,7 +101,7 @@ func (m *Manager) StatusSnapshot(version string) status.StatusResponse { Networks: []status.NetworkStatus{}, } } - return m.registry.Snapshot(version, m.blockStore) + return m.registry.Snapshot(version) } // Inject workers into manager diff --git a/internal/worker/regular.go b/internal/worker/regular.go index 4e41114..4d35d76 100644 --- a/internal/worker/regular.go +++ b/internal/worker/regular.go @@ -182,6 +182,7 @@ func (rw *RegularWorker) processRegularBlocks() error { } func (rw *RegularWorker) determineStartingBlock() uint64 { + registry := status.EnsureStatusRegistry(rw.statusRegistry) chainLatest, err1 := rw.chain.GetLatestBlockNumber(rw.ctx) kvLatest, err2 := rw.blockStore.GetLatestBlock(rw.chain.GetNetworkInternalCode()) @@ -224,6 +225,8 @@ func (rw *RegularWorker) determineStartingBlock() uint64 { "count", len(ranges), "error", err, ) + } else { + registry.UpsertCatchupRanges(rw.chain.GetName(), ranges) } rw.logger.Info("Queued catchup ranges", @@ -351,6 +354,7 @@ func (rw *RegularWorker) flushBlockHashes() { // skipAheadIfLagging checks if the regular worker is too far behind the chain head. // If so, it queues the skipped range for catchup and jumps currentBlock to chain head. func (rw *RegularWorker) skipAheadIfLagging(latest uint64) bool { + registry := status.EnsureStatusRegistry(rw.statusRegistry) maxLag := rw.config.MaxLag if maxLag == 0 { maxLag = constant.DefaultMaxLag @@ -385,6 +389,8 @@ func (rw *RegularWorker) skipAheadIfLagging(latest uint64) bool { "count", len(ranges), "error", err, ) + } else { + registry.UpsertCatchupRanges(rw.chain.GetName(), ranges) } rw.currentBlock = latest diff --git a/internal/worker/regular_test.go b/internal/worker/regular_test.go index 2156116..73573f1 100644 --- a/internal/worker/regular_test.go +++ b/internal/worker/regular_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/fystack/multichain-indexer/internal/indexer" + "github.com/fystack/multichain-indexer/internal/status" "github.com/fystack/multichain-indexer/pkg/common/config" "github.com/fystack/multichain-indexer/pkg/common/enum" "github.com/fystack/multichain-indexer/pkg/common/types" @@ -144,6 +145,81 @@ func TestBaseWorkerExecuteRecoverableConvertsPanicToError(t *testing.T) { require.Equal(t, "test panic panic: boom", err.Error()) } +func TestRegularWorkerDetermineStartingBlockUpdatesCatchupRegistry(t *testing.T) { + t.Parallel() + + chain := &stubIndexer{ + name: "ethereum", + internalCode: "ETH", + networkType: enum.NetworkTypeEVM, + latest: 25, + } + store := &stubBlockStore{latestBlock: 20} + statusRegistry := status.NewRegistry() + statusRegistry.RegisterChain("ethereum", "ethereum", config.ChainConfig{ + NetworkId: "eth-mainnet", + InternalCode: "ETH", + Type: enum.NetworkTypeEVM, + }) + rw := newTestRegularWorker(chain, store, 20, 2) + rw.statusRegistry = statusRegistry + + start := rw.determineStartingBlock() + require.Equal(t, uint64(25), start) + + resp := statusRegistry.Snapshot("1.0.0") + require.Len(t, resp.Networks, 1) + require.Equal(t, 1, resp.Networks[0].CatchupRanges) + require.Equal(t, uint64(5), resp.Networks[0].CatchupPendingBlocks) +} + +func TestCatchupWorkerUpdatesCatchupRegistryOnProgressAndCompletion(t *testing.T) { + t.Parallel() + + statusRegistry := status.NewRegistry() + statusRegistry.RegisterChain("ethereum", "ethereum", config.ChainConfig{ + NetworkId: "eth-mainnet", + InternalCode: "ETH", + Type: enum.NetworkTypeEVM, + }) + statusRegistry.SetCatchupRanges("ethereum", []blockstore.CatchupRange{{ + Start: 1, + End: 10, + Current: 0, + }}) + + store := &stubBlockStore{} + cw := &CatchupWorker{ + BaseWorker: &BaseWorker{ + ctx: context.Background(), + cancel: func() {}, + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + chain: &stubIndexer{name: "ethereum", internalCode: "ETH", networkType: enum.NetworkTypeEVM}, + blockStore: store, + statusRegistry: statusRegistry, + }, + blockRanges: []blockstore.CatchupRange{{ + Start: 1, + End: 10, + Current: 0, + }}, + } + + cw.saveProgress(blockstore.CatchupRange{Start: 1, End: 10, Current: 0}, 5) + + resp := statusRegistry.Snapshot("1.0.0") + require.Len(t, resp.Networks, 1) + require.Equal(t, 1, resp.Networks[0].CatchupRanges) + require.Equal(t, uint64(5), resp.Networks[0].CatchupPendingBlocks) + + err := cw.completeRange(blockstore.CatchupRange{Start: 1, End: 10, Current: 5}) + require.NoError(t, err) + + resp = statusRegistry.Snapshot("1.0.0") + require.Equal(t, 0, resp.Networks[0].CatchupRanges) + require.Equal(t, uint64(0), resp.Networks[0].CatchupPendingBlocks) +} + func testChainConfig() config.ChainConfig { return config.ChainConfig{ PollInterval: time.Millisecond, @@ -222,12 +298,27 @@ func (s *stubIndexer) IsHealthy() bool { } type stubBlockStore struct { - savedLatest []uint64 - failedBlocks []uint64 + latestBlock uint64 + savedLatest []uint64 + failedBlocks []uint64 + savedCatchupRanges []blockstore.CatchupRange + catchupProgress []blockstore.CatchupRange + deleteCatchupCalls []blockstore.CatchupRange + getLatestBlockErr error + getCatchupProgressErr error + saveCatchupRangesErr error + saveCatchupProgressErr error + deleteCatchupErr error } func (s *stubBlockStore) GetLatestBlock(string) (uint64, error) { - return 0, errors.New("not found") + if s.getLatestBlockErr != nil { + return 0, s.getLatestBlockErr + } + if s.latestBlock == 0 { + return 0, errors.New("not found") + } + return s.latestBlock, nil } func (s *stubBlockStore) SaveLatestBlock(_ string, blockNumber uint64) error { @@ -252,19 +343,55 @@ func (s *stubBlockStore) RemoveFailedBlocks(string, []uint64) error { return nil } -func (s *stubBlockStore) SaveCatchupRanges(string, []blockstore.CatchupRange) error { +func (s *stubBlockStore) SaveCatchupRanges(_ string, ranges []blockstore.CatchupRange) error { + if s.saveCatchupRangesErr != nil { + return s.saveCatchupRangesErr + } + s.savedCatchupRanges = append(s.savedCatchupRanges, ranges...) return nil } -func (s *stubBlockStore) SaveCatchupProgress(string, uint64, uint64, uint64) error { +func (s *stubBlockStore) SaveCatchupProgress(_ string, start, end, current uint64) error { + if s.saveCatchupProgressErr != nil { + return s.saveCatchupProgressErr + } + for i := range s.catchupProgress { + if s.catchupProgress[i].Start == start && s.catchupProgress[i].End == end { + s.catchupProgress[i].Current = current + return nil + } + } + s.catchupProgress = append(s.catchupProgress, blockstore.CatchupRange{ + Start: start, + End: end, + Current: current, + }) return nil } func (s *stubBlockStore) GetCatchupProgress(string) ([]blockstore.CatchupRange, error) { - return nil, nil + if s.getCatchupProgressErr != nil { + return nil, s.getCatchupProgressErr + } + return append([]blockstore.CatchupRange(nil), s.catchupProgress...), nil } -func (s *stubBlockStore) DeleteCatchupRange(string, uint64, uint64) error { +func (s *stubBlockStore) DeleteCatchupRange(_ string, start, end uint64) error { + if s.deleteCatchupErr != nil { + return s.deleteCatchupErr + } + s.deleteCatchupCalls = append(s.deleteCatchupCalls, blockstore.CatchupRange{ + Start: start, + End: end, + }) + filtered := s.catchupProgress[:0] + for _, rng := range s.catchupProgress { + if rng.Start == start && rng.End == end { + continue + } + filtered = append(filtered, rng) + } + s.catchupProgress = filtered return nil }