From 359e4381f4c750c199a781a5ee21ea3ad235ce0e Mon Sep 17 00:00:00 2001 From: kamilsa Date: Wed, 11 Feb 2026 17:54:30 +0500 Subject: [PATCH 01/16] Update consensus logic: get_attestation_target, aggregation broadcast, and attestation propagation --- src/lean_spec/subspecs/chain/service.py | 11 ++- .../subspecs/containers/state/state.py | 34 +++++++++ src/lean_spec/subspecs/forkchoice/store.py | 75 +++++++++---------- .../subspecs/networking/service/service.py | 31 +++++++- src/lean_spec/subspecs/node/node.py | 18 ++++- src/lean_spec/subspecs/sync/service.py | 46 +++++++++++- src/lean_spec/subspecs/validator/service.py | 3 + tests/interop/helpers/node_runner.py | 32 +++++++- tests/lean_spec/helpers/builders.py | 2 +- .../lean_spec/subspecs/chain/test_service.py | 7 +- .../forkchoice/test_attestation_target.py | 14 ++-- .../forkchoice/test_store_attestations.py | 38 +++++----- .../forkchoice/test_time_management.py | 16 ++-- 13 files changed, 237 insertions(+), 90 deletions(-) diff --git a/src/lean_spec/subspecs/chain/service.py b/src/lean_spec/subspecs/chain/service.py index 17a86e20..475f3c3d 100644 --- a/src/lean_spec/subspecs/chain/service.py +++ b/src/lean_spec/subspecs/chain/service.py @@ -125,9 +125,10 @@ async def run(self) -> None: # # This minimal service does not produce blocks. # Block production requires validator keys. - new_store = self.sync_service.store.on_tick( + new_store, new_aggregated_attestations = self.sync_service.store.on_tick( time=current_time, has_proposal=False, + is_aggregator=self.sync_service.is_aggregator, ) # Update sync service's store reference. @@ -137,6 +138,11 @@ async def run(self) -> None: # the updated time. self.sync_service.store = new_store + # Publish any new aggregated attestations produced this tick + if new_aggregated_attestations: + for agg in new_aggregated_attestations: + await self.sync_service.publish_aggregated_attestation(agg) + logger.info( "Tick: slot=%d interval=%d time=%d head=%s finalized=slot%d", self.clock.current_slot(), @@ -162,9 +168,10 @@ async def _initial_tick(self) -> Interval | None: # Only tick if we're past genesis. if current_time >= self.clock.genesis_time: - new_store = self.sync_service.store.on_tick( + new_store, _ = self.sync_service.store.on_tick( time=current_time, has_proposal=False, + is_aggregator=self.sync_service.is_aggregator, ) self.sync_service.store = new_store return self.clock.total_intervals() diff --git a/src/lean_spec/subspecs/containers/state/state.py b/src/lean_spec/subspecs/containers/state/state.py index a494f0fe..32c57959 100644 --- a/src/lean_spec/subspecs/containers/state/state.py +++ b/src/lean_spec/subspecs/containers/state/state.py @@ -2,6 +2,7 @@ from __future__ import annotations +import logging from typing import AbstractSet, Collection, Iterable from lean_spec.subspecs.ssz.hash import hash_tree_root @@ -33,6 +34,8 @@ Validators, ) +logger = logging.getLogger(__name__) + class State(Container): """The main consensus state object.""" @@ -449,6 +452,13 @@ def process_attestations( # # The rules below filter out invalid or irrelevant votes. for attestation in attestations: + logger.debug( + "Processing attestation: target=slot%d source=slot%d participants=%s", + attestation.data.target.slot, + attestation.data.source.slot, + attestation.aggregation_bits.to_validator_indices(), + ) + source = attestation.data.source target = attestation.data.target @@ -457,6 +467,7 @@ def process_attestations( # A vote may only originate from a point in history that is already justified. # A source that lacks existing justification cannot be used to anchor a new vote. if not justified_slots.is_slot_justified(finalized_slot, source.slot): + logger.debug("Skipping attestation: source slot %d not justified", source.slot) continue # Ignore votes for targets that have already reached consensus. @@ -468,6 +479,7 @@ def process_attestations( # Ignore votes that reference zero-hash slots. if source.root == ZERO_HASH or target.root == ZERO_HASH: + logger.debug("Skipping attestation: zero root in source/target") continue # Ensure the vote refers to blocks that actually exist on our chain. @@ -491,6 +503,11 @@ def process_attestations( ) if not source_matches or not target_matches: + logger.debug( + "Skipping attestation: root mismatch (source_match=%s target_match=%s)", + source_matches, + target_matches, + ) continue # Ensure time flows forward. @@ -498,6 +515,11 @@ def process_attestations( # A target must always lie strictly after its source slot. # Otherwise the vote makes no chronological sense. if target.slot <= source.slot: + logger.debug( + "Skipping attestation: target slot %d <= source slot %d", + target.slot, + source.slot, + ) continue # Ensure the target falls on a slot that can be justified after the finalized one. @@ -514,6 +536,11 @@ def process_attestations( # Any target outside this pattern is not eligible for justification, # so votes for it are simply ignored. if not target.slot.is_justifiable_after(self.latest_finalized.slot): + logger.debug( + "Skipping attestation: target slot %d not justifiable after finalized slot %d", + target.slot, + self.latest_finalized.slot, + ) continue # Record the vote. @@ -542,6 +569,12 @@ def process_attestations( count = sum(bool(justified) for justified in justifications[target.root]) if 3 * count >= (2 * len(self.validators)): + logger.info( + "Supermajority reached for target slot %d: %d votes (threshold: %d)", + target.slot, + count, + (2 * len(self.validators) + 2) // 3, + ) # The block becomes justified # # The chain now considers this block part of its safe head. @@ -573,6 +606,7 @@ def process_attestations( old_finalized_slot = finalized_slot latest_finalized = source finalized_slot = latest_finalized.slot + logger.info("Finalization advanced to slot %d", finalized_slot) # Rebase/prune justification tracking across the new finalized boundary. # diff --git a/src/lean_spec/subspecs/forkchoice/store.py b/src/lean_spec/subspecs/forkchoice/store.py index d54636a2..c567ce2f 100644 --- a/src/lean_spec/subspecs/forkchoice/store.py +++ b/src/lean_spec/subspecs/forkchoice/store.py @@ -7,6 +7,7 @@ __all__ = ["Store"] import copy +import logging from collections import defaultdict from lean_spec.subspecs.chain.config import ( @@ -45,6 +46,8 @@ ) from lean_spec.types.container import Container +logger = logging.getLogger(__name__) + class Store(Container): """ @@ -488,7 +491,6 @@ def on_gossip_aggregated_attestation( new_attestation_data_by_root = dict(self.attestation_data_by_root) new_attestation_data_by_root[data_root] = data - store = self for vid in validator_ids: # Update Proof Map # @@ -497,7 +499,7 @@ def on_gossip_aggregated_attestation( new_aggregated_payloads.setdefault(key, []).append(proof) # Return store with updated aggregated payloads and attestation data - return store.model_copy( + return self.model_copy( update={ "latest_new_aggregated_payloads": new_aggregated_payloads, "attestation_data_by_root": new_attestation_data_by_root, @@ -856,28 +858,8 @@ def update_head(self) -> "Store": ) def accept_new_attestations(self) -> "Store": - """ - Process pending aggregated payloads and update forkchoice head. - - Moves aggregated payloads from latest_new_aggregated_payloads to - latest_known_aggregated_payloads, making them eligible to contribute to - fork choice weights. This migration happens at specific interval ticks. - - The Interval Tick System - ------------------------- - Aggregated payloads progress through intervals: - - Interval 0: Block proposal - - Interval 1: Validators cast attestations (enter "new") - - Interval 2: Aggregators create proofs & broadcast - - Interval 3: Safe target update - - Interval 4: Process accumulated attestations - - This staged progression ensures proper timing and prevents premature - influence on fork choice decisions. + """Process pending aggregated payloads and update forkchoice head.""" - Returns: - New Store with migrated aggregated payloads and updated head. - """ # Merge new aggregated payloads into known aggregated payloads merged_aggregated_payloads = dict(self.latest_known_aggregated_payloads) for sig_key, proofs in self.latest_new_aggregated_payloads.items(): @@ -937,7 +919,7 @@ def update_safe_target(self) -> "Store": return self.model_copy(update={"safe_target": safe_target}) - def aggregate_committee_signatures(self) -> "Store": + def aggregate_committee_signatures(self) -> tuple["Store", list[SignedAggregatedAttestation]]: """ Aggregate committee signatures for attestations in committee_signatures. @@ -945,7 +927,7 @@ def aggregate_committee_signatures(self) -> "Store": Attestations are reconstructed from gossip_signatures using attestation_data_by_root. Returns: - New Store with updated latest_new_aggregated_payloads. + Tuple of (new Store with updated payloads, list of new SignedAggregatedAttestation). """ new_aggregated_payloads = dict(self.latest_new_aggregated_payloads) @@ -970,13 +952,14 @@ def aggregate_committee_signatures(self) -> "Store": committee_signatures, ) - # iterate to broadcast aggregated attestations + # Create list for broadcasting + new_aggregates: list[SignedAggregatedAttestation] = [] for aggregated_attestation, aggregated_signature in aggregated_results: - _ = SignedAggregatedAttestation( + agg = SignedAggregatedAttestation( data=aggregated_attestation.data, proof=aggregated_signature, ) - # Note: here we should broadcast the aggregated signature to committee_aggregators topic + new_aggregates.append(agg) # Compute new aggregated payloads new_gossip_sigs = dict(self.gossip_signatures) @@ -998,9 +981,11 @@ def aggregate_committee_signatures(self) -> "Store": "latest_new_aggregated_payloads": new_aggregated_payloads, "gossip_signatures": new_gossip_sigs, } - ) + ), new_aggregates - def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Store": + def tick_interval( + self, has_proposal: bool, is_aggregator: bool = False + ) -> tuple["Store", list[SignedAggregatedAttestation]]: """ Advance store time by one interval and perform interval-specific actions. @@ -1042,11 +1027,12 @@ def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Sto is_aggregator: Whether the node is an aggregator. Returns: - New Store with advanced time and interval-specific updates applied. + Tuple of (new Store with advanced time, list of new SignedAggregatedAttestation). """ # Advance time by one interval store = self.model_copy(update={"time": self.time + Uint64(1)}) current_interval = store.time % INTERVALS_PER_SLOT + new_aggregates: list[SignedAggregatedAttestation] = [] if current_interval == Uint64(0): # Start of slot - process attestations if proposal exists @@ -1055,7 +1041,7 @@ def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Sto elif current_interval == Uint64(2): # Aggregation interval - aggregators create proofs if is_aggregator: - store = store.aggregate_committee_signatures() + store, new_aggregates = store.aggregate_committee_signatures() elif current_interval == Uint64(3): # Fast confirm - update safe target based on received proofs store = store.update_safe_target() @@ -1063,9 +1049,11 @@ def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Sto # End of slot - accept accumulated attestations store = store.accept_new_attestations() - return store + return store, new_aggregates - def on_tick(self, time: Uint64, has_proposal: bool, is_aggregator: bool = False) -> "Store": + def on_tick( + self, time: Uint64, has_proposal: bool, is_aggregator: bool = False + ) -> tuple["Store", list[SignedAggregatedAttestation]]: """ Advance forkchoice store time to given timestamp. @@ -1079,7 +1067,7 @@ def on_tick(self, time: Uint64, has_proposal: bool, is_aggregator: bool = False) is_aggregator: Whether the node is an aggregator. Returns: - New Store with time advanced and all interval actions performed. + Tuple of (new Store with time advanced, list of all produced SignedAggregatedAttestation). """ # Calculate target time in intervals time_delta_ms = (time - self.config.genesis_time) * Uint64(1000) @@ -1087,14 +1075,16 @@ def on_tick(self, time: Uint64, has_proposal: bool, is_aggregator: bool = False) # Tick forward one interval at a time store = self + all_new_aggregates: list[SignedAggregatedAttestation] = [] while store.time < tick_interval_time: # Check if proposal should be signaled for next interval should_signal_proposal = has_proposal and (store.time + Uint64(1)) == tick_interval_time # Advance by one interval with appropriate signaling - store = store.tick_interval(should_signal_proposal, is_aggregator) + store, new_aggregates = store.tick_interval(should_signal_proposal, is_aggregator) + all_new_aggregates.extend(new_aggregates) - return store + return store, all_new_aggregates def get_proposal_head(self, slot: Slot) -> tuple["Store", Bytes32]: """ @@ -1122,7 +1112,7 @@ def get_proposal_head(self, slot: Slot) -> tuple["Store", Bytes32]: slot_time = self.config.genesis_time + slot_duration_seconds # Advance time to current slot (ticking intervals) - store = self.on_tick(slot_time, True) + store, _ = self.on_tick(slot_time, True) # Process any pending attestations before proposal store = store.accept_new_attestations() @@ -1168,8 +1158,11 @@ def get_attestation_target(self) -> Checkpoint: # # This ensures the target doesn't advance too far ahead of safe target, # providing a balance between liveness and safety. + # + # MODIFIED: We allow the target to be up to 1 slot ahead of safe_target + # to ensure the chain can actually start advancing from genesis. for _ in range(JUSTIFICATION_LOOKBACK_SLOTS): - if self.blocks[target_block_root].slot > self.blocks[self.safe_target].slot: + if self.blocks[target_block_root].slot > self.blocks[self.safe_target].slot + Slot(1): target_block_root = self.blocks[target_block_root].parent_root else: break @@ -1186,7 +1179,7 @@ def get_attestation_target(self) -> Checkpoint: # Create checkpoint from selected target block target_block = self.blocks[target_block_root] - return Checkpoint(root=hash_tree_root(target_block), slot=target_block.slot) + return Checkpoint(root=target_block_root, slot=target_block.slot) def produce_attestation_data(self, slot: Slot) -> AttestationData: """ @@ -1293,7 +1286,7 @@ def produce_block_with_signatures( # # The builder iteratively collects valid attestations. # It returns the final block, post-state, and signature proofs. - final_block, final_post_state, _, signatures = head_state.build_block( + final_block, final_post_state, collected_attestations, signatures = head_state.build_block( slot=slot, proposer_index=validator_index, parent_root=head_root, diff --git a/src/lean_spec/subspecs/networking/service/service.py b/src/lean_spec/subspecs/networking/service/service.py index 193c64f1..4f617ae6 100644 --- a/src/lean_spec/subspecs/networking/service/service.py +++ b/src/lean_spec/subspecs/networking/service/service.py @@ -28,12 +28,13 @@ from lean_spec.snappy import frame_compress from lean_spec.subspecs.containers import SignedBlockWithAttestation -from lean_spec.subspecs.containers.attestation import SignedAttestation +from lean_spec.subspecs.containers.attestation import SignedAggregatedAttestation, SignedAttestation from lean_spec.subspecs.networking.gossipsub.topic import GossipTopic from lean_spec.subspecs.networking.peer.info import PeerInfo from lean_spec.subspecs.networking.types import ConnectionState from .events import ( + GossipAggregatedAttestationEvent, GossipAttestationEvent, GossipBlockEvent, NetworkEvent, @@ -146,14 +147,22 @@ async def _handle_event(self, event: NetworkEvent) -> None: ) await self.sync_service.on_gossip_block(block, peer_id) - case GossipAttestationEvent(attestation=attestation, peer_id=peer_id): + case GossipAttestationEvent(attestation=attestation, peer_id=peer_id, topic=topic): # # SyncService will validate signature and update forkchoice. await self.sync_service.on_gossip_attestation( attestation=attestation, + subnet_id=topic.subnet_id or 0, peer_id=peer_id, ) + case GossipAggregatedAttestationEvent(signed_attestation=att, peer_id=peer_id): + # Route aggregated attestations to sync service. + # + # Aggregates contain multiple validator votes and are used + # to advance justification and finalization. + await self.sync_service.on_gossip_aggregated_attestation(att, peer_id) + case PeerStatusEvent(peer_id=peer_id, status=status): # Route peer status updates to sync service. # @@ -234,3 +243,21 @@ async def publish_attestation(self, attestation: SignedAttestation, subnet_id: i await self.event_source.publish(str(topic), compressed) logger.debug("Published attestation for slot %s", attestation.message.slot) + + async def publish_aggregated_attestation( + self, signed_attestation: SignedAggregatedAttestation + ) -> None: + """ + Publish an aggregated attestation to the aggregation gossip topic. + + Args: + signed_attestation: Aggregated attestation to publish. + """ + topic = GossipTopic.committee_aggregation(self.fork_digest) + ssz_bytes = signed_attestation.encode_bytes() + compressed = frame_compress(ssz_bytes) + + await self.event_source.publish(str(topic), compressed) + logger.debug( + "Published aggregated attestation for slot %s", signed_attestation.data.slot + ) diff --git a/src/lean_spec/subspecs/node/node.py b/src/lean_spec/subspecs/node/node.py index eb245c1f..1a339ead 100644 --- a/src/lean_spec/subspecs/node/node.py +++ b/src/lean_spec/subspecs/node/node.py @@ -22,7 +22,7 @@ from lean_spec.subspecs.chain import SlotClock from lean_spec.subspecs.chain.config import ATTESTATION_COMMITTEE_COUNT, INTERVALS_PER_SLOT from lean_spec.subspecs.chain.service import ChainService -from lean_spec.subspecs.containers import Block, BlockBody, State +from lean_spec.subspecs.containers import Block, BlockBody, SignedBlockWithAttestation, State from lean_spec.subspecs.containers.attestation import SignedAttestation from lean_spec.subspecs.containers.block.types import AggregatedAttestations from lean_spec.subspecs.containers.slot import Slot @@ -240,6 +240,13 @@ def from_genesis(cls, config: NodeConfig) -> Node: is_aggregator=config.is_aggregator, ) + # Wire up aggregated attestation publishing. + # + # ReqRespClient implements NetworkRequester which SyncService uses + # to publish aggregates. We route these to NetworkService. + if hasattr(config.network, "set_publish_agg_fn"): + config.network.set_publish_agg_fn(network_service.publish_aggregated_attestation) + # Create API server if configured api_server: ApiServer | None = None if config.api_config is not None: @@ -262,12 +269,19 @@ def from_genesis(cls, config: NodeConfig) -> Node: async def publish_attestation_wrapper(attestation: SignedAttestation) -> None: subnet_id = attestation.validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT) await network_service.publish_attestation(attestation, subnet_id) + # Also route locally so we can aggregate our own attestation + await sync_service.on_gossip_attestation(attestation, subnet_id, peer_id=None) + + async def publish_block_wrapper(block: SignedBlockWithAttestation) -> None: + await network_service.publish_block(block) + # Also route locally so we update our own store + await sync_service.on_gossip_block(block, peer_id=None) validator_service = ValidatorService( sync_service=sync_service, clock=clock, registry=config.validator_registry, - on_block=network_service.publish_block, + on_block=publish_block_wrapper, on_attestation=publish_attestation_wrapper, ) diff --git a/src/lean_spec/subspecs/sync/service.py b/src/lean_spec/subspecs/sync/service.py index e210d16f..5ba35002 100644 --- a/src/lean_spec/subspecs/sync/service.py +++ b/src/lean_spec/subspecs/sync/service.py @@ -46,6 +46,7 @@ from lean_spec.subspecs.chain.clock import SlotClock from lean_spec.subspecs.containers import ( Block, + SignedAggregatedAttestation, SignedAttestation, SignedBlockWithAttestation, ) @@ -402,7 +403,8 @@ async def on_gossip_block( async def on_gossip_attestation( self, attestation: SignedAttestation, - peer_id: PeerId, # noqa: ARG002 + subnet_id: int, + peer_id: PeerId | None = None, ) -> None: """ Handle attestation received via gossip. @@ -416,7 +418,8 @@ async def on_gossip_attestation( Args: attestation: The signed attestation received. - peer_id: The peer that propagated the attestation (unused for now). + subnet_id: Subnet ID the attestation was received on. + peer_id: The peer that propagated the attestation (optional). """ # Guard: Only process gossip in states that accept it. # @@ -454,6 +457,45 @@ async def on_gossip_attestation( # These are expected during normal operation and don't indicate bugs. pass + async def on_gossip_aggregated_attestation( + self, + signed_attestation: SignedAggregatedAttestation, + peer_id: PeerId, # noqa: ARG002 + ) -> None: + """ + Handle aggregated attestation received via gossip. + + Aggregated attestations are collections of individual votes for the same + target, signed by an aggregator. They provide efficient propagation of + consensus weight. + + Args: + signed_attestation: The signed aggregated attestation received. + peer_id: The peer that propagated the aggregate (unused for now). + """ + if not self._state.accepts_gossip: + return + + try: + self.store = self.store.on_gossip_aggregated_attestation(signed_attestation) + except (AssertionError, KeyError): + # Aggregation validation failed. + pass + + async def publish_aggregated_attestation( + self, + signed_attestation: SignedAggregatedAttestation, + ) -> None: + """ + Publish an aggregated attestation to the network. + + Called by the chain service when this node acts as an aggregator. + + Args: + signed_attestation: The aggregate to publish. + """ + await self.network.publish_aggregated_attestation(signed_attestation) + async def start_sync(self) -> None: """ Start or resume synchronization. diff --git a/src/lean_spec/subspecs/validator/service.py b/src/lean_spec/subspecs/validator/service.py index 217ed724..e01f3c01 100644 --- a/src/lean_spec/subspecs/validator/service.py +++ b/src/lean_spec/subspecs/validator/service.py @@ -323,7 +323,10 @@ async def _produce_attestations(self, slot: Slot) -> None: Args: slot: Current slot number. """ + # Ensure we are attesting to the latest known head + self.sync_service.store = self.sync_service.store.update_head() store = self.sync_service.store + head_state = store.states.get(store.head) if head_state is None: return diff --git a/tests/interop/helpers/node_runner.py b/tests/interop/helpers/node_runner.py index 691cc2af..eebe910b 100644 --- a/tests/interop/helpers/node_runner.py +++ b/tests/interop/helpers/node_runner.py @@ -12,6 +12,7 @@ from dataclasses import dataclass, field from typing import TYPE_CHECKING, cast +from lean_spec.subspecs.chain.config import ATTESTATION_COMMITTEE_COUNT from lean_spec.subspecs.containers import Checkpoint, Validator from lean_spec.subspecs.containers.state import Validators from lean_spec.subspecs.containers.validator import ValidatorIndex @@ -237,6 +238,7 @@ async def start_node( self, node_index: int, validator_indices: list[int] | None = None, + is_aggregator: bool = False, bootnodes: list[str] | None = None, *, start_services: bool = True, @@ -247,6 +249,7 @@ async def start_node( Args: node_index: Index for this node (for logging/identification). validator_indices: Which validators this node controls. + is_aggregator: Whether this node is aggregator bootnodes: Addresses to connect to on startup. start_services: If True, start the node's services immediately. If False, call test_node.start() manually after mesh is stable. @@ -285,6 +288,7 @@ async def start_node( api_config=None, # Disable API server for interop tests (not needed for P2P testing) validator_registry=validator_registry, fork_digest=self.fork_digest, + is_aggregator=is_aggregator, ) node = Node.from_genesis(config) @@ -353,9 +357,20 @@ async def start_node( await event_source.start_gossipsub() block_topic = f"/leanconsensus/{self.fork_digest}/block/ssz_snappy" - attestation_topic = f"/leanconsensus/{self.fork_digest}/attestation/ssz_snappy" + aggregation_topic = f"/leanconsensus/{self.fork_digest}/aggregation/ssz_snappy" event_source.subscribe_gossip_topic(block_topic) - event_source.subscribe_gossip_topic(attestation_topic) + event_source.subscribe_gossip_topic(aggregation_topic) + + # Determine subnets for our validators and subscribe. + # + # Validators only subscribe to the subnets they are assigned to. + # This matches the Ethereum gossip specification. + if validator_indices: + from lean_spec.subspecs.chain.config import ATTESTATION_COMMITTEE_COUNT + for idx in validator_indices: + subnet_id = idx % int(ATTESTATION_COMMITTEE_COUNT) + topic = f"/leanconsensus/{self.fork_digest}/attestation_{subnet_id}/ssz_snappy" + event_source.subscribe_gossip_topic(topic) # Optionally start the node's services. # @@ -411,9 +426,20 @@ async def start_all( # This allows the gossipsub mesh to form before validators start # producing blocks and attestations. Otherwise, early blocks/attestations # would be "Published message to 0 peers" because the mesh is empty. + aggregator_indices = set(range(int(ATTESTATION_COMMITTEE_COUNT))) for i in range(num_nodes): validator_indices = validators_per_node[i] if i < len(validators_per_node) else [] - await self.start_node(i, validator_indices, start_services=False) + + # A node is an aggregator if it controls any of the first + # ATTESTATION_COMMITTEE_COUNT validators. + is_node_aggregator = any(vid in aggregator_indices for vid in validator_indices) + + await self.start_node( + i, + validator_indices, + is_aggregator=is_node_aggregator, + start_services=False, + ) # Stagger node startup like Ream does. # diff --git a/tests/lean_spec/helpers/builders.py b/tests/lean_spec/helpers/builders.py index 9c7e2f56..5766379b 100644 --- a/tests/lean_spec/helpers/builders.py +++ b/tests/lean_spec/helpers/builders.py @@ -526,7 +526,7 @@ def make_signed_block_from_store( slot_duration = block.slot * SECONDS_PER_SLOT block_time = store.config.genesis_time + slot_duration - advanced_store = store.on_tick(block_time, has_proposal=True) + advanced_store, _ = store.on_tick(block_time, has_proposal=True) return advanced_store, signed_block diff --git a/tests/lean_spec/subspecs/chain/test_service.py b/tests/lean_spec/subspecs/chain/test_service.py index 8e2e9ac4..1406bf26 100644 --- a/tests/lean_spec/subspecs/chain/test_service.py +++ b/tests/lean_spec/subspecs/chain/test_service.py @@ -28,7 +28,9 @@ class MockStore: head: Bytes32 = field(default_factory=lambda: ZERO_HASH) latest_finalized: MockCheckpoint = field(default_factory=MockCheckpoint) - def on_tick(self, time: Uint64, has_proposal: bool) -> MockStore: + def on_tick( + self, time: Uint64, has_proposal: bool, is_aggregator: bool = False + ) -> tuple[MockStore, list]: """Record the tick call and return a new store.""" new_store = MockStore( time=time, @@ -37,7 +39,7 @@ def on_tick(self, time: Uint64, has_proposal: bool) -> MockStore: latest_finalized=self.latest_finalized, ) new_store.tick_calls.append((time, has_proposal)) - return new_store + return new_store, [] @dataclass @@ -45,6 +47,7 @@ class MockSyncService: """Mock sync service for testing ChainService.""" store: MockStore = field(default_factory=MockStore) + is_aggregator: bool = False class TestChainServiceLifecycle: diff --git a/tests/lean_spec/subspecs/forkchoice/test_attestation_target.py b/tests/lean_spec/subspecs/forkchoice/test_attestation_target.py index 43f72e05..15c7ceef 100644 --- a/tests/lean_spec/subspecs/forkchoice/test_attestation_target.py +++ b/tests/lean_spec/subspecs/forkchoice/test_attestation_target.py @@ -162,7 +162,7 @@ def test_safe_target_requires_supermajority( store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) # Aggregate the signatures - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() # Update safe target (uses latest_new_aggregated_payloads) store = store.update_safe_target() @@ -206,7 +206,7 @@ def test_safe_target_advances_with_supermajority( store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) # Aggregate the signatures - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() # Update safe target store = store.update_safe_target() @@ -246,7 +246,7 @@ def test_update_safe_target_uses_new_attestations( store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) # Aggregate into new payloads - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() # Update safe target should use new aggregated payloads store = store.update_safe_target() @@ -300,7 +300,7 @@ def test_justification_with_supermajority_attestations( store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) # Aggregate signatures before producing the next block - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() # Produce block 2 which includes these attestations store, block_2, signatures = store.produce_block_with_signatures(slot_2, proposer_2) @@ -381,7 +381,7 @@ def test_justification_tracking_with_multiple_targets( ) store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() store = store.update_safe_target() # Neither target should be justified with only half validators @@ -526,7 +526,7 @@ def test_full_attestation_cycle( store = store.on_gossip_attestation(signed_attestation, is_aggregator=True) # Phase 3: Aggregate signatures into payloads - store = store.aggregate_committee_signatures() + store, _ = store.aggregate_committee_signatures() # Phase 4: Update safe target store = store.update_safe_target() @@ -591,7 +591,7 @@ def test_attestation_target_after_on_block( # Process block via on_block on a fresh consumer store consumer_store = observer_store block_time = consumer_store.config.genesis_time + block.slot * Uint64(SECONDS_PER_SLOT) - consumer_store = consumer_store.on_tick(block_time, has_proposal=True) + consumer_store, _ = consumer_store.on_tick(block_time, has_proposal=True) consumer_store = consumer_store.on_block(signed_block) # Get attestation target after on_block diff --git a/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py b/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py index f02f73d4..e0044c3e 100644 --- a/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py +++ b/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py @@ -509,7 +509,7 @@ def test_aggregates_gossip_signatures_into_proof(self, key_manager: XmssKeyManag ) # Perform aggregation - updated_store = store.aggregate_committee_signatures() + updated_store, _ = store.aggregate_committee_signatures() # Verify proofs were created and stored data_root = attestation_data.data_root_bytes() @@ -537,7 +537,7 @@ def test_aggregated_proof_is_valid(self, key_manager: XmssKeyManager) -> None: attesting_validators=attesting_validators, ) - updated_store = store.aggregate_committee_signatures() + updated_store, _ = store.aggregate_committee_signatures() data_root = attestation_data.data_root_bytes() sig_key = SignatureKey(ValidatorIndex(1), data_root) @@ -567,7 +567,7 @@ def test_empty_gossip_signatures_produces_no_proofs(self, key_manager: XmssKeyMa attesting_validators=[], # No attesters ) - updated_store = store.aggregate_committee_signatures() + updated_store, _ = store.aggregate_committee_signatures() # Verify no proofs were created assert len(updated_store.latest_new_aggregated_payloads) == 0 @@ -619,7 +619,7 @@ def test_multiple_attestation_data_grouped_separately( } ) - updated_store = store.aggregate_committee_signatures() + updated_store, _ = store.aggregate_committee_signatures() # Verify both validators have separate proofs sig_key_1 = SignatureKey(ValidatorIndex(1), data_root_1) @@ -661,7 +661,7 @@ def test_interval_2_triggers_aggregation_for_aggregator( store = store.model_copy(update={"time": Uint64(1)}) # Tick to interval 2 as aggregator - updated_store = store.tick_interval(has_proposal=False, is_aggregator=True) + updated_store, _ = store.tick_interval(has_proposal=False, is_aggregator=True) # Verify aggregation was performed data_root = attestation_data.data_root_bytes() @@ -692,7 +692,7 @@ def test_interval_2_skips_aggregation_for_non_aggregator( store = store.model_copy(update={"time": Uint64(1)}) # Tick to interval 2 as NON-aggregator - updated_store = store.tick_interval(has_proposal=False, is_aggregator=False) + updated_store, _ = store.tick_interval(has_proposal=False, is_aggregator=False) # Verify aggregation was NOT performed data_root = attestation_data.data_root_bytes() @@ -728,13 +728,12 @@ def test_other_intervals_do_not_trigger_aggregation(self, key_manager: XmssKeyMa # After tick, time becomes time+1, and interval = (time+1) % 5 # So we need time+1 % 5 == target_interval # Therefore time = target_interval - 1 (mod 5) - pre_tick_time = (target_interval - 1) % int(INTERVALS_PER_SLOT) - test_store = store.model_copy(update={"time": Uint64(pre_tick_time)}) - - updated_store = test_store.tick_interval(has_proposal=False, is_aggregator=True) - - assert sig_key not in updated_store.latest_new_aggregated_payloads, ( - f"Aggregation should NOT occur at interval {target_interval}" + pre_tick_time = (target_interval - 1) % int(INTERVALS_PER_SLOT) + test_store = store.model_copy(update={"time": Uint64(pre_tick_time)}) + + updated_store, _ = test_store.tick_interval(has_proposal=False, is_aggregator=True) + + assert sig_key not in updated_store.latest_new_aggregated_payloads, ( f"Aggregation should NOT occur at interval {target_interval}" ) def test_interval_0_accepts_attestations_with_proposal( @@ -754,7 +753,7 @@ def test_interval_0_accepts_attestations_with_proposal( store = store.model_copy(update={"time": Uint64(4)}) # Tick to interval 0 with proposal - updated_store = store.tick_interval(has_proposal=True, is_aggregator=True) + updated_store, _ = store.tick_interval(has_proposal=True, is_aggregator=True) # Verify time advanced assert updated_store.time == Uint64(5) @@ -810,12 +809,11 @@ def test_gossip_to_aggregation_to_storage(self, key_manager: XmssKeyManager) -> sig_key = SignatureKey(vid, data_root) assert sig_key in store.gossip_signatures, f"Signature for {vid} should be stored" - # Step 2: Advance to interval 2 (aggregation interval) - store = store.model_copy(update={"time": Uint64(1)}) - store = store.tick_interval(has_proposal=False, is_aggregator=True) - - # Step 3: Verify aggregated proofs were created - for vid in attesting_validators: + # Step 2: Advance to interval 2 (aggregation interval) + store = store.model_copy(update={"time": Uint64(1)}) + store, _ = store.tick_interval(has_proposal=False, is_aggregator=True) + + # Step 3: Verify aggregated proofs were created for vid in attesting_validators: sig_key = SignatureKey(vid, data_root) assert sig_key in store.latest_new_aggregated_payloads, ( f"Aggregated proof for {vid} should exist after interval 2" diff --git a/tests/lean_spec/subspecs/forkchoice/test_time_management.py b/tests/lean_spec/subspecs/forkchoice/test_time_management.py index 228f1b82..399739f1 100644 --- a/tests/lean_spec/subspecs/forkchoice/test_time_management.py +++ b/tests/lean_spec/subspecs/forkchoice/test_time_management.py @@ -54,7 +54,7 @@ def test_on_tick_basic(self, sample_store: Store) -> None: initial_time = sample_store.time target_time = sample_store.config.genesis_time + Uint64(200) # Much later time - sample_store = sample_store.on_tick(target_time, has_proposal=True) + sample_store, _ = sample_store.on_tick(target_time, has_proposal=True) # Time should advance assert sample_store.time > initial_time @@ -64,7 +64,7 @@ def test_on_tick_no_proposal(self, sample_store: Store) -> None: initial_time = sample_store.time target_time = sample_store.config.genesis_time + Uint64(100) - sample_store = sample_store.on_tick(target_time, has_proposal=False) + sample_store, _ = sample_store.on_tick(target_time, has_proposal=False) # Time should still advance assert sample_store.time >= initial_time @@ -75,7 +75,7 @@ def test_on_tick_already_current(self, sample_store: Store) -> None: current_target = sample_store.config.genesis_time + initial_time # Try to advance to current time (should be no-op) - sample_store = sample_store.on_tick(current_target, has_proposal=True) + sample_store, _ = sample_store.on_tick(current_target, has_proposal=True) # Should not change significantly (time can only increase) # Tolerance increased for 5-interval per slot system @@ -86,7 +86,7 @@ def test_on_tick_small_increment(self, sample_store: Store) -> None: initial_time = sample_store.time target_time = sample_store.config.genesis_time + initial_time + Uint64(1) - sample_store = sample_store.on_tick(target_time, has_proposal=False) + sample_store, _ = sample_store.on_tick(target_time, has_proposal=False) # Should advance by small amount assert sample_store.time >= initial_time @@ -100,7 +100,7 @@ def test_tick_interval_basic(self, sample_store: Store) -> None: initial_time = sample_store.time # Tick one interval forward - sample_store = sample_store.tick_interval(has_proposal=False) + sample_store, _ = sample_store.tick_interval(has_proposal=False) # Time should advance by one interval assert sample_store.time == initial_time + Uint64(1) @@ -109,7 +109,7 @@ def test_tick_interval_with_proposal(self, sample_store: Store) -> None: """Test interval ticking with proposal.""" initial_time = sample_store.time - sample_store = sample_store.tick_interval(has_proposal=True) + sample_store, _ = sample_store.tick_interval(has_proposal=True) # Time should advance assert sample_store.time == initial_time + Uint64(1) @@ -120,7 +120,7 @@ def test_tick_interval_sequence(self, sample_store: Store) -> None: # Tick multiple intervals for i in range(5): - sample_store = sample_store.tick_interval(has_proposal=(i % 2 == 0)) + sample_store, _ = sample_store.tick_interval(has_proposal=(i % 2 == 0)) # Should have advanced by 5 intervals assert sample_store.time == initial_time + Uint64(5) @@ -136,7 +136,7 @@ def test_tick_interval_actions_by_phase(self, sample_store: Store) -> None: # Tick through a complete slot cycle for interval in range(INTERVALS_PER_SLOT): has_proposal = interval == 0 # Proposal only in first interval - sample_store = sample_store.tick_interval(has_proposal=has_proposal) + sample_store, _ = sample_store.tick_interval(has_proposal=has_proposal) current_interval = sample_store.time % INTERVALS_PER_SLOT expected_interval = Uint64((interval + 1)) % INTERVALS_PER_SLOT From a72c8de2d6b0efb3ca7da49ca9de4c3826254d28 Mon Sep 17 00:00:00 2001 From: kamilsa Date: Wed, 11 Feb 2026 18:40:07 +0500 Subject: [PATCH 02/16] fix: wire aggregated attestation broadcast through network pipeline The aggregated attestation pipeline was broken at multiple points, preventing finalization in multi-node setups: - Add missing GossipAggregatedAttestationEvent to network events - Add AGGREGATED_ATTESTATION decoding and dispatch in event sources - Fix SyncService.publish_aggregated_attestation to use a callback instead of a missing method on NetworkRequester - Wire publish callback directly in Node.from_genesis - Publish aggregates from ChainService._initial_tick (was discarded) - Enable test_late_joiner_sync with is_aggregator=True on node 0 --- src/lean_spec/subspecs/chain/service.py | 8 +++- .../networking/client/event_source.py | 38 ++++++++++++++++++- .../subspecs/networking/service/events.py | 22 ++++++++++- src/lean_spec/subspecs/node/node.py | 7 ++-- src/lean_spec/subspecs/sync/service.py | 15 ++++++-- tests/interop/test_late_joiner.py | 3 +- 6 files changed, 80 insertions(+), 13 deletions(-) diff --git a/src/lean_spec/subspecs/chain/service.py b/src/lean_spec/subspecs/chain/service.py index 475f3c3d..16ab4d3f 100644 --- a/src/lean_spec/subspecs/chain/service.py +++ b/src/lean_spec/subspecs/chain/service.py @@ -168,12 +168,18 @@ async def _initial_tick(self) -> Interval | None: # Only tick if we're past genesis. if current_time >= self.clock.genesis_time: - new_store, _ = self.sync_service.store.on_tick( + new_store, new_aggregated_attestations = self.sync_service.store.on_tick( time=current_time, has_proposal=False, is_aggregator=self.sync_service.is_aggregator, ) self.sync_service.store = new_store + + # Publish any aggregated attestations produced during catch-up. + if new_aggregated_attestations: + for agg in new_aggregated_attestations: + await self.sync_service.publish_aggregated_attestation(agg) + return self.clock.total_intervals() return None diff --git a/src/lean_spec/subspecs/networking/client/event_source.py b/src/lean_spec/subspecs/networking/client/event_source.py index a4973742..8be72b9f 100644 --- a/src/lean_spec/subspecs/networking/client/event_source.py +++ b/src/lean_spec/subspecs/networking/client/event_source.py @@ -106,7 +106,7 @@ from lean_spec.snappy import SnappyDecompressionError, frame_decompress from lean_spec.subspecs.containers import SignedBlockWithAttestation -from lean_spec.subspecs.containers.attestation import SignedAttestation +from lean_spec.subspecs.containers.attestation import SignedAggregatedAttestation, SignedAttestation from lean_spec.subspecs.networking.config import ( GOSSIPSUB_DEFAULT_PROTOCOL_ID, GOSSIPSUB_PROTOCOL_ID_V12, @@ -130,6 +130,7 @@ ) from lean_spec.subspecs.networking.reqresp.message import Status from lean_spec.subspecs.networking.service.events import ( + GossipAggregatedAttestationEvent, GossipAttestationEvent, GossipBlockEvent, NetworkEvent, @@ -324,7 +325,7 @@ def decode_message( self, topic_str: str, compressed_data: bytes, - ) -> SignedBlockWithAttestation | SignedAttestation | None: + ) -> SignedBlockWithAttestation | SignedAttestation | SignedAggregatedAttestation | None: """ Decode a gossip message from topic and compressed data. @@ -392,6 +393,8 @@ def decode_message( return SignedBlockWithAttestation.decode_bytes(ssz_bytes) case TopicKind.ATTESTATION_SUBNET: return SignedAttestation.decode_bytes(ssz_bytes) + case TopicKind.AGGREGATED_ATTESTATION: + return SignedAggregatedAttestation.decode_bytes(ssz_bytes) except SSZSerializationError as e: raise GossipMessageError(f"SSZ decode failed: {e}") from e @@ -818,6 +821,9 @@ async def _handle_gossipsub_message(self, event: GossipsubMessageEvent) -> None: case TopicKind.ATTESTATION_SUBNET: if isinstance(message, SignedAttestation): await self._emit_gossip_attestation(message, event.peer_id) + case TopicKind.AGGREGATED_ATTESTATION: + if isinstance(message, SignedAggregatedAttestation): + await self._emit_gossip_aggregated_attestation(message, event.peer_id) logger.debug("Processed gossipsub message %s from %s", topic.kind.value, event.peer_id) @@ -1170,6 +1176,25 @@ async def _emit_gossip_attestation( GossipAttestationEvent(attestation=attestation, peer_id=peer_id, topic=topic) ) + async def _emit_gossip_aggregated_attestation( + self, + signed_attestation: SignedAggregatedAttestation, + peer_id: PeerId, + ) -> None: + """ + Emit a gossip aggregated attestation event. + + Args: + signed_attestation: Aggregated attestation received from gossip. + peer_id: Peer that sent it. + """ + topic = GossipTopic(kind=TopicKind.AGGREGATED_ATTESTATION, fork_digest=self._fork_digest) + await self._events.put( + GossipAggregatedAttestationEvent( + signed_attestation=signed_attestation, peer_id=peer_id, topic=topic + ) + ) + async def _accept_streams(self, peer_id: PeerId, conn: QuicConnection) -> None: """ Accept incoming streams from a connection. @@ -1466,6 +1491,15 @@ async def _handle_gossip_stream(self, peer_id: PeerId, stream: Stream) -> None: # Type mismatch indicates a bug in decode_message. logger.warning("Attestation topic but got %s", type(message).__name__) + case TopicKind.AGGREGATED_ATTESTATION: + if isinstance(message, SignedAggregatedAttestation): + await self._emit_gossip_aggregated_attestation(message, peer_id) + else: + logger.warning( + "Aggregated attestation topic but got %s", + type(message).__name__, + ) + logger.debug("Received gossip %s from %s", topic.kind.value, peer_id) except GossipMessageError as e: diff --git a/src/lean_spec/subspecs/networking/service/events.py b/src/lean_spec/subspecs/networking/service/events.py index 91b6334b..38c4dcb9 100644 --- a/src/lean_spec/subspecs/networking/service/events.py +++ b/src/lean_spec/subspecs/networking/service/events.py @@ -26,7 +26,7 @@ from typing import Protocol, runtime_checkable from lean_spec.subspecs.containers import SignedBlockWithAttestation -from lean_spec.subspecs.containers.attestation import SignedAttestation +from lean_spec.subspecs.containers.attestation import SignedAggregatedAttestation, SignedAttestation from lean_spec.subspecs.networking.gossipsub.topic import GossipTopic from lean_spec.subspecs.networking.reqresp.message import Status from lean_spec.subspecs.networking.transport import PeerId @@ -69,6 +69,25 @@ class GossipAttestationEvent: """Topic the attestation was received on (includes fork digest).""" +@dataclass(frozen=True, slots=True) +class GossipAggregatedAttestationEvent: + """ + Aggregated attestation received via gossip subscription. + + Fired when a signed aggregated attestation arrives from the gossipsub network. + Aggregates contain multiple validator votes combined into a single proof. + """ + + signed_attestation: SignedAggregatedAttestation + """The signed aggregated attestation.""" + + peer_id: PeerId + """Peer that propagated this aggregated attestation to us.""" + + topic: GossipTopic + """Topic the aggregated attestation was received on.""" + + @dataclass(frozen=True, slots=True) class PeerStatusEvent: """ @@ -115,6 +134,7 @@ class PeerDisconnectedEvent: NetworkEvent = ( GossipBlockEvent | GossipAttestationEvent + | GossipAggregatedAttestationEvent | PeerStatusEvent | PeerConnectedEvent | PeerDisconnectedEvent diff --git a/src/lean_spec/subspecs/node/node.py b/src/lean_spec/subspecs/node/node.py index 1a339ead..1007f105 100644 --- a/src/lean_spec/subspecs/node/node.py +++ b/src/lean_spec/subspecs/node/node.py @@ -242,10 +242,9 @@ def from_genesis(cls, config: NodeConfig) -> Node: # Wire up aggregated attestation publishing. # - # ReqRespClient implements NetworkRequester which SyncService uses - # to publish aggregates. We route these to NetworkService. - if hasattr(config.network, "set_publish_agg_fn"): - config.network.set_publish_agg_fn(network_service.publish_aggregated_attestation) + # SyncService delegates aggregate publishing to NetworkService + # via a callback, avoiding a circular dependency. + sync_service._publish_agg_fn = network_service.publish_aggregated_attestation # Create API server if configured api_server: ApiServer | None = None diff --git a/src/lean_spec/subspecs/sync/service.py b/src/lean_spec/subspecs/sync/service.py index 5ba35002..b0dcd7a3 100644 --- a/src/lean_spec/subspecs/sync/service.py +++ b/src/lean_spec/subspecs/sync/service.py @@ -38,9 +38,9 @@ import asyncio import logging -from collections.abc import Callable +from collections.abc import Callable, Coroutine from dataclasses import dataclass, field -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from lean_spec.subspecs import metrics from lean_spec.subspecs.chain.clock import SlotClock @@ -68,6 +68,8 @@ BlockProcessor = Callable[[Store, SignedBlockWithAttestation], Store] +PublishAggFn = Callable[[SignedAggregatedAttestation], Coroutine[Any, Any, None]] + def default_block_processor( store: Store, @@ -77,6 +79,10 @@ def default_block_processor( return store.on_block(block) +async def _noop_publish_agg(signed_attestation: SignedAggregatedAttestation) -> None: + """No-op default for aggregated attestation publishing.""" + + @dataclass(slots=True) class SyncProgress: """ @@ -157,6 +163,9 @@ class SyncService: process_block: BlockProcessor = field(default=default_block_processor) """Block processor function. Defaults to Store.on_block().""" + _publish_agg_fn: PublishAggFn = field(default=_noop_publish_agg) + """Callback for publishing aggregated attestations to the network.""" + _state: SyncState = field(default=SyncState.IDLE) """Current sync state.""" @@ -494,7 +503,7 @@ async def publish_aggregated_attestation( Args: signed_attestation: The aggregate to publish. """ - await self.network.publish_aggregated_attestation(signed_attestation) + await self._publish_agg_fn(signed_attestation) async def start_sync(self) -> None: """ diff --git a/tests/interop/test_late_joiner.py b/tests/interop/test_late_joiner.py index 343a1f63..53c7c919 100644 --- a/tests/interop/test_late_joiner.py +++ b/tests/interop/test_late_joiner.py @@ -24,7 +24,6 @@ pytestmark = pytest.mark.interop -@pytest.mark.skip(reason="Interop test not passing - needs update (#359)") @pytest.mark.timeout(240) @pytest.mark.num_validators(3) async def test_late_joiner_sync(node_cluster: NodeCluster) -> None: @@ -36,7 +35,7 @@ async def test_late_joiner_sync(node_cluster: NodeCluster) -> None: """ validators_per_node = [[0], [1], [2]] - await node_cluster.start_node(0, validators_per_node[0]) + await node_cluster.start_node(0, validators_per_node[0], is_aggregator=True) await node_cluster.start_node(1, validators_per_node[1]) node0 = node_cluster.nodes[0] From 9aa20e9a98d57ea3e98f276dc97b96de295f8b85 Mon Sep 17 00:00:00 2001 From: kamilsa Date: Wed, 11 Feb 2026 18:50:00 +0500 Subject: [PATCH 03/16] fix: update fill framework and tests for new on_tick tuple return type - Unpack (store, aggregates) tuple from on_tick and aggregate_committee_signatures in fork choice fill framework - Update attestation target selection tests for the +1 safe_target allowance introduced in get_attestation_target --- .../test_fixtures/fork_choice.py | 8 +- .../fc/test_attestation_target_selection.py | 115 +++++++++--------- 2 files changed, 61 insertions(+), 62 deletions(-) diff --git a/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py b/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py index 0a0138b6..f48b74c1 100644 --- a/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py +++ b/packages/testing/src/consensus_testing/test_fixtures/fork_choice.py @@ -235,7 +235,9 @@ def make_fixture(self) -> Self: # Time advancement may trigger slot boundaries. # At slot boundaries, pending attestations may become active. # Always act as aggregator to ensure gossip signatures are aggregated - store = store.on_tick(Uint64(step.time), has_proposal=False, is_aggregator=True) + store, _ = store.on_tick( + Uint64(step.time), has_proposal=False, is_aggregator=True + ) elif isinstance(step, BlockStep): # Build a complete signed block from the lightweight spec. @@ -264,7 +266,7 @@ def make_fixture(self) -> Self: # Always act as aggregator to ensure gossip signatures are aggregated slot_duration_seconds = block.slot * SECONDS_PER_SLOT block_time = store.config.genesis_time + slot_duration_seconds - store = store.on_tick(block_time, has_proposal=True, is_aggregator=True) + store, _ = store.on_tick(block_time, has_proposal=True, is_aggregator=True) # Process the block through Store. # This validates, applies state transition, and updates head. @@ -408,7 +410,7 @@ def _build_block_from_spec( # First, aggregate any gossip signatures into payloads # This ensures that signatures from previous blocks (like proposer attestations) # are available for extraction - aggregation_store = working_store.aggregate_committee_signatures() + aggregation_store, _ = working_store.aggregate_committee_signatures() # Now combine aggregated payloads from both sources aggregated_payloads = ( diff --git a/tests/consensus/devnet/fc/test_attestation_target_selection.py b/tests/consensus/devnet/fc/test_attestation_target_selection.py index 8044a7fa..c4f6c994 100644 --- a/tests/consensus/devnet/fc/test_attestation_target_selection.py +++ b/tests/consensus/devnet/fc/test_attestation_target_selection.py @@ -17,24 +17,21 @@ def test_attestation_target_at_genesis_initially( fork_choice_test: ForkChoiceTestFiller, ) -> None: """ - Attestation target starts at genesis before safe target updates. + Attestation target starts near genesis before safe target updates. Scenario -------- Process two blocks at slots 1 and 2. Expected: - - After slot 1: target = slot 0 (genesis/finalized) - - After slot 2: target = slot 0 (genesis/finalized) - - Target root automatically validated against block at slot 0 + - After slot 1: target = slot 1 (1 slot ahead of safe target allowed) + - After slot 2: target = slot 1 (walkback stops at safe_target + 1) Why This Matters ---------------- - Initially, the safe target is at genesis (slot 0), so the attestation - target walks back from head to genesis. - - This conservative behavior ensures validators don't attest too far ahead - before there's sufficient attestation weight to advance the safe target. + Initially, the safe target is at genesis (slot 0). The attestation target + is allowed up to 1 slot ahead of safe target to ensure the chain can + start advancing from genesis. """ fork_choice_test( steps=[ @@ -42,14 +39,14 @@ def test_attestation_target_at_genesis_initially( block=BlockSpec(slot=Slot(1)), checks=StoreChecks( head_slot=Slot(1), - attestation_target_slot=Slot(0), + attestation_target_slot=Slot(1), ), ), BlockStep( block=BlockSpec(slot=Slot(2)), checks=StoreChecks( head_slot=Slot(2), - attestation_target_slot=Slot(0), # Still genesis + attestation_target_slot=Slot(1), ), ), ], @@ -87,35 +84,35 @@ def test_attestation_target_advances_with_attestations( block=BlockSpec(slot=Slot(1)), checks=StoreChecks( head_slot=Slot(1), - attestation_target_slot=Slot(0), # Still at genesis + attestation_target_slot=Slot(1), # 1 slot ahead of safe target allowed ), ), BlockStep( block=BlockSpec(slot=Slot(2)), checks=StoreChecks( head_slot=Slot(2), - attestation_target_slot=Slot(0), # Still at genesis + attestation_target_slot=Slot(1), # Walks back to safe_target + 1 ), ), BlockStep( block=BlockSpec(slot=Slot(3)), checks=StoreChecks( head_slot=Slot(3), - attestation_target_slot=Slot(0), # Still at genesis + attestation_target_slot=Slot(1), # Walks back to safe_target + 1 ), ), BlockStep( block=BlockSpec(slot=Slot(4)), checks=StoreChecks( head_slot=Slot(4), - attestation_target_slot=Slot(1), # Advances to slot 1 + attestation_target_slot=Slot(1), # 3-step walkback from 4 → 1 ), ), BlockStep( block=BlockSpec(slot=Slot(5)), checks=StoreChecks( head_slot=Slot(5), - attestation_target_slot=Slot(2), # Continues advancing + attestation_target_slot=Slot(2), # 3-step walkback from 5 → 2 ), ), ], @@ -150,21 +147,21 @@ def test_attestation_target_with_slot_gaps( block=BlockSpec(slot=Slot(1)), checks=StoreChecks( head_slot=Slot(1), - attestation_target_slot=Slot(0), + attestation_target_slot=Slot(1), ), ), BlockStep( block=BlockSpec(slot=Slot(3)), checks=StoreChecks( head_slot=Slot(3), - attestation_target_slot=Slot(0), + attestation_target_slot=Slot(1), ), ), BlockStep( block=BlockSpec(slot=Slot(5)), checks=StoreChecks( head_slot=Slot(5), - attestation_target_slot=Slot(0), + attestation_target_slot=Slot(1), # Walks back 5→3→1, stops at safe_target+1 ), ), ], @@ -201,56 +198,56 @@ def test_attestation_target_with_extended_chain( block=BlockSpec(slot=Slot(1)), checks=StoreChecks( head_slot=Slot(1), - attestation_target_slot=Slot(0), # Genesis + attestation_target_slot=Slot(1), ), ), BlockStep( block=BlockSpec(slot=Slot(2)), checks=StoreChecks( head_slot=Slot(2), - attestation_target_slot=Slot(0), # Still genesis + attestation_target_slot=Slot(1), ), ), BlockStep( block=BlockSpec(slot=Slot(3)), checks=StoreChecks( head_slot=Slot(3), - attestation_target_slot=Slot(0), # Still genesis + attestation_target_slot=Slot(1), ), ), BlockStep( block=BlockSpec(slot=Slot(4)), checks=StoreChecks( head_slot=Slot(4), - attestation_target_slot=Slot(1), # Advances to slot 1 + attestation_target_slot=Slot(1), ), ), BlockStep( block=BlockSpec(slot=Slot(5)), checks=StoreChecks( head_slot=Slot(5), - attestation_target_slot=Slot(2), # Stable at 2 + attestation_target_slot=Slot(2), ), ), BlockStep( block=BlockSpec(slot=Slot(6)), checks=StoreChecks( head_slot=Slot(6), - attestation_target_slot=Slot(3), # Continues to advance + attestation_target_slot=Slot(3), ), ), BlockStep( block=BlockSpec(slot=Slot(7)), checks=StoreChecks( head_slot=Slot(7), - attestation_target_slot=Slot(4), # Continues advancing + attestation_target_slot=Slot(4), ), ), BlockStep( block=BlockSpec(slot=Slot(8)), checks=StoreChecks( head_slot=Slot(8), - attestation_target_slot=Slot(5), # Continues advancing + attestation_target_slot=Slot(5), ), ), ], @@ -296,39 +293,39 @@ def test_attestation_target_justifiable_constraint( head_slot=Slot(i), attestation_target_slot=Slot( # Mapping of current slot -> expected target slot - # delta = current_slot - JUSTIFICATION_LOOKBACK_SLOTS - finalized_slot - # delta = current_slot - 3 - 0 + # With +1 allowance: walkback stops at safe_target + 1 + # delta = target_slot - finalized_slot { - 1: 0, # 3-slot walkback reaches safe target at slot 0 - 2: 0, # 3-slot walkback reaches safe target at slot 0 - 3: 0, # 3-slot walkback reaches safe target at slot 0 - 4: 1, # delta = 4 - 3 - 0 = 1, Rule 1: delta 1 ≤ 5 - 5: 2, # delta = 5 - 3 - 0 = 2, Rule 1: delta 2 ≤ 5 - 6: 3, # delta = 6 - 3 - 0 = 3, Rule 1: delta 3 ≤ 5 - 7: 4, # delta = 7 - 3 - 0 = 4, Rule 1: delta 4 ≤ 5 - 8: 5, # delta = 8 - 3 - 0 = 5, Rule 1: delta 5 ≤ 5 - 9: 6, # delta = 6 - 0 = 6, Rule 3: pronic number (2*3) - 10: 6, # delta = 10 - 3 - 0 = 7 - 11: 6, # delta = 11 - 3 - 0 = 8 - 12: 9, # delta = 9 - 0 = 9, Rule 2: perfect square (3^2) - 13: 9, # delta = 13 - 3 - 0 = 10 - 14: 9, # delta = 14 - 3 - 0 = 11 - 15: 12, # delta = 15 - 3 - 0 = 12, Rule 3: pronic number (3*4) - 16: 12, # delta = 16 - 3 - 0 = 13 - 17: 12, # delta = 17 - 3 - 0 = 14 - 18: 12, # delta = 18 - 3 - 0 = 15 - 19: 16, # delta = 19 - 3 - 0 = 16, Rule 2: perfect square (4^2) - 20: 16, # delta = 20 - 3 - 0 = 17 - 21: 16, # delta = 21 - 3 - 0 = 18 - 22: 16, # delta = 22 - 3 - 0 = 19 - 23: 20, # delta = 23 - 3 - 0 = 20, Rule 3: pronic number (4*5) - 24: 20, # delta = 24 - 3 - 0 = 21 - 25: 20, # delta = 25 - 3 - 0 = 22 - 26: 20, # delta = 26 - 3 - 0 = 23 - 27: 20, # delta = 27 - 3 - 0 = 24 - 28: 25, # delta = 28 - 3 - 0 = 25, Rule 2: perfect square (5^2) - 29: 25, # delta = 29 - 3 - 0 = 26 - 30: 25, # delta = 30 - 3 - 0 = 27 + 1: 1, # At safe_target + 1, no walkback needed + 2: 1, # Walks back to safe_target + 1 + 3: 1, # Walks back to safe_target + 1 + 4: 1, # 3-step walkback from 4 → 1 + 5: 2, # 3-step walkback from 5 → 2, delta 2 ≤ 5 + 6: 3, # 3-step walkback from 6 → 3, delta 3 ≤ 5 + 7: 4, # 3-step walkback from 7 → 4, delta 4 ≤ 5 + 8: 5, # 3-step walkback from 8 → 5, delta 5 ≤ 5 + 9: 6, # delta = 6, pronic number (2*3) + 10: 6, # delta = 7, not justifiable → walks to 6 + 11: 6, # delta = 8, not justifiable → walks to 6 + 12: 9, # delta = 9, perfect square (3^2) + 13: 9, # delta = 10, not justifiable → walks to 9 + 14: 9, # delta = 11, not justifiable → walks to 9 + 15: 12, # delta = 12, pronic number (3*4) + 16: 12, # delta = 13, not justifiable → walks to 12 + 17: 12, # delta = 14, not justifiable → walks to 12 + 18: 12, # delta = 15, not justifiable → walks to 12 + 19: 16, # delta = 16, perfect square (4^2) + 20: 16, # delta = 17, not justifiable → walks to 16 + 21: 16, # delta = 18, not justifiable → walks to 16 + 22: 16, # delta = 19, not justifiable → walks to 16 + 23: 20, # delta = 20, pronic number (4*5) + 24: 20, # delta = 21, not justifiable → walks to 20 + 25: 20, # delta = 22, not justifiable → walks to 20 + 26: 20, # delta = 23, not justifiable → walks to 20 + 27: 20, # delta = 24, not justifiable → walks to 20 + 28: 25, # delta = 25, perfect square (5^2) + 29: 25, # delta = 26, not justifiable → walks to 25 + 30: 25, # delta = 27, not justifiable → walks to 25 }[i] ), ), From dc2c45b0560ad60065c4ce20047388ce39fdcffa Mon Sep 17 00:00:00 2001 From: kamilsa Date: Wed, 11 Feb 2026 19:04:21 +0500 Subject: [PATCH 04/16] fix: unskip multi-node interop tests and fix store attribute names - Remove @pytest.mark.skip from test_mesh_finalization and test_mesh_2_2_2_finalization - Update store attribute references: latest_new_attestations -> latest_new_aggregated_payloads, latest_known_attestations -> latest_known_aggregated_payloads - test_partition_recovery remains xfail (known sync service limitation) --- tests/interop/test_multi_node.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/interop/test_multi_node.py b/tests/interop/test_multi_node.py index d2edcfe5..2409018a 100644 --- a/tests/interop/test_multi_node.py +++ b/tests/interop/test_multi_node.py @@ -44,7 +44,6 @@ pytestmark = pytest.mark.interop -@pytest.mark.skip(reason="Interop test not passing - needs update (#359)") @pytest.mark.timeout(120) @pytest.mark.num_validators(3) async def test_mesh_finalization(node_cluster: NodeCluster) -> None: @@ -146,8 +145,12 @@ async def test_mesh_finalization(node_cluster: NodeCluster) -> None: # # - High new_atts, low known_atts = processing bottleneck # - Low counts everywhere = gossip not propagating - new_atts = [len(node._store.latest_new_attestations) for node in node_cluster.nodes] - known_atts = [len(node._store.latest_known_attestations) for node in node_cluster.nodes] + new_atts = [ + len(node._store.latest_new_aggregated_payloads) for node in node_cluster.nodes + ] + known_atts = [ + len(node._store.latest_known_aggregated_payloads) for node in node_cluster.nodes + ] logger.info( "Progress: head=%s justified=%s finalized=%s new_atts=%s known_atts=%s", @@ -200,7 +203,6 @@ async def test_mesh_finalization(node_cluster: NodeCluster) -> None: ) -@pytest.mark.skip(reason="Interop test not passing - needs update (#359)") @pytest.mark.timeout(120) @pytest.mark.num_validators(3) async def test_mesh_2_2_2_finalization(node_cluster: NodeCluster) -> None: From a89d33a71dfcf1016f934deda37c53ffeba4254b Mon Sep 17 00:00:00 2001 From: kamilsa Date: Wed, 11 Feb 2026 19:16:23 +0500 Subject: [PATCH 05/16] fix: resolve lint, type, and test issues across tox suite - Fix test_store_attestations indentation: steps 2-3 were inside for loop, causing aggregation to clear signatures before second validator check - Fix store.py lint: remove blank line after docstring, wrap long line - Fix type errors: peer_id accepts None for self-produced blocks throughout sync pipeline (SyncService, HeadSync, BlockCache) - Fix formatting in service.py, node_runner.py, test_multi_node.py --- src/lean_spec/subspecs/forkchoice/store.py | 4 ++-- .../subspecs/networking/service/service.py | 4 +--- src/lean_spec/subspecs/sync/block_cache.py | 5 ++-- src/lean_spec/subspecs/sync/head_sync.py | 8 +++---- src/lean_spec/subspecs/sync/service.py | 2 +- tests/interop/helpers/node_runner.py | 1 + tests/interop/test_multi_node.py | 4 +--- .../forkchoice/test_store_attestations.py | 24 ++++++++++--------- 8 files changed, 26 insertions(+), 26 deletions(-) diff --git a/src/lean_spec/subspecs/forkchoice/store.py b/src/lean_spec/subspecs/forkchoice/store.py index c567ce2f..88bc7301 100644 --- a/src/lean_spec/subspecs/forkchoice/store.py +++ b/src/lean_spec/subspecs/forkchoice/store.py @@ -859,7 +859,6 @@ def update_head(self) -> "Store": def accept_new_attestations(self) -> "Store": """Process pending aggregated payloads and update forkchoice head.""" - # Merge new aggregated payloads into known aggregated payloads merged_aggregated_payloads = dict(self.latest_known_aggregated_payloads) for sig_key, proofs in self.latest_new_aggregated_payloads.items(): @@ -1067,7 +1066,8 @@ def on_tick( is_aggregator: Whether the node is an aggregator. Returns: - Tuple of (new Store with time advanced, list of all produced SignedAggregatedAttestation). + Tuple of (new Store with time advanced, + list of all produced SignedAggregatedAttestation). """ # Calculate target time in intervals time_delta_ms = (time - self.config.genesis_time) * Uint64(1000) diff --git a/src/lean_spec/subspecs/networking/service/service.py b/src/lean_spec/subspecs/networking/service/service.py index 4f617ae6..80beb9e3 100644 --- a/src/lean_spec/subspecs/networking/service/service.py +++ b/src/lean_spec/subspecs/networking/service/service.py @@ -258,6 +258,4 @@ async def publish_aggregated_attestation( compressed = frame_compress(ssz_bytes) await self.event_source.publish(str(topic), compressed) - logger.debug( - "Published aggregated attestation for slot %s", signed_attestation.data.slot - ) + logger.debug("Published aggregated attestation for slot %s", signed_attestation.data.slot) diff --git a/src/lean_spec/subspecs/sync/block_cache.py b/src/lean_spec/subspecs/sync/block_cache.py index 2b8de0b0..fc803df7 100644 --- a/src/lean_spec/subspecs/sync/block_cache.py +++ b/src/lean_spec/subspecs/sync/block_cache.py @@ -103,7 +103,7 @@ class PendingBlock: slot order to ensure parents are processed before children. """ - received_from: PeerId + received_from: PeerId | None """ Peer that sent this block. @@ -112,6 +112,7 @@ class PendingBlock: - If invalid, they get penalized. This creates incentives for good behavior. + None for self-produced blocks. """ received_at: float = field(default_factory=time) @@ -165,7 +166,7 @@ def __contains__(self, root: Bytes32) -> bool: def add( self, block: SignedBlockWithAttestation, - peer: PeerId, + peer: PeerId | None, backfill_depth: int = 0, ) -> PendingBlock: """ diff --git a/src/lean_spec/subspecs/sync/head_sync.py b/src/lean_spec/subspecs/sync/head_sync.py index b14d0592..96610c9e 100644 --- a/src/lean_spec/subspecs/sync/head_sync.py +++ b/src/lean_spec/subspecs/sync/head_sync.py @@ -143,7 +143,7 @@ class HeadSync: async def on_gossip_block( self, block: SignedBlockWithAttestation, - peer_id: PeerId, + peer_id: PeerId | None, store: Store, ) -> tuple[HeadSyncResult, Store]: """ @@ -217,7 +217,7 @@ async def on_gossip_block( async def _process_block_with_descendants( self, block: SignedBlockWithAttestation, - peer_id: PeerId, + peer_id: PeerId | None, store: Store, ) -> tuple[HeadSyncResult, Store]: """ @@ -283,7 +283,7 @@ async def _process_cached_descendants( self, parent_root: Bytes32, store: Store, - peer_id: PeerId, + peer_id: PeerId | None, ) -> int: """ Process any cached blocks that descend from the given parent. @@ -351,7 +351,7 @@ async def _process_cached_descendants( async def _cache_and_backfill( self, block: SignedBlockWithAttestation, - peer_id: PeerId, + peer_id: PeerId | None, store: Store, ) -> tuple[HeadSyncResult, Store]: """ diff --git a/src/lean_spec/subspecs/sync/service.py b/src/lean_spec/subspecs/sync/service.py index b0dcd7a3..2cd27244 100644 --- a/src/lean_spec/subspecs/sync/service.py +++ b/src/lean_spec/subspecs/sync/service.py @@ -356,7 +356,7 @@ async def on_peer_status(self, peer_id: PeerId, status: Status) -> None: async def on_gossip_block( self, block: SignedBlockWithAttestation, - peer_id: PeerId, + peer_id: PeerId | None, ) -> None: """ Handle block received via gossip. diff --git a/tests/interop/helpers/node_runner.py b/tests/interop/helpers/node_runner.py index eebe910b..26fc3246 100644 --- a/tests/interop/helpers/node_runner.py +++ b/tests/interop/helpers/node_runner.py @@ -367,6 +367,7 @@ async def start_node( # This matches the Ethereum gossip specification. if validator_indices: from lean_spec.subspecs.chain.config import ATTESTATION_COMMITTEE_COUNT + for idx in validator_indices: subnet_id = idx % int(ATTESTATION_COMMITTEE_COUNT) topic = f"/leanconsensus/{self.fork_digest}/attestation_{subnet_id}/ssz_snappy" diff --git a/tests/interop/test_multi_node.py b/tests/interop/test_multi_node.py index 2409018a..19362504 100644 --- a/tests/interop/test_multi_node.py +++ b/tests/interop/test_multi_node.py @@ -145,9 +145,7 @@ async def test_mesh_finalization(node_cluster: NodeCluster) -> None: # # - High new_atts, low known_atts = processing bottleneck # - Low counts everywhere = gossip not propagating - new_atts = [ - len(node._store.latest_new_aggregated_payloads) for node in node_cluster.nodes - ] + new_atts = [len(node._store.latest_new_aggregated_payloads) for node in node_cluster.nodes] known_atts = [ len(node._store.latest_known_aggregated_payloads) for node in node_cluster.nodes ] diff --git a/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py b/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py index e0044c3e..982aa905 100644 --- a/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py +++ b/tests/lean_spec/subspecs/forkchoice/test_store_attestations.py @@ -728,12 +728,13 @@ def test_other_intervals_do_not_trigger_aggregation(self, key_manager: XmssKeyMa # After tick, time becomes time+1, and interval = (time+1) % 5 # So we need time+1 % 5 == target_interval # Therefore time = target_interval - 1 (mod 5) - pre_tick_time = (target_interval - 1) % int(INTERVALS_PER_SLOT) - test_store = store.model_copy(update={"time": Uint64(pre_tick_time)}) - - updated_store, _ = test_store.tick_interval(has_proposal=False, is_aggregator=True) - - assert sig_key not in updated_store.latest_new_aggregated_payloads, ( f"Aggregation should NOT occur at interval {target_interval}" + pre_tick_time = (target_interval - 1) % int(INTERVALS_PER_SLOT) + test_store = store.model_copy(update={"time": Uint64(pre_tick_time)}) + + updated_store, _ = test_store.tick_interval(has_proposal=False, is_aggregator=True) + + assert sig_key not in updated_store.latest_new_aggregated_payloads, ( + f"Aggregation should NOT occur at interval {target_interval}" ) def test_interval_0_accepts_attestations_with_proposal( @@ -809,11 +810,12 @@ def test_gossip_to_aggregation_to_storage(self, key_manager: XmssKeyManager) -> sig_key = SignatureKey(vid, data_root) assert sig_key in store.gossip_signatures, f"Signature for {vid} should be stored" - # Step 2: Advance to interval 2 (aggregation interval) - store = store.model_copy(update={"time": Uint64(1)}) - store, _ = store.tick_interval(has_proposal=False, is_aggregator=True) - - # Step 3: Verify aggregated proofs were created for vid in attesting_validators: + # Step 2: Advance to interval 2 (aggregation interval) + store = store.model_copy(update={"time": Uint64(1)}) + store, _ = store.tick_interval(has_proposal=False, is_aggregator=True) + + # Step 3: Verify aggregated proofs were created + for vid in attesting_validators: sig_key = SignatureKey(vid, data_root) assert sig_key in store.latest_new_aggregated_payloads, ( f"Aggregated proof for {vid} should exist after interval 2" From 26b90350739b42eea1f507a95186e37db5ef1ea3 Mon Sep 17 00:00:00 2001 From: kamilsa Date: Wed, 11 Feb 2026 20:07:26 +0500 Subject: [PATCH 06/16] fix: use convergence-based polling in finalization tests Replace fixed 70s duration loops with convergence helpers: - assert_all_finalized_to: polls until finalization target reached - assert_heads_consistent: polls until head slots converge - assert_same_finalized_checkpoint: polls until nodes agree This fixes CI flakiness where slow machines cause nodes to diverge during the fixed wait period. Tests now exit early on success and tolerate slower environments via generous timeouts. --- tests/interop/test_multi_node.py | 158 +++++-------------------------- 1 file changed, 22 insertions(+), 136 deletions(-) diff --git a/tests/interop/test_multi_node.py b/tests/interop/test_multi_node.py index 19362504..81d462a9 100644 --- a/tests/interop/test_multi_node.py +++ b/tests/interop/test_multi_node.py @@ -24,14 +24,15 @@ import asyncio import logging -import time import pytest from .helpers import ( NodeCluster, + assert_all_finalized_to, assert_heads_consistent, assert_peer_connections, + assert_same_finalized_checkpoint, full_mesh, mesh_2_2_2, ) @@ -44,7 +45,7 @@ pytestmark = pytest.mark.interop -@pytest.mark.timeout(120) +@pytest.mark.timeout(200) @pytest.mark.num_validators(3) async def test_mesh_finalization(node_cluster: NodeCluster) -> None: """ @@ -101,107 +102,29 @@ async def test_mesh_finalization(node_cluster: NodeCluster) -> None: # The 15s timeout handles slow handshakes. await assert_peer_connections(node_cluster, min_peers=2, timeout=15) - # Let the chain run for a fixed duration. - # - # Timing calculation: - # - # - Slot duration: 4 seconds - # - Slots in 70s: ~17 slots - # - Finalization requires: 2 consecutive justified epochs - # - With 3 validators: justification needs 2/3 = 2 attestations per slot - # - # This duration allows enough time for validators to: - # - # 1. Produce blocks (one per slot, round-robin) - # 2. Broadcast attestations (all validators each slot) - # 3. Accumulate justification (2+ matching attestations) - # 4. Finalize (justified epoch becomes finalized) - run_duration = 70 - poll_interval = 5 - - logger.info("Running chain for %d seconds...", run_duration) - - # Poll the chain state periodically. - # - # This provides visibility into consensus progress during the test. - # The logged metrics help debug failures. - start = time.monotonic() - while time.monotonic() - start < run_duration: - # Collect current state from each node. - # - # Head slot: the highest slot block each node has seen. - # Finalized slot: the most recent finalized checkpoint slot. - # Justified slot: the most recent justified checkpoint slot. - slots = [node.head_slot for node in node_cluster.nodes] - finalized = [node.finalized_slot for node in node_cluster.nodes] - justified = [node.justified_slot for node in node_cluster.nodes] - - # Track attestation counts for debugging. - # - # New attestations: received but not yet processed by fork choice. - # Known attestations: already incorporated into the store. - # - # These counts reveal if gossip is working: - # - # - High new_atts, low known_atts = processing bottleneck - # - Low counts everywhere = gossip not propagating - new_atts = [len(node._store.latest_new_aggregated_payloads) for node in node_cluster.nodes] - known_atts = [ - len(node._store.latest_known_aggregated_payloads) for node in node_cluster.nodes - ] - - logger.info( - "Progress: head=%s justified=%s finalized=%s new_atts=%s known_atts=%s", - slots, - justified, - finalized, - new_atts, - known_atts, - ) - await asyncio.sleep(poll_interval) - - # Capture final state for assertions. - head_slots = [node.head_slot for node in node_cluster.nodes] - finalized_slots = [node.finalized_slot for node in node_cluster.nodes] - - logger.info("FINAL: head_slots=%s finalized=%s", head_slots, finalized_slots) - - # Verify the chain advanced sufficiently. + # Wait for finalization with convergence-based polling. # - # Minimum 5 slots ensures: + # Instead of a fixed duration, we actively poll for the target state. + # This is more robust under varying CI performance. # - # - Block production is working (at least 5 blocks created) - # - Gossip is propagating (all nodes see the same progress) - # - No single node is stuck or partitioned - assert all(slot >= 5 for slot in head_slots), ( - f"Chain did not advance enough. Head slots: {head_slots}" - ) + # Finalization requires 2 consecutive justified epochs. + # With 3 validators and 4s slots, this typically takes ~30s + # but may take longer on slow CI machines. + await assert_all_finalized_to(node_cluster, target_slot=1, timeout=100) - # Verify heads are consistent across nodes. + # Verify heads converged across nodes. # - # In a healthy network, all nodes should converge to similar head slots. - # A difference > 2 slots indicates gossip or fork choice issues. - head_diff = max(head_slots) - min(head_slots) - assert head_diff <= 2, f"Head slots diverged too much. Slots: {head_slots}, diff: {head_diff}" - - # Verify ALL nodes finalized. - # - # With 70s runtime (~17 slots) and working gossip, every node - # should have finalized at least one checkpoint. - assert all(slot > 0 for slot in finalized_slots), ( - f"Not all nodes finalized. Finalized slots: {finalized_slots}" - ) + # After finalization, all nodes should agree on head within 2 slots. + await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=30) # Verify finalized checkpoints are consistent. # # All nodes must agree on the finalized checkpoint. # Finalization is irreversible - divergent finalization would be catastrophic. - assert len(set(finalized_slots)) == 1, ( - f"Finalized slots inconsistent across nodes: {finalized_slots}" - ) + await assert_same_finalized_checkpoint(node_cluster.nodes, timeout=30) -@pytest.mark.timeout(120) +@pytest.mark.timeout(200) @pytest.mark.num_validators(3) async def test_mesh_2_2_2_finalization(node_cluster: NodeCluster) -> None: """ @@ -244,58 +167,21 @@ async def test_mesh_2_2_2_finalization(node_cluster: NodeCluster) -> None: # Using min_peers=1 ensures spokes pass the check. await assert_peer_connections(node_cluster, min_peers=1, timeout=15) - # Match Ream's 70 second test duration. + # Wait for finalization with convergence-based polling. # - # Finalization requires sufficient time for: - # - Multiple slots to pass (4s each) - # - Attestations to accumulate - # - Justification and finalization to occur - run_duration = 70 - poll_interval = 5 - - logger.info("Running chain for %d seconds (mesh_2_2_2)...", run_duration) + # Hub-and-spoke adds latency (messages route through hub) + # but the protocol should still achieve finalization. + await assert_all_finalized_to(node_cluster, target_slot=1, timeout=100) - # Poll chain progress. - start = time.monotonic() - while time.monotonic() - start < run_duration: - slots = [node.head_slot for node in node_cluster.nodes] - finalized = [node.finalized_slot for node in node_cluster.nodes] - logger.info("Progress: head_slots=%s finalized=%s", slots, finalized) - await asyncio.sleep(poll_interval) - - # Final state capture. - head_slots = [node.head_slot for node in node_cluster.nodes] - finalized_slots = [node.finalized_slot for node in node_cluster.nodes] - - logger.info("FINAL: head_slots=%s finalized=%s", head_slots, finalized_slots) - - # Same assertions as full mesh. - # - # Despite reduced connectivity (messages route through hub), - # the protocol should still achieve full consensus. - - # Chain must advance sufficiently. - assert all(slot >= 5 for slot in head_slots), ( - f"Chain did not advance enough. Head slots: {head_slots}" - ) - - # Heads must be consistent across nodes. + # Verify heads converged across nodes. # # Hub-and-spoke adds latency but should not cause divergence. - head_diff = max(head_slots) - min(head_slots) - assert head_diff <= 2, f"Head slots diverged too much. Slots: {head_slots}, diff: {head_diff}" - - # ALL nodes must finalize. - assert all(slot > 0 for slot in finalized_slots), ( - f"Not all nodes finalized. Finalized slots: {finalized_slots}" - ) + await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=30) # Finalized checkpoints must be identical. # # Even with indirect connectivity, finalization must be consistent. - assert len(set(finalized_slots)) == 1, ( - f"Finalized slots inconsistent across nodes: {finalized_slots}" - ) + await assert_same_finalized_checkpoint(node_cluster.nodes, timeout=30) @pytest.mark.timeout(30) From 5ace58d948f310e5d3c7aa05ae046cc0b8a608fc Mon Sep 17 00:00:00 2001 From: kamilsa Date: Wed, 11 Feb 2026 22:15:49 +0500 Subject: [PATCH 07/16] fix: increase timeouts for CI reliability in interop tests - Increase gossipsub mesh stabilization from 5s to 10s in start_all (CI machines need more time for mesh formation before block production) - Increase finalization timeout from 100s to 150s - Increase peer connection timeout from 15s to 30s - Increase pytest timeout from 200s to 300s The CI failure showed all 3 nodes stuck at finalized slot 0, indicating gossip mesh wasn't fully formed when services started. --- tests/interop/helpers/node_runner.py | 3 ++- tests/interop/test_multi_node.py | 12 ++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/tests/interop/helpers/node_runner.py b/tests/interop/helpers/node_runner.py index 26fc3246..70c793a6 100644 --- a/tests/interop/helpers/node_runner.py +++ b/tests/interop/helpers/node_runner.py @@ -470,7 +470,8 @@ async def start_all( # 3. GRAFT messages to be sent and processed # # A longer delay ensures proper mesh formation before block production. - await asyncio.sleep(5.0) + # CI machines need more time due to lower CPU/scheduling priority. + await asyncio.sleep(10.0) # Phase 4: Start node services (validators, chain service, etc). # diff --git a/tests/interop/test_multi_node.py b/tests/interop/test_multi_node.py index 81d462a9..52ef3538 100644 --- a/tests/interop/test_multi_node.py +++ b/tests/interop/test_multi_node.py @@ -45,7 +45,7 @@ pytestmark = pytest.mark.interop -@pytest.mark.timeout(200) +@pytest.mark.timeout(300) @pytest.mark.num_validators(3) async def test_mesh_finalization(node_cluster: NodeCluster) -> None: """ @@ -100,7 +100,7 @@ async def test_mesh_finalization(node_cluster: NodeCluster) -> None: # Each node needs at least 2 peers (the other two nodes). # This ensures gossip will reach all nodes. # The 15s timeout handles slow handshakes. - await assert_peer_connections(node_cluster, min_peers=2, timeout=15) + await assert_peer_connections(node_cluster, min_peers=2, timeout=30) # Wait for finalization with convergence-based polling. # @@ -110,7 +110,7 @@ async def test_mesh_finalization(node_cluster: NodeCluster) -> None: # Finalization requires 2 consecutive justified epochs. # With 3 validators and 4s slots, this typically takes ~30s # but may take longer on slow CI machines. - await assert_all_finalized_to(node_cluster, target_slot=1, timeout=100) + await assert_all_finalized_to(node_cluster, target_slot=1, timeout=150) # Verify heads converged across nodes. # @@ -124,7 +124,7 @@ async def test_mesh_finalization(node_cluster: NodeCluster) -> None: await assert_same_finalized_checkpoint(node_cluster.nodes, timeout=30) -@pytest.mark.timeout(200) +@pytest.mark.timeout(300) @pytest.mark.num_validators(3) async def test_mesh_2_2_2_finalization(node_cluster: NodeCluster) -> None: """ @@ -165,13 +165,13 @@ async def test_mesh_2_2_2_finalization(node_cluster: NodeCluster) -> None: # # Hub (node 0) has 2 peers; spokes have 1 peer each. # Using min_peers=1 ensures spokes pass the check. - await assert_peer_connections(node_cluster, min_peers=1, timeout=15) + await assert_peer_connections(node_cluster, min_peers=1, timeout=30) # Wait for finalization with convergence-based polling. # # Hub-and-spoke adds latency (messages route through hub) # but the protocol should still achieve finalization. - await assert_all_finalized_to(node_cluster, target_slot=1, timeout=100) + await assert_all_finalized_to(node_cluster, target_slot=1, timeout=150) # Verify heads converged across nodes. # From 109e55522db9421655c92da84929e1253ed7ecab Mon Sep 17 00:00:00 2001 From: kamilsa Date: Thu, 12 Feb 2026 09:55:41 +0500 Subject: [PATCH 08/16] chore: remove debug logging from state.py and store.py Remove all logger.debug() and logger.info() statements plus logging imports that were added during development for debugging. --- .../subspecs/containers/state/state.py | 34 ------------------- src/lean_spec/subspecs/forkchoice/store.py | 3 -- 2 files changed, 37 deletions(-) diff --git a/src/lean_spec/subspecs/containers/state/state.py b/src/lean_spec/subspecs/containers/state/state.py index 32c57959..a494f0fe 100644 --- a/src/lean_spec/subspecs/containers/state/state.py +++ b/src/lean_spec/subspecs/containers/state/state.py @@ -2,7 +2,6 @@ from __future__ import annotations -import logging from typing import AbstractSet, Collection, Iterable from lean_spec.subspecs.ssz.hash import hash_tree_root @@ -34,8 +33,6 @@ Validators, ) -logger = logging.getLogger(__name__) - class State(Container): """The main consensus state object.""" @@ -452,13 +449,6 @@ def process_attestations( # # The rules below filter out invalid or irrelevant votes. for attestation in attestations: - logger.debug( - "Processing attestation: target=slot%d source=slot%d participants=%s", - attestation.data.target.slot, - attestation.data.source.slot, - attestation.aggregation_bits.to_validator_indices(), - ) - source = attestation.data.source target = attestation.data.target @@ -467,7 +457,6 @@ def process_attestations( # A vote may only originate from a point in history that is already justified. # A source that lacks existing justification cannot be used to anchor a new vote. if not justified_slots.is_slot_justified(finalized_slot, source.slot): - logger.debug("Skipping attestation: source slot %d not justified", source.slot) continue # Ignore votes for targets that have already reached consensus. @@ -479,7 +468,6 @@ def process_attestations( # Ignore votes that reference zero-hash slots. if source.root == ZERO_HASH or target.root == ZERO_HASH: - logger.debug("Skipping attestation: zero root in source/target") continue # Ensure the vote refers to blocks that actually exist on our chain. @@ -503,11 +491,6 @@ def process_attestations( ) if not source_matches or not target_matches: - logger.debug( - "Skipping attestation: root mismatch (source_match=%s target_match=%s)", - source_matches, - target_matches, - ) continue # Ensure time flows forward. @@ -515,11 +498,6 @@ def process_attestations( # A target must always lie strictly after its source slot. # Otherwise the vote makes no chronological sense. if target.slot <= source.slot: - logger.debug( - "Skipping attestation: target slot %d <= source slot %d", - target.slot, - source.slot, - ) continue # Ensure the target falls on a slot that can be justified after the finalized one. @@ -536,11 +514,6 @@ def process_attestations( # Any target outside this pattern is not eligible for justification, # so votes for it are simply ignored. if not target.slot.is_justifiable_after(self.latest_finalized.slot): - logger.debug( - "Skipping attestation: target slot %d not justifiable after finalized slot %d", - target.slot, - self.latest_finalized.slot, - ) continue # Record the vote. @@ -569,12 +542,6 @@ def process_attestations( count = sum(bool(justified) for justified in justifications[target.root]) if 3 * count >= (2 * len(self.validators)): - logger.info( - "Supermajority reached for target slot %d: %d votes (threshold: %d)", - target.slot, - count, - (2 * len(self.validators) + 2) // 3, - ) # The block becomes justified # # The chain now considers this block part of its safe head. @@ -606,7 +573,6 @@ def process_attestations( old_finalized_slot = finalized_slot latest_finalized = source finalized_slot = latest_finalized.slot - logger.info("Finalization advanced to slot %d", finalized_slot) # Rebase/prune justification tracking across the new finalized boundary. # diff --git a/src/lean_spec/subspecs/forkchoice/store.py b/src/lean_spec/subspecs/forkchoice/store.py index 88bc7301..97556fc6 100644 --- a/src/lean_spec/subspecs/forkchoice/store.py +++ b/src/lean_spec/subspecs/forkchoice/store.py @@ -7,7 +7,6 @@ __all__ = ["Store"] import copy -import logging from collections import defaultdict from lean_spec.subspecs.chain.config import ( @@ -46,8 +45,6 @@ ) from lean_spec.types.container import Container -logger = logging.getLogger(__name__) - class Store(Container): """ From 5fbaae25a2af076909c9607b0f45fb37d7cc78b3 Mon Sep 17 00:00:00 2001 From: kamilsa Date: Thu, 12 Feb 2026 18:45:41 +0500 Subject: [PATCH 09/16] some review fixes --- src/lean_spec/subspecs/forkchoice/store.py | 8 ++++---- tests/interop/helpers/node_runner.py | 2 -- tests/interop/test_multi_node.py | 2 +- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/lean_spec/subspecs/forkchoice/store.py b/src/lean_spec/subspecs/forkchoice/store.py index 97556fc6..b92b7c46 100644 --- a/src/lean_spec/subspecs/forkchoice/store.py +++ b/src/lean_spec/subspecs/forkchoice/store.py @@ -1023,7 +1023,7 @@ def tick_interval( is_aggregator: Whether the node is an aggregator. Returns: - Tuple of (new Store with advanced time, list of new SignedAggregatedAttestation). + Tuple of (new store with advanced time, list of new signed aggregated attestation). """ # Advance time by one interval store = self.model_copy(update={"time": self.time + Uint64(1)}) @@ -1063,8 +1063,8 @@ def on_tick( is_aggregator: Whether the node is an aggregator. Returns: - Tuple of (new Store with time advanced, - list of all produced SignedAggregatedAttestation). + Tuple of (new store with time advanced, + list of all produced signed aggregated attestation). """ # Calculate target time in intervals time_delta_ms = (time - self.config.genesis_time) * Uint64(1000) @@ -1156,7 +1156,7 @@ def get_attestation_target(self) -> Checkpoint: # This ensures the target doesn't advance too far ahead of safe target, # providing a balance between liveness and safety. # - # MODIFIED: We allow the target to be up to 1 slot ahead of safe_target + # We allow the target to be up to 1 slot ahead of the safe target # to ensure the chain can actually start advancing from genesis. for _ in range(JUSTIFICATION_LOOKBACK_SLOTS): if self.blocks[target_block_root].slot > self.blocks[self.safe_target].slot + Slot(1): diff --git a/tests/interop/helpers/node_runner.py b/tests/interop/helpers/node_runner.py index 70c793a6..25d309d8 100644 --- a/tests/interop/helpers/node_runner.py +++ b/tests/interop/helpers/node_runner.py @@ -366,8 +366,6 @@ async def start_node( # Validators only subscribe to the subnets they are assigned to. # This matches the Ethereum gossip specification. if validator_indices: - from lean_spec.subspecs.chain.config import ATTESTATION_COMMITTEE_COUNT - for idx in validator_indices: subnet_id = idx % int(ATTESTATION_COMMITTEE_COUNT) topic = f"/leanconsensus/{self.fork_digest}/attestation_{subnet_id}/ssz_snappy" diff --git a/tests/interop/test_multi_node.py b/tests/interop/test_multi_node.py index 52ef3538..f4a3fbec 100644 --- a/tests/interop/test_multi_node.py +++ b/tests/interop/test_multi_node.py @@ -99,7 +99,7 @@ async def test_mesh_finalization(node_cluster: NodeCluster) -> None: # # Each node needs at least 2 peers (the other two nodes). # This ensures gossip will reach all nodes. - # The 15s timeout handles slow handshakes. + # The 30s timeout handles slow handshakes. await assert_peer_connections(node_cluster, min_peers=2, timeout=30) # Wait for finalization with convergence-based polling. From 3a89f8b63a374b6c0b3d37d2013605bb2ecde9dc Mon Sep 17 00:00:00 2001 From: kamilsa Date: Fri, 13 Feb 2026 09:47:02 +0500 Subject: [PATCH 10/16] fix: use strict walkback to safe_target in attestation target selection The previous +Slot(1) offset allowed target to equal head when head was only 1 slot ahead of safe_target, conflating fork choice (head vote) with BFT finality (target vote). This violates the separation between the two mechanisms. With strict > safe_target.slot, the target never exceeds safe_target. Genesis bootstraps naturally: update_safe_target at interval 3 advances safe_target via 2/3+ head votes, so target progresses by slot 2. --- src/lean_spec/subspecs/forkchoice/store.py | 11 +++-- .../fc/test_attestation_target_selection.py | 47 ++++++++++--------- 2 files changed, 30 insertions(+), 28 deletions(-) diff --git a/src/lean_spec/subspecs/forkchoice/store.py b/src/lean_spec/subspecs/forkchoice/store.py index ba036278..a231cab4 100644 --- a/src/lean_spec/subspecs/forkchoice/store.py +++ b/src/lean_spec/subspecs/forkchoice/store.py @@ -1159,13 +1159,14 @@ def get_attestation_target(self) -> Checkpoint: # Walk back toward safe target (up to `JUSTIFICATION_LOOKBACK_SLOTS` steps) # - # This ensures the target doesn't advance too far ahead of safe target, - # providing a balance between liveness and safety. + # Target must not exceed safe_target's slot. This preserves the + # separation between head votes (fork choice) and target votes + # (BFT finality). Allowing target == head would conflate the two. # - # We allow the target to be up to 1 slot ahead of the safe target - # to ensure the chain can actually start advancing from genesis. + # At genesis, target stays at slot 0 until safe_target advances + # via update_safe_target at interval 3. for _ in range(JUSTIFICATION_LOOKBACK_SLOTS): - if self.blocks[target_block_root].slot > self.blocks[self.safe_target].slot + Slot(1): + if self.blocks[target_block_root].slot > self.blocks[self.safe_target].slot: target_block_root = self.blocks[target_block_root].parent_root else: break diff --git a/tests/consensus/devnet/fc/test_attestation_target_selection.py b/tests/consensus/devnet/fc/test_attestation_target_selection.py index c4f6c994..8703a576 100644 --- a/tests/consensus/devnet/fc/test_attestation_target_selection.py +++ b/tests/consensus/devnet/fc/test_attestation_target_selection.py @@ -17,21 +17,22 @@ def test_attestation_target_at_genesis_initially( fork_choice_test: ForkChoiceTestFiller, ) -> None: """ - Attestation target starts near genesis before safe target updates. + Attestation target stays at genesis before safe target updates. Scenario -------- Process two blocks at slots 1 and 2. Expected: - - After slot 1: target = slot 1 (1 slot ahead of safe target allowed) - - After slot 2: target = slot 1 (walkback stops at safe_target + 1) + - After slot 1: target = slot 0 (walkback to safe_target) + - After slot 2: target = slot 0 (walkback to safe_target) Why This Matters ---------------- Initially, the safe target is at genesis (slot 0). The attestation target - is allowed up to 1 slot ahead of safe target to ensure the chain can - start advancing from genesis. + walks back to safe_target to maintain separation between head votes + (fork choice) and target votes (BFT finality). The chain bootstraps + via update_safe_target at interval 3. """ fork_choice_test( steps=[ @@ -39,14 +40,14 @@ def test_attestation_target_at_genesis_initially( block=BlockSpec(slot=Slot(1)), checks=StoreChecks( head_slot=Slot(1), - attestation_target_slot=Slot(1), + attestation_target_slot=Slot(0), ), ), BlockStep( block=BlockSpec(slot=Slot(2)), checks=StoreChecks( head_slot=Slot(2), - attestation_target_slot=Slot(1), + attestation_target_slot=Slot(0), ), ), ], @@ -65,7 +66,7 @@ def test_attestation_target_advances_with_attestations( Expected: - Initial blocks: target stays at genesis (slot 0) - - Later blocks: target advances as attestations accumulate + - Later blocks: target advances as walkback from head reaches further slots - Target remains behind head for safety Why This Matters @@ -84,21 +85,21 @@ def test_attestation_target_advances_with_attestations( block=BlockSpec(slot=Slot(1)), checks=StoreChecks( head_slot=Slot(1), - attestation_target_slot=Slot(1), # 1 slot ahead of safe target allowed + attestation_target_slot=Slot(0), # Walks back to safe_target ), ), BlockStep( block=BlockSpec(slot=Slot(2)), checks=StoreChecks( head_slot=Slot(2), - attestation_target_slot=Slot(1), # Walks back to safe_target + 1 + attestation_target_slot=Slot(0), # Walks back to safe_target ), ), BlockStep( block=BlockSpec(slot=Slot(3)), checks=StoreChecks( head_slot=Slot(3), - attestation_target_slot=Slot(1), # Walks back to safe_target + 1 + attestation_target_slot=Slot(0), # Walks back to safe_target ), ), BlockStep( @@ -147,21 +148,21 @@ def test_attestation_target_with_slot_gaps( block=BlockSpec(slot=Slot(1)), checks=StoreChecks( head_slot=Slot(1), - attestation_target_slot=Slot(1), + attestation_target_slot=Slot(0), ), ), BlockStep( block=BlockSpec(slot=Slot(3)), checks=StoreChecks( head_slot=Slot(3), - attestation_target_slot=Slot(1), + attestation_target_slot=Slot(0), ), ), BlockStep( block=BlockSpec(slot=Slot(5)), checks=StoreChecks( head_slot=Slot(5), - attestation_target_slot=Slot(1), # Walks back 5→3→1, stops at safe_target+1 + attestation_target_slot=Slot(0), # Walks back 5→3→1→0, at safe_target ), ), ], @@ -198,21 +199,21 @@ def test_attestation_target_with_extended_chain( block=BlockSpec(slot=Slot(1)), checks=StoreChecks( head_slot=Slot(1), - attestation_target_slot=Slot(1), + attestation_target_slot=Slot(0), ), ), BlockStep( block=BlockSpec(slot=Slot(2)), checks=StoreChecks( head_slot=Slot(2), - attestation_target_slot=Slot(1), + attestation_target_slot=Slot(0), ), ), BlockStep( block=BlockSpec(slot=Slot(3)), checks=StoreChecks( head_slot=Slot(3), - attestation_target_slot=Slot(1), + attestation_target_slot=Slot(0), ), ), BlockStep( @@ -293,13 +294,13 @@ def test_attestation_target_justifiable_constraint( head_slot=Slot(i), attestation_target_slot=Slot( # Mapping of current slot -> expected target slot - # With +1 allowance: walkback stops at safe_target + 1 - # delta = target_slot - finalized_slot + # Walkback stops at safe_target (slot 0) then + # justifiability is checked: delta = target - finalized { - 1: 1, # At safe_target + 1, no walkback needed - 2: 1, # Walks back to safe_target + 1 - 3: 1, # Walks back to safe_target + 1 - 4: 1, # 3-step walkback from 4 → 1 + 1: 0, # Walks back to safe_target + 2: 0, # Walks back to safe_target + 3: 0, # Walks back to safe_target + 4: 1, # 3-step walkback from 4 → 1, delta 1 ≤ 5 5: 2, # 3-step walkback from 5 → 2, delta 2 ≤ 5 6: 3, # 3-step walkback from 6 → 3, delta 3 ≤ 5 7: 4, # 3-step walkback from 7 → 4, delta 4 ≤ 5 From caaf5745f9e5d886c5e969b7b17996264b3fd74b Mon Sep 17 00:00:00 2001 From: kamilsa Date: Fri, 13 Feb 2026 09:53:29 +0500 Subject: [PATCH 11/16] chore: restore docstrings in store.py unchanged code Restore full docstrings for accept_new_attestations and aggregate_committee_signatures methods to match origin/main. The implementations changed (tuple return types, aggregates collection), but these docstrings document the overall behavior which remains the same. No functional change. --- src/lean_spec/subspecs/forkchoice/store.py | 34 ++++++++++++++++++---- 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/src/lean_spec/subspecs/forkchoice/store.py b/src/lean_spec/subspecs/forkchoice/store.py index a231cab4..54f3945a 100644 --- a/src/lean_spec/subspecs/forkchoice/store.py +++ b/src/lean_spec/subspecs/forkchoice/store.py @@ -861,7 +861,28 @@ def update_head(self) -> "Store": ) def accept_new_attestations(self) -> "Store": - """Process pending aggregated payloads and update forkchoice head.""" + """ + Process pending aggregated payloads and update forkchoice head. + + Moves aggregated payloads from latest_new_aggregated_payloads to + latest_known_aggregated_payloads, making them eligible to contribute to + fork choice weights. This migration happens at specific interval ticks. + + The Interval Tick System + ------------------------- + Aggregated payloads progress through intervals: + - Interval 0: Block proposal + - Interval 1: Validators cast attestations (enter "new") + - Interval 2: Aggregators create proofs & broadcast + - Interval 3: Safe target update + - Interval 4: Process accumulated attestations + + This staged progression ensures proper timing and prevents premature + influence on fork choice decisions. + + Returns: + New Store with migrated aggregated payloads and updated head. + """ # Merge new aggregated payloads into known aggregated payloads merged_aggregated_payloads = dict(self.latest_known_aggregated_payloads) for sig_key, proofs in self.latest_new_aggregated_payloads.items(): @@ -954,14 +975,15 @@ def aggregate_committee_signatures(self) -> tuple["Store", list[SignedAggregated committee_signatures, ) - # Create list for broadcasting + # Create list of aggregated attestations for broadcasting new_aggregates: list[SignedAggregatedAttestation] = [] for aggregated_attestation, aggregated_signature in aggregated_results: - agg = SignedAggregatedAttestation( - data=aggregated_attestation.data, - proof=aggregated_signature, + new_aggregates.append( + SignedAggregatedAttestation( + data=aggregated_attestation.data, + proof=aggregated_signature, + ) ) - new_aggregates.append(agg) # Compute new aggregated payloads new_gossip_sigs = dict(self.gossip_signatures) From e6e88036a72d829231e6039fa1817cb00b4db6a6 Mon Sep 17 00:00:00 2001 From: kamilsa Date: Fri, 13 Feb 2026 09:55:31 +0500 Subject: [PATCH 12/16] reduce diff --- src/lean_spec/subspecs/forkchoice/store.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/lean_spec/subspecs/forkchoice/store.py b/src/lean_spec/subspecs/forkchoice/store.py index 54f3945a..0bf5593a 100644 --- a/src/lean_spec/subspecs/forkchoice/store.py +++ b/src/lean_spec/subspecs/forkchoice/store.py @@ -1181,12 +1181,8 @@ def get_attestation_target(self) -> Checkpoint: # Walk back toward safe target (up to `JUSTIFICATION_LOOKBACK_SLOTS` steps) # - # Target must not exceed safe_target's slot. This preserves the - # separation between head votes (fork choice) and target votes - # (BFT finality). Allowing target == head would conflate the two. - # - # At genesis, target stays at slot 0 until safe_target advances - # via update_safe_target at interval 3. + # This ensures the target doesn't advance too far ahead of safe target, + # providing a balance between liveness and safety. for _ in range(JUSTIFICATION_LOOKBACK_SLOTS): if self.blocks[target_block_root].slot > self.blocks[self.safe_target].slot: target_block_root = self.blocks[target_block_root].parent_root From 6f5a86fed5ed23a6afa6b593e8726edc9c4da313 Mon Sep 17 00:00:00 2001 From: kamilsa Date: Fri, 13 Feb 2026 10:46:35 +0500 Subject: [PATCH 13/16] fix: increase finalization timeout for strict attestation target walkback The strict > safe_target.slot walkback (without +Slot(1) offset) introduces a 1-slot delay in attestation target advancement. Targets lag by 1 slot compared to the old behavior, requiring more time for justification consensus. Increased finalization timeout from 150s to 180s in both mesh finalization tests. This gives the protocol enough time (~50-60s vs 30s) to reach finalization on typical CI machines. --- tests/interop/test_multi_node.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/interop/test_multi_node.py b/tests/interop/test_multi_node.py index f4a3fbec..ecead2a4 100644 --- a/tests/interop/test_multi_node.py +++ b/tests/interop/test_multi_node.py @@ -108,9 +108,11 @@ async def test_mesh_finalization(node_cluster: NodeCluster) -> None: # This is more robust under varying CI performance. # # Finalization requires 2 consecutive justified epochs. - # With 3 validators and 4s slots, this typically takes ~30s - # but may take longer on slow CI machines. - await assert_all_finalized_to(node_cluster, target_slot=1, timeout=150) + # With 3 validators and 4s slots, this typically takes ~40-50s + # due to the 1-slot delay in attestation target advancement. + # The strict walkback (target <= safe_target.slot) means targets + # lag by 1 slot compared to heads, requiring extra time for consensus. + await assert_all_finalized_to(node_cluster, target_slot=1, timeout=180) # Verify heads converged across nodes. # @@ -171,7 +173,8 @@ async def test_mesh_2_2_2_finalization(node_cluster: NodeCluster) -> None: # # Hub-and-spoke adds latency (messages route through hub) # but the protocol should still achieve finalization. - await assert_all_finalized_to(node_cluster, target_slot=1, timeout=150) + # The strict walkback (target <= safe_target.slot) adds ~1 slot delay. + await assert_all_finalized_to(node_cluster, target_slot=1, timeout=180) # Verify heads converged across nodes. # From 05e18ab07a5618dff14e6322428b36df8a9e62d8 Mon Sep 17 00:00:00 2001 From: kamilsa Date: Fri, 13 Feb 2026 11:10:54 +0500 Subject: [PATCH 14/16] fix: prevent isolated node failures with gossip mesh verification The [6, 6, 0] finalization failure (2 nodes finalized, 1 stuck at genesis) indicates a gossip mesh formation race condition on slow CI machines. Changes: 1. Increase mesh stabilization delay from 10s to 15s for slower CI 2. Add assert_heads_consistent check after peer connections to verify gossip subscriptions are active (not just peer connections) This catches nodes that have peer connections but haven't completed gossip topic subscription handshakes, preventing isolated node scenarios. --- tests/interop/helpers/node_runner.py | 3 ++- tests/interop/test_multi_node.py | 13 +++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/tests/interop/helpers/node_runner.py b/tests/interop/helpers/node_runner.py index 4b761c0e..9c708fe7 100644 --- a/tests/interop/helpers/node_runner.py +++ b/tests/interop/helpers/node_runner.py @@ -466,7 +466,8 @@ async def start_all( # # A longer delay ensures proper mesh formation before block production. # CI machines need more time due to lower CPU/scheduling priority. - await asyncio.sleep(10.0) + # Increased to 15s to handle slow CI and strict attestation target walkback. + await asyncio.sleep(15.0) # Phase 4: Start node services (validators, chain service, etc). # diff --git a/tests/interop/test_multi_node.py b/tests/interop/test_multi_node.py index ecead2a4..3cc70af1 100644 --- a/tests/interop/test_multi_node.py +++ b/tests/interop/test_multi_node.py @@ -102,6 +102,13 @@ async def test_mesh_finalization(node_cluster: NodeCluster) -> None: # The 30s timeout handles slow handshakes. await assert_peer_connections(node_cluster, min_peers=2, timeout=30) + # Verify gossip is working by checking head consensus. + # + # After mesh formation, nodes should converge on the same head within a few slots. + # This confirms gossip subscriptions are active (not just peer connections). + # Catches race conditions where peers connect but gossip mesh isn't fully formed. + await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=30) + # Wait for finalization with convergence-based polling. # # Instead of a fixed duration, we actively poll for the target state. @@ -169,6 +176,12 @@ async def test_mesh_2_2_2_finalization(node_cluster: NodeCluster) -> None: # Using min_peers=1 ensures spokes pass the check. await assert_peer_connections(node_cluster, min_peers=1, timeout=30) + # Verify gossip mesh is active before waiting for finalization. + # + # Nodes should converge on head via gossip propagation through hub. + # This ensures the hub is properly relaying messages to spokes. + await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=30) + # Wait for finalization with convergence-based polling. # # Hub-and-spoke adds latency (messages route through hub) From f9c6d09fd8faba1a5af978540d2658b3662e2e52 Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Sat, 14 Feb 2026 16:08:01 +0100 Subject: [PATCH 15/16] Address PR review comments from @unnawut - Remove stale aggregated attestation publishing during catch-up - Remove unused subnet_id and peer_id parameters from sync service - Add warning log on aggregation validation failure - Revert non-critical test file changes Co-Authored-By: Claude Opus 4.6 --- src/lean_spec/subspecs/chain/service.py | 9 +- .../subspecs/networking/service/service.py | 12 +- src/lean_spec/subspecs/node/node.py | 2 +- src/lean_spec/subspecs/sync/service.py | 11 +- .../fc/test_attestation_target_selection.py | 112 +++++++++--------- 5 files changed, 68 insertions(+), 78 deletions(-) diff --git a/src/lean_spec/subspecs/chain/service.py b/src/lean_spec/subspecs/chain/service.py index 16ab4d3f..263db37d 100644 --- a/src/lean_spec/subspecs/chain/service.py +++ b/src/lean_spec/subspecs/chain/service.py @@ -168,17 +168,16 @@ async def _initial_tick(self) -> Interval | None: # Only tick if we're past genesis. if current_time >= self.clock.genesis_time: - new_store, new_aggregated_attestations = self.sync_service.store.on_tick( + new_store, _ = self.sync_service.store.on_tick( time=current_time, has_proposal=False, is_aggregator=self.sync_service.is_aggregator, ) self.sync_service.store = new_store - # Publish any aggregated attestations produced during catch-up. - if new_aggregated_attestations: - for agg in new_aggregated_attestations: - await self.sync_service.publish_aggregated_attestation(agg) + # Discard aggregated attestations from catch-up. + # During initial sync we may be many slots behind. + # Publishing stale aggregations would spam the network. return self.clock.total_intervals() diff --git a/src/lean_spec/subspecs/networking/service/service.py b/src/lean_spec/subspecs/networking/service/service.py index 80eb8f7a..2a0f020d 100644 --- a/src/lean_spec/subspecs/networking/service/service.py +++ b/src/lean_spec/subspecs/networking/service/service.py @@ -147,21 +147,17 @@ async def _handle_event(self, event: NetworkEvent) -> None: ) await self.sync_service.on_gossip_block(block, peer_id) - case GossipAttestationEvent(attestation=attestation, peer_id=peer_id, topic=topic): + case GossipAttestationEvent(attestation=attestation): # # SyncService will validate signature and update forkchoice. - await self.sync_service.on_gossip_attestation( - attestation=attestation, - subnet_id=topic.subnet_id or 0, - peer_id=peer_id, - ) + await self.sync_service.on_gossip_attestation(attestation) - case GossipAggregatedAttestationEvent(signed_attestation=att, peer_id=peer_id): + case GossipAggregatedAttestationEvent(signed_attestation=att): # Route aggregated attestations to sync service. # # Aggregates contain multiple validator votes and are used # to advance justification and finalization. - await self.sync_service.on_gossip_aggregated_attestation(att, peer_id) + await self.sync_service.on_gossip_aggregated_attestation(att) case PeerStatusEvent(peer_id=peer_id, status=status): # Route peer status updates to sync service. diff --git a/src/lean_spec/subspecs/node/node.py b/src/lean_spec/subspecs/node/node.py index ceb979ab..82178da2 100644 --- a/src/lean_spec/subspecs/node/node.py +++ b/src/lean_spec/subspecs/node/node.py @@ -267,7 +267,7 @@ async def publish_attestation_wrapper(attestation: SignedAttestation) -> None: subnet_id = attestation.validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT) await network_service.publish_attestation(attestation, subnet_id) # Also route locally so we can aggregate our own attestation - await sync_service.on_gossip_attestation(attestation, subnet_id, peer_id=None) + await sync_service.on_gossip_attestation(attestation) async def publish_block_wrapper(block: SignedBlockWithAttestation) -> None: await network_service.publish_block(block) diff --git a/src/lean_spec/subspecs/sync/service.py b/src/lean_spec/subspecs/sync/service.py index 3b74d42f..ae9498a7 100644 --- a/src/lean_spec/subspecs/sync/service.py +++ b/src/lean_spec/subspecs/sync/service.py @@ -412,8 +412,6 @@ async def on_gossip_block( async def on_gossip_attestation( self, attestation: SignedAttestation, - subnet_id: int, - peer_id: PeerId | None = None, ) -> None: """ Handle attestation received via gossip. @@ -427,8 +425,6 @@ async def on_gossip_attestation( Args: attestation: The signed attestation received. - subnet_id: Subnet ID the attestation was received on. - peer_id: The peer that propagated the attestation (optional). """ # Guard: Only process gossip in states that accept it. # @@ -467,7 +463,6 @@ async def on_gossip_attestation( async def on_gossip_aggregated_attestation( self, signed_attestation: SignedAggregatedAttestation, - peer_id: PeerId, # noqa: ARG002 ) -> None: """ Handle aggregated attestation received via gossip. @@ -478,16 +473,14 @@ async def on_gossip_aggregated_attestation( Args: signed_attestation: The signed aggregated attestation received. - peer_id: The peer that propagated the aggregate (unused for now). """ if not self._state.accepts_gossip: return try: self.store = self.store.on_gossip_aggregated_attestation(signed_attestation) - except (AssertionError, KeyError): - # Aggregation validation failed. - pass + except (AssertionError, KeyError) as e: + logger.warning("Aggregated attestation validation failed: %s", e) async def publish_aggregated_attestation( self, diff --git a/tests/consensus/devnet/fc/test_attestation_target_selection.py b/tests/consensus/devnet/fc/test_attestation_target_selection.py index 8703a576..8044a7fa 100644 --- a/tests/consensus/devnet/fc/test_attestation_target_selection.py +++ b/tests/consensus/devnet/fc/test_attestation_target_selection.py @@ -17,22 +17,24 @@ def test_attestation_target_at_genesis_initially( fork_choice_test: ForkChoiceTestFiller, ) -> None: """ - Attestation target stays at genesis before safe target updates. + Attestation target starts at genesis before safe target updates. Scenario -------- Process two blocks at slots 1 and 2. Expected: - - After slot 1: target = slot 0 (walkback to safe_target) - - After slot 2: target = slot 0 (walkback to safe_target) + - After slot 1: target = slot 0 (genesis/finalized) + - After slot 2: target = slot 0 (genesis/finalized) + - Target root automatically validated against block at slot 0 Why This Matters ---------------- - Initially, the safe target is at genesis (slot 0). The attestation target - walks back to safe_target to maintain separation between head votes - (fork choice) and target votes (BFT finality). The chain bootstraps - via update_safe_target at interval 3. + Initially, the safe target is at genesis (slot 0), so the attestation + target walks back from head to genesis. + + This conservative behavior ensures validators don't attest too far ahead + before there's sufficient attestation weight to advance the safe target. """ fork_choice_test( steps=[ @@ -47,7 +49,7 @@ def test_attestation_target_at_genesis_initially( block=BlockSpec(slot=Slot(2)), checks=StoreChecks( head_slot=Slot(2), - attestation_target_slot=Slot(0), + attestation_target_slot=Slot(0), # Still genesis ), ), ], @@ -66,7 +68,7 @@ def test_attestation_target_advances_with_attestations( Expected: - Initial blocks: target stays at genesis (slot 0) - - Later blocks: target advances as walkback from head reaches further slots + - Later blocks: target advances as attestations accumulate - Target remains behind head for safety Why This Matters @@ -85,35 +87,35 @@ def test_attestation_target_advances_with_attestations( block=BlockSpec(slot=Slot(1)), checks=StoreChecks( head_slot=Slot(1), - attestation_target_slot=Slot(0), # Walks back to safe_target + attestation_target_slot=Slot(0), # Still at genesis ), ), BlockStep( block=BlockSpec(slot=Slot(2)), checks=StoreChecks( head_slot=Slot(2), - attestation_target_slot=Slot(0), # Walks back to safe_target + attestation_target_slot=Slot(0), # Still at genesis ), ), BlockStep( block=BlockSpec(slot=Slot(3)), checks=StoreChecks( head_slot=Slot(3), - attestation_target_slot=Slot(0), # Walks back to safe_target + attestation_target_slot=Slot(0), # Still at genesis ), ), BlockStep( block=BlockSpec(slot=Slot(4)), checks=StoreChecks( head_slot=Slot(4), - attestation_target_slot=Slot(1), # 3-step walkback from 4 → 1 + attestation_target_slot=Slot(1), # Advances to slot 1 ), ), BlockStep( block=BlockSpec(slot=Slot(5)), checks=StoreChecks( head_slot=Slot(5), - attestation_target_slot=Slot(2), # 3-step walkback from 5 → 2 + attestation_target_slot=Slot(2), # Continues advancing ), ), ], @@ -162,7 +164,7 @@ def test_attestation_target_with_slot_gaps( block=BlockSpec(slot=Slot(5)), checks=StoreChecks( head_slot=Slot(5), - attestation_target_slot=Slot(0), # Walks back 5→3→1→0, at safe_target + attestation_target_slot=Slot(0), ), ), ], @@ -199,56 +201,56 @@ def test_attestation_target_with_extended_chain( block=BlockSpec(slot=Slot(1)), checks=StoreChecks( head_slot=Slot(1), - attestation_target_slot=Slot(0), + attestation_target_slot=Slot(0), # Genesis ), ), BlockStep( block=BlockSpec(slot=Slot(2)), checks=StoreChecks( head_slot=Slot(2), - attestation_target_slot=Slot(0), + attestation_target_slot=Slot(0), # Still genesis ), ), BlockStep( block=BlockSpec(slot=Slot(3)), checks=StoreChecks( head_slot=Slot(3), - attestation_target_slot=Slot(0), + attestation_target_slot=Slot(0), # Still genesis ), ), BlockStep( block=BlockSpec(slot=Slot(4)), checks=StoreChecks( head_slot=Slot(4), - attestation_target_slot=Slot(1), + attestation_target_slot=Slot(1), # Advances to slot 1 ), ), BlockStep( block=BlockSpec(slot=Slot(5)), checks=StoreChecks( head_slot=Slot(5), - attestation_target_slot=Slot(2), + attestation_target_slot=Slot(2), # Stable at 2 ), ), BlockStep( block=BlockSpec(slot=Slot(6)), checks=StoreChecks( head_slot=Slot(6), - attestation_target_slot=Slot(3), + attestation_target_slot=Slot(3), # Continues to advance ), ), BlockStep( block=BlockSpec(slot=Slot(7)), checks=StoreChecks( head_slot=Slot(7), - attestation_target_slot=Slot(4), + attestation_target_slot=Slot(4), # Continues advancing ), ), BlockStep( block=BlockSpec(slot=Slot(8)), checks=StoreChecks( head_slot=Slot(8), - attestation_target_slot=Slot(5), + attestation_target_slot=Slot(5), # Continues advancing ), ), ], @@ -294,39 +296,39 @@ def test_attestation_target_justifiable_constraint( head_slot=Slot(i), attestation_target_slot=Slot( # Mapping of current slot -> expected target slot - # Walkback stops at safe_target (slot 0) then - # justifiability is checked: delta = target - finalized + # delta = current_slot - JUSTIFICATION_LOOKBACK_SLOTS - finalized_slot + # delta = current_slot - 3 - 0 { - 1: 0, # Walks back to safe_target - 2: 0, # Walks back to safe_target - 3: 0, # Walks back to safe_target - 4: 1, # 3-step walkback from 4 → 1, delta 1 ≤ 5 - 5: 2, # 3-step walkback from 5 → 2, delta 2 ≤ 5 - 6: 3, # 3-step walkback from 6 → 3, delta 3 ≤ 5 - 7: 4, # 3-step walkback from 7 → 4, delta 4 ≤ 5 - 8: 5, # 3-step walkback from 8 → 5, delta 5 ≤ 5 - 9: 6, # delta = 6, pronic number (2*3) - 10: 6, # delta = 7, not justifiable → walks to 6 - 11: 6, # delta = 8, not justifiable → walks to 6 - 12: 9, # delta = 9, perfect square (3^2) - 13: 9, # delta = 10, not justifiable → walks to 9 - 14: 9, # delta = 11, not justifiable → walks to 9 - 15: 12, # delta = 12, pronic number (3*4) - 16: 12, # delta = 13, not justifiable → walks to 12 - 17: 12, # delta = 14, not justifiable → walks to 12 - 18: 12, # delta = 15, not justifiable → walks to 12 - 19: 16, # delta = 16, perfect square (4^2) - 20: 16, # delta = 17, not justifiable → walks to 16 - 21: 16, # delta = 18, not justifiable → walks to 16 - 22: 16, # delta = 19, not justifiable → walks to 16 - 23: 20, # delta = 20, pronic number (4*5) - 24: 20, # delta = 21, not justifiable → walks to 20 - 25: 20, # delta = 22, not justifiable → walks to 20 - 26: 20, # delta = 23, not justifiable → walks to 20 - 27: 20, # delta = 24, not justifiable → walks to 20 - 28: 25, # delta = 25, perfect square (5^2) - 29: 25, # delta = 26, not justifiable → walks to 25 - 30: 25, # delta = 27, not justifiable → walks to 25 + 1: 0, # 3-slot walkback reaches safe target at slot 0 + 2: 0, # 3-slot walkback reaches safe target at slot 0 + 3: 0, # 3-slot walkback reaches safe target at slot 0 + 4: 1, # delta = 4 - 3 - 0 = 1, Rule 1: delta 1 ≤ 5 + 5: 2, # delta = 5 - 3 - 0 = 2, Rule 1: delta 2 ≤ 5 + 6: 3, # delta = 6 - 3 - 0 = 3, Rule 1: delta 3 ≤ 5 + 7: 4, # delta = 7 - 3 - 0 = 4, Rule 1: delta 4 ≤ 5 + 8: 5, # delta = 8 - 3 - 0 = 5, Rule 1: delta 5 ≤ 5 + 9: 6, # delta = 6 - 0 = 6, Rule 3: pronic number (2*3) + 10: 6, # delta = 10 - 3 - 0 = 7 + 11: 6, # delta = 11 - 3 - 0 = 8 + 12: 9, # delta = 9 - 0 = 9, Rule 2: perfect square (3^2) + 13: 9, # delta = 13 - 3 - 0 = 10 + 14: 9, # delta = 14 - 3 - 0 = 11 + 15: 12, # delta = 15 - 3 - 0 = 12, Rule 3: pronic number (3*4) + 16: 12, # delta = 16 - 3 - 0 = 13 + 17: 12, # delta = 17 - 3 - 0 = 14 + 18: 12, # delta = 18 - 3 - 0 = 15 + 19: 16, # delta = 19 - 3 - 0 = 16, Rule 2: perfect square (4^2) + 20: 16, # delta = 20 - 3 - 0 = 17 + 21: 16, # delta = 21 - 3 - 0 = 18 + 22: 16, # delta = 22 - 3 - 0 = 19 + 23: 20, # delta = 23 - 3 - 0 = 20, Rule 3: pronic number (4*5) + 24: 20, # delta = 24 - 3 - 0 = 21 + 25: 20, # delta = 25 - 3 - 0 = 22 + 26: 20, # delta = 26 - 3 - 0 = 23 + 27: 20, # delta = 27 - 3 - 0 = 24 + 28: 25, # delta = 28 - 3 - 0 = 25, Rule 2: perfect square (5^2) + 29: 25, # delta = 29 - 3 - 0 = 26 + 30: 25, # delta = 30 - 3 - 0 = 27 }[i] ), ), From 214403fd5dd5dc9f6ab8e98359be191bf838ea1b Mon Sep 17 00:00:00 2001 From: Thomas Coratger Date: Sat, 14 Feb 2026 22:00:47 +0100 Subject: [PATCH 16/16] fix: interop finalization + redesign test suite --- src/lean_spec/subspecs/validator/service.py | 79 +++++++++- tests/interop/helpers/__init__.py | 3 + tests/interop/helpers/diagnostics.py | 58 ++++++++ tests/interop/helpers/node_runner.py | 15 +- tests/interop/test_attestation_pipeline.py | 154 ++++++++++++++++++++ tests/interop/test_block_pipeline.py | 87 +++++++++++ tests/interop/test_justification.py | 134 +++++++++++++++++ tests/interop/test_multi_node.py | 130 ++--------------- 8 files changed, 536 insertions(+), 124 deletions(-) create mode 100644 tests/interop/helpers/diagnostics.py create mode 100644 tests/interop/test_attestation_pipeline.py create mode 100644 tests/interop/test_block_pipeline.py create mode 100644 tests/interop/test_justification.py diff --git a/src/lean_spec/subspecs/validator/service.py b/src/lean_spec/subspecs/validator/service.py index e01f3c01..3968fbc4 100644 --- a/src/lean_spec/subspecs/validator/service.py +++ b/src/lean_spec/subspecs/validator/service.py @@ -39,6 +39,7 @@ from lean_spec.subspecs import metrics from lean_spec.subspecs.chain.clock import Interval, SlotClock +from lean_spec.subspecs.chain.config import ATTESTATION_COMMITTEE_COUNT from lean_spec.subspecs.containers import ( Attestation, AttestationData, @@ -54,7 +55,7 @@ ) from lean_spec.subspecs.containers.slot import Slot from lean_spec.subspecs.xmss import TARGET_SIGNATURE_SCHEME, GeneralizedXmssScheme -from lean_spec.subspecs.xmss.aggregation import AggregatedSignatureProof +from lean_spec.subspecs.xmss.aggregation import AggregatedSignatureProof, SignatureKey from lean_spec.types import Uint64 from .registry import ValidatorEntry, ValidatorRegistry @@ -286,10 +287,14 @@ async def _maybe_produce_block(self, slot: Slot) -> None: # This adds our attestation and signatures to the block. signed_block = self._sign_block(block, validator_index, signatures) - # The proposer's attestation is already stored in the block. - # When the block is broadcast, the proposer signature is tracked - # in gossip_signatures for future aggregation. - # No need to separately process the proposer attestation. + # Store proposer's attestation signature locally for aggregation. + # + # The proposer's block is already in the store from produce_block_with_signatures. + # When on_gossip_block is called locally, it returns early (duplicate check). + # So the proposer's attestation signature never reaches gossip_signatures + # via on_block. We must store it explicitly here so the aggregator + # (which may be this same node) can include it in aggregation. + self._store_proposer_attestation_signature(signed_block, validator_index) self._blocks_produced += 1 metrics.blocks_proposed.inc() @@ -323,6 +328,21 @@ async def _produce_attestations(self, slot: Slot) -> None: Args: slot: Current slot number. """ + # Wait briefly for the current slot's block to arrive via gossip. + # + # At interval 1 (800ms after slot start), the slot's block may not + # have arrived yet from the proposer node (production + gossip + verification + # can exceed 800ms on slow machines). Without the block, attestations + # would reference an old head, causing safe_target to stall. + store = self.sync_service.store + current_slot_has_block = any(block.slot == slot for block in store.blocks.values()) + if not current_slot_has_block: + for _ in range(8): + await asyncio.sleep(0.05) + store = self.sync_service.store + if any(block.slot == slot for block in store.blocks.values()): + break + # Ensure we are attesting to the latest known head self.sync_service.store = self.sync_service.store.update_head() store = self.sync_service.store @@ -462,6 +482,55 @@ def _sign_attestation( signature=signature, ) + def _store_proposer_attestation_signature( + self, + signed_block: SignedBlockWithAttestation, + validator_index: ValidatorIndex, + ) -> None: + """ + Store the proposer's attestation signature in gossip_signatures. + + When the proposer produces a block, the block is added to the store + immediately. The subsequent local on_gossip_block call returns early + because the block is already in the store (duplicate check). This means + the proposer's attestation signature never reaches gossip_signatures + via the normal on_block path. + + This method explicitly stores the signature so aggregation can include it. + + Args: + signed_block: The signed block containing the proposer attestation. + validator_index: The proposer's validator index. + """ + store = self.sync_service.store + if store.validator_id is None: + return + + # Only store if the proposer is in the same subnet as the aggregator. + proposer_subnet = validator_index.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT) + current_subnet = store.validator_id.compute_subnet_id(ATTESTATION_COMMITTEE_COUNT) + if proposer_subnet != current_subnet: + return + + proposer_attestation = signed_block.message.proposer_attestation + proposer_signature = signed_block.signature.proposer_signature + data_root = proposer_attestation.data.data_root_bytes() + + sig_key = SignatureKey(validator_index, data_root) + new_gossip_sigs = dict(store.gossip_signatures) + new_gossip_sigs[sig_key] = proposer_signature + + # Also store the attestation data for later extraction during aggregation. + new_attestation_data_by_root = dict(store.attestation_data_by_root) + new_attestation_data_by_root[data_root] = proposer_attestation.data + + self.sync_service.store = store.model_copy( + update={ + "gossip_signatures": new_gossip_sigs, + "attestation_data_by_root": new_attestation_data_by_root, + } + ) + def _ensure_prepared_for_epoch( self, entry: ValidatorEntry, diff --git a/tests/interop/helpers/__init__.py b/tests/interop/helpers/__init__.py index ec534d61..30a27b16 100644 --- a/tests/interop/helpers/__init__.py +++ b/tests/interop/helpers/__init__.py @@ -8,6 +8,7 @@ assert_peer_connections, assert_same_finalized_checkpoint, ) +from .diagnostics import PipelineDiagnostics from .node_runner import NodeCluster, TestNode from .port_allocator import PortAllocator from .topology import chain, full_mesh, mesh_2_2_2, star @@ -20,6 +21,8 @@ "assert_block_propagated", "assert_chain_progressing", "assert_same_finalized_checkpoint", + # Diagnostics + "PipelineDiagnostics", # Node management "TestNode", "NodeCluster", diff --git a/tests/interop/helpers/diagnostics.py b/tests/interop/helpers/diagnostics.py new file mode 100644 index 00000000..957e1e1f --- /dev/null +++ b/tests/interop/helpers/diagnostics.py @@ -0,0 +1,58 @@ +""" +Pipeline diagnostics for interop tests. + +Captures attestation pipeline state from test nodes for debugging +and assertions. Provides a snapshot of the full pipeline: +block production -> attestation -> aggregation -> safe target -> justification. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +from .node_runner import TestNode + + +@dataclass(frozen=True, slots=True) +class PipelineDiagnostics: + """Snapshot of a node's attestation pipeline state.""" + + head_slot: int + """Current head slot.""" + + safe_target_slot: int + """Current safe target slot.""" + + finalized_slot: int + """Latest finalized slot.""" + + justified_slot: int + """Latest justified slot.""" + + gossip_signatures_count: int + """Number of pending gossip signatures (pre-aggregation).""" + + new_aggregated_count: int + """Number of entries in latest_new_aggregated_payloads.""" + + known_aggregated_count: int + """Number of entries in latest_known_aggregated_payloads.""" + + block_count: int + """Total blocks in the store.""" + + @classmethod + def from_node(cls, node: TestNode) -> PipelineDiagnostics: + """Capture diagnostics from a test node.""" + store = node._store + safe_block = store.blocks.get(store.safe_target) + return cls( + head_slot=node.head_slot, + safe_target_slot=int(safe_block.slot) if safe_block else 0, + finalized_slot=node.finalized_slot, + justified_slot=node.justified_slot, + gossip_signatures_count=len(store.gossip_signatures), + new_aggregated_count=len(store.latest_new_aggregated_payloads), + known_aggregated_count=len(store.latest_known_aggregated_payloads), + block_count=len(store.blocks), + ) diff --git a/tests/interop/helpers/node_runner.py b/tests/interop/helpers/node_runner.py index 9c708fe7..e38b4e60 100644 --- a/tests/interop/helpers/node_runner.py +++ b/tests/interop/helpers/node_runner.py @@ -202,6 +202,8 @@ class NodeCluster: def __post_init__(self) -> None: """Initialize validators and keys.""" self._generate_validators() + # Default genesis time for single-node starts. + # start_all() overrides this to align with service start. self._genesis_time = int(time.time()) def _generate_validators(self) -> None: @@ -417,6 +419,13 @@ async def start_all( if validators_per_node is None: validators_per_node = self._distribute_validators(num_nodes) + # Set genesis time to coincide with service start. + # + # Phases 1-3 (node creation, connection, mesh stabilization) take ~10s. + # Setting genesis in the future prevents wasting slots during setup. + # The first block will be produced at slot 1, shortly after services start. + self._genesis_time = int(time.time()) + 10 + # Phase 1: Create nodes with networking ready but services not running. # # This allows the gossipsub mesh to form before validators start @@ -464,10 +473,8 @@ async def start_all( # 2. Subscription RPCs to be exchanged # 3. GRAFT messages to be sent and processed # - # A longer delay ensures proper mesh formation before block production. - # CI machines need more time due to lower CPU/scheduling priority. - # Increased to 15s to handle slow CI and strict attestation target walkback. - await asyncio.sleep(15.0) + # 5s allows ~7 heartbeats which is sufficient for mesh formation. + await asyncio.sleep(5.0) # Phase 4: Start node services (validators, chain service, etc). # diff --git a/tests/interop/test_attestation_pipeline.py b/tests/interop/test_attestation_pipeline.py new file mode 100644 index 00000000..e97073a5 --- /dev/null +++ b/tests/interop/test_attestation_pipeline.py @@ -0,0 +1,154 @@ +""" +Attestation production and delivery pipeline tests. + +Verifies that validators produce attestations referencing the correct +head and that attestations are delivered to the aggregator. +""" + +from __future__ import annotations + +import asyncio +import logging +import time + +import pytest + +from .helpers import ( + NodeCluster, + PipelineDiagnostics, + assert_peer_connections, + full_mesh, +) + +logger = logging.getLogger(__name__) + +pytestmark = pytest.mark.interop + + +@pytest.mark.timeout(60) +@pytest.mark.num_validators(3) +async def test_attestation_head_references(node_cluster: NodeCluster) -> None: + """ + Verify attestations reference the current slot's block, not genesis. + + After the first block is produced and propagated, attestations from + non-proposer validators should point to that block as their head. + """ + topology = full_mesh(3) + validators_per_node = [[0], [1], [2]] + + await node_cluster.start_all(topology, validators_per_node) + await assert_peer_connections(node_cluster, min_peers=2, timeout=15) + + # Wait for ~3 slots so attestations have been produced. + await asyncio.sleep(16) + + # Check that gossip_signatures or aggregated payloads exist. + # If attestations reference genesis with target==source, they'd be skipped. + # So the presence of valid aggregated payloads indicates correct head references. + for node in node_cluster.nodes: + diag = PipelineDiagnostics.from_node(node) + logger.info( + "Node %d: head=%d safe_target=%d gossip_sigs=%d new_agg=%d known_agg=%d", + node.index, + diag.head_slot, + diag.safe_target_slot, + diag.gossip_signatures_count, + diag.new_aggregated_count, + diag.known_aggregated_count, + ) + + # At least one node should have aggregated payloads (the aggregator). + total_agg = sum( + PipelineDiagnostics.from_node(n).new_aggregated_count + + PipelineDiagnostics.from_node(n).known_aggregated_count + for n in node_cluster.nodes + ) + assert total_agg > 0, "No aggregated attestation payloads found on any node" + + +@pytest.mark.timeout(60) +@pytest.mark.num_validators(3) +async def test_attestation_gossip_delivery(node_cluster: NodeCluster) -> None: + """ + Verify attestations reach the aggregator node via gossip. + + The aggregator collects gossip signatures from subnet attestation topics. + After a few slots, the aggregator should have collected signatures from + multiple validators. + """ + topology = full_mesh(3) + validators_per_node = [[0], [1], [2]] + + await node_cluster.start_all(topology, validators_per_node) + await assert_peer_connections(node_cluster, min_peers=2, timeout=15) + + # Wait for ~2 slots for attestations to be produced and gossiped. + await asyncio.sleep(12) + + # Find aggregator nodes (those with gossip_signatures). + for node in node_cluster.nodes: + diag = PipelineDiagnostics.from_node(node) + if diag.gossip_signatures_count > 0 or diag.new_aggregated_count > 0: + logger.info( + "Node %d has pipeline activity: gossip_sigs=%d new_agg=%d", + node.index, + diag.gossip_signatures_count, + diag.new_aggregated_count, + ) + + # At least one aggregator should have received signatures. + max_sigs = max( + PipelineDiagnostics.from_node(n).gossip_signatures_count + + PipelineDiagnostics.from_node(n).new_aggregated_count + for n in node_cluster.nodes + ) + assert max_sigs > 0, "No gossip signatures or aggregated payloads found on any node" + + +@pytest.mark.timeout(90) +@pytest.mark.num_validators(3) +async def test_safe_target_advancement(node_cluster: NodeCluster) -> None: + """ + Verify safe_target advances beyond genesis after aggregation. + + After aggregation at interval 2 and safe target update at interval 3, + the safe_target should point to a non-genesis block. This is a + prerequisite for meaningful attestation targets. + """ + topology = full_mesh(3) + validators_per_node = [[0], [1], [2]] + + await node_cluster.start_all(topology, validators_per_node) + await assert_peer_connections(node_cluster, min_peers=2, timeout=15) + + # Wait for enough slots for safe_target to advance. + # Needs: block production -> attestation -> aggregation -> safe target update. + start = time.monotonic() + timeout = 60.0 + + while time.monotonic() - start < timeout: + diags = [PipelineDiagnostics.from_node(n) for n in node_cluster.nodes] + safe_targets = [d.safe_target_slot for d in diags] + + if any(st > 0 for st in safe_targets): + logger.info("Safe target advanced: %s", safe_targets) + return + + logger.debug("Safe targets still at genesis: %s", safe_targets) + await asyncio.sleep(2.0) + + diags = [PipelineDiagnostics.from_node(n) for n in node_cluster.nodes] + for i, d in enumerate(diags): + logger.error( + "Node %d: head=%d safe=%d fin=%d just=%d gsigs=%d nagg=%d kagg=%d", + i, + d.head_slot, + d.safe_target_slot, + d.finalized_slot, + d.justified_slot, + d.gossip_signatures_count, + d.new_aggregated_count, + d.known_aggregated_count, + ) + raise AssertionError(f"Safe target never advanced beyond genesis: {safe_targets}") diff --git a/tests/interop/test_block_pipeline.py b/tests/interop/test_block_pipeline.py new file mode 100644 index 00000000..b76006a4 --- /dev/null +++ b/tests/interop/test_block_pipeline.py @@ -0,0 +1,87 @@ +""" +Block production and propagation pipeline tests. + +Verifies that blocks are produced, propagated via gossip, +and integrated into all nodes' stores. +""" + +from __future__ import annotations + +import asyncio +import logging + +import pytest + +from .helpers import ( + NodeCluster, + PipelineDiagnostics, + assert_peer_connections, + full_mesh, +) + +logger = logging.getLogger(__name__) + +pytestmark = pytest.mark.interop + + +@pytest.mark.timeout(60) +@pytest.mark.num_validators(3) +async def test_block_production_single_slot(node_cluster: NodeCluster) -> None: + """ + Verify that a block is produced and reaches all nodes within one slot. + + After mesh stabilization and service start, the proposer for slot 1 + should produce a block that propagates to all 3 nodes. + """ + topology = full_mesh(3) + validators_per_node = [[0], [1], [2]] + + await node_cluster.start_all(topology, validators_per_node) + await assert_peer_connections(node_cluster, min_peers=2, timeout=15) + + # Wait for one slot (4s) plus propagation margin. + await asyncio.sleep(8) + + for node in node_cluster.nodes: + diag = PipelineDiagnostics.from_node(node) + logger.info("Node %d: head_slot=%d blocks=%d", node.index, diag.head_slot, diag.block_count) + assert diag.head_slot >= 1, ( + f"Node {node.index} stuck at slot {diag.head_slot}, expected >= 1" + ) + + +@pytest.mark.timeout(60) +@pytest.mark.num_validators(3) +async def test_consecutive_blocks(node_cluster: NodeCluster) -> None: + """ + Verify blocks at consecutive slots reference correct parents. + + After several slots, each non-genesis block should have a parent_root + that points to the previous slot's block. + """ + topology = full_mesh(3) + validators_per_node = [[0], [1], [2]] + + await node_cluster.start_all(topology, validators_per_node) + await assert_peer_connections(node_cluster, min_peers=2, timeout=15) + + # Wait for ~3 slots. + await asyncio.sleep(16) + + # Check parent chain on node 0. + store = node_cluster.nodes[0]._store + head_block = store.blocks[store.head] + + # Walk back from head to genesis, verifying parent chain. + visited = 0 + current = head_block + while current.parent_root in store.blocks: + parent = store.blocks[current.parent_root] + assert current.slot > parent.slot, ( + f"Block at slot {current.slot} has parent at slot {parent.slot} (not decreasing)" + ) + current = parent + visited += 1 + + logger.info("Walked %d blocks in parent chain from head slot %d", visited, head_block.slot) + assert visited >= 2, f"Expected at least 2 blocks in chain, found {visited}" diff --git a/tests/interop/test_justification.py b/tests/interop/test_justification.py new file mode 100644 index 00000000..941d5f9e --- /dev/null +++ b/tests/interop/test_justification.py @@ -0,0 +1,134 @@ +""" +Justification and finalization pipeline tests. + +Verifies the full consensus lifecycle from block production through +checkpoint justification and finalization. +""" + +from __future__ import annotations + +import asyncio +import logging +import time + +import pytest + +from .helpers import ( + NodeCluster, + PipelineDiagnostics, + assert_all_finalized_to, + assert_heads_consistent, + assert_peer_connections, + assert_same_finalized_checkpoint, + full_mesh, + mesh_2_2_2, +) + +logger = logging.getLogger(__name__) + +pytestmark = pytest.mark.interop + + +@pytest.mark.timeout(120) +@pytest.mark.num_validators(3) +async def test_first_justification(node_cluster: NodeCluster) -> None: + """ + Verify that the first justification event occurs. + + Justification requires 2/3+ attestation weight on a target checkpoint. + With 3 validators, 2 must attest to the same target. This test waits + for the justified_slot to advance beyond genesis on any node. + """ + topology = full_mesh(3) + validators_per_node = [[0], [1], [2]] + + await node_cluster.start_all(topology, validators_per_node) + await assert_peer_connections(node_cluster, min_peers=2, timeout=15) + + start = time.monotonic() + timeout = 90.0 + + while time.monotonic() - start < timeout: + justified_slots = [n.justified_slot for n in node_cluster.nodes] + + if any(js > 0 for js in justified_slots): + logger.info("First justification achieved: %s", justified_slots) + return + + # Log pipeline state periodically for diagnostics. + if int(time.monotonic() - start) % 10 == 0: + for node in node_cluster.nodes: + diag = PipelineDiagnostics.from_node(node) + logger.info( + "Node %d: head=%d safe=%d just=%d fin=%d", + node.index, + diag.head_slot, + diag.safe_target_slot, + diag.justified_slot, + diag.finalized_slot, + ) + + await asyncio.sleep(2.0) + + diags = [PipelineDiagnostics.from_node(n) for n in node_cluster.nodes] + for i, d in enumerate(diags): + logger.error( + "Node %d: head=%d safe=%d fin=%d just=%d gsigs=%d nagg=%d kagg=%d", + i, + d.head_slot, + d.safe_target_slot, + d.finalized_slot, + d.justified_slot, + d.gossip_signatures_count, + d.new_aggregated_count, + d.known_aggregated_count, + ) + raise AssertionError(f"No justification after {timeout}s: {[d.justified_slot for d in diags]}") + + +@pytest.mark.timeout(150) +@pytest.mark.num_validators(3) +async def test_finalization_full_mesh(node_cluster: NodeCluster) -> None: + """ + Verify chain finalization in a fully connected network. + + Tests the complete consensus lifecycle: + + - Block production and gossip propagation + - Attestation aggregation across validators + - Checkpoint justification (2/3+ votes) + - Checkpoint finalization (justified child of justified parent) + + Network topology: Full mesh (every node connected to every other). + """ + topology = full_mesh(3) + validators_per_node = [[0], [1], [2]] + + await node_cluster.start_all(topology, validators_per_node) + await assert_peer_connections(node_cluster, min_peers=2, timeout=15) + await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=30) + + await assert_all_finalized_to(node_cluster, target_slot=1, timeout=90) + await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=15) + await assert_same_finalized_checkpoint(node_cluster.nodes, timeout=15) + + +@pytest.mark.timeout(150) +@pytest.mark.num_validators(3) +async def test_finalization_hub_spoke(node_cluster: NodeCluster) -> None: + """ + Verify finalization with hub-and-spoke topology. + + Node 0 is the hub; nodes 1 and 2 are spokes that only connect to the hub. + Messages between spokes must route through the hub. + """ + topology = mesh_2_2_2() + validators_per_node = [[0], [1], [2]] + + await node_cluster.start_all(topology, validators_per_node) + await assert_peer_connections(node_cluster, min_peers=1, timeout=15) + await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=30) + + await assert_all_finalized_to(node_cluster, target_slot=1, timeout=90) + await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=15) + await assert_same_finalized_checkpoint(node_cluster.nodes, timeout=15) diff --git a/tests/interop/test_multi_node.py b/tests/interop/test_multi_node.py index 3cc70af1..64b22cff 100644 --- a/tests/interop/test_multi_node.py +++ b/tests/interop/test_multi_node.py @@ -45,7 +45,7 @@ pytestmark = pytest.mark.interop -@pytest.mark.timeout(300) +@pytest.mark.timeout(150) @pytest.mark.num_validators(3) async def test_mesh_finalization(node_cluster: NodeCluster) -> None: """ @@ -61,143 +61,43 @@ async def test_mesh_finalization(node_cluster: NodeCluster) -> None: - Checkpoint finalization (justified child of justified parent) Network topology: Full mesh (every node connected to every other). - This maximizes connectivity and minimizes propagation latency. - - Timing rationale: - - - 60s timeout: allows ~15 slots at 4s each, plenty for finalization - - 30s run duration: ~7-8 slots, enough for 2 epochs of justification - - 15s peer timeout: sufficient for QUIC handshake - - The Ream project uses similar parameters for compatibility testing. """ - # Build the network topology. - # - # Full mesh with 3 nodes creates 3 bidirectional connections: - # - Node 0 <-> Node 1 - # - Node 0 <-> Node 2 - # - Node 1 <-> Node 2 topology = full_mesh(3) - - # Assign exactly one validator to each node. - # - # Validator indices match node indices for clarity. - # With 3 validators total, each controls 1/3 of voting power. validators_per_node = [[0], [1], [2]] - # Start all nodes with the configured topology. - # - # Each node begins: - # - # - Listening on a unique port - # - Connecting to peers per topology - # - Running the block production loop - # - Subscribing to gossip topics await node_cluster.start_all(topology, validators_per_node) - # Wait for peer connections before proceeding. - # - # Each node needs at least 2 peers (the other two nodes). - # This ensures gossip will reach all nodes. - # The 30s timeout handles slow handshakes. - await assert_peer_connections(node_cluster, min_peers=2, timeout=30) - - # Verify gossip is working by checking head consensus. - # - # After mesh formation, nodes should converge on the same head within a few slots. - # This confirms gossip subscriptions are active (not just peer connections). - # Catches race conditions where peers connect but gossip mesh isn't fully formed. + await assert_peer_connections(node_cluster, min_peers=2, timeout=15) await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=30) - # Wait for finalization with convergence-based polling. - # - # Instead of a fixed duration, we actively poll for the target state. - # This is more robust under varying CI performance. - # - # Finalization requires 2 consecutive justified epochs. - # With 3 validators and 4s slots, this typically takes ~40-50s - # due to the 1-slot delay in attestation target advancement. - # The strict walkback (target <= safe_target.slot) means targets - # lag by 1 slot compared to heads, requiring extra time for consensus. - await assert_all_finalized_to(node_cluster, target_slot=1, timeout=180) - - # Verify heads converged across nodes. - # - # After finalization, all nodes should agree on head within 2 slots. - await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=30) + # With aligned genesis time, finalization typically occurs ~40s after service start. + await assert_all_finalized_to(node_cluster, target_slot=1, timeout=90) - # Verify finalized checkpoints are consistent. - # - # All nodes must agree on the finalized checkpoint. - # Finalization is irreversible - divergent finalization would be catastrophic. - await assert_same_finalized_checkpoint(node_cluster.nodes, timeout=30) + await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=15) + await assert_same_finalized_checkpoint(node_cluster.nodes, timeout=15) -@pytest.mark.timeout(300) +@pytest.mark.timeout(150) @pytest.mark.num_validators(3) async def test_mesh_2_2_2_finalization(node_cluster: NodeCluster) -> None: """ Verify finalization with hub-and-spoke topology. - This tests consensus under restricted connectivity: - - - Node 0 is the hub (receives all connections) - - Nodes 1 and 2 are spokes (only connect to hub) - - Spokes cannot communicate directly - - Topology diagram:: - - Node 1 ---> Node 0 <--- Node 2 - - This is harder than full mesh because: - - - Messages between spokes must route through the hub - - Hub failure would partition the network - - Gossip takes two hops instead of one - - The test verifies that even with indirect connectivity, - the protocol achieves finalization. This matches the - Ream project's `test_lean_node_finalizes_mesh_2_2_2` test. + Node 0 is the hub; nodes 1 and 2 are spokes that only connect to the hub. + Messages between spokes must route through the hub. """ - # Build hub-and-spoke topology. - # - # Returns [(1, 0), (2, 0)]: nodes 1 and 2 dial node 0. - # Node 0 acts as the central hub. topology = mesh_2_2_2() - - # Same validator assignment as full mesh test. validators_per_node = [[0], [1], [2]] await node_cluster.start_all(topology, validators_per_node) - # Lower peer requirement than full mesh. - # - # Hub (node 0) has 2 peers; spokes have 1 peer each. - # Using min_peers=1 ensures spokes pass the check. - await assert_peer_connections(node_cluster, min_peers=1, timeout=30) - - # Verify gossip mesh is active before waiting for finalization. - # - # Nodes should converge on head via gossip propagation through hub. - # This ensures the hub is properly relaying messages to spokes. + await assert_peer_connections(node_cluster, min_peers=1, timeout=15) await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=30) - # Wait for finalization with convergence-based polling. - # - # Hub-and-spoke adds latency (messages route through hub) - # but the protocol should still achieve finalization. - # The strict walkback (target <= safe_target.slot) adds ~1 slot delay. - await assert_all_finalized_to(node_cluster, target_slot=1, timeout=180) + await assert_all_finalized_to(node_cluster, target_slot=1, timeout=90) - # Verify heads converged across nodes. - # - # Hub-and-spoke adds latency but should not cause divergence. - await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=30) - - # Finalized checkpoints must be identical. - # - # Even with indirect connectivity, finalization must be consistent. - await assert_same_finalized_checkpoint(node_cluster.nodes, timeout=30) + await assert_heads_consistent(node_cluster, max_slot_diff=2, timeout=15) + await assert_same_finalized_checkpoint(node_cluster.nodes, timeout=15) @pytest.mark.timeout(30) @@ -253,7 +153,7 @@ async def test_two_node_connection(node_cluster: NodeCluster) -> None: await assert_heads_consistent(node_cluster, max_slot_diff=2) -@pytest.mark.timeout(45) +@pytest.mark.timeout(60) @pytest.mark.num_validators(3) async def test_block_gossip_propagation(node_cluster: NodeCluster) -> None: """ @@ -319,7 +219,7 @@ async def test_block_gossip_propagation(node_cluster: NodeCluster) -> None: @pytest.mark.xfail(reason="Sync service doesn't pull missing blocks for isolated nodes") -@pytest.mark.timeout(180) +@pytest.mark.timeout(120) @pytest.mark.num_validators(3) async def test_partition_recovery(node_cluster: NodeCluster) -> None: """