-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsui_monitor.py
More file actions
101 lines (86 loc) · 3.36 KB
/
sui_monitor.py
File metadata and controls
101 lines (86 loc) · 3.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import asyncio
import logging
from dataclasses import dataclass
from typing import Dict, List, Set
import httpx
from config import Config
from state import State
logger = logging.getLogger(__name__)
@dataclass
class SuiValidator:
name: str
sui_address: str
stake_amount: int
next_epoch_stake: int
class SuiMonitor:
def __init__(
self,
config: Config,
state: State,
alerter,
client: httpx.AsyncClient,
) -> None:
self.config = config
self.state = state
self.alerter = alerter
self.client = client
def parse_validators(self, data: dict) -> List[SuiValidator]:
validators = []
for item in data.get("result", {}).get("activeValidators", []):
validators.append(
SuiValidator(
name=item["name"],
sui_address=item["suiAddress"],
stake_amount=int(item["stakingPoolSuiBalance"]),
next_epoch_stake=int(item["nextEpochStake"]),
)
)
return validators
def _stake_drop_fraction(self, previous_stake: int, next_epoch_stake: int) -> float:
if previous_stake == 0:
return 0.0
return (previous_stake - next_epoch_stake) / previous_stake
async def process_validators(self, validators: List[SuiValidator], state: State) -> None:
previous_addresses = state.get_previous_sui_addresses()
previous_stakes = state.get_previous_sui_stakes()
current_addresses = {v.sui_address for v in validators}
# Alert for validators that dropped out of the active set
dropped_addresses = previous_addresses - current_addresses
for address in dropped_addresses:
dropped = SuiValidator(
name=f"Unknown ({address})",
sui_address=address,
stake_amount=previous_stakes.get(address, 0),
next_epoch_stake=0,
)
await self.alerter.alert_sui_drop(dropped)
# Alert for validators with significant stake drops
for validator in validators:
if validator.sui_address not in previous_stakes:
continue
prev_stake = previous_stakes[validator.sui_address]
drop = self._stake_drop_fraction(prev_stake, validator.next_epoch_stake)
if drop >= self.config.sui_stake_drop_threshold:
await self.alerter.alert_sui_drop(validator)
state.set_previous_sui_addresses(current_addresses)
state.set_previous_sui_stakes({v.sui_address: v.stake_amount for v in validators})
state.save()
async def fetch_system_state(self) -> dict:
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "suix_getLatestSuiSystemState",
"params": [],
}
resp = await self.client.post(self.config.sui_rpc_url, json=payload)
resp.raise_for_status()
return resp.json()
async def run(self) -> None:
while True:
try:
data = await self.fetch_system_state()
validators = self.parse_validators(data)
await self.process_validators(validators, self.state)
except Exception as e:
logger.error("SUI monitor error: %s", e)
await asyncio.sleep(self.config.poll_interval_sui)