diff --git a/cmd/indexer/main.go b/cmd/indexer/main.go index 40ad16d..93aa69a 100644 --- a/cmd/indexer/main.go +++ b/cmd/indexer/main.go @@ -14,6 +14,7 @@ import ( "github.com/alecthomas/kong" "gorm.io/gorm" + "github.com/fystack/multichain-indexer/internal/status" "github.com/fystack/multichain-indexer/internal/worker" "github.com/fystack/multichain-indexer/pkg/addressbloomfilter" "github.com/fystack/multichain-indexer/pkg/common/config" @@ -212,7 +213,7 @@ func runIndexer(chains []string, configPath string, debug, manual, catchup, from managerCfg, ) - healthServer := startHealthServer(cfg.Services.Port, cfg) + healthServer := startHealthServer(cfg.Services.Port, cfg, manager) // Start all workers logger.Info("Starting all workers") @@ -246,7 +247,7 @@ type HealthResponse struct { Version string `json:"version"` } -func startHealthServer(port int, cfg *config.Config) *http.Server { +func startHealthServer(port int, cfg *config.Config, manager *worker.Manager) *http.Server { mux := http.NewServeMux() version := cfg.Version @@ -266,13 +267,33 @@ func startHealthServer(port int, cfg *config.Config) *http.Server { json.NewEncoder(w).Encode(response) }) + mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + response := status.StatusResponse{ + Timestamp: time.Now().UTC(), + Version: version, + Networks: []status.NetworkStatus{}, + } + if manager != nil { + response = manager.StatusSnapshot(version) + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(response) + }) + server := &http.Server{ Addr: fmt.Sprintf(":%d", port), Handler: mux, } go func() { - logger.Info("Health check server started", "port", port, "endpoint", "/health") + logger.Info("Health check server started", "port", port, "endpoints", []string{"/health", "/status"}) if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { logger.Error("Health server failed to start", "error", err) } diff --git a/configs/config.example.yaml b/configs/config.example.yaml index 3a86e4a..8b9e6dc 100644 --- a/configs/config.example.yaml +++ b/configs/config.example.yaml @@ -8,6 +8,9 @@ defaults: poll_interval: "5s" # how often to poll for new blocks reorg_rollback_window: 20 # number of blocks to roll back on reorg two_way_indexing: false # enable two-way indexing for all chains + status: + healthy_max_pending_blocks: 50 + slow_max_pending_blocks: 250 client: timeout: "20s" # RPC timeout per request max_retries: 3 # max retries per RPC call @@ -38,6 +41,10 @@ chains: throttle: rps: 5 burst: 8 + status: + # Optional per-chain override for /status health condition. + healthy_max_pending_blocks: 80 + slow_max_pending_blocks: 400 ethereum_mainnet: network_id: "ethereum_mainnet" @@ -239,7 +246,10 @@ chains: batch_size: 200 concurrency: 12 parallel: true - + status: + healthy_max_pending_blocks: 50 + slow_max_pending_blocks: 250 + osmosis_mainnet: network_id: "osmosis-1" internal_code: "OSMO_MAINNET" diff --git a/internal/status/registry.go b/internal/status/registry.go new file mode 100644 index 0000000..d723284 --- /dev/null +++ b/internal/status/registry.go @@ -0,0 +1,302 @@ +package status + +import ( + "sort" + "strings" + "sync" + "time" + + "github.com/fystack/multichain-indexer/pkg/common/config" + "github.com/fystack/multichain-indexer/pkg/store/blockstore" +) + +type Registry struct { + mu sync.RWMutex + chains map[string]*chainState +} + +func NewRegistry() *Registry { + return &Registry{ + chains: make(map[string]*chainState), + } +} + +func (r *Registry) RegisterChain(chainKey, chainName string, chainCfg config.ChainConfig) { + key := config.CanonicalChainKey(chainKey) + if key == "" { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + state, exists := r.chains[key] + if !exists { + state = &chainState{ + failedBlocks: make(map[uint64]struct{}), + catchupRanges: make(map[catchupRangeKey]uint64), + } + r.chains[key] = state + } + + if strings.TrimSpace(chainCfg.NetworkId) != "" { + state.networkID = chainCfg.NetworkId + } else { + state.networkID = chainName + } + state.chainName = chainName + state.internalCode = chainCfg.InternalCode + state.networkType = string(chainCfg.Type) + state.thresholds = chainCfg.Status.Normalize() +} + +func (r *Registry) UpdateHead(chainKey string, latestBlock, indexedBlock uint64, indexedAt time.Time) { + key := config.CanonicalChainKey(chainKey) + if key == "" { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + state := r.ensureStateLocked(key) + state.latestBlock = latestBlock + state.indexedBlock = indexedBlock + if !indexedAt.IsZero() { + state.lastIndexedAt = indexedAt.UTC() + } +} + +func (r *Registry) MarkFailedBlock(chainKey string, blockNumber uint64) { + key := config.CanonicalChainKey(chainKey) + if key == "" || blockNumber == 0 { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + state := r.ensureStateLocked(key) + state.failedBlocks[blockNumber] = struct{}{} +} + +func (r *Registry) ClearFailedBlocks(chainKey string, blockNumbers []uint64) { + key := config.CanonicalChainKey(chainKey) + if key == "" || len(blockNumbers) == 0 { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + state := r.ensureStateLocked(key) + for _, block := range blockNumbers { + delete(state.failedBlocks, block) + } +} + +func (r *Registry) SetFailedBlocks(chainKey string, blockNumbers []uint64) { + key := config.CanonicalChainKey(chainKey) + if key == "" { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + state := r.ensureStateLocked(key) + state.failedBlocks = make(map[uint64]struct{}, len(blockNumbers)) + for _, block := range blockNumbers { + if block == 0 { + continue + } + state.failedBlocks[block] = struct{}{} + } +} + +func (r *Registry) SetCatchupRanges(chainKey string, ranges []blockstore.CatchupRange) { + key := config.CanonicalChainKey(chainKey) + if key == "" { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + state := r.ensureStateLocked(key) + state.catchupPendingBlocks = 0 + state.catchupRanges = make(map[catchupRangeKey]uint64, len(ranges)) + for _, rng := range ranges { + upsertCatchupRangeLocked(state, rng) + } +} + +func (r *Registry) UpsertCatchupRanges(chainKey string, ranges []blockstore.CatchupRange) { + key := config.CanonicalChainKey(chainKey) + if key == "" || len(ranges) == 0 { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + state := r.ensureStateLocked(key) + for _, rng := range ranges { + upsertCatchupRangeLocked(state, rng) + } +} + +func (r *Registry) DeleteCatchupRange(chainKey string, start, end uint64) { + key := config.CanonicalChainKey(chainKey) + if key == "" { + return + } + + rangeKey, ok := makeCatchupRangeKey(start, end) + if !ok { + return + } + + r.mu.Lock() + defer r.mu.Unlock() + + state := r.ensureStateLocked(key) + if pending, exists := state.catchupRanges[rangeKey]; exists { + if state.catchupPendingBlocks >= pending { + state.catchupPendingBlocks -= pending + } else { + state.catchupPendingBlocks = 0 + } + delete(state.catchupRanges, rangeKey) + } +} + +func (r *Registry) Snapshot(version string) StatusResponse { + r.mu.RLock() + states := make([]chainSnapshot, 0, len(r.chains)) + for _, state := range r.chains { + states = append(states, chainSnapshot{ + networkID: state.networkID, + chainName: state.chainName, + internalCode: state.internalCode, + networkType: state.networkType, + thresholds: state.thresholds, + latestBlock: state.latestBlock, + indexedBlock: state.indexedBlock, + lastIndexedAt: state.lastIndexedAt, + failedBlocksCount: len(state.failedBlocks), + catchupRangesCount: len(state.catchupRanges), + catchupPendingBlocks: state.catchupPendingBlocks, + }) + } + r.mu.RUnlock() + + networks := make([]NetworkStatus, 0, len(states)) + for _, state := range states { + headGap := uint64(0) + if state.latestBlock > state.indexedBlock { + headGap = state.latestBlock - state.indexedBlock + } + pending := headGap + state.catchupPendingBlocks + + item := NetworkStatus{ + NetworkID: state.networkID, + ChainName: state.chainName, + InternalCode: state.internalCode, + NetworkType: state.networkType, + Health: deriveHealth(pending, state.thresholds), + LatestBlock: state.latestBlock, + IndexedBlock: state.indexedBlock, + PendingBlocks: pending, + HeadGap: headGap, + CatchupPendingBlocks: state.catchupPendingBlocks, + CatchupRanges: state.catchupRangesCount, + FailedBlocks: state.failedBlocksCount, + } + if !state.lastIndexedAt.IsZero() { + t := state.lastIndexedAt + item.LastIndexedAt = &t + } + networks = append(networks, item) + } + + sort.Slice(networks, func(i, j int) bool { + return networks[i].ChainName < networks[j].ChainName + }) + + return StatusResponse{ + Timestamp: time.Now().UTC(), + Version: version, + Networks: networks, + } +} + +func (r *Registry) ensureStateLocked(key string) *chainState { + state, exists := r.chains[key] + if exists { + return state + } + + state = &chainState{ + networkID: strings.ToLower(key), + chainName: strings.ToLower(key), + internalCode: key, + thresholds: config.StatusConfig{}.Normalize(), + failedBlocks: make(map[uint64]struct{}), + catchupRanges: make(map[catchupRangeKey]uint64), + } + r.chains[key] = state + return state +} + +func makeCatchupRangeKey(start, end uint64) (catchupRangeKey, bool) { + if start == 0 || end < start { + return catchupRangeKey{}, false + } + return catchupRangeKey{start: start, end: end}, true +} + +func catchupPendingForRange(rng blockstore.CatchupRange) uint64 { + if rng.End < rng.Start || rng.Start == 0 { + return 0 + } + if rng.Current < rng.Start { + return rng.End - rng.Start + 1 + } + if rng.Current < rng.End { + return rng.End - rng.Current + } + return 0 +} + +func upsertCatchupRangeLocked(state *chainState, rng blockstore.CatchupRange) { + rangeKey, ok := makeCatchupRangeKey(rng.Start, rng.End) + if !ok { + return + } + if state.catchupRanges == nil { + state.catchupRanges = make(map[catchupRangeKey]uint64) + } + + newPending := catchupPendingForRange(rng) + oldPending := state.catchupRanges[rangeKey] + state.catchupRanges[rangeKey] = newPending + + if state.catchupPendingBlocks >= oldPending { + state.catchupPendingBlocks -= oldPending + } else { + state.catchupPendingBlocks = 0 + } + state.catchupPendingBlocks += newPending +} + +func deriveHealth(pendingBlocks uint64, thresholds config.StatusConfig) HealthStatus { + switch { + case pendingBlocks < thresholds.HealthyMaxPendingBlocks: + return HealthHealthy + case pendingBlocks < thresholds.SlowMaxPendingBlocks: + return HealthSlow + default: + return HealthDegraded + } +} diff --git a/internal/status/status.go b/internal/status/status.go new file mode 100644 index 0000000..762a408 --- /dev/null +++ b/internal/status/status.go @@ -0,0 +1,38 @@ +package status + +import ( + "time" + + "github.com/fystack/multichain-indexer/pkg/store/blockstore" +) + +// StatusRegistry captures status mutations used by workers. +type StatusRegistry interface { + UpdateHead(chainKey string, latestBlock, indexedBlock uint64, indexedAt time.Time) + MarkFailedBlock(chainKey string, blockNumber uint64) + ClearFailedBlocks(chainKey string, blockNumbers []uint64) + SetFailedBlocks(chainKey string, blockNumbers []uint64) + SetCatchupRanges(chainKey string, ranges []blockstore.CatchupRange) + UpsertCatchupRanges(chainKey string, ranges []blockstore.CatchupRange) + DeleteCatchupRange(chainKey string, start, end uint64) +} + +// NoopStatusRegistry is a no-op implementation of StatusRegistry. +type NoopStatusRegistry struct{} + +func (NoopStatusRegistry) UpdateHead(string, uint64, uint64, time.Time) {} +func (NoopStatusRegistry) MarkFailedBlock(string, uint64) {} +func (NoopStatusRegistry) ClearFailedBlocks(string, []uint64) {} +func (NoopStatusRegistry) SetFailedBlocks(string, []uint64) {} +func (NoopStatusRegistry) SetCatchupRanges(string, []blockstore.CatchupRange) { +} +func (NoopStatusRegistry) UpsertCatchupRanges(string, []blockstore.CatchupRange) { +} +func (NoopStatusRegistry) DeleteCatchupRange(string, uint64, uint64) {} + +func EnsureStatusRegistry(statusRegistry StatusRegistry) StatusRegistry { + if statusRegistry == nil { + return NoopStatusRegistry{} + } + return statusRegistry +} diff --git a/internal/status/status_test.go b/internal/status/status_test.go new file mode 100644 index 0000000..630ac6a --- /dev/null +++ b/internal/status/status_test.go @@ -0,0 +1,142 @@ +package status + +import ( + "testing" + "time" + + "github.com/fystack/multichain-indexer/pkg/common/config" + "github.com/fystack/multichain-indexer/pkg/common/enum" + "github.com/fystack/multichain-indexer/pkg/store/blockstore" + "github.com/stretchr/testify/require" +) + +func TestRegistrySnapshotDerivesHealthWithPerChainThresholds(t *testing.T) { + t.Parallel() + + registry := NewRegistry() + registry.RegisterChain( + "ETH_MAINNET", + "ethereum_mainnet", + config.ChainConfig{ + NetworkId: "eth-mainnet", + InternalCode: "ETH_MAINNET", + Type: enum.NetworkTypeEVM, + Status: config.StatusConfig{ + HealthyMaxPendingBlocks: 20, + SlowMaxPendingBlocks: 100, + }, + }, + ) + + indexedAt := time.Date(2026, 3, 25, 12, 0, 0, 0, time.UTC) + registry.UpdateHead("eth_mainnet", 1_000, 980, indexedAt) + registry.MarkFailedBlock("eth_mainnet", 981) + registry.MarkFailedBlock("eth_mainnet", 982) + registry.SetCatchupRanges("eth_mainnet", []blockstore.CatchupRange{ + {Start: 100, End: 102, Current: 99}, + {Start: 200, End: 201, Current: 199}, + }) + + resp := registry.Snapshot("1.2.3") + require.Equal(t, "1.2.3", resp.Version) + require.Len(t, resp.Networks, 1) + + network := resp.Networks[0] + require.Equal(t, "eth-mainnet", network.NetworkID) + require.Equal(t, "ethereum_mainnet", network.ChainName) + require.Equal(t, "ETH_MAINNET", network.InternalCode) + require.Equal(t, "evm", network.NetworkType) + require.Equal(t, uint64(1_000), network.LatestBlock) + require.Equal(t, uint64(980), network.IndexedBlock) + require.Equal(t, uint64(20), network.HeadGap) + require.Equal(t, uint64(5), network.CatchupPendingBlocks) + require.Equal(t, uint64(25), network.PendingBlocks) + require.Equal(t, 2, network.CatchupRanges) + require.Equal(t, 2, network.FailedBlocks) + require.Equal(t, HealthSlow, network.Health) + require.NotNil(t, network.LastIndexedAt) + require.True(t, network.LastIndexedAt.Equal(indexedAt)) +} + +func TestRegistrySnapshotUsesDefaultThresholdWhenMissing(t *testing.T) { + t.Parallel() + + registry := NewRegistry() + registry.RegisterChain( + "TRON_MAINNET", + "tron_mainnet", + config.ChainConfig{ + NetworkId: "tron-mainnet", + InternalCode: "TRON_MAINNET", + Type: enum.NetworkTypeTron, + }, + ) + + registry.UpdateHead("tron_mainnet", 500, 470, time.Time{}) + + resp := registry.Snapshot("1.0.0") + require.Len(t, resp.Networks, 1) + + network := resp.Networks[0] + require.Equal(t, uint64(30), network.PendingBlocks) + require.Equal(t, HealthHealthy, network.Health) +} + +func TestRegistryClearFailedBlocks(t *testing.T) { + t.Parallel() + + registry := NewRegistry() + registry.RegisterChain("BTC_MAINNET", "bitcoin_mainnet", config.ChainConfig{ + NetworkId: "btc-mainnet", + InternalCode: "BTC_MAINNET", + Type: enum.NetworkTypeBtc, + }) + + registry.MarkFailedBlock("btc_mainnet", 10) + registry.MarkFailedBlock("btc_mainnet", 11) + registry.ClearFailedBlocks("btc_mainnet", []uint64{10}) + + resp := registry.Snapshot("1.0.0") + require.Len(t, resp.Networks, 1) + require.Equal(t, 1, resp.Networks[0].FailedBlocks) + + registry.SetFailedBlocks("btc_mainnet", []uint64{21, 22, 22}) + resp = registry.Snapshot("1.0.0") + require.Equal(t, 2, resp.Networks[0].FailedBlocks) +} + +func TestRegistryCatchupRangeMutations(t *testing.T) { + t.Parallel() + + registry := NewRegistry() + registry.RegisterChain("SOL_MAINNET", "sol_mainnet", config.ChainConfig{ + NetworkId: "sol-mainnet", + InternalCode: "SOL_MAINNET", + Type: enum.NetworkTypeSol, + }) + + registry.SetCatchupRanges("sol_mainnet", []blockstore.CatchupRange{ + {Start: 1, End: 10, Current: 0}, + {Start: 20, End: 25, Current: 22}, + }) + + resp := registry.Snapshot("1.0.0") + require.Len(t, resp.Networks, 1) + require.Equal(t, 2, resp.Networks[0].CatchupRanges) + require.Equal(t, uint64(13), resp.Networks[0].CatchupPendingBlocks) + + registry.UpsertCatchupRanges("sol_mainnet", []blockstore.CatchupRange{ + {Start: 1, End: 10, Current: 5}, + {Start: 30, End: 31, Current: 29}, + }) + + resp = registry.Snapshot("1.0.0") + require.Equal(t, 3, resp.Networks[0].CatchupRanges) + require.Equal(t, uint64(10), resp.Networks[0].CatchupPendingBlocks) + + registry.DeleteCatchupRange("sol_mainnet", 20, 25) + + resp = registry.Snapshot("1.0.0") + require.Equal(t, 2, resp.Networks[0].CatchupRanges) + require.Equal(t, uint64(7), resp.Networks[0].CatchupPendingBlocks) +} diff --git a/internal/status/types.go b/internal/status/types.go new file mode 100644 index 0000000..3b839da --- /dev/null +++ b/internal/status/types.go @@ -0,0 +1,70 @@ +package status + +import ( + "time" + + "github.com/fystack/multichain-indexer/pkg/common/config" +) + +type HealthStatus string + +const ( + HealthHealthy HealthStatus = "healthy" + HealthSlow HealthStatus = "slow" + HealthDegraded HealthStatus = "degraded" +) + +type NetworkStatus struct { + NetworkID string `json:"network_id"` + ChainName string `json:"chain_name"` + InternalCode string `json:"internal_code"` + NetworkType string `json:"network_type"` + Health HealthStatus `json:"health"` + LatestBlock uint64 `json:"latest_block"` + IndexedBlock uint64 `json:"indexed_block"` + PendingBlocks uint64 `json:"pending_blocks"` + HeadGap uint64 `json:"head_gap"` + CatchupPendingBlocks uint64 `json:"catchup_pending_blocks"` + CatchupRanges int `json:"catchup_ranges"` + FailedBlocks int `json:"failed_blocks"` + LastIndexedAt *time.Time `json:"last_indexed_at,omitempty"` +} + +type StatusResponse struct { + Timestamp time.Time `json:"timestamp"` + Version string `json:"version"` + Networks []NetworkStatus `json:"networks"` +} + +type catchupRangeKey struct { + start uint64 + end uint64 +} + +type chainState struct { + networkID string + chainName string + internalCode string + networkType string + thresholds config.StatusConfig + latestBlock uint64 + indexedBlock uint64 + lastIndexedAt time.Time + failedBlocks map[uint64]struct{} + catchupRanges map[catchupRangeKey]uint64 + catchupPendingBlocks uint64 +} + +type chainSnapshot struct { + networkID string + chainName string + internalCode string + networkType string + thresholds config.StatusConfig + latestBlock uint64 + indexedBlock uint64 + lastIndexedAt time.Time + failedBlocksCount int + catchupRangesCount int + catchupPendingBlocks uint64 +} diff --git a/internal/worker/base.go b/internal/worker/base.go index 53cde6d..546d0e4 100644 --- a/internal/worker/base.go +++ b/internal/worker/base.go @@ -8,6 +8,7 @@ import ( "log/slog" "github.com/fystack/multichain-indexer/internal/indexer" + "github.com/fystack/multichain-indexer/internal/status" "github.com/fystack/multichain-indexer/pkg/common/config" "github.com/fystack/multichain-indexer/pkg/common/logger" "github.com/fystack/multichain-indexer/pkg/common/types" @@ -38,14 +39,15 @@ type BaseWorker struct { mode WorkerMode logger *slog.Logger - config config.ChainConfig - chain indexer.Indexer - kvstore infra.KVStore - blockStore blockstore.Store - pubkeyStore pubkeystore.Store - emitter events.Emitter - failedChan chan FailedBlockEvent - observer BlockResultObserver + config config.ChainConfig + chain indexer.Indexer + kvstore infra.KVStore + blockStore blockstore.Store + pubkeyStore pubkeystore.Store + emitter events.Emitter + failedChan chan FailedBlockEvent + observer BlockResultObserver + statusRegistry status.StatusRegistry } // Stop stops the worker and cleans up internal resources @@ -65,6 +67,7 @@ func newWorkerWithMode( pubkeyStore pubkeystore.Store, mode WorkerMode, failedChan chan FailedBlockEvent, + statusRegistry status.StatusRegistry, ) *BaseWorker { ctx, cancel := context.WithCancel(ctx) log := logger.With( @@ -73,17 +76,18 @@ func newWorkerWithMode( ) return &BaseWorker{ - ctx: ctx, - cancel: cancel, - mode: mode, - logger: log, - config: cfg, - chain: chain, - kvstore: kv, - blockStore: blockStore, - pubkeyStore: pubkeyStore, - emitter: emitter, - failedChan: failedChan, + ctx: ctx, + cancel: cancel, + mode: mode, + logger: log, + config: cfg, + chain: chain, + kvstore: kv, + blockStore: blockStore, + pubkeyStore: pubkeyStore, + emitter: emitter, + failedChan: failedChan, + statusRegistry: status.EnsureStatusRegistry(statusRegistry), } } @@ -138,8 +142,11 @@ func (bw *BaseWorker) notifyObserver(blockNumber uint64, status BlockStatus) { // handleBlockResult processes a block result and persists/forwards errors if needed. func (bw *BaseWorker) handleBlockResult(result indexer.BlockResult) bool { + registry := status.EnsureStatusRegistry(bw.statusRegistry) + if result.Error != nil { _ = bw.blockStore.SaveFailedBlock(bw.chain.GetNetworkInternalCode(), result.Number) + registry.MarkFailedBlock(bw.chain.GetName(), result.Number) // Non-blocking push to failedChan select { @@ -182,6 +189,7 @@ func (bw *BaseWorker) handleBlockResult(result indexer.BlockResult) bool { "chain", bw.chain.GetName(), "block", result.Block.Number, ) + registry.ClearFailedBlocks(bw.chain.GetName(), []uint64{result.Number}) bw.notifyObserver(result.Number, BlockStatusProcessed) return true } diff --git a/internal/worker/catchup.go b/internal/worker/catchup.go index a45fdce..2af7cdb 100644 --- a/internal/worker/catchup.go +++ b/internal/worker/catchup.go @@ -7,6 +7,7 @@ import ( "time" "github.com/fystack/multichain-indexer/internal/indexer" + "github.com/fystack/multichain-indexer/internal/status" "github.com/fystack/multichain-indexer/pkg/common/config" "github.com/fystack/multichain-indexer/pkg/common/enum" "github.com/fystack/multichain-indexer/pkg/events" @@ -38,6 +39,7 @@ func NewCatchupWorker( emitter events.Emitter, pubkeyStore pubkeystore.Store, failedChan chan FailedBlockEvent, + statusRegistry status.StatusRegistry, ) *CatchupWorker { worker := newWorkerWithMode( ctx, @@ -49,6 +51,7 @@ func NewCatchupWorker( pubkeyStore, ModeCatchup, failedChan, + statusRegistry, ) cw := &CatchupWorker{ BaseWorker: worker, @@ -109,6 +112,7 @@ func (cw *CatchupWorker) runCatchup() { } func (cw *CatchupWorker) loadCatchupProgress() []blockstore.CatchupRange { + registry := status.EnsureStatusRegistry(cw.statusRegistry) var ranges []blockstore.CatchupRange // Load existing catchup ranges from database (they're already split when saved) @@ -118,6 +122,7 @@ func (cw *CatchupWorker) loadCatchupProgress() []blockstore.CatchupRange { "progress_ranges", len(progress), ) ranges = progress + registry.SetCatchupRanges(cw.chain.GetName(), progress) } else { cw.logger.Warn("Failed to load catchup progress, will create new range", "chain", cw.chain.GetName(), @@ -157,6 +162,8 @@ func (cw *CatchupWorker) loadCatchupProgress() []blockstore.CatchupRange { "count", len(newRanges), "error", err, ) + } else { + registry.UpsertCatchupRanges(cw.chain.GetName(), newRanges) } ranges = append(ranges, newRanges...) } @@ -346,24 +353,54 @@ func (cw *CatchupWorker) processRange(r blockstore.CatchupRange, workerID int) e func (cw *CatchupWorker) saveProgress(r blockstore.CatchupRange, current uint64) { cw.progressMu.Lock() defer cw.progressMu.Unlock() + registry := status.EnsureStatusRegistry(cw.statusRegistry) cw.logger.Debug("Saving catchup progress", "chain", cw.chain.GetName(), "range", fmt.Sprintf("%d-%d", r.Start, r.End), "current", current, ) - _ = cw.blockStore.SaveCatchupProgress(cw.chain.GetNetworkInternalCode(), r.Start, r.End, current) + current = min(current, r.End) + if err := cw.blockStore.SaveCatchupProgress(cw.chain.GetNetworkInternalCode(), r.Start, r.End, current); err != nil { + cw.logger.Warn("Failed to save catchup progress", + "chain", cw.chain.GetName(), + "range", fmt.Sprintf("%d-%d", r.Start, r.End), + "current", current, + "error", err, + ) + return + } + registry.UpsertCatchupRanges(cw.chain.GetName(), []blockstore.CatchupRange{{ + Start: r.Start, + End: r.End, + Current: current, + }}) + for i := range cw.blockRanges { + if cw.blockRanges[i].Start == r.Start && cw.blockRanges[i].End == r.End { + cw.blockRanges[i].Current = current + break + } + } } func (cw *CatchupWorker) completeRange(r blockstore.CatchupRange) error { cw.progressMu.Lock() defer cw.progressMu.Unlock() + registry := status.EnsureStatusRegistry(cw.statusRegistry) cw.logger.Info("Completing catchup range", "chain", cw.chain.GetName(), "range", fmt.Sprintf("%d-%d", r.Start, r.End), ) - _ = cw.blockStore.DeleteCatchupRange(cw.chain.GetNetworkInternalCode(), r.Start, r.End) + if err := cw.blockStore.DeleteCatchupRange(cw.chain.GetNetworkInternalCode(), r.Start, r.End); err != nil { + cw.logger.Warn("Failed to delete catchup range", + "chain", cw.chain.GetName(), + "range", fmt.Sprintf("%d-%d", r.Start, r.End), + "error", err, + ) + return err + } + registry.DeleteCatchupRange(cw.chain.GetName(), r.Start, r.End) // Remove from local ranges for i, existing := range cw.blockRanges { @@ -409,6 +446,9 @@ func (cw *CatchupWorker) Close() error { "ranges", len(rangesToSave), "error", err, ) + } else { + registry := status.EnsureStatusRegistry(cw.statusRegistry) + registry.UpsertCatchupRanges(cw.chain.GetName(), rangesToSave) } return nil diff --git a/internal/worker/factory.go b/internal/worker/factory.go index 71064ef..6d14611 100644 --- a/internal/worker/factory.go +++ b/internal/worker/factory.go @@ -18,6 +18,7 @@ import ( "github.com/fystack/multichain-indexer/internal/rpc/sui" tonrpc "github.com/fystack/multichain-indexer/internal/rpc/ton" "github.com/fystack/multichain-indexer/internal/rpc/tron" + "github.com/fystack/multichain-indexer/internal/status" "github.com/fystack/multichain-indexer/pkg/addressbloomfilter" "github.com/fystack/multichain-indexer/pkg/common/config" "github.com/fystack/multichain-indexer/pkg/common/enum" @@ -35,14 +36,15 @@ import ( // WorkerDeps bundles dependencies injected into workers. type WorkerDeps struct { - Ctx context.Context - KVStore infra.KVStore - BlockStore blockstore.Store - Emitter events.Emitter - Pubkey pubkeystore.Store - Redis infra.RedisClient - FailedChan chan FailedBlockEvent - Observer BlockResultObserver + Ctx context.Context + KVStore infra.KVStore + BlockStore blockstore.Store + Emitter events.Emitter + Pubkey pubkeystore.Store + Redis infra.RedisClient + FailedChan chan FailedBlockEvent + Observer BlockResultObserver + StatusRegistry status.StatusRegistry } // ManagerConfig defines which workers to enable per chain. @@ -105,6 +107,7 @@ func BuildWorkers( deps.Emitter, deps.Pubkey, deps.FailedChan, + deps.StatusRegistry, ), } case ModeCatchup: @@ -118,6 +121,7 @@ func BuildWorkers( deps.Emitter, deps.Pubkey, deps.FailedChan, + deps.StatusRegistry, ), } case ModeRescanner: @@ -131,6 +135,7 @@ func BuildWorkers( deps.Emitter, deps.Pubkey, deps.FailedChan, + deps.StatusRegistry, ), } case ModeManual: @@ -145,6 +150,7 @@ func BuildWorkers( deps.Emitter, deps.Pubkey, deps.FailedChan, + deps.StatusRegistry, ), } case ModeMempool: @@ -158,6 +164,7 @@ func BuildWorkers( deps.Emitter, deps.Pubkey, deps.FailedChan, + deps.StatusRegistry, ), } default: @@ -772,8 +779,10 @@ func CreateManagerWithWorkers( // Shared stores blockStore := blockstore.NewBlockStore(kvstore) pubkeyStore := pubkeystore.NewPublicKeyStore(addressBF) + statusRegistry := status.NewRegistry() manager := NewManager(ctx, kvstore, blockStore, emitter, pubkeyStore) + manager.registry = statusRegistry // Loop each chain for _, chainName := range managerCfg.Chains { @@ -805,22 +814,36 @@ func CreateManagerWithWorkers( default: logger.Fatal("Unsupported network type", "chain", chainName, "type", chainCfg.Type) } + statusRegistry.RegisterChain(idxr.GetName(), chainName, chainCfg) + if existingFailed, err := blockStore.GetFailedBlocks(idxr.GetNetworkInternalCode()); err == nil { + statusRegistry.SetFailedBlocks(idxr.GetName(), existingFailed) + } + if existingCatchup, err := blockStore.GetCatchupProgress(idxr.GetNetworkInternalCode()); err == nil { + statusRegistry.SetCatchupRanges(idxr.GetName(), existingCatchup) + } else { + logger.Warn("Failed to load catchup progress for status registry", + "chain", chainName, + "internal_code", idxr.GetNetworkInternalCode(), + "error", err, + ) + } failedChan := make(chan FailedBlockEvent, 100) // Worker deps deps := WorkerDeps{ - Ctx: ctx, - KVStore: kvstore, - BlockStore: blockStore, - Emitter: emitter, - Pubkey: pubkeyStore, - Redis: redisClient, - FailedChan: failedChan, - Observer: managerCfg.Observer, + Ctx: ctx, + KVStore: kvstore, + BlockStore: blockStore, + Emitter: emitter, + Pubkey: pubkeyStore, + Redis: redisClient, + FailedChan: failedChan, + Observer: managerCfg.Observer, + StatusRegistry: statusRegistry, } - // Helper: add workers if enabled (all modes share the same indexer and global rate limiter) + // Helper: add workers if enabled (all modes share the same indexer and global rate limiter). addIfEnabled := func(mode WorkerMode, enabled bool) { if enabled { ws := BuildWorkers(idxr, chainCfg, mode, deps) diff --git a/internal/worker/factory_test.go b/internal/worker/factory_test.go index 187d5dc..be6261b 100644 --- a/internal/worker/factory_test.go +++ b/internal/worker/factory_test.go @@ -3,15 +3,18 @@ package worker import ( "context" "errors" + "fmt" "log/slog" "testing" "time" "github.com/fystack/multichain-indexer/pkg/common/config" + "github.com/fystack/multichain-indexer/pkg/common/constant" "github.com/fystack/multichain-indexer/pkg/common/enum" commonlogger "github.com/fystack/multichain-indexer/pkg/common/logger" "github.com/fystack/multichain-indexer/pkg/events" "github.com/fystack/multichain-indexer/pkg/infra" + "github.com/fystack/multichain-indexer/pkg/store/blockstore" "github.com/hashicorp/consul/api" "github.com/stretchr/testify/require" ) @@ -116,8 +119,8 @@ func TestRescannerFailedChannelIsolationByChain(t *testing.T) { chA := make(chan FailedBlockEvent, 1) chB := make(chan FailedBlockEvent, 1) - rwA := NewRescannerWorker(ctx, chainA, testChainConfig(), noopKVStore{}, &stubBlockStore{}, events.Emitter(nil), nil, chA) - rwB := NewRescannerWorker(ctx, chainB, testChainConfig(), noopKVStore{}, &stubBlockStore{}, events.Emitter(nil), nil, chB) + rwA := NewRescannerWorker(ctx, chainA, testChainConfig(), noopKVStore{}, &stubBlockStore{}, events.Emitter(nil), nil, chA, nil) + rwB := NewRescannerWorker(ctx, chainB, testChainConfig(), noopKVStore{}, &stubBlockStore{}, events.Emitter(nil), nil, chB, nil) doneA := make(chan struct{}) doneB := make(chan struct{}) @@ -145,8 +148,63 @@ func TestRescannerFailedChannelIsolationByChain(t *testing.T) { require.NotContains(t, rwB.failedBlocks, uint64(101)) } +func TestCreateManagerWithWorkersBootstrapsCatchupRangesIntoStatusRegistry(t *testing.T) { + t.Parallel() + initTestLogger() + + cfg := &config.Config{ + Chains: config.Chains{ + "chain-a": { + Name: "chain-a", + NetworkId: "chain-a", + InternalCode: "a", + Type: enum.NetworkTypeEVM, + PollInterval: time.Millisecond, + Client: config.ClientConfig{ + Timeout: time.Second, + }, + Throttle: config.Throttle{ + BatchSize: 1, + RPS: 1, + Burst: 1, + }, + Nodes: []config.NodeConfig{{URL: "http://127.0.0.1:8545"}}, + }, + }, + } + + kv := &listKVStore{ + pairs: []*infra.KVPair{{ + Key: fmt.Sprintf("%s/%s/%s/%d-%d", blockstore.BlockStates, "a", constant.KVPrefixProgressCatchup, 1, 20), + Value: []byte("10"), + }}, + } + + manager := CreateManagerWithWorkers( + context.Background(), + cfg, + kv, + nil, + nil, + events.Emitter(nil), + nil, + ManagerConfig{ + Chains: []string{"chain-a"}, + }, + ) + + resp := manager.StatusSnapshot("1.0.0") + require.Len(t, resp.Networks, 1) + require.Equal(t, 1, resp.Networks[0].CatchupRanges) + require.Equal(t, uint64(10), resp.Networks[0].CatchupPendingBlocks) +} + type noopKVStore struct{} +type listKVStore struct { + pairs []*infra.KVPair +} + func initTestLogger() { commonlogger.Init(&commonlogger.Options{ Level: slog.LevelError, @@ -165,3 +223,24 @@ func (noopKVStore) List(string) ([]*infra.KVPair, error) { return nil, nil } func (noopKVStore) Delete(string) error { return nil } func (noopKVStore) BatchSet([]infra.KVPair) error { return nil } func (noopKVStore) Close() error { return nil } + +func (s *listKVStore) GetName() string { return "list" } +func (s *listKVStore) Set(string, string) error { return nil } +func (s *listKVStore) Get(string) (string, error) { return "", errors.New("not found") } +func (s *listKVStore) GetWithOptions(string, *api.QueryOptions) (string, error) { + return "", errors.New("not found") +} +func (s *listKVStore) SetAny(string, any) error { return nil } +func (s *listKVStore) GetAny(string, any) (bool, error) { return false, nil } +func (s *listKVStore) Delete(string) error { return nil } +func (s *listKVStore) BatchSet([]infra.KVPair) error { return nil } +func (s *listKVStore) Close() error { return nil } +func (s *listKVStore) List(prefix string) ([]*infra.KVPair, error) { + var out []*infra.KVPair + for _, pair := range s.pairs { + if len(pair.Key) >= len(prefix) && pair.Key[:len(prefix)] == prefix { + out = append(out, pair) + } + } + return out, nil +} diff --git a/internal/worker/manager.go b/internal/worker/manager.go index 59d64bf..bcf6e6a 100644 --- a/internal/worker/manager.go +++ b/internal/worker/manager.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/fystack/multichain-indexer/internal/status" "github.com/fystack/multichain-indexer/pkg/common/logger" "github.com/fystack/multichain-indexer/pkg/events" "github.com/fystack/multichain-indexer/pkg/infra" @@ -22,6 +23,7 @@ type Manager struct { blockStore blockstore.Store emitter events.Emitter pubkeyStore pubkeystore.Store + registry *status.Registry } func NewManager( @@ -40,6 +42,10 @@ func NewManager( } } +func (m *Manager) Registry() *status.Registry { + return m.registry +} + // Start launches all injected workers func (m *Manager) Start() { for _, w := range m.workers { @@ -86,16 +92,28 @@ func (m *Manager) Stop() { logger.Info("Manager stopped") } -// closeResource is a helper to close resources with consistent error handling -func (m *Manager) closeResource(name string, resource interface{}, closer func() error) { - if resource != nil { - if err := closer(); err != nil { - logger.Error("Failed to close "+name, "err", err) +// StatusSnapshot returns /status payload from live in-memory registry state. +func (m *Manager) StatusSnapshot(version string) status.StatusResponse { + if m.registry == nil { + return status.StatusResponse{ + Timestamp: time.Now().UTC(), + Version: version, + Networks: []status.NetworkStatus{}, } } + return m.registry.Snapshot(version) } // Inject workers into manager func (m *Manager) AddWorkers(workers ...Worker) { m.workers = append(m.workers, workers...) } + +// closeResource is a helper to close resources with consistent error handling +func (m *Manager) closeResource(name string, resource any, closer func() error) { + if resource != nil { + if err := closer(); err != nil { + logger.Error("Failed to close "+name, "err", err) + } + } +} diff --git a/internal/worker/manual.go b/internal/worker/manual.go index 34d4340..64ebe62 100644 --- a/internal/worker/manual.go +++ b/internal/worker/manual.go @@ -5,6 +5,7 @@ import ( "time" "github.com/fystack/multichain-indexer/internal/indexer" + "github.com/fystack/multichain-indexer/internal/status" "github.com/fystack/multichain-indexer/pkg/common/config" "github.com/fystack/multichain-indexer/pkg/events" "github.com/fystack/multichain-indexer/pkg/infra" @@ -41,6 +42,7 @@ func NewManualWorker( emitter events.Emitter, pubkeyStore pubkeystore.Store, failedChan chan FailedBlockEvent, + statusRegistry status.StatusRegistry, ) *ManualWorker { return &ManualWorker{ BaseWorker: newWorkerWithMode( @@ -53,6 +55,7 @@ func NewManualWorker( pubkeyStore, ModeManual, failedChan, + statusRegistry, ), mbs: missingblockstore.NewMissingBlocksStore(redisClient), config: DefaultManualConfig, diff --git a/internal/worker/mempool.go b/internal/worker/mempool.go index 7955a41..9bf9f75 100644 --- a/internal/worker/mempool.go +++ b/internal/worker/mempool.go @@ -6,6 +6,7 @@ import ( "time" "github.com/fystack/multichain-indexer/internal/indexer" + "github.com/fystack/multichain-indexer/internal/status" "github.com/fystack/multichain-indexer/pkg/common/config" "github.com/fystack/multichain-indexer/pkg/common/types" "github.com/fystack/multichain-indexer/pkg/events" @@ -33,6 +34,7 @@ func NewMempoolWorker( emitter events.Emitter, pubkeyStore pubkeystore.Store, failedChan chan FailedBlockEvent, + statusRegistry status.StatusRegistry, ) *MempoolWorker { worker := newWorkerWithMode( ctx, @@ -44,6 +46,7 @@ func NewMempoolWorker( pubkeyStore, ModeMempool, failedChan, + statusRegistry, ) // Cast to Bitcoin indexer (mempool is Bitcoin-specific) @@ -156,7 +159,7 @@ func (mw *MempoolWorker) processMempool() error { if mw.config.IndexUTXO { for i := range utxoEvents { event := &utxoEvents[i] - + if mw.seenTxs[event.TxHash+":utxo"] { continue } diff --git a/internal/worker/regular.go b/internal/worker/regular.go index e99646e..4d35d76 100644 --- a/internal/worker/regular.go +++ b/internal/worker/regular.go @@ -7,6 +7,7 @@ import ( "time" "github.com/fystack/multichain-indexer/internal/indexer" + "github.com/fystack/multichain-indexer/internal/status" "github.com/fystack/multichain-indexer/pkg/common/config" "github.com/fystack/multichain-indexer/pkg/common/constant" "github.com/fystack/multichain-indexer/pkg/common/enum" @@ -16,7 +17,6 @@ import ( "github.com/fystack/multichain-indexer/pkg/store/pubkeystore" ) - const ( MaxBlockHashSize = 50 regularGapRetryAttempts = 2 @@ -31,7 +31,7 @@ type RegularWorker struct { *BaseWorker currentBlock uint64 blockHashes []blockstore.BlockHashEntry - hashesModified bool + hashesModified bool persistTicker *time.Ticker } @@ -44,6 +44,7 @@ func NewRegularWorker( emitter events.Emitter, pubkeyStore pubkeystore.Store, failedChan chan FailedBlockEvent, + statusRegistry status.StatusRegistry, ) *RegularWorker { worker := newWorkerWithMode( ctx, @@ -55,6 +56,7 @@ func NewRegularWorker( pubkeyStore, ModeRegular, failedChan, + statusRegistry, ) rw := &RegularWorker{BaseWorker: worker} rw.currentBlock = rw.determineStartingBlock() @@ -94,17 +96,20 @@ func (rw *RegularWorker) processRegularBlocks() error { if err != nil { return fmt.Errorf("get latest block: %w", err) } + rw.updateHeadStatus(latest, time.Time{}) rw.logger.Info("Got latest block", "latest", latest, "current", rw.currentBlock) // Lag detection: if we're too far behind, jump to chain head and queue skipped range for catchup if rw.skipAheadIfLagging(latest) { + rw.updateHeadStatus(latest, time.Time{}) return nil } if rw.currentBlock > latest { rw.logger.Info("Waiting for new blocks...", "current", rw.currentBlock, "latest", latest) time.Sleep(rw.config.PollInterval) + rw.updateHeadStatus(latest, time.Time{}) return nil } @@ -152,14 +157,17 @@ func (rw *RegularWorker) processRegularBlocks() error { return nil } + indexedAt := time.Time{} if lastSuccess >= rw.currentBlock { rw.currentBlock = lastSuccess + 1 _ = rw.blockStore.SaveLatestBlock(rw.chain.GetNetworkInternalCode(), lastSuccess) + indexedAt = time.Now().UTC() if lastSuccessHash != "" { rw.addBlockHash(lastSuccess, lastSuccessHash) } } + rw.updateHeadStatus(latest, indexedAt) rw.logger.Info("Processed latest blocks", "chain", rw.chain.GetName(), @@ -174,6 +182,7 @@ func (rw *RegularWorker) processRegularBlocks() error { } func (rw *RegularWorker) determineStartingBlock() uint64 { + registry := status.EnsureStatusRegistry(rw.statusRegistry) chainLatest, err1 := rw.chain.GetLatestBlockNumber(rw.ctx) kvLatest, err2 := rw.blockStore.GetLatestBlock(rw.chain.GetNetworkInternalCode()) @@ -216,6 +225,8 @@ func (rw *RegularWorker) determineStartingBlock() uint64 { "count", len(ranges), "error", err, ) + } else { + registry.UpsertCatchupRanges(rw.chain.GetName(), ranges) } rw.logger.Info("Queued catchup ranges", @@ -343,6 +354,7 @@ func (rw *RegularWorker) flushBlockHashes() { // skipAheadIfLagging checks if the regular worker is too far behind the chain head. // If so, it queues the skipped range for catchup and jumps currentBlock to chain head. func (rw *RegularWorker) skipAheadIfLagging(latest uint64) bool { + registry := status.EnsureStatusRegistry(rw.statusRegistry) maxLag := rw.config.MaxLag if maxLag == 0 { maxLag = constant.DefaultMaxLag @@ -377,6 +389,8 @@ func (rw *RegularWorker) skipAheadIfLagging(latest uint64) bool { "count", len(ranges), "error", err, ) + } else { + registry.UpsertCatchupRanges(rw.chain.GetName(), ranges) } rw.currentBlock = latest @@ -391,6 +405,23 @@ func (rw *RegularWorker) skipAheadIfLagging(latest uint64) bool { return true } +func (rw *RegularWorker) currentIndexedBlock() uint64 { + if rw.currentBlock == 0 { + return 0 + } + return rw.currentBlock - 1 +} + +func (rw *RegularWorker) updateHeadStatus(latest uint64, indexedAt time.Time) { + registry := status.EnsureStatusRegistry(rw.statusRegistry) + registry.UpdateHead( + rw.chain.GetName(), + latest, + rw.currentIndexedBlock(), + indexedAt, + ) +} + func (rw *RegularWorker) processReorgCheckedBatch( results []indexer.BlockResult, end uint64, diff --git a/internal/worker/regular_test.go b/internal/worker/regular_test.go index 2156116..73573f1 100644 --- a/internal/worker/regular_test.go +++ b/internal/worker/regular_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/fystack/multichain-indexer/internal/indexer" + "github.com/fystack/multichain-indexer/internal/status" "github.com/fystack/multichain-indexer/pkg/common/config" "github.com/fystack/multichain-indexer/pkg/common/enum" "github.com/fystack/multichain-indexer/pkg/common/types" @@ -144,6 +145,81 @@ func TestBaseWorkerExecuteRecoverableConvertsPanicToError(t *testing.T) { require.Equal(t, "test panic panic: boom", err.Error()) } +func TestRegularWorkerDetermineStartingBlockUpdatesCatchupRegistry(t *testing.T) { + t.Parallel() + + chain := &stubIndexer{ + name: "ethereum", + internalCode: "ETH", + networkType: enum.NetworkTypeEVM, + latest: 25, + } + store := &stubBlockStore{latestBlock: 20} + statusRegistry := status.NewRegistry() + statusRegistry.RegisterChain("ethereum", "ethereum", config.ChainConfig{ + NetworkId: "eth-mainnet", + InternalCode: "ETH", + Type: enum.NetworkTypeEVM, + }) + rw := newTestRegularWorker(chain, store, 20, 2) + rw.statusRegistry = statusRegistry + + start := rw.determineStartingBlock() + require.Equal(t, uint64(25), start) + + resp := statusRegistry.Snapshot("1.0.0") + require.Len(t, resp.Networks, 1) + require.Equal(t, 1, resp.Networks[0].CatchupRanges) + require.Equal(t, uint64(5), resp.Networks[0].CatchupPendingBlocks) +} + +func TestCatchupWorkerUpdatesCatchupRegistryOnProgressAndCompletion(t *testing.T) { + t.Parallel() + + statusRegistry := status.NewRegistry() + statusRegistry.RegisterChain("ethereum", "ethereum", config.ChainConfig{ + NetworkId: "eth-mainnet", + InternalCode: "ETH", + Type: enum.NetworkTypeEVM, + }) + statusRegistry.SetCatchupRanges("ethereum", []blockstore.CatchupRange{{ + Start: 1, + End: 10, + Current: 0, + }}) + + store := &stubBlockStore{} + cw := &CatchupWorker{ + BaseWorker: &BaseWorker{ + ctx: context.Background(), + cancel: func() {}, + logger: slog.New(slog.NewTextHandler(io.Discard, nil)), + chain: &stubIndexer{name: "ethereum", internalCode: "ETH", networkType: enum.NetworkTypeEVM}, + blockStore: store, + statusRegistry: statusRegistry, + }, + blockRanges: []blockstore.CatchupRange{{ + Start: 1, + End: 10, + Current: 0, + }}, + } + + cw.saveProgress(blockstore.CatchupRange{Start: 1, End: 10, Current: 0}, 5) + + resp := statusRegistry.Snapshot("1.0.0") + require.Len(t, resp.Networks, 1) + require.Equal(t, 1, resp.Networks[0].CatchupRanges) + require.Equal(t, uint64(5), resp.Networks[0].CatchupPendingBlocks) + + err := cw.completeRange(blockstore.CatchupRange{Start: 1, End: 10, Current: 5}) + require.NoError(t, err) + + resp = statusRegistry.Snapshot("1.0.0") + require.Equal(t, 0, resp.Networks[0].CatchupRanges) + require.Equal(t, uint64(0), resp.Networks[0].CatchupPendingBlocks) +} + func testChainConfig() config.ChainConfig { return config.ChainConfig{ PollInterval: time.Millisecond, @@ -222,12 +298,27 @@ func (s *stubIndexer) IsHealthy() bool { } type stubBlockStore struct { - savedLatest []uint64 - failedBlocks []uint64 + latestBlock uint64 + savedLatest []uint64 + failedBlocks []uint64 + savedCatchupRanges []blockstore.CatchupRange + catchupProgress []blockstore.CatchupRange + deleteCatchupCalls []blockstore.CatchupRange + getLatestBlockErr error + getCatchupProgressErr error + saveCatchupRangesErr error + saveCatchupProgressErr error + deleteCatchupErr error } func (s *stubBlockStore) GetLatestBlock(string) (uint64, error) { - return 0, errors.New("not found") + if s.getLatestBlockErr != nil { + return 0, s.getLatestBlockErr + } + if s.latestBlock == 0 { + return 0, errors.New("not found") + } + return s.latestBlock, nil } func (s *stubBlockStore) SaveLatestBlock(_ string, blockNumber uint64) error { @@ -252,19 +343,55 @@ func (s *stubBlockStore) RemoveFailedBlocks(string, []uint64) error { return nil } -func (s *stubBlockStore) SaveCatchupRanges(string, []blockstore.CatchupRange) error { +func (s *stubBlockStore) SaveCatchupRanges(_ string, ranges []blockstore.CatchupRange) error { + if s.saveCatchupRangesErr != nil { + return s.saveCatchupRangesErr + } + s.savedCatchupRanges = append(s.savedCatchupRanges, ranges...) return nil } -func (s *stubBlockStore) SaveCatchupProgress(string, uint64, uint64, uint64) error { +func (s *stubBlockStore) SaveCatchupProgress(_ string, start, end, current uint64) error { + if s.saveCatchupProgressErr != nil { + return s.saveCatchupProgressErr + } + for i := range s.catchupProgress { + if s.catchupProgress[i].Start == start && s.catchupProgress[i].End == end { + s.catchupProgress[i].Current = current + return nil + } + } + s.catchupProgress = append(s.catchupProgress, blockstore.CatchupRange{ + Start: start, + End: end, + Current: current, + }) return nil } func (s *stubBlockStore) GetCatchupProgress(string) ([]blockstore.CatchupRange, error) { - return nil, nil + if s.getCatchupProgressErr != nil { + return nil, s.getCatchupProgressErr + } + return append([]blockstore.CatchupRange(nil), s.catchupProgress...), nil } -func (s *stubBlockStore) DeleteCatchupRange(string, uint64, uint64) error { +func (s *stubBlockStore) DeleteCatchupRange(_ string, start, end uint64) error { + if s.deleteCatchupErr != nil { + return s.deleteCatchupErr + } + s.deleteCatchupCalls = append(s.deleteCatchupCalls, blockstore.CatchupRange{ + Start: start, + End: end, + }) + filtered := s.catchupProgress[:0] + for _, rng := range s.catchupProgress { + if rng.Start == start && rng.End == end { + continue + } + filtered = append(filtered, rng) + } + s.catchupProgress = filtered return nil } diff --git a/internal/worker/rescanner.go b/internal/worker/rescanner.go index adcfd8d..72ff67e 100644 --- a/internal/worker/rescanner.go +++ b/internal/worker/rescanner.go @@ -7,6 +7,7 @@ import ( "time" "github.com/fystack/multichain-indexer/internal/indexer" + "github.com/fystack/multichain-indexer/internal/status" "github.com/fystack/multichain-indexer/pkg/common/config" "github.com/fystack/multichain-indexer/pkg/common/enum" "github.com/fystack/multichain-indexer/pkg/events" @@ -45,6 +46,7 @@ func NewRescannerWorker( emitter events.Emitter, pubkeyStore pubkeystore.Store, failedChan chan FailedBlockEvent, + statusRegistry status.StatusRegistry, ) *RescannerWorker { return &RescannerWorker{ BaseWorker: newWorkerWithMode( @@ -57,6 +59,7 @@ func NewRescannerWorker( pubkeyStore, ModeRescanner, failedChan, + statusRegistry, ), failedBlocks: make(map[uint64]uint8), maxRetries: RescannerMaxRetries, @@ -112,6 +115,7 @@ func (rw *RescannerWorker) addFailedBlock(block uint64, errMsg string) { if _, exists := rw.failedBlocks[block]; !exists { rw.failedBlocks[block] = 0 rw.addSave(block) + rw.statusRegistry.MarkFailedBlock(rw.chain.GetName(), block) rw.logger.Info("Added failed block", "block", block, "error", errMsg) } } @@ -121,6 +125,7 @@ func (rw *RescannerWorker) syncFromKV() error { if err != nil { return err } + rw.statusRegistry.SetFailedBlocks(rw.chain.GetName(), blocks) rw.mu.Lock() defer rw.mu.Unlock() for _, num := range blocks { @@ -141,6 +146,7 @@ func (rw *RescannerWorker) removeBlocks(blocks []uint64) { } rw.mu.Unlock() rw.addRemove(blocks...) + rw.statusRegistry.ClearFailedBlocks(rw.chain.GetName(), blocks) } func (rw *RescannerWorker) incrementRetry(block uint64) { diff --git a/pkg/common/config/chains.go b/pkg/common/config/chains.go index 0f44d3d..505e032 100644 --- a/pkg/common/config/chains.go +++ b/pkg/common/config/chains.go @@ -7,6 +7,12 @@ import ( "dario.cat/mergo" ) +// CanonicalChainKey is the normalized chain identifier used as registry keys, indexer GetName(), +// and ChainConfig.Name after Load (uppercase + trim). Callers should pass raw YAML keys or CLI names. +func CanonicalChainKey(s string) string { + return strings.ToUpper(strings.TrimSpace(s)) +} + // GetChain returns a chain config by name. func (c Chains) GetChain(name string) (ChainConfig, error) { chain, ok := c[name] @@ -72,6 +78,10 @@ func (c Chains) ApplyDefaults(def Defaults) error { if err := mergo.Merge(&chain.Throttle, def.Throttle); err != nil { return fmt.Errorf("merge throttle defaults for %s: %w", name, err) } + if err := mergo.Merge(&chain.Status, def.Status); err != nil { + return fmt.Errorf("merge status defaults for %s: %w", name, err) + } + chain.Status = chain.Status.Normalize() c[name] = chain } return nil diff --git a/pkg/common/config/chains_test.go b/pkg/common/config/chains_test.go new file mode 100644 index 0000000..fb9fea3 --- /dev/null +++ b/pkg/common/config/chains_test.go @@ -0,0 +1,38 @@ +package config + +import ( + "testing" + "time" + + "github.com/fystack/multichain-indexer/pkg/common/enum" + "github.com/stretchr/testify/require" +) + +func TestApplyDefaults_MergesStatusThresholds(t *testing.T) { + t.Parallel() + + chains := Chains{ + "ethereum_mainnet": { + Type: enum.NetworkTypeEVM, + PollInterval: time.Second, + Nodes: []NodeConfig{{URL: "https://example.com"}}, + Status: StatusConfig{ + SlowMaxPendingBlocks: 120, + }, + }, + } + + err := chains.ApplyDefaults(Defaults{ + PollInterval: time.Second, + ReorgRollbackWindow: 20, + Status: StatusConfig{ + HealthyMaxPendingBlocks: 30, + SlowMaxPendingBlocks: 250, + }, + }) + require.NoError(t, err) + + chain := chains["ethereum_mainnet"] + require.Equal(t, uint64(30), chain.Status.HealthyMaxPendingBlocks) + require.Equal(t, uint64(120), chain.Status.SlowMaxPendingBlocks) +} diff --git a/pkg/common/config/load.go b/pkg/common/config/load.go index e2a764a..d1fc7e6 100644 --- a/pkg/common/config/load.go +++ b/pkg/common/config/load.go @@ -45,7 +45,7 @@ func Load(path string) (*Config, error) { for name, chain := range cfg.Chains { // apply name to struct name - chain.Name = strings.ToUpper(name) + chain.Name = CanonicalChainKey(name) chain.NativeDenom = strings.TrimSpace(chain.NativeDenom) if err := validate.Struct(chain); err != nil { return nil, fmt.Errorf("chain %s validation failed: %w", name, err) diff --git a/pkg/common/config/types.go b/pkg/common/config/types.go index 42a002b..510c102 100644 --- a/pkg/common/config/types.go +++ b/pkg/common/config/types.go @@ -14,6 +14,11 @@ const ( ProdEnv Env = "production" ) +const ( + DefaultStatusHealthyMaxPendingBlocks uint64 = 50 + DefaultStatusSlowMaxPendingBlocks uint64 = 250 +) + type Config struct { Version string `yaml:"version"` Environment Env `yaml:"env" validate:"required,oneof=development production"` @@ -27,6 +32,7 @@ type Defaults struct { TwoWayIndexing bool `yaml:"two_way_indexing"` PollInterval time.Duration `yaml:"poll_interval" validate:"required"` ReorgRollbackWindow int `yaml:"reorg_rollback_window" validate:"required,min=1"` + Status StatusConfig `yaml:"status"` Client ClientConfig `yaml:"client"` Throttle Throttle `yaml:"throttle"` Failover rpc.FailoverConfig `yaml:"failover"` @@ -50,12 +56,29 @@ type ChainConfig struct { IndexUTXO bool `yaml:"index_utxo"` DebugTrace bool `yaml:"debug_trace"` TraceThrottle TraceThrottle `yaml:"trace_throttle"` + Status StatusConfig `yaml:"status"` Client ClientConfig `yaml:"client"` Throttle Throttle `yaml:"throttle"` Ton TonConfig `yaml:"ton"` Nodes []NodeConfig `yaml:"nodes" validate:"required,min=1"` } +type StatusConfig struct { + HealthyMaxPendingBlocks uint64 `yaml:"healthy_max_pending_blocks"` + SlowMaxPendingBlocks uint64 `yaml:"slow_max_pending_blocks"` +} + +func (s StatusConfig) Normalize() StatusConfig { + normalized := s + if normalized.HealthyMaxPendingBlocks == 0 { + normalized.HealthyMaxPendingBlocks = DefaultStatusHealthyMaxPendingBlocks + } + if normalized.SlowMaxPendingBlocks == 0 { + normalized.SlowMaxPendingBlocks = DefaultStatusSlowMaxPendingBlocks + } + return normalized +} + type ClientConfig struct { Timeout time.Duration `yaml:"timeout"` MaxRetries int `yaml:"max_retries" validate:"min=0"` diff --git a/pkg/common/config/validate.go b/pkg/common/config/validate.go index db300b5..e00b080 100644 --- a/pkg/common/config/validate.go +++ b/pkg/common/config/validate.go @@ -10,5 +10,13 @@ func validateChainConfig(chain ChainConfig) error { if chain.Type == enum.NetworkTypeCosmos && chain.NativeDenom == "" { return fmt.Errorf("native_denom is required for cosmos chains") } + statusCfg := chain.Status.Normalize() + if statusCfg.HealthyMaxPendingBlocks >= statusCfg.SlowMaxPendingBlocks { + return fmt.Errorf( + "status thresholds invalid: healthy_max_pending_blocks (%d) must be less than slow_max_pending_blocks (%d)", + statusCfg.HealthyMaxPendingBlocks, + statusCfg.SlowMaxPendingBlocks, + ) + } return nil } diff --git a/pkg/common/config/validate_test.go b/pkg/common/config/validate_test.go index b074050..25ff2f2 100644 --- a/pkg/common/config/validate_test.go +++ b/pkg/common/config/validate_test.go @@ -30,3 +30,33 @@ func TestValidateChainConfig_DoesNotRequireNativeDenomForNonCosmos(t *testing.T) }) require.NoError(t, err) } + +func TestValidateChainConfig_AllowsDefaultStatusThresholds(t *testing.T) { + err := validateChainConfig(ChainConfig{ + Type: enum.NetworkTypeEVM, + }) + require.NoError(t, err) +} + +func TestValidateChainConfig_RejectsInvalidStatusThresholds(t *testing.T) { + err := validateChainConfig(ChainConfig{ + Type: enum.NetworkTypeEVM, + Status: StatusConfig{ + HealthyMaxPendingBlocks: 300, + SlowMaxPendingBlocks: 200, + }, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "status thresholds invalid") +} + +func TestValidateChainConfig_RejectsSlowLowerThanDefaultHealthy(t *testing.T) { + err := validateChainConfig(ChainConfig{ + Type: enum.NetworkTypeEVM, + Status: StatusConfig{ + SlowMaxPendingBlocks: 40, + }, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "status thresholds invalid") +} diff --git a/pkg/store/blockstore/store.go b/pkg/store/blockstore/store.go index e946752..69c2729 100644 --- a/pkg/store/blockstore/store.go +++ b/pkg/store/blockstore/store.go @@ -29,6 +29,24 @@ type CatchupRange struct { Current uint64 `json:"current"` } +// CatchupPendingBlocks returns how many blocks remain across all active catchup ranges. +func CatchupPendingBlocks(ranges []CatchupRange) uint64 { + var pending uint64 + for _, r := range ranges { + if r.End < r.Start { + continue + } + if r.Current < r.Start { + pending += r.End - r.Start + 1 + continue + } + if r.Current < r.End { + pending += r.End - r.Current + } + } + return pending +} + func latestBlockKey(chainName string) string { return fmt.Sprintf("%s/%s/%s", BlockStates, chainName, constant.KVPrefixLatestBlock) }