Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions cmd/indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/alecthomas/kong"
"gorm.io/gorm"

"github.com/fystack/multichain-indexer/internal/status"
"github.com/fystack/multichain-indexer/internal/worker"
"github.com/fystack/multichain-indexer/pkg/addressbloomfilter"
"github.com/fystack/multichain-indexer/pkg/common/config"
Expand Down Expand Up @@ -212,7 +213,7 @@ func runIndexer(chains []string, configPath string, debug, manual, catchup, from
managerCfg,
)

healthServer := startHealthServer(cfg.Services.Port, cfg)
healthServer := startHealthServer(cfg.Services.Port, cfg, manager)

// Start all workers
logger.Info("Starting all workers")
Expand Down Expand Up @@ -246,7 +247,7 @@ type HealthResponse struct {
Version string `json:"version"`
}

func startHealthServer(port int, cfg *config.Config) *http.Server {
func startHealthServer(port int, cfg *config.Config, manager *worker.Manager) *http.Server {
mux := http.NewServeMux()

version := cfg.Version
Expand All @@ -266,13 +267,33 @@ func startHealthServer(port int, cfg *config.Config) *http.Server {
json.NewEncoder(w).Encode(response)
})

mux.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

response := status.StatusResponse{
Timestamp: time.Now().UTC(),
Version: version,
Networks: []status.NetworkStatus{},
}
if manager != nil {
response = manager.StatusSnapshot(version)
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(response)
})

server := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: mux,
}

go func() {
logger.Info("Health check server started", "port", port, "endpoint", "/health")
logger.Info("Health check server started", "port", port, "endpoints", []string{"/health", "/status"})
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Error("Health server failed to start", "error", err)
}
Expand Down
12 changes: 11 additions & 1 deletion configs/config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ defaults:
poll_interval: "5s" # how often to poll for new blocks
reorg_rollback_window: 20 # number of blocks to roll back on reorg
two_way_indexing: false # enable two-way indexing for all chains
status:
healthy_max_pending_blocks: 50
slow_max_pending_blocks: 250
client:
timeout: "20s" # RPC timeout per request
max_retries: 3 # max retries per RPC call
Expand Down Expand Up @@ -38,6 +41,10 @@ chains:
throttle:
rps: 5
burst: 8
status:
# Optional per-chain override for /status health condition.
healthy_max_pending_blocks: 80
slow_max_pending_blocks: 400

ethereum_mainnet:
network_id: "ethereum_mainnet"
Expand Down Expand Up @@ -239,7 +246,10 @@ chains:
batch_size: 200
concurrency: 12
parallel: true

status:
healthy_max_pending_blocks: 50
slow_max_pending_blocks: 250

osmosis_mainnet:
network_id: "osmosis-1"
internal_code: "OSMO_MAINNET"
Expand Down
302 changes: 302 additions & 0 deletions internal/status/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
package status

import (
"sort"
"strings"
"sync"
"time"

"github.com/fystack/multichain-indexer/pkg/common/config"
"github.com/fystack/multichain-indexer/pkg/store/blockstore"
)

type Registry struct {
mu sync.RWMutex
chains map[string]*chainState
}

func NewRegistry() *Registry {
return &Registry{
chains: make(map[string]*chainState),
}
}

func (r *Registry) RegisterChain(chainKey, chainName string, chainCfg config.ChainConfig) {
key := config.CanonicalChainKey(chainKey)
if key == "" {
return
}

r.mu.Lock()
defer r.mu.Unlock()

state, exists := r.chains[key]
if !exists {
state = &chainState{
failedBlocks: make(map[uint64]struct{}),
catchupRanges: make(map[catchupRangeKey]uint64),
}
r.chains[key] = state
}

if strings.TrimSpace(chainCfg.NetworkId) != "" {
state.networkID = chainCfg.NetworkId
} else {
state.networkID = chainName
}
state.chainName = chainName
state.internalCode = chainCfg.InternalCode
state.networkType = string(chainCfg.Type)
state.thresholds = chainCfg.Status.Normalize()
}

func (r *Registry) UpdateHead(chainKey string, latestBlock, indexedBlock uint64, indexedAt time.Time) {
key := config.CanonicalChainKey(chainKey)
if key == "" {
return
}

r.mu.Lock()
defer r.mu.Unlock()

state := r.ensureStateLocked(key)
state.latestBlock = latestBlock
state.indexedBlock = indexedBlock
if !indexedAt.IsZero() {
state.lastIndexedAt = indexedAt.UTC()
}
}

func (r *Registry) MarkFailedBlock(chainKey string, blockNumber uint64) {
key := config.CanonicalChainKey(chainKey)
if key == "" || blockNumber == 0 {
return
}

r.mu.Lock()
defer r.mu.Unlock()

state := r.ensureStateLocked(key)
state.failedBlocks[blockNumber] = struct{}{}
}

func (r *Registry) ClearFailedBlocks(chainKey string, blockNumbers []uint64) {
key := config.CanonicalChainKey(chainKey)
if key == "" || len(blockNumbers) == 0 {
return
}

r.mu.Lock()
defer r.mu.Unlock()

state := r.ensureStateLocked(key)
for _, block := range blockNumbers {
delete(state.failedBlocks, block)
}
}

func (r *Registry) SetFailedBlocks(chainKey string, blockNumbers []uint64) {
key := config.CanonicalChainKey(chainKey)
if key == "" {
return
}

r.mu.Lock()
defer r.mu.Unlock()

state := r.ensureStateLocked(key)
state.failedBlocks = make(map[uint64]struct{}, len(blockNumbers))
for _, block := range blockNumbers {
if block == 0 {
continue
}
state.failedBlocks[block] = struct{}{}
}
}

func (r *Registry) SetCatchupRanges(chainKey string, ranges []blockstore.CatchupRange) {
key := config.CanonicalChainKey(chainKey)
if key == "" {
return
}

r.mu.Lock()
defer r.mu.Unlock()

state := r.ensureStateLocked(key)
state.catchupPendingBlocks = 0
state.catchupRanges = make(map[catchupRangeKey]uint64, len(ranges))
for _, rng := range ranges {
upsertCatchupRangeLocked(state, rng)
}
}

func (r *Registry) UpsertCatchupRanges(chainKey string, ranges []blockstore.CatchupRange) {
key := config.CanonicalChainKey(chainKey)
if key == "" || len(ranges) == 0 {
return
}

r.mu.Lock()
defer r.mu.Unlock()

state := r.ensureStateLocked(key)
for _, rng := range ranges {
upsertCatchupRangeLocked(state, rng)
}
}

func (r *Registry) DeleteCatchupRange(chainKey string, start, end uint64) {
key := config.CanonicalChainKey(chainKey)
if key == "" {
return
}

rangeKey, ok := makeCatchupRangeKey(start, end)
if !ok {
return
}

r.mu.Lock()
defer r.mu.Unlock()

state := r.ensureStateLocked(key)
if pending, exists := state.catchupRanges[rangeKey]; exists {
if state.catchupPendingBlocks >= pending {
state.catchupPendingBlocks -= pending
} else {
state.catchupPendingBlocks = 0
}
delete(state.catchupRanges, rangeKey)
}
}

func (r *Registry) Snapshot(version string) StatusResponse {
r.mu.RLock()
states := make([]chainSnapshot, 0, len(r.chains))
for _, state := range r.chains {
states = append(states, chainSnapshot{
networkID: state.networkID,
chainName: state.chainName,
internalCode: state.internalCode,
networkType: state.networkType,
thresholds: state.thresholds,
latestBlock: state.latestBlock,
indexedBlock: state.indexedBlock,
lastIndexedAt: state.lastIndexedAt,
failedBlocksCount: len(state.failedBlocks),
catchupRangesCount: len(state.catchupRanges),
catchupPendingBlocks: state.catchupPendingBlocks,
})
}
r.mu.RUnlock()

networks := make([]NetworkStatus, 0, len(states))
for _, state := range states {
headGap := uint64(0)
if state.latestBlock > state.indexedBlock {
headGap = state.latestBlock - state.indexedBlock
}
pending := headGap + state.catchupPendingBlocks

item := NetworkStatus{
NetworkID: state.networkID,
ChainName: state.chainName,
InternalCode: state.internalCode,
NetworkType: state.networkType,
Health: deriveHealth(pending, state.thresholds),
LatestBlock: state.latestBlock,
IndexedBlock: state.indexedBlock,
PendingBlocks: pending,
HeadGap: headGap,
CatchupPendingBlocks: state.catchupPendingBlocks,
CatchupRanges: state.catchupRangesCount,
FailedBlocks: state.failedBlocksCount,
}
if !state.lastIndexedAt.IsZero() {
t := state.lastIndexedAt
item.LastIndexedAt = &t
}
networks = append(networks, item)
}

sort.Slice(networks, func(i, j int) bool {
return networks[i].ChainName < networks[j].ChainName
})

return StatusResponse{
Timestamp: time.Now().UTC(),
Version: version,
Networks: networks,
}
}

func (r *Registry) ensureStateLocked(key string) *chainState {
state, exists := r.chains[key]
if exists {
return state
}

state = &chainState{
networkID: strings.ToLower(key),
chainName: strings.ToLower(key),
internalCode: key,
thresholds: config.StatusConfig{}.Normalize(),
failedBlocks: make(map[uint64]struct{}),
catchupRanges: make(map[catchupRangeKey]uint64),
}
r.chains[key] = state
return state
}

func makeCatchupRangeKey(start, end uint64) (catchupRangeKey, bool) {
if start == 0 || end < start {
return catchupRangeKey{}, false
}
return catchupRangeKey{start: start, end: end}, true
}

func catchupPendingForRange(rng blockstore.CatchupRange) uint64 {
if rng.End < rng.Start || rng.Start == 0 {
return 0
}
if rng.Current < rng.Start {
return rng.End - rng.Start + 1
}
if rng.Current < rng.End {
return rng.End - rng.Current
}
return 0
}

func upsertCatchupRangeLocked(state *chainState, rng blockstore.CatchupRange) {
rangeKey, ok := makeCatchupRangeKey(rng.Start, rng.End)
if !ok {
return
}
if state.catchupRanges == nil {
state.catchupRanges = make(map[catchupRangeKey]uint64)
}

newPending := catchupPendingForRange(rng)
oldPending := state.catchupRanges[rangeKey]
state.catchupRanges[rangeKey] = newPending

if state.catchupPendingBlocks >= oldPending {
state.catchupPendingBlocks -= oldPending
} else {
state.catchupPendingBlocks = 0
}
state.catchupPendingBlocks += newPending
}

func deriveHealth(pendingBlocks uint64, thresholds config.StatusConfig) HealthStatus {
switch {
case pendingBlocks < thresholds.HealthyMaxPendingBlocks:
return HealthHealthy
case pendingBlocks < thresholds.SlowMaxPendingBlocks:
return HealthSlow
default:
return HealthDegraded
}
}
Loading
Loading