Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,46 @@ When shutdown is signaled, stops all services gracefully.
"""
```

### Testing Style (CRITICAL)

**Always use full equality assertions.** Never assert individual fields when you can assert the whole object. This catches more bugs and replaces multiple lines with a single, complete check.

Bad:
```python
assert len(capture.sent) == 1
_, rpc = capture.sent[0]
assert rpc.control is not None
assert len(rpc.control.prune) == 1
```

Good:
```python
assert capture.sent == [
(peer_id, RPC(control=ControlMessage(prune=[ControlPrune(topic_id=topic, backoff=60)])))
]
```

Bad:
```python
event = queue.get_nowait()
assert event.peer_id == peer_id
assert event.topic == "topic"
```

Good:
```python
assert queue.get_nowait() == GossipsubPeerEvent(
peer_id=peer_id, topic="topic", subscribed=True
)
```

When order is non-deterministic (random peer selection), assert exact RPC shape and exact peer set separately:
```python
expected_rpc = RPC(control=ControlMessage(graft=[ControlGraft(topic_id=topic)]))
assert {p for p, _ in capture.sent} == expected_peers
assert all(rpc == expected_rpc for _, rpc in capture.sent)
```

## Test Framework Structure

**Two types of tests:**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@

from pydantic import model_validator

from lean_spec.subspecs.chain.config import SECONDS_PER_SLOT
from lean_spec.subspecs.chain.config import (
INTERVALS_PER_SLOT,
MILLISECONDS_PER_INTERVAL,
)
from lean_spec.subspecs.containers.attestation import (
Attestation,
AttestationData,
Expand Down Expand Up @@ -239,8 +242,13 @@ def make_fixture(self) -> Self:
# Time advancement may trigger slot boundaries.
# At slot boundaries, pending attestations may become active.
# Always act as aggregator to ensure gossip signatures are aggregated
#
# TickStep.time is a Unix timestamp in seconds.
# Convert to intervals since genesis for the store.
delta_ms = (Uint64(step.time) - store.config.genesis_time) * Uint64(1000)
target_interval = delta_ms // MILLISECONDS_PER_INTERVAL
store, _ = store.on_tick(
Uint64(step.time), has_proposal=False, is_aggregator=True
target_interval, has_proposal=False, is_aggregator=True
)

case BlockStep():
Expand Down Expand Up @@ -268,9 +276,10 @@ def make_fixture(self) -> Self:
# Store rejects blocks from the future.
# This tick includes a block (has proposal).
# Always act as aggregator to ensure gossip signatures are aggregated
slot_duration_seconds = block.slot * SECONDS_PER_SLOT
block_time = store.config.genesis_time + slot_duration_seconds
store, _ = store.on_tick(block_time, has_proposal=True, is_aggregator=True)
target_interval = block.slot * INTERVALS_PER_SLOT
store, _ = store.on_tick(
target_interval, has_proposal=True, is_aggregator=True
)

# Process the block through Store.
# This validates, applies state transition, and updates head.
Expand Down
96 changes: 64 additions & 32 deletions src/lean_spec/subspecs/chain/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@
import logging
from dataclasses import dataclass, field

from lean_spec.subspecs.chain.config import INTERVALS_PER_SLOT
from lean_spec.subspecs.containers.attestation.attestation import (
SignedAggregatedAttestation,
)
from lean_spec.subspecs.sync import SyncService
from lean_spec.types import Uint64

from .clock import Interval, SlotClock

Expand Down Expand Up @@ -112,49 +117,79 @@ async def run(self) -> None:
if total_interval <= last_handled_total_interval:
continue

# Get current wall-clock time as Unix timestamp (may have changed after sleep).
#
# The store expects an absolute timestamp, not intervals.
# It internally converts to intervals.
current_time = self.clock.current_time()

# Tick the store forward to current time.
# Tick the store forward to current interval.
#
# The store advances time interval by interval, performing
# appropriate actions at each interval.
#
# This minimal service does not produce blocks.
# Block production requires validator keys.
new_store, new_aggregated_attestations = self.sync_service.store.on_tick(
time=current_time,
has_proposal=False,
is_aggregator=self.sync_service.is_aggregator,
)
new_aggregated_attestations = await self._tick_to(total_interval)

# Update sync service's store reference.
#
# SyncService owns the authoritative store. After ticking,
# we update its reference so gossip block processing sees
# the updated time.
self.sync_service.store = new_store

# Publish any new aggregated attestations produced this tick
# Publish any new aggregated attestations produced this tick.
if new_aggregated_attestations:
for agg in new_aggregated_attestations:
await self.sync_service.publish_aggregated_attestation(agg)

logger.info(
"Tick: slot=%d interval=%d time=%d head=%s finalized=slot%d",
"Tick: slot=%d interval=%d head=%s finalized=slot%d",
self.clock.current_slot(),
self.clock.total_intervals(),
current_time,
new_store.head.hex(),
new_store.latest_finalized.slot,
total_interval,
self.sync_service.store.head.hex(),
self.sync_service.store.latest_finalized.slot,
)

# Mark this interval as handled.
last_handled_total_interval = total_interval

async def _tick_to(self, target_interval: Interval) -> list[SignedAggregatedAttestation]:
"""
Advance store to target interval with skip and yield.

When the node falls behind by more than one slot, stale intervals
are skipped. Processing every missed interval synchronously would
block the event loop, starving gossip and causing the node to fall
further behind.

Between each remaining interval tick, yields to the event loop so
gossip messages can be processed.

Updates ``self.sync_service.store`` in place after each tick so
concurrent gossip handlers see current time.

Returns aggregated attestations produced during the ticks.
"""
store = self.sync_service.store
all_new_aggregates: list[SignedAggregatedAttestation] = []

# Skip stale intervals when falling behind.
#
# Jump to the last full slot boundary before the target.
# The final slot's worth of intervals still runs normally so that
# aggregation, safe target, and attestation acceptance happen.
gap = target_interval - store.time
if gap > INTERVALS_PER_SLOT:
skip_to = Uint64(target_interval - INTERVALS_PER_SLOT)
store = store.model_copy(update={"time": skip_to})
self.sync_service.store = store

# Tick remaining intervals one at a time.
while store.time < target_interval:
store, new_aggregates = store.tick_interval(
has_proposal=False,
is_aggregator=self.sync_service.is_aggregator,
)
all_new_aggregates.extend(new_aggregates)
self.sync_service.store = store

# Yield to the event loop so gossip handlers can run.
# Re-read store afterward: a gossip handler may have added
# blocks or attestations during the yield.
await asyncio.sleep(0)
store = self.sync_service.store

return all_new_aggregates

async def _initial_tick(self) -> Interval | None:
"""
Perform initial tick to catch up store time to current wall clock.
Expand All @@ -168,18 +203,15 @@ async def _initial_tick(self) -> Interval | None:

# Only tick if we're past genesis.
if current_time >= self.clock.genesis_time:
new_store, _ = self.sync_service.store.on_tick(
time=current_time,
has_proposal=False,
is_aggregator=self.sync_service.is_aggregator,
)
self.sync_service.store = new_store
target_interval = self.clock.total_intervals()

# Use _tick_to for skip + yield during catch-up.
# Discard aggregated attestations from catch-up.
# During initial sync we may be many slots behind.
# Publishing stale aggregations would spam the network.
await self._tick_to(target_interval)

return self.clock.total_intervals()
return target_interval

return None

Expand Down
Loading
Loading