diff --git a/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py b/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py index 5633a69b..a89a33f7 100644 --- a/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py +++ b/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py @@ -239,7 +239,7 @@ def make_fixture(self) -> Self: # Time advancement may trigger slot boundaries. # At slot boundaries, pending attestations may become active. # Always act as aggregator to ensure gossip signatures are aggregated - store = store.on_tick( + store, _ = store.on_tick( Uint64(step.time), has_proposal=False, is_aggregator=True ) @@ -270,7 +270,7 @@ def make_fixture(self) -> Self: # Always act as aggregator to ensure gossip signatures are aggregated slot_duration_seconds = block.slot * SECONDS_PER_SLOT block_time = store.config.genesis_time + slot_duration_seconds - store = store.on_tick(block_time, has_proposal=True, is_aggregator=True) + store, _ = store.on_tick(block_time, has_proposal=True, is_aggregator=True) # Process the block through Store. # This validates, applies state transition, and updates head. @@ -399,7 +399,7 @@ def _build_block_from_spec( # Aggregate gossip signatures and merge into known payloads. # This makes recently gossiped attestations available for block construction. - aggregation_store = working_store.aggregate_committee_signatures() + aggregation_store, _ = working_store.aggregate_committee_signatures() merged_store = aggregation_store.accept_new_attestations() # Two sources of attestations: diff --git a/src/lean_spec/subspecs/chain/service.py b/src/lean_spec/subspecs/chain/service.py index 17a86e20..263db37d 100644 --- a/src/lean_spec/subspecs/chain/service.py +++ b/src/lean_spec/subspecs/chain/service.py @@ -125,9 +125,10 @@ async def run(self) -> None: # # This minimal service does not produce blocks. # Block production requires validator keys. - new_store = self.sync_service.store.on_tick( + new_store, new_aggregated_attestations = self.sync_service.store.on_tick( time=current_time, has_proposal=False, + is_aggregator=self.sync_service.is_aggregator, ) # Update sync service's store reference. @@ -137,6 +138,11 @@ async def run(self) -> None: # the updated time. self.sync_service.store = new_store + # Publish any new aggregated attestations produced this tick + if new_aggregated_attestations: + for agg in new_aggregated_attestations: + await self.sync_service.publish_aggregated_attestation(agg) + logger.info( "Tick: slot=%d interval=%d time=%d head=%s finalized=slot%d", self.clock.current_slot(), @@ -162,11 +168,17 @@ async def _initial_tick(self) -> Interval | None: # Only tick if we're past genesis. if current_time >= self.clock.genesis_time: - new_store = self.sync_service.store.on_tick( + new_store, _ = self.sync_service.store.on_tick( time=current_time, has_proposal=False, + is_aggregator=self.sync_service.is_aggregator, ) self.sync_service.store = new_store + + # Discard aggregated attestations from catch-up. + # During initial sync we may be many slots behind. + # Publishing stale aggregations would spam the network. + return self.clock.total_intervals() return None diff --git a/src/lean_spec/subspecs/forkchoice/store.py b/src/lean_spec/subspecs/forkchoice/store.py index 4697d54e..b23c8029 100644 --- a/src/lean_spec/subspecs/forkchoice/store.py +++ b/src/lean_spec/subspecs/forkchoice/store.py @@ -494,7 +494,6 @@ def on_gossip_aggregated_attestation( new_attestation_data_by_root = dict(self.attestation_data_by_root) new_attestation_data_by_root[data_root] = data - store = self for vid in validator_ids: # Update Proof Map # @@ -503,7 +502,7 @@ def on_gossip_aggregated_attestation( new_aggregated_payloads.setdefault(key, []).append(proof) # Return store with updated aggregated payloads and attestation data - return store.model_copy( + return self.model_copy( update={ "latest_new_aggregated_payloads": new_aggregated_payloads, "attestation_data_by_root": new_attestation_data_by_root, @@ -943,7 +942,7 @@ def update_safe_target(self) -> "Store": return self.model_copy(update={"safe_target": safe_target}) - def aggregate_committee_signatures(self) -> "Store": + def aggregate_committee_signatures(self) -> tuple["Store", list[SignedAggregatedAttestation]]: """ Aggregate committee signatures for attestations in committee_signatures. @@ -951,7 +950,7 @@ def aggregate_committee_signatures(self) -> "Store": Attestations are reconstructed from gossip_signatures using attestation_data_by_root. Returns: - New Store with updated latest_new_aggregated_payloads. + Tuple of (new Store with updated payloads, list of new SignedAggregatedAttestation). """ new_aggregated_payloads = dict(self.latest_new_aggregated_payloads) @@ -976,13 +975,15 @@ def aggregate_committee_signatures(self) -> "Store": committee_signatures, ) - # iterate to broadcast aggregated attestations + # Create list of aggregated attestations for broadcasting + new_aggregates: list[SignedAggregatedAttestation] = [] for aggregated_attestation, aggregated_signature in aggregated_results: - _ = SignedAggregatedAttestation( - data=aggregated_attestation.data, - proof=aggregated_signature, + new_aggregates.append( + SignedAggregatedAttestation( + data=aggregated_attestation.data, + proof=aggregated_signature, + ) ) - # Note: here we should broadcast the aggregated signature to committee_aggregators topic # Compute new aggregated payloads new_gossip_sigs = dict(self.gossip_signatures) @@ -1004,9 +1005,11 @@ def aggregate_committee_signatures(self) -> "Store": "latest_new_aggregated_payloads": new_aggregated_payloads, "gossip_signatures": new_gossip_sigs, } - ) + ), new_aggregates - def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Store": + def tick_interval( + self, has_proposal: bool, is_aggregator: bool = False + ) -> tuple["Store", list[SignedAggregatedAttestation]]: """ Advance store time by one interval and perform interval-specific actions. @@ -1048,11 +1051,12 @@ def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Sto is_aggregator: Whether the node is an aggregator. Returns: - New Store with advanced time and interval-specific updates applied. + Tuple of (new store with advanced time, list of new signed aggregated attestation). """ # Advance time by one interval store = self.model_copy(update={"time": self.time + Uint64(1)}) current_interval = store.time % INTERVALS_PER_SLOT + new_aggregates: list[SignedAggregatedAttestation] = [] if current_interval == Uint64(0): # Start of slot - process attestations if proposal exists @@ -1061,7 +1065,7 @@ def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Sto elif current_interval == Uint64(2): # Aggregation interval - aggregators create proofs if is_aggregator: - store = store.aggregate_committee_signatures() + store, new_aggregates = store.aggregate_committee_signatures() elif current_interval == Uint64(3): # Fast confirm - update safe target based on received proofs store = store.update_safe_target() @@ -1069,9 +1073,11 @@ def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Sto # End of slot - accept accumulated attestations store = store.accept_new_attestations() - return store + return store, new_aggregates - def on_tick(self, time: Uint64, has_proposal: bool, is_aggregator: bool = False) -> "Store": + def on_tick( + self, time: Uint64, has_proposal: bool, is_aggregator: bool = False + ) -> tuple["Store", list[SignedAggregatedAttestation]]: """ Advance forkchoice store time to given timestamp. @@ -1085,7 +1091,8 @@ def on_tick(self, time: Uint64, has_proposal: bool, is_aggregator: bool = False) is_aggregator: Whether the node is an aggregator. Returns: - New Store with time advanced and all interval actions performed. + Tuple of (new store with time advanced, + list of all produced signed aggregated attestation). """ # Calculate target time in intervals time_delta_ms = (time - self.config.genesis_time) * Uint64(1000) @@ -1093,14 +1100,16 @@ def on_tick(self, time: Uint64, has_proposal: bool, is_aggregator: bool = False) # Tick forward one interval at a time store = self + all_new_aggregates: list[SignedAggregatedAttestation] = [] while store.time < tick_interval_time: # Check if proposal should be signaled for next interval should_signal_proposal = has_proposal and (store.time + Uint64(1)) == tick_interval_time # Advance by one interval with appropriate signaling - store = store.tick_interval(should_signal_proposal, is_aggregator) + store, new_aggregates = store.tick_interval(should_signal_proposal, is_aggregator) + all_new_aggregates.extend(new_aggregates) - return store + return store, all_new_aggregates def get_proposal_head(self, slot: Slot) -> tuple["Store", Bytes32]: """ @@ -1128,7 +1137,7 @@ def get_proposal_head(self, slot: Slot) -> tuple["Store", Bytes32]: slot_time = self.config.genesis_time + slot_duration_seconds # Advance time to current slot (ticking intervals) - store = self.on_tick(slot_time, True) + store, _ = self.on_tick(slot_time, True) # Process any pending attestations before proposal store = store.accept_new_attestations() @@ -1192,7 +1201,7 @@ def get_attestation_target(self) -> Checkpoint: # Create checkpoint from selected target block target_block = self.blocks[target_block_root] - return Checkpoint(root=hash_tree_root(target_block), slot=target_block.slot) + return Checkpoint(root=target_block_root, slot=target_block.slot) def produce_attestation_data(self, slot: Slot) -> AttestationData: """ @@ -1299,7 +1308,7 @@ def produce_block_with_signatures( # # The builder iteratively collects valid attestations. # It returns the final block, post-state, and signature proofs. - final_block, final_post_state, _, signatures = head_state.build_block( + final_block, final_post_state, collected_attestations, signatures = head_state.build_block( slot=slot, proposer_index=validator_index, parent_root=head_root, diff --git a/src/lean_spec/subspecs/networking/client/event_source.py b/src/lean_spec/subspecs/networking/client/event_source.py index 9cc62ec0..fdc7e671 100644 --- a/src/lean_spec/subspecs/networking/client/event_source.py +++ b/src/lean_spec/subspecs/networking/client/event_source.py @@ -106,7 +106,7 @@ from lean_spec.snappy import SnappyDecompressionError, frame_decompress from lean_spec.subspecs.containers import SignedBlockWithAttestation -from lean_spec.subspecs.containers.attestation import SignedAttestation +from lean_spec.subspecs.containers.attestation import SignedAggregatedAttestation, SignedAttestation from lean_spec.subspecs.networking.config import ( GOSSIPSUB_DEFAULT_PROTOCOL_ID, GOSSIPSUB_PROTOCOL_ID_V12, @@ -130,6 +130,7 @@ ) from lean_spec.subspecs.networking.reqresp.message import Status from lean_spec.subspecs.networking.service.events import ( + GossipAggregatedAttestationEvent, GossipAttestationEvent, GossipBlockEvent, NetworkEvent, @@ -235,7 +236,7 @@ def decode_message( self, topic_str: str, compressed_data: bytes, - ) -> SignedBlockWithAttestation | SignedAttestation | None: + ) -> SignedBlockWithAttestation | SignedAttestation | SignedAggregatedAttestation | None: """ Decode a gossip message from topic and compressed data. @@ -303,6 +304,8 @@ def decode_message( return SignedBlockWithAttestation.decode_bytes(ssz_bytes) case TopicKind.ATTESTATION_SUBNET: return SignedAttestation.decode_bytes(ssz_bytes) + case TopicKind.AGGREGATED_ATTESTATION: + return SignedAggregatedAttestation.decode_bytes(ssz_bytes) except SSZSerializationError as e: raise GossipMessageError(f"SSZ decode failed: {e}") from e @@ -726,6 +729,9 @@ async def _handle_gossipsub_message(self, event: GossipsubMessageEvent) -> None: case TopicKind.ATTESTATION_SUBNET: if isinstance(message, SignedAttestation): await self._emit_gossip_attestation(message, event.peer_id) + case TopicKind.AGGREGATED_ATTESTATION: + if isinstance(message, SignedAggregatedAttestation): + await self._emit_gossip_aggregated_attestation(message, event.peer_id) logger.debug("Processed gossipsub message %s from %s", topic.kind.value, event.peer_id) @@ -1078,6 +1084,25 @@ async def _emit_gossip_attestation( GossipAttestationEvent(attestation=attestation, peer_id=peer_id, topic=topic) ) + async def _emit_gossip_aggregated_attestation( + self, + signed_attestation: SignedAggregatedAttestation, + peer_id: PeerId, + ) -> None: + """ + Emit a gossip aggregated attestation event. + + Args: + signed_attestation: Aggregated attestation received from gossip. + peer_id: Peer that sent it. + """ + topic = GossipTopic(kind=TopicKind.AGGREGATED_ATTESTATION, fork_digest=self._fork_digest) + await self._events.put( + GossipAggregatedAttestationEvent( + signed_attestation=signed_attestation, peer_id=peer_id, topic=topic + ) + ) + async def _accept_streams(self, peer_id: PeerId, conn: QuicConnection) -> None: """ Accept incoming streams from a connection. @@ -1370,6 +1395,15 @@ async def _handle_gossip_stream(self, peer_id: PeerId, stream: InboundStreamProt # Type mismatch indicates a bug in decode_message. logger.warning("Attestation topic but got %s", type(message).__name__) + case TopicKind.AGGREGATED_ATTESTATION: + if isinstance(message, SignedAggregatedAttestation): + await self._emit_gossip_aggregated_attestation(message, peer_id) + else: + logger.warning( + "Aggregated attestation topic but got %s", + type(message).__name__, + ) + logger.debug("Received gossip %s from %s", topic.kind.value, peer_id) except GossipMessageError as e: diff --git a/src/lean_spec/subspecs/networking/service/events.py b/src/lean_spec/subspecs/networking/service/events.py index 4a18eeb6..9095cfca 100644 --- a/src/lean_spec/subspecs/networking/service/events.py +++ b/src/lean_spec/subspecs/networking/service/events.py @@ -24,7 +24,7 @@ from dataclasses import dataclass from lean_spec.subspecs.containers import SignedBlockWithAttestation -from lean_spec.subspecs.containers.attestation import SignedAttestation +from lean_spec.subspecs.containers.attestation import SignedAggregatedAttestation, SignedAttestation from lean_spec.subspecs.networking.gossipsub.topic import GossipTopic from lean_spec.subspecs.networking.reqresp.message import Status from lean_spec.subspecs.networking.transport import PeerId @@ -67,6 +67,25 @@ class GossipAttestationEvent: """Topic the attestation was received on (includes fork digest).""" +@dataclass(frozen=True, slots=True) +class GossipAggregatedAttestationEvent: + """ + Aggregated attestation received via gossip subscription. + + Fired when a signed aggregated attestation arrives from the gossipsub network. + Aggregates contain multiple validator votes combined into a single proof. + """ + + signed_attestation: SignedAggregatedAttestation + """The signed aggregated attestation.""" + + peer_id: PeerId + """Peer that propagated this aggregated attestation to us.""" + + topic: GossipTopic + """Topic the aggregated attestation was received on.""" + + @dataclass(frozen=True, slots=True) class PeerStatusEvent: """ @@ -113,6 +132,7 @@ class PeerDisconnectedEvent: NetworkEvent = ( GossipBlockEvent | GossipAttestationEvent + | GossipAggregatedAttestationEvent | PeerStatusEvent | PeerConnectedEvent | PeerDisconnectedEvent diff --git a/src/lean_spec/subspecs/networking/service/service.py b/src/lean_spec/subspecs/networking/service/service.py index 57f8fb1a..2a0f020d 100644 --- a/src/lean_spec/subspecs/networking/service/service.py +++ b/src/lean_spec/subspecs/networking/service/service.py @@ -28,13 +28,14 @@ from lean_spec.snappy import frame_compress from lean_spec.subspecs.containers import SignedBlockWithAttestation -from lean_spec.subspecs.containers.attestation import SignedAttestation +from lean_spec.subspecs.containers.attestation import SignedAggregatedAttestation, SignedAttestation from lean_spec.subspecs.networking.client.event_source import LiveNetworkEventSource from lean_spec.subspecs.networking.gossipsub.topic import GossipTopic from lean_spec.subspecs.networking.peer.info import PeerInfo from lean_spec.subspecs.networking.types import ConnectionState from .events import ( + GossipAggregatedAttestationEvent, GossipAttestationEvent, GossipBlockEvent, NetworkEvent, @@ -146,13 +147,17 @@ async def _handle_event(self, event: NetworkEvent) -> None: ) await self.sync_service.on_gossip_block(block, peer_id) - case GossipAttestationEvent(attestation=attestation, peer_id=peer_id): + case GossipAttestationEvent(attestation=attestation): # # SyncService will validate signature and update forkchoice. - await self.sync_service.on_gossip_attestation( - attestation=attestation, - peer_id=peer_id, - ) + await self.sync_service.on_gossip_attestation(attestation) + + case GossipAggregatedAttestationEvent(signed_attestation=att): + # Route aggregated attestations to sync service. + # + # Aggregates contain multiple validator votes and are used + # to advance justification and finalization. + await self.sync_service.on_gossip_aggregated_attestation(att) case PeerStatusEvent(peer_id=peer_id, status=status): # Route peer status updates to sync service. @@ -234,3 +239,19 @@ async def publish_attestation(self, attestation: SignedAttestation, subnet_id: i await self.event_source.publish(str(topic), compressed) logger.debug("Published attestation for slot %s", attestation.message.slot) + + async def publish_aggregated_attestation( + self, signed_attestation: SignedAggregatedAttestation + ) -> None: + """ + Publish an aggregated attestation to the aggregation gossip topic. + + Args: + signed_attestation: Aggregated attestation to publish. + """ + topic = GossipTopic.committee_aggregation(self.fork_digest) + ssz_bytes = signed_attestation.encode_bytes() + compressed = frame_compress(ssz_bytes) + + await self.event_source.publish(str(topic), compressed) + logger.debug("Published aggregated attestation for slot %s", signed_attestation.data.slot) diff --git a/src/lean_spec/subspecs/node/node.py b/src/lean_spec/subspecs/node/node.py index 55bc09c7..82178da2 100644 --- a/src/lean_spec/subspecs/node/node.py +++ b/src/lean_spec/subspecs/node/node.py @@ -21,7 +21,7 @@ from lean_spec.subspecs.chain import SlotClock from lean_spec.subspecs.chain.config import ATTESTATION_COMMITTEE_COUNT, INTERVALS_PER_SLOT from lean_spec.subspecs.chain.service import ChainService -from lean_spec.subspecs.containers import Block, BlockBody, State +from lean_spec.subspecs.containers import Block, BlockBody, SignedBlockWithAttestation, State from lean_spec.subspecs.containers.attestation import SignedAttestation from lean_spec.subspecs.containers.block.types import AggregatedAttestations from lean_spec.subspecs.containers.slot import Slot @@ -238,6 +238,12 @@ def from_genesis(cls, config: NodeConfig) -> Node: is_aggregator=config.is_aggregator, ) + # Wire up aggregated attestation publishing. + # + # SyncService delegates aggregate publishing to NetworkService + # via a callback, avoiding a circular dependency. + sync_service._publish_agg_fn = network_service.publish_aggregated_attestation + # Create API server if configured api_server: ApiServer | None = None if config.api_config is not None: @@ -260,12 +266,19 @@ def from_genesis(cls, config: NodeConfig) -> Node: async def publish_attestation_wrapper(attestation: SignedAttestation) -> None: subnet_id = attestation.validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT) await network_service.publish_attestation(attestation, subnet_id) + # Also route locally so we can aggregate our own attestation + await sync_service.on_gossip_attestation(attestation) + + async def publish_block_wrapper(block: SignedBlockWithAttestation) -> None: + await network_service.publish_block(block) + # Also route locally so we update our own store + await sync_service.on_gossip_block(block, peer_id=None) validator_service = ValidatorService( sync_service=sync_service, clock=clock, registry=config.validator_registry, - on_block=network_service.publish_block, + on_block=publish_block_wrapper, on_attestation=publish_attestation_wrapper, ) diff --git a/src/lean_spec/subspecs/sync/block_cache.py b/src/lean_spec/subspecs/sync/block_cache.py index 2b8de0b0..fc803df7 100644 --- a/src/lean_spec/subspecs/sync/block_cache.py +++ b/src/lean_spec/subspecs/sync/block_cache.py @@ -103,7 +103,7 @@ class PendingBlock: slot order to ensure parents are processed before children. """ - received_from: PeerId + received_from: PeerId | None """ Peer that sent this block. @@ -112,6 +112,7 @@ class PendingBlock: - If invalid, they get penalized. This creates incentives for good behavior. + None for self-produced blocks. """ received_at: float = field(default_factory=time) @@ -165,7 +166,7 @@ def __contains__(self, root: Bytes32) -> bool: def add( self, block: SignedBlockWithAttestation, - peer: PeerId, + peer: PeerId | None, backfill_depth: int = 0, ) -> PendingBlock: """ diff --git a/src/lean_spec/subspecs/sync/head_sync.py b/src/lean_spec/subspecs/sync/head_sync.py index b14d0592..96610c9e 100644 --- a/src/lean_spec/subspecs/sync/head_sync.py +++ b/src/lean_spec/subspecs/sync/head_sync.py @@ -143,7 +143,7 @@ class HeadSync: async def on_gossip_block( self, block: SignedBlockWithAttestation, - peer_id: PeerId, + peer_id: PeerId | None, store: Store, ) -> tuple[HeadSyncResult, Store]: """ @@ -217,7 +217,7 @@ async def on_gossip_block( async def _process_block_with_descendants( self, block: SignedBlockWithAttestation, - peer_id: PeerId, + peer_id: PeerId | None, store: Store, ) -> tuple[HeadSyncResult, Store]: """ @@ -283,7 +283,7 @@ async def _process_cached_descendants( self, parent_root: Bytes32, store: Store, - peer_id: PeerId, + peer_id: PeerId | None, ) -> int: """ Process any cached blocks that descend from the given parent. @@ -351,7 +351,7 @@ async def _process_cached_descendants( async def _cache_and_backfill( self, block: SignedBlockWithAttestation, - peer_id: PeerId, + peer_id: PeerId | None, store: Store, ) -> tuple[HeadSyncResult, Store]: """ diff --git a/src/lean_spec/subspecs/sync/service.py b/src/lean_spec/subspecs/sync/service.py index 299791f6..ae9498a7 100644 --- a/src/lean_spec/subspecs/sync/service.py +++ b/src/lean_spec/subspecs/sync/service.py @@ -38,14 +38,15 @@ import asyncio import logging -from collections.abc import Callable +from collections.abc import Callable, Coroutine from dataclasses import dataclass, field -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from lean_spec.subspecs import metrics from lean_spec.subspecs.chain.clock import SlotClock from lean_spec.subspecs.containers import ( Block, + SignedAggregatedAttestation, SignedAttestation, SignedBlockWithAttestation, ) @@ -67,6 +68,8 @@ BlockProcessor = Callable[[Store, SignedBlockWithAttestation], Store] +PublishAggFn = Callable[[SignedAggregatedAttestation], Coroutine[Any, Any, None]] + def default_block_processor( store: Store, @@ -76,6 +79,10 @@ def default_block_processor( return store.on_block(block) +async def _noop_publish_agg(signed_attestation: SignedAggregatedAttestation) -> None: + """No-op default for aggregated attestation publishing.""" + + @dataclass(slots=True) class SyncProgress: """ @@ -156,6 +163,9 @@ class SyncService: process_block: BlockProcessor = field(default=default_block_processor) """Block processor function. Defaults to Store.on_block().""" + _publish_agg_fn: PublishAggFn = field(default=_noop_publish_agg) + """Callback for publishing aggregated attestations to the network.""" + _state: SyncState = field(default=SyncState.IDLE) """Current sync state.""" @@ -346,7 +356,7 @@ async def on_peer_status(self, peer_id: PeerId, status: Status) -> None: async def on_gossip_block( self, block: SignedBlockWithAttestation, - peer_id: PeerId, + peer_id: PeerId | None, ) -> None: """ Handle block received via gossip. @@ -402,7 +412,6 @@ async def on_gossip_block( async def on_gossip_attestation( self, attestation: SignedAttestation, - peer_id: PeerId, # noqa: ARG002 ) -> None: """ Handle attestation received via gossip. @@ -416,7 +425,6 @@ async def on_gossip_attestation( Args: attestation: The signed attestation received. - peer_id: The peer that propagated the attestation (unused for now). """ # Guard: Only process gossip in states that accept it. # @@ -452,6 +460,42 @@ async def on_gossip_attestation( # These are expected during normal operation and don't indicate bugs. pass + async def on_gossip_aggregated_attestation( + self, + signed_attestation: SignedAggregatedAttestation, + ) -> None: + """ + Handle aggregated attestation received via gossip. + + Aggregated attestations are collections of individual votes for the same + target, signed by an aggregator. They provide efficient propagation of + consensus weight. + + Args: + signed_attestation: The signed aggregated attestation received. + """ + if not self._state.accepts_gossip: + return + + try: + self.store = self.store.on_gossip_aggregated_attestation(signed_attestation) + except (AssertionError, KeyError) as e: + logger.warning("Aggregated attestation validation failed: %s", e) + + async def publish_aggregated_attestation( + self, + signed_attestation: SignedAggregatedAttestation, + ) -> None: + """ + Publish an aggregated attestation to the network. + + Called by the chain service when this node acts as an aggregator. + + Args: + signed_attestation: The aggregate to publish. + """ + await self._publish_agg_fn(signed_attestation) + async def start_sync(self) -> None: """ Start or resume synchronization. diff --git a/src/lean_spec/subspecs/validator/service.py b/src/lean_spec/subspecs/validator/service.py index 217ed724..3968fbc4 100644 --- a/src/lean_spec/subspecs/validator/service.py +++ b/src/lean_spec/subspecs/validator/service.py @@ -39,6 +39,7 @@ from lean_spec.subspecs import metrics from lean_spec.subspecs.chain.clock import Interval, SlotClock +from lean_spec.subspecs.chain.config import ATTESTATION_COMMITTEE_COUNT from lean_spec.subspecs.containers import ( Attestation, AttestationData, @@ -54,7 +55,7 @@ ) from lean_spec.subspecs.containers.slot import Slot from lean_spec.subspecs.xmss import TARGET_SIGNATURE_SCHEME, GeneralizedXmssScheme -from lean_spec.subspecs.xmss.aggregation import AggregatedSignatureProof +from lean_spec.subspecs.xmss.aggregation import AggregatedSignatureProof, SignatureKey from lean_spec.types import Uint64 from .registry import ValidatorEntry, ValidatorRegistry @@ -286,10 +287,14 @@ async def _maybe_produce_block(self, slot: Slot) -> None: # This adds our attestation and signatures to the block. signed_block = self._sign_block(block, validator_index, signatures) - # The proposer's attestation is already stored in the block. - # When the block is broadcast, the proposer signature is tracked - # in gossip_signatures for future aggregation. - # No need to separately process the proposer attestation. + # Store proposer's attestation signature locally for aggregation. + # + # The proposer's block is already in the store from produce_block_with_signatures. + # When on_gossip_block is called locally, it returns early (duplicate check). + # So the proposer's attestation signature never reaches gossip_signatures + # via on_block. We must store it explicitly here so the aggregator + # (which may be this same node) can include it in aggregation. + self._store_proposer_attestation_signature(signed_block, validator_index) self._blocks_produced += 1 metrics.blocks_proposed.inc() @@ -323,7 +328,25 @@ async def _produce_attestations(self, slot: Slot) -> None: Args: slot: Current slot number. """ + # Wait briefly for the current slot's block to arrive via gossip. + # + # At interval 1 (800ms after slot start), the slot's block may not + # have arrived yet from the proposer node (production + gossip + verification + # can exceed 800ms on slow machines). Without the block, attestations + # would reference an old head, causing safe_target to stall. + store = self.sync_service.store + current_slot_has_block = any(block.slot == slot for block in store.blocks.values()) + if not current_slot_has_block: + for _ in range(8): + await asyncio.sleep(0.05) + store = self.sync_service.store + if any(block.slot == slot for block in store.blocks.values()): + break + + # Ensure we are attesting to the latest known head + self.sync_service.store = self.sync_service.store.update_head() store = self.sync_service.store + head_state = store.states.get(store.head) if head_state is None: return @@ -459,6 +482,55 @@ def _sign_attestation( signature=signature, ) + def _store_proposer_attestation_signature( + self, + signed_block: SignedBlockWithAttestation, + validator_index: ValidatorIndex, + ) -> None: + """ + Store the proposer's attestation signature in gossip_signatures. + + When the proposer produces a block, the block is added to the store + immediately. The subsequent local on_gossip_block call returns early + because the block is already in the store (duplicate check). This means + the proposer's attestation signature never reaches gossip_signatures + via the normal on_block path. + + This method explicitly stores the signature so aggregation can include it. + + Args: + signed_block: The signed block containing the proposer attestation. + validator_index: The proposer's validator index. + """ + store = self.sync_service.store + if store.validator_id is None: + return + + # Only store if the proposer is in the same subnet as the aggregator. + proposer_subnet = validator_index.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT) + current_subnet = store.validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT) + if proposer_subnet != current_subnet: + return + + proposer_attestation = signed_block.message.proposer_attestation + proposer_signature = signed_block.signature.proposer_signature + data_root = proposer_attestation.data.data_root_bytes() + + sig_key = SignatureKey(validator_index, data_root) + new_gossip_sigs = dict(store.gossip_signatures) + new_gossip_sigs[sig_key] = proposer_signature + + # Also store the attestation data for later extraction during aggregation. + new_attestation_data_by_root = dict(store.attestation_data_by_root) + new_attestation_data_by_root[data_root] = proposer_attestation.data + + self.sync_service.store = store.model_copy( + update={ + "gossip_signatures": new_gossip_sigs, + "attestation_data_by_root": new_attestation_data_by_root, + } + ) + def _ensure_prepared_for_epoch( self, entry: ValidatorEntry, diff --git a/tests/interop/helpers/__init__.py b/tests/interop/helpers/__init__.py index ec534d61..30a27b16 100644 --- a/tests/interop/helpers/__init__.py +++ b/tests/interop/helpers/__init__.py @@ -8,6 +8,7 @@ assert_peer_connections, assert_same_finalized_checkpoint, ) +from .diagnostics import PipelineDiagnostics from .node_runner import NodeCluster, TestNode from .port_allocator import PortAllocator from .topology import chain, full_mesh, mesh_2_2_2, star @@ -20,6 +21,8 @@ "assert_block_propagated", "assert_chain_progressing", "assert_same_finalized_checkpoint", + # Diagnostics + "PipelineDiagnostics", # Node management "TestNode", "NodeCluster", diff --git a/tests/interop/helpers/diagnostics.py b/tests/interop/helpers/diagnostics.py new file mode 100644 index 00000000..957e1e1f --- /dev/null +++ b/tests/interop/helpers/diagnostics.py @@ -0,0 +1,58 @@ +""" +Pipeline diagnostics for interop tests. + +Captures attestation pipeline state from test nodes for debugging +and assertions. Provides a snapshot of the full pipeline: +block production -> attestation -> aggregation -> safe target -> justification. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +from .node_runner import TestNode + + +@dataclass(frozen=True, slots=True) +class PipelineDiagnostics: + """Snapshot of a node's attestation pipeline state.""" + + head_slot: int + """Current head slot.""" + + safe_target_slot: int + """Current safe target slot.""" + + finalized_slot: int + """Latest finalized slot.""" + + justified_slot: int + """Latest justified slot.""" + + gossip_signatures_count: int + """Number of pending gossip signatures (pre-aggregation).""" + + new_aggregated_count: int + """Number of entries in latest_new_aggregated_payloads.""" + + known_aggregated_count: int + """Number of entries in latest_known_aggregated_payloads.""" + + block_count: int + """Total blocks in the store.""" + + @classmethod + def from_node(cls, node: TestNode) -> PipelineDiagnostics: + """Capture diagnostics from a test node.""" + store = node._store + safe_block = store.blocks.get(store.safe_target) + return cls( + head_slot=node.head_slot, + safe_target_slot=int(safe_block.slot) if safe_block else 0, + finalized_slot=node.finalized_slot, + justified_slot=node.justified_slot, + gossip_signatures_count=len(store.gossip_signatures), + new_aggregated_count=len(store.latest_new_aggregated_payloads), + known_aggregated_count=len(store.latest_known_aggregated_payloads), + block_count=len(store.blocks), + ) diff --git a/tests/interop/helpers/node_runner.py b/tests/interop/helpers/node_runner.py index 71bc288a..e38b4e60 100644 --- a/tests/interop/helpers/node_runner.py +++ b/tests/interop/helpers/node_runner.py @@ -12,6 +12,7 @@ from dataclasses import dataclass, field from typing import cast +from lean_spec.subspecs.chain.config import ATTESTATION_COMMITTEE_COUNT from lean_spec.subspecs.containers import Checkpoint, Validator from lean_spec.subspecs.containers.state import Validators from lean_spec.subspecs.containers.validator import ValidatorIndex @@ -201,6 +202,8 @@ class NodeCluster: def __post_init__(self) -> None: """Initialize validators and keys.""" self._generate_validators() + # Default genesis time for single-node starts. + # start_all() overrides this to align with service start. self._genesis_time = int(time.time()) def _generate_validators(self) -> None: @@ -234,6 +237,7 @@ async def start_node( self, node_index: int, validator_indices: list[int] | None = None, + is_aggregator: bool = False, bootnodes: list[str] | None = None, *, start_services: bool = True, @@ -244,6 +248,7 @@ async def start_node( Args: node_index: Index for this node (for logging/identification). validator_indices: Which validators this node controls. + is_aggregator: Whether this node is aggregator bootnodes: Addresses to connect to on startup. start_services: If True, start the node's services immediately. If False, call test_node.start() manually after mesh is stable. @@ -282,6 +287,7 @@ async def start_node( api_config=None, # Disable API server for interop tests (not needed for P2P testing) validator_registry=validator_registry, fork_digest=self.fork_digest, + is_aggregator=is_aggregator, ) node = Node.from_genesis(config) @@ -350,9 +356,19 @@ async def start_node( await event_source.start_gossipsub() block_topic = f"/leanconsensus/{self.fork_digest}/block/ssz_snappy" - attestation_topic = f"/leanconsensus/{self.fork_digest}/attestation/ssz_snappy" + aggregation_topic = f"/leanconsensus/{self.fork_digest}/aggregation/ssz_snappy" event_source.subscribe_gossip_topic(block_topic) - event_source.subscribe_gossip_topic(attestation_topic) + event_source.subscribe_gossip_topic(aggregation_topic) + + # Determine subnets for our validators and subscribe. + # + # Validators only subscribe to the subnets they are assigned to. + # This matches the Ethereum gossip specification. + if validator_indices: + for idx in validator_indices: + subnet_id = idx % int(ATTESTATION_COMMITTEE_COUNT) + topic = f"/leanconsensus/{self.fork_digest}/attestation_{subnet_id}/ssz_snappy" + event_source.subscribe_gossip_topic(topic) # Optionally start the node's services. # @@ -403,14 +419,32 @@ async def start_all( if validators_per_node is None: validators_per_node = self._distribute_validators(num_nodes) + # Set genesis time to coincide with service start. + # + # Phases 1-3 (node creation, connection, mesh stabilization) take ~10s. + # Setting genesis in the future prevents wasting slots during setup. + # The first block will be produced at slot 1, shortly after services start. + self._genesis_time = int(time.time()) + 10 + # Phase 1: Create nodes with networking ready but services not running. # # This allows the gossipsub mesh to form before validators start # producing blocks and attestations. Otherwise, early blocks/attestations # would be "Published message to 0 peers" because the mesh is empty. + aggregator_indices = set(range(int(ATTESTATION_COMMITTEE_COUNT))) for i in range(num_nodes): validator_indices = validators_per_node[i] if i < len(validators_per_node) else [] - await self.start_node(i, validator_indices, start_services=False) + + # A node is an aggregator if it controls any of the first + # ATTESTATION_COMMITTEE_COUNT validators. + is_node_aggregator = any(vid in aggregator_indices for vid in validator_indices) + + await self.start_node( + i, + validator_indices, + is_aggregator=is_node_aggregator, + start_services=False, + ) # Stagger node startup like Ream does. # @@ -439,7 +473,7 @@ async def start_all( # 2. Subscription RPCs to be exchanged # 3. GRAFT messages to be sent and processed # - # A longer delay ensures proper mesh formation before block production. + # 5s allows ~7 heartbeats which is sufficient for mesh formation. await asyncio.sleep(5.0) # Phase 4: Start node services (validators, chain service, etc). diff --git a/tests/interop/test_attestation_pipeline.py b/tests/interop/test_attestation_pipeline.py new file mode 100644 index 00000000..e97073a5 --- /dev/null +++ b/tests/interop/test_attestation_pipeline.py @@ -0,0 +1,154 @@ +""" +Attestation production and delivery pipeline tests. + +Verifies that validators produce attestations referencing the correct +head and that attestations are delivered to the aggregator. +""" + +from __future__ import annotations + +import asyncio +import logging +import time + +import pytest + +from .helpers import ( + NodeCluster, + PipelineDiagnostics, + assert_peer_connections, + full_mesh, +) + +logger = logging.getLogger(__name__) + +pytestmark = pytest.mark.interop + + +@pytest.mark.timeout(60) +@pytest.mark.num_validators(3) +async def test_attestation_head_references(node_cluster: NodeCluster) -> None: + """ + Verify attestations reference the current slot's block, not genesis. + + After the first block is produced and propagated, attestations from + non-proposer validators should point to that block as their head. + """ + topology = full_mesh(3) + validators_per_node = [[0], [1], [2]] + + await node_cluster.start_all(topology, validators_per_node) + await assert_peer_connections(node_cluster, min_peers=2, timeout=15) + + # Wait for ~3 slots so attestations have been produced. + await asyncio.sleep(16) + + # Check that gossip_signatures or aggregated payloads exist. + # If attestations reference genesis with target==source, they'd be skipped. + # So the presence of valid aggregated payloads indicates correct head references. + for node in node_cluster.nodes: + diag = PipelineDiagnostics.from_node(node) + logger.info( + "Node %d: head=%d safe_target=%d gossip_sigs=%d new_agg=%d known_agg=%d", + node.index, + diag.head_slot, + diag.safe_target_slot, + diag.gossip_signatures_count, + diag.new_aggregated_count, + diag.known_aggregated_count, + ) + + # At least one node should have aggregated payloads (the aggregator). + total_agg = sum( + PipelineDiagnostics.from_node(n).new_aggregated_count + + PipelineDiagnostics.from_node(n).known_aggregated_count + for n in node_cluster.nodes + ) + assert total_agg > 0, "No aggregated attestation payloads found on any node" + + +@pytest.mark.timeout(60) +@pytest.mark.num_validators(3) +async def test_attestation_gossip_delivery(node_cluster: NodeCluster) -> None: + """ + Verify attestations reach the aggregator node via gossip. + + The aggregator collects gossip signatures from subnet attestation topics. + After a few slots, the aggregator should have collected signatures from + multiple validators. + """ + topology = full_mesh(3) + validators_per_node = [[0], [1], [2]] + + await node_cluster.start_all(topology, validators_per_node) + await assert_peer_connections(node_cluster, min_peers=2, timeout=15) + + # Wait for ~2 slots for attestations to be produced and gossiped. + await asyncio.sleep(12) + + # Find aggregator nodes (those with gossip_signatures). + for node in node_cluster.nodes: + diag = PipelineDiagnostics.from_node(node) + if diag.gossip_signatures_count > 0 or diag.new_aggregated_count > 0: + logger.info( + "Node %d has pipeline activity: gossip_sigs=%d new_agg=%d", + node.index, + diag.gossip_signatures_count, + diag.new_aggregated_count, + ) + + # At least one aggregator should have received signatures. + max_sigs = max( + PipelineDiagnostics.from_node(n).gossip_signatures_count + + PipelineDiagnostics.from_node(n).new_aggregated_count + for n in node_cluster.nodes + ) + assert max_sigs > 0, "No gossip signatures or aggregated payloads found on any node" + + +@pytest.mark.timeout(90) +@pytest.mark.num_validators(3) +async def test_safe_target_advancement(node_cluster: NodeCluster) -> None: + """ + Verify safe_target advances beyond genesis after aggregation. + + After aggregation at interval 2 and safe target update at interval 3, + the safe_target should point to a non-genesis block. This is a + prerequisite for meaningful attestation targets. + """ + topology = full_mesh(3) + validators_per_node = [[0], [1], [2]] + + await node_cluster.start_all(topology, validators_per_node) + await assert_peer_connections(node_cluster, min_peers=2, timeout=15) + + # Wait for enough slots for safe_target to advance. + # Needs: block production -> attestation -> aggregation -> safe target update. + start = time.monotonic() + timeout = 60.0 + + while time.monotonic() - start < timeout: + diags = [PipelineDiagnostics.from_node(n) for n in node_cluster.nodes] + safe_targets = [d.safe_target_slot for d in diags] + + if any(st > 0 for st in safe_targets): + logger.info("Safe target advanced: %s", safe_targets) + return + + logger.debug("Safe targets still at genesis: %s", safe_targets) + await asyncio.sleep(2.0) + + diags = [PipelineDiagnostics.from_node(n) for n in node_cluster.nodes] + for i, d in enumerate(diags): + logger.error( + "Node %d: head=%d safe=%d fin=%d just=%d gsigs=%d nagg=%d kagg=%d", + i, + d.head_slot, + d.safe_target_slot, + d.finalized_slot, + d.justified_slot, + d.gossip_signatures_count, + d.new_aggregated_count, + d.known_aggregated_count, + ) + raise AssertionError(f"Safe target never advanced beyond genesis: {safe_targets}") diff --git a/tests/interop/test_block_pipeline.py b/tests/interop/test_block_pipeline.py new file mode 100644 index 00000000..b76006a4 --- /dev/null +++ b/tests/interop/test_block_pipeline.py @@ -0,0 +1,87 @@ +""" +Block production and propagation pipeline tests. + +Verifies that blocks are produced, propagated via gossip, +and integrated into all nodes' stores. +""" + +from __future__ import annotations + +import asyncio +import logging + +import pytest + +from .helpers import ( + NodeCluster, + PipelineDiagnostics, + assert_peer_connections, + full_mesh, +) + +logger = logging.getLogger(__name__) + +pytestmark = pytest.mark.interop + + +@pytest.mark.timeout(60) +@pytest.mark.num_validators(3) +async def test_block_production_single_slot(node_cluster: NodeCluster) -> None: + """ + Verify that a block is produced and reaches all nodes within one slot. + + After mesh stabilization and service start, the proposer for slot 1 + should produce a block that propagates to all 3 nodes. + """ + topology = full_mesh(3) + validators_per_node = [[0], [1], [2]] + + await node_cluster.start_all(topology, validators_per_node) + await assert_peer_connections(node_cluster, min_peers=2, timeout=15) + + # Wait for one slot (4s) plus propagation margin. + await asyncio.sleep(8) + + for node in node_cluster.nodes: + diag = PipelineDiagnostics.from_node(node) + logger.info("Node %d: head_slot=%d blocks=%d", node.index, diag.head_slot, diag.block_count) + assert diag.head_slot >= 1, ( + f"Node {node.index} stuck at slot {diag.head_slot}, expected >= 1" + ) + + +@pytest.mark.timeout(60) +@pytest.mark.num_validators(3) +async def test_consecutive_blocks(node_cluster: NodeCluster) -> None: + """ + Verify blocks at consecutive slots reference correct parents. + + After several slots, each non-genesis block should have a parent_root + that points to the previous slot's block. + """ + topology = full_mesh(3) + validators_per_node = [[0], [1], [2]] + + await node_cluster.start_all(topology, validators_per_node) + await assert_peer_connections(node_cluster, min_peers=2, timeout=15) + + # Wait for ~3 slots. + await asyncio.sleep(16) + + # Check parent chain on node 0. + store = node_cluster.nodes[0]._store + head_block = store.blocks[store.head] + + # Walk back from head to genesis, verifying parent chain. + visited = 0 + current = head_block + while current.parent_root in store.blocks: + parent = store.blocks[current.parent_root] + assert current.slot > parent.slot, ( + f"Block at slot {current.slot} has parent at slot {parent.slot} (not decreasing)" + ) + current = parent + visited += 1 + + logger.info("Walked %d blocks in parent chain from head slot %d", visited, head_block.slot) + assert visited >= 2, f"Expected at least 2 blocks in chain, found {visited}" diff --git a/tests/interop/test_justification.py b/tests/interop/test_justification.py new file mode 100644 index 00000000..941d5f9e --- /dev/null +++ b/tests/interop/test_justification.py @@ -0,0 +1,134 @@ +""" +Justification and finalization pipeline tests. + +Verifies the full consensus lifecycle from block production through +checkpoint justification and finalization. +""" + +from __future__ import annotations + +import asyncio +import logging +import time + +import pytest + +from .helpers import ( + NodeCluster, + PipelineDiagnostics, + assert_all_finalized_to, + assert_heads_consistent, + assert_peer_connections, + assert_same_finalized_checkpoint, + full_mesh, + mesh_2_2_2, +) + +logger = logging.getLogger(__name__) + +pytestmark = pytest.mark.interop + + +@pytest.mark.timeout(120) +@pytest.mark.num_validators(3) +async def test_first_justification(node_cluster: NodeCluster) -> None: + """ + Verify that the first justification event occurs. + + Justification requires 2/3+ attestation weight on a target checkpoint. + With 3 validators, 2 must attest to the same target. This test waits + for the justified_slot to advance beyond genesis on any node. + """ + topology = full_mesh(3) + validators_per_node = [[0], [1], [2]] + + await node_cluster.start_all(topology, validators_per_node) + await assert_peer_connections(node_cluster, min_peers=2, timeout=15) + + start = time.monotonic() + timeout = 90.0 + + while time.monotonic() - start < timeout: + justified_slots = [n.justified_slot for n in node_cluster.nodes] + + if any(js > 0 for js in justified_slots): + logger.info("First justification achieved: %s", justified_slots) + return + + # Log pipeline state periodically for diagnostics. + if int(time.monotonic() - start) % 10 == 0: + for node in node_cluster.nodes: + diag = PipelineDiagnostics.from_node(node) + logger.info( + "Node %d: head=%d safe=%d just=%d fin=%d", + node.index, + diag.head_slot, + diag.safe_target_slot, + diag.justified_slot, + diag.finalized_slot, + ) + + await asyncio.sleep(2.0) + + diags = [PipelineDiagnostics.from_node(n) for n in node_cluster.nodes] + for i, d in enumerate(diags): + logger.error( + "Node %d: head=%d safe=%d fin=%d just=%d gsigs=%d nagg=%d kagg=%d", + i, + d.head_slot, + d.safe_target_slot, + d.finalized_slot, + d.justified_slot, + d.gossip_signatures_count, + d.new_aggregated_count, + d.known_aggregated_count, + ) + raise AssertionError(f"No justification after {timeout}s: {[d.justified_slot for d in diags]}") + + +@pytest.mark.timeout(150) +@pytest.mark.num_validators(3) +async def test_finalization_full_mesh(node_cluster: NodeCluster) -> None: + """ + Verify chain finalization in a fully connected network. + + Tests the complete consensus lifecycle: + + - Block production and gossip propagation + - Attestation aggregation across validators + - Checkpoint justification (2/3+ votes) + - Checkpoint finalization (justified child of justified parent) + + Network topology: Full mesh (every node connected to every other). + """ + topology = full_mesh(3) + validators_per_node = [[0], [1], [2]] + + await node_cluster.start_all(topology, validators_per_node) + await assert_peer_connections(node_cluster, min_peers=2, timeout=15) + await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=30) + + await assert_all_finalized_to(node_cluster, target_slot=1, timeout=90) + await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=15) + await assert_same_finalized_checkpoint(node_cluster.nodes, timeout=15) + + +@pytest.mark.timeout(150) +@pytest.mark.num_validators(3) +async def test_finalization_hub_spoke(node_cluster: NodeCluster) -> None: + """ + Verify finalization with hub-and-spoke topology. + + Node 0 is the hub; nodes 1 and 2 are spokes that only connect to the hub. + Messages between spokes must route through the hub. + """ + topology = mesh_2_2_2() + validators_per_node = [[0], [1], [2]] + + await node_cluster.start_all(topology, validators_per_node) + await assert_peer_connections(node_cluster, min_peers=1, timeout=15) + await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=30) + + await assert_all_finalized_to(node_cluster, target_slot=1, timeout=90) + await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=15) + await assert_same_finalized_checkpoint(node_cluster.nodes, timeout=15) diff --git a/tests/interop/test_late_joiner.py b/tests/interop/test_late_joiner.py index 343a1f63..53c7c919 100644 --- a/tests/interop/test_late_joiner.py +++ b/tests/interop/test_late_joiner.py @@ -24,7 +24,6 @@ pytestmark = pytest.mark.interop -@pytest.mark.skip(reason="Interop test not passing - needs update (#359)") @pytest.mark.timeout(240) @pytest.mark.num_validators(3) async def test_late_joiner_sync(node_cluster: NodeCluster) -> None: @@ -36,7 +35,7 @@ async def test_late_joiner_sync(node_cluster: NodeCluster) -> None: """ validators_per_node = [[0], [1], [2]] - await node_cluster.start_node(0, validators_per_node[0]) + await node_cluster.start_node(0, validators_per_node[0], is_aggregator=True) await node_cluster.start_node(1, validators_per_node[1]) node0 = node_cluster.nodes[0] diff --git a/tests/interop/test_multi_node.py b/tests/interop/test_multi_node.py index d2edcfe5..64b22cff 100644 --- a/tests/interop/test_multi_node.py +++ b/tests/interop/test_multi_node.py @@ -24,14 +24,15 @@ import asyncio import logging -import time import pytest from .helpers import ( NodeCluster, + assert_all_finalized_to, assert_heads_consistent, assert_peer_connections, + assert_same_finalized_checkpoint, full_mesh, mesh_2_2_2, ) @@ -44,8 +45,7 @@ pytestmark = pytest.mark.interop -@pytest.mark.skip(reason="Interop test not passing - needs update (#359)") -@pytest.mark.timeout(120) +@pytest.mark.timeout(150) @pytest.mark.num_validators(3) async def test_mesh_finalization(node_cluster: NodeCluster) -> None: """ @@ -61,241 +61,43 @@ async def test_mesh_finalization(node_cluster: NodeCluster) -> None: - Checkpoint finalization (justified child of justified parent) Network topology: Full mesh (every node connected to every other). - This maximizes connectivity and minimizes propagation latency. - - Timing rationale: - - - 60s timeout: allows ~15 slots at 4s each, plenty for finalization - - 30s run duration: ~7-8 slots, enough for 2 epochs of justification - - 15s peer timeout: sufficient for QUIC handshake - - The Ream project uses similar parameters for compatibility testing. """ - # Build the network topology. - # - # Full mesh with 3 nodes creates 3 bidirectional connections: - # - Node 0 <-> Node 1 - # - Node 0 <-> Node 2 - # - Node 1 <-> Node 2 topology = full_mesh(3) - - # Assign exactly one validator to each node. - # - # Validator indices match node indices for clarity. - # With 3 validators total, each controls 1/3 of voting power. validators_per_node = [[0], [1], [2]] - # Start all nodes with the configured topology. - # - # Each node begins: - # - # - Listening on a unique port - # - Connecting to peers per topology - # - Running the block production loop - # - Subscribing to gossip topics await node_cluster.start_all(topology, validators_per_node) - # Wait for peer connections before proceeding. - # - # Each node needs at least 2 peers (the other two nodes). - # This ensures gossip will reach all nodes. - # The 15s timeout handles slow handshakes. await assert_peer_connections(node_cluster, min_peers=2, timeout=15) + await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=30) - # Let the chain run for a fixed duration. - # - # Timing calculation: - # - # - Slot duration: 4 seconds - # - Slots in 70s: ~17 slots - # - Finalization requires: 2 consecutive justified epochs - # - With 3 validators: justification needs 2/3 = 2 attestations per slot - # - # This duration allows enough time for validators to: - # - # 1. Produce blocks (one per slot, round-robin) - # 2. Broadcast attestations (all validators each slot) - # 3. Accumulate justification (2+ matching attestations) - # 4. Finalize (justified epoch becomes finalized) - run_duration = 70 - poll_interval = 5 - - logger.info("Running chain for %d seconds...", run_duration) - - # Poll the chain state periodically. - # - # This provides visibility into consensus progress during the test. - # The logged metrics help debug failures. - start = time.monotonic() - while time.monotonic() - start < run_duration: - # Collect current state from each node. - # - # Head slot: the highest slot block each node has seen. - # Finalized slot: the most recent finalized checkpoint slot. - # Justified slot: the most recent justified checkpoint slot. - slots = [node.head_slot for node in node_cluster.nodes] - finalized = [node.finalized_slot for node in node_cluster.nodes] - justified = [node.justified_slot for node in node_cluster.nodes] - - # Track attestation counts for debugging. - # - # New attestations: received but not yet processed by fork choice. - # Known attestations: already incorporated into the store. - # - # These counts reveal if gossip is working: - # - # - High new_atts, low known_atts = processing bottleneck - # - Low counts everywhere = gossip not propagating - new_atts = [len(node._store.latest_new_attestations) for node in node_cluster.nodes] - known_atts = [len(node._store.latest_known_attestations) for node in node_cluster.nodes] - - logger.info( - "Progress: head=%s justified=%s finalized=%s new_atts=%s known_atts=%s", - slots, - justified, - finalized, - new_atts, - known_atts, - ) - await asyncio.sleep(poll_interval) - - # Capture final state for assertions. - head_slots = [node.head_slot for node in node_cluster.nodes] - finalized_slots = [node.finalized_slot for node in node_cluster.nodes] - - logger.info("FINAL: head_slots=%s finalized=%s", head_slots, finalized_slots) - - # Verify the chain advanced sufficiently. - # - # Minimum 5 slots ensures: - # - # - Block production is working (at least 5 blocks created) - # - Gossip is propagating (all nodes see the same progress) - # - No single node is stuck or partitioned - assert all(slot >= 5 for slot in head_slots), ( - f"Chain did not advance enough. Head slots: {head_slots}" - ) - - # Verify heads are consistent across nodes. - # - # In a healthy network, all nodes should converge to similar head slots. - # A difference > 2 slots indicates gossip or fork choice issues. - head_diff = max(head_slots) - min(head_slots) - assert head_diff <= 2, f"Head slots diverged too much. Slots: {head_slots}, diff: {head_diff}" - - # Verify ALL nodes finalized. - # - # With 70s runtime (~17 slots) and working gossip, every node - # should have finalized at least one checkpoint. - assert all(slot > 0 for slot in finalized_slots), ( - f"Not all nodes finalized. Finalized slots: {finalized_slots}" - ) + # With aligned genesis time, finalization typically occurs ~40s after service start. + await assert_all_finalized_to(node_cluster, target_slot=1, timeout=90) - # Verify finalized checkpoints are consistent. - # - # All nodes must agree on the finalized checkpoint. - # Finalization is irreversible - divergent finalization would be catastrophic. - assert len(set(finalized_slots)) == 1, ( - f"Finalized slots inconsistent across nodes: {finalized_slots}" - ) + await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=15) + await assert_same_finalized_checkpoint(node_cluster.nodes, timeout=15) -@pytest.mark.skip(reason="Interop test not passing - needs update (#359)") -@pytest.mark.timeout(120) +@pytest.mark.timeout(150) @pytest.mark.num_validators(3) async def test_mesh_2_2_2_finalization(node_cluster: NodeCluster) -> None: """ Verify finalization with hub-and-spoke topology. - This tests consensus under restricted connectivity: - - - Node 0 is the hub (receives all connections) - - Nodes 1 and 2 are spokes (only connect to hub) - - Spokes cannot communicate directly - - Topology diagram:: - - Node 1 ---> Node 0 <--- Node 2 - - This is harder than full mesh because: - - - Messages between spokes must route through the hub - - Hub failure would partition the network - - Gossip takes two hops instead of one - - The test verifies that even with indirect connectivity, - the protocol achieves finalization. This matches the - Ream project's `test_lean_node_finalizes_mesh_2_2_2` test. + Node 0 is the hub; nodes 1 and 2 are spokes that only connect to the hub. + Messages between spokes must route through the hub. """ - # Build hub-and-spoke topology. - # - # Returns [(1, 0), (2, 0)]: nodes 1 and 2 dial node 0. - # Node 0 acts as the central hub. topology = mesh_2_2_2() - - # Same validator assignment as full mesh test. validators_per_node = [[0], [1], [2]] await node_cluster.start_all(topology, validators_per_node) - # Lower peer requirement than full mesh. - # - # Hub (node 0) has 2 peers; spokes have 1 peer each. - # Using min_peers=1 ensures spokes pass the check. await assert_peer_connections(node_cluster, min_peers=1, timeout=15) + await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=30) - # Match Ream's 70 second test duration. - # - # Finalization requires sufficient time for: - # - Multiple slots to pass (4s each) - # - Attestations to accumulate - # - Justification and finalization to occur - run_duration = 70 - poll_interval = 5 - - logger.info("Running chain for %d seconds (mesh_2_2_2)...", run_duration) - - # Poll chain progress. - start = time.monotonic() - while time.monotonic() - start < run_duration: - slots = [node.head_slot for node in node_cluster.nodes] - finalized = [node.finalized_slot for node in node_cluster.nodes] - logger.info("Progress: head_slots=%s finalized=%s", slots, finalized) - await asyncio.sleep(poll_interval) + await assert_all_finalized_to(node_cluster, target_slot=1, timeout=90) - # Final state capture. - head_slots = [node.head_slot for node in node_cluster.nodes] - finalized_slots = [node.finalized_slot for node in node_cluster.nodes] - - logger.info("FINAL: head_slots=%s finalized=%s", head_slots, finalized_slots) - - # Same assertions as full mesh. - # - # Despite reduced connectivity (messages route through hub), - # the protocol should still achieve full consensus. - - # Chain must advance sufficiently. - assert all(slot >= 5 for slot in head_slots), ( - f"Chain did not advance enough. Head slots: {head_slots}" - ) - - # Heads must be consistent across nodes. - # - # Hub-and-spoke adds latency but should not cause divergence. - head_diff = max(head_slots) - min(head_slots) - assert head_diff <= 2, f"Head slots diverged too much. Slots: {head_slots}, diff: {head_diff}" - - # ALL nodes must finalize. - assert all(slot > 0 for slot in finalized_slots), ( - f"Not all nodes finalized. Finalized slots: {finalized_slots}" - ) - - # Finalized checkpoints must be identical. - # - # Even with indirect connectivity, finalization must be consistent. - assert len(set(finalized_slots)) == 1, ( - f"Finalized slots inconsistent across nodes: {finalized_slots}" - ) + await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=15) + await assert_same_finalized_checkpoint(node_cluster.nodes, timeout=15) @pytest.mark.timeout(30) @@ -351,7 +153,7 @@ async def test_two_node_connection(node_cluster: NodeCluster) -> None: await assert_heads_consistent(node_cluster, max_slot_diff=2) -@pytest.mark.timeout(45) +@pytest.mark.timeout(60) @pytest.mark.num_validators(3) async def test_block_gossip_propagation(node_cluster: NodeCluster) -> None: """ @@ -417,7 +219,7 @@ async def test_block_gossip_propagation(node_cluster: NodeCluster) -> None: @pytest.mark.xfail(reason="Sync service doesn't pull missing blocks for isolated nodes") -@pytest.mark.timeout(180) +@pytest.mark.timeout(120) @pytest.mark.num_validators(3) async def test_partition_recovery(node_cluster: NodeCluster) -> None: """ diff --git a/tests/lean_spec/helpers/builders.py b/tests/lean_spec/helpers/builders.py index ae557bfc..594aae54 100644 --- a/tests/lean_spec/helpers/builders.py +++ b/tests/lean_spec/helpers/builders.py @@ -520,7 +520,7 @@ def make_signed_block_from_store( slot_duration = block.slot * SECONDS_PER_SLOT block_time = store.config.genesis_time + slot_duration - advanced_store = store.on_tick(block_time, has_proposal=True) + advanced_store, _ = store.on_tick(block_time, has_proposal=True) return advanced_store, signed_block diff --git a/tests/lean_spec/subspecs/chain/test_service.py b/tests/lean_spec/subspecs/chain/test_service.py index 8e2e9ac4..1406bf26 100644 --- a/tests/lean_spec/subspecs/chain/test_service.py +++ b/tests/lean_spec/subspecs/chain/test_service.py @@ -28,7 +28,9 @@ class MockStore: head: Bytes32 = field(default_factory=lambda: ZERO_HASH) latest_finalized: MockCheckpoint = field(default_factory=MockCheckpoint) - def on_tick(self, time: Uint64, has_proposal: bool) -> MockStore: + def on_tick( + self, time: Uint64, has_proposal: bool, is_aggregator: bool = False + ) -> tuple[MockStore, list]: """Record the tick call and return a new store.""" new_store = MockStore( time=time, @@ -37,7 +39,7 @@ def on_tick(self, time: Uint64, has_proposal: bool) -> MockStore: latest_finalized=self.latest_finalized, ) new_store.tick_calls.append((time, has_proposal)) - return new_store + return new_store, [] @dataclass @@ -45,6 +47,7 @@ class MockSyncService: """Mock sync service for testing ChainService.""" store: MockStore = field(default_factory=MockStore) + is_aggregator: bool = False class TestChainServiceLifecycle: diff --git a/tests/lean_spec/subspecs/forkchoice/test_attestation_target.py b/tests/lean_spec/subspecs/forkchoice/test_attestation_target.py index 2cbc306f..c85721c8 100644 --- a/tests/lean_spec/subspecs/forkchoice/test_attestation_target.py +++ b/tests/lean_spec/subspecs/forkchoice/test_attestation_target.py @@ -163,7 +163,7 @@ def test_safe_target_requires_supermajority( store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) # Aggregate the signatures - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() # Update safe target (uses latest_new_aggregated_payloads) store = store.update_safe_target() @@ -207,7 +207,7 @@ def test_safe_target_advances_with_supermajority( store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) # Aggregate the signatures - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() # Update safe target store = store.update_safe_target() @@ -247,7 +247,7 @@ def test_update_safe_target_uses_new_attestations( store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) # Aggregate into new payloads - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() # Update safe target should use new aggregated payloads store = store.update_safe_target() @@ -301,7 +301,7 @@ def test_justification_with_supermajority_attestations( store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) # Aggregate signatures before producing the next block - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() # Produce block 2 which includes these attestations store, block_2, signatures = store.produce_block_with_signatures(slot_2, proposer_2) @@ -382,7 +382,7 @@ def test_justification_tracking_with_multiple_targets( ) store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() store = store.update_safe_target() # Neither target should be justified with only half validators @@ -527,7 +527,7 @@ def test_full_attestation_cycle( store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) # Phase 3: Aggregate signatures into payloads - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() # Phase 4: Update safe target store = store.update_safe_target() @@ -589,7 +589,7 @@ def test_attestation_target_after_on_block( # Process block via on_block on a fresh consumer store consumer_store = observer_store block_time = consumer_store.config.genesis_time + block.slot * Uint64(SECONDS_PER_SLOT) - consumer_store = consumer_store.on_tick(block_time, has_proposal=True) + consumer_store, _ = consumer_store.on_tick(block_time, has_proposal=True) consumer_store = consumer_store.on_block(signed_block) # Get attestation target after on_block diff --git a/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py b/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py index 904c8a62..ec1a5868 100644 --- a/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py +++ b/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py @@ -509,7 +509,7 @@ def test_aggregates_gossip_signatures_into_proof(self, key_manager: XmssKeyManag ) # Perform aggregation - updated_store = store.aggregate_committee_signatures() + updated_store, _ = store.aggregate_committee_signatures() # Verify proofs were created and stored data_root = attestation_data.data_root_bytes() @@ -537,7 +537,7 @@ def test_aggregated_proof_is_valid(self, key_manager: XmssKeyManager) -> None: attesting_validators=attesting_validators, ) - updated_store = store.aggregate_committee_signatures() + updated_store, _ = store.aggregate_committee_signatures() data_root = attestation_data.data_root_bytes() sig_key = SignatureKey(ValidatorIndex(1), data_root) @@ -567,7 +567,7 @@ def test_empty_gossip_signatures_produces_no_proofs(self, key_manager: XmssKeyMa attesting_validators=[], # No attesters ) - updated_store = store.aggregate_committee_signatures() + updated_store, _ = store.aggregate_committee_signatures() # Verify no proofs were created assert len(updated_store.latest_new_aggregated_payloads) == 0 @@ -619,7 +619,7 @@ def test_multiple_attestation_data_grouped_separately( } ) - updated_store = store.aggregate_committee_signatures() + updated_store, _ = store.aggregate_committee_signatures() # Verify both validators have separate proofs sig_key_1 = SignatureKey(ValidatorIndex(1), data_root_1) @@ -661,7 +661,7 @@ def test_interval_2_triggers_aggregation_for_aggregator( store = store.model_copy(update={"time": Uint64(1)}) # Tick to interval 2 as aggregator - updated_store = store.tick_interval(has_proposal=False, is_aggregator=True) + updated_store, _ = store.tick_interval(has_proposal=False, is_aggregator=True) # Verify aggregation was performed data_root = attestation_data.data_root_bytes() @@ -692,7 +692,7 @@ def test_interval_2_skips_aggregation_for_non_aggregator( store = store.model_copy(update={"time": Uint64(1)}) # Tick to interval 2 as NON-aggregator - updated_store = store.tick_interval(has_proposal=False, is_aggregator=False) + updated_store, _ = store.tick_interval(has_proposal=False, is_aggregator=False) # Verify aggregation was NOT performed data_root = attestation_data.data_root_bytes() @@ -731,7 +731,7 @@ def test_other_intervals_do_not_trigger_aggregation(self, key_manager: XmssKeyMa pre_tick_time = (target_interval - 1) % int(INTERVALS_PER_SLOT) test_store = store.model_copy(update={"time": Uint64(pre_tick_time)}) - updated_store = test_store.tick_interval(has_proposal=False, is_aggregator=True) + updated_store, _ = test_store.tick_interval(has_proposal=False, is_aggregator=True) assert sig_key not in updated_store.latest_new_aggregated_payloads, ( f"Aggregation should NOT occur at interval {target_interval}" @@ -754,7 +754,7 @@ def test_interval_0_accepts_attestations_with_proposal( store = store.model_copy(update={"time": Uint64(4)}) # Tick to interval 0 with proposal - updated_store = store.tick_interval(has_proposal=True, is_aggregator=True) + updated_store, _ = store.tick_interval(has_proposal=True, is_aggregator=True) # Verify time advanced assert updated_store.time == Uint64(5) @@ -812,7 +812,7 @@ def test_gossip_to_aggregation_to_storage(self, key_manager: XmssKeyManager) -> # Step 2: Advance to interval 2 (aggregation interval) store = store.model_copy(update={"time": Uint64(1)}) - store = store.tick_interval(has_proposal=False, is_aggregator=True) + store, _ = store.tick_interval(has_proposal=False, is_aggregator=True) # Step 3: Verify aggregated proofs were created for vid in attesting_validators: diff --git a/tests/lean_spec/subspecs/forkchoice/test_time_management.py b/tests/lean_spec/subspecs/forkchoice/test_time_management.py index 36df3b51..c80eafb3 100644 --- a/tests/lean_spec/subspecs/forkchoice/test_time_management.py +++ b/tests/lean_spec/subspecs/forkchoice/test_time_management.py @@ -59,7 +59,7 @@ def test_on_tick_basic(self, sample_store: Store) -> None: initial_time = sample_store.time target_time = sample_store.config.genesis_time + Uint64(200) # Much later time - sample_store = sample_store.on_tick(target_time, has_proposal=True) + sample_store, _ = sample_store.on_tick(target_time, has_proposal=True) # Time should advance assert sample_store.time > initial_time @@ -69,7 +69,7 @@ def test_on_tick_no_proposal(self, sample_store: Store) -> None: initial_time = sample_store.time target_time = sample_store.config.genesis_time + Uint64(100) - sample_store = sample_store.on_tick(target_time, has_proposal=False) + sample_store, _ = sample_store.on_tick(target_time, has_proposal=False) # Time should still advance assert sample_store.time >= initial_time @@ -80,7 +80,7 @@ def test_on_tick_already_current(self, sample_store: Store) -> None: current_target = sample_store.config.genesis_time + initial_time # Try to advance to current time (should be no-op) - sample_store = sample_store.on_tick(current_target, has_proposal=True) + sample_store, _ = sample_store.on_tick(current_target, has_proposal=True) # Should not change significantly (time can only increase) # Tolerance increased for 5-interval per slot system @@ -91,7 +91,7 @@ def test_on_tick_small_increment(self, sample_store: Store) -> None: initial_time = sample_store.time target_time = sample_store.config.genesis_time + initial_time + Uint64(1) - sample_store = sample_store.on_tick(target_time, has_proposal=False) + sample_store, _ = sample_store.on_tick(target_time, has_proposal=False) # Should advance by small amount assert sample_store.time >= initial_time @@ -105,7 +105,7 @@ def test_tick_interval_basic(self, sample_store: Store) -> None: initial_time = sample_store.time # Tick one interval forward - sample_store = sample_store.tick_interval(has_proposal=False) + sample_store, _ = sample_store.tick_interval(has_proposal=False) # Time should advance by one interval assert sample_store.time == initial_time + Uint64(1) @@ -114,7 +114,7 @@ def test_tick_interval_with_proposal(self, sample_store: Store) -> None: """Test interval ticking with proposal.""" initial_time = sample_store.time - sample_store = sample_store.tick_interval(has_proposal=True) + sample_store, _ = sample_store.tick_interval(has_proposal=True) # Time should advance assert sample_store.time == initial_time + Uint64(1) @@ -125,7 +125,7 @@ def test_tick_interval_sequence(self, sample_store: Store) -> None: # Tick multiple intervals for i in range(5): - sample_store = sample_store.tick_interval(has_proposal=(i % 2 == 0)) + sample_store, _ = sample_store.tick_interval(has_proposal=(i % 2 == 0)) # Should have advanced by 5 intervals assert sample_store.time == initial_time + Uint64(5) @@ -139,7 +139,7 @@ def test_tick_interval_actions_by_phase(self, sample_store: Store) -> None: # Tick through a complete slot cycle for interval in range(INTERVALS_PER_SLOT): has_proposal = interval == 0 # Proposal only in first interval - sample_store = sample_store.tick_interval(has_proposal=has_proposal) + sample_store, _ = sample_store.tick_interval(has_proposal=has_proposal) current_interval = sample_store.time % INTERVALS_PER_SLOT expected_interval = Uint64((interval + 1)) % INTERVALS_PER_SLOT