Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
359e438
Update consensus logic: get_attestation_target, aggregation broadcast…
kamilsa Feb 11, 2026
a72c8de
fix: wire aggregated attestation broadcast through network pipeline
kamilsa Feb 11, 2026
9aa20e9
fix: update fill framework and tests for new on_tick tuple return type
kamilsa Feb 11, 2026
dc2c45b
fix: unskip multi-node interop tests and fix store attribute names
kamilsa Feb 11, 2026
a89d33a
fix: resolve lint, type, and test issues across tox suite
kamilsa Feb 11, 2026
26b9035
fix: use convergence-based polling in finalization tests
kamilsa Feb 11, 2026
5ace58d
fix: increase timeouts for CI reliability in interop tests
kamilsa Feb 11, 2026
109e555
chore: remove debug logging from state.py and store.py
kamilsa Feb 12, 2026
5fbaae2
some review fixes
kamilsa Feb 12, 2026
6cb53f0
Merge origin/main into fix/consensus-broadcast-updates
kamilsa Feb 13, 2026
3a89f8b
fix: use strict walkback to safe_target in attestation target selection
kamilsa Feb 13, 2026
caaf574
chore: restore docstrings in store.py unchanged code
kamilsa Feb 13, 2026
e6e8803
reduce diff
kamilsa Feb 13, 2026
6f5a86f
fix: increase finalization timeout for strict attestation target walk…
kamilsa Feb 13, 2026
05e18ab
fix: prevent isolated node failures with gossip mesh verification
kamilsa Feb 13, 2026
2f5fd75
Merge upstream/main into fix/consensus-broadcast-updates
tcoratger Feb 14, 2026
f9c6d09
Address PR review comments from @unnawut
tcoratger Feb 14, 2026
214403f
fix: interop finalization + redesign test suite
tcoratger Feb 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def make_fixture(self) -> Self:
# Time advancement may trigger slot boundaries.
# At slot boundaries, pending attestations may become active.
# Always act as aggregator to ensure gossip signatures are aggregated
store = store.on_tick(
store, _ = store.on_tick(
Uint64(step.time), has_proposal=False, is_aggregator=True
)

Expand Down Expand Up @@ -270,7 +270,7 @@ def make_fixture(self) -> Self:
# Always act as aggregator to ensure gossip signatures are aggregated
slot_duration_seconds = block.slot * SECONDS_PER_SLOT
block_time = store.config.genesis_time + slot_duration_seconds
store = store.on_tick(block_time, has_proposal=True, is_aggregator=True)
store, _ = store.on_tick(block_time, has_proposal=True, is_aggregator=True)

# Process the block through Store.
# This validates, applies state transition, and updates head.
Expand Down Expand Up @@ -399,7 +399,7 @@ def _build_block_from_spec(

# Aggregate gossip signatures and merge into known payloads.
# This makes recently gossiped attestations available for block construction.
aggregation_store = working_store.aggregate_committee_signatures()
aggregation_store, _ = working_store.aggregate_committee_signatures()
merged_store = aggregation_store.accept_new_attestations()

# Two sources of attestations:
Expand Down
16 changes: 14 additions & 2 deletions src/lean_spec/subspecs/chain/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,10 @@ async def run(self) -> None:
#
# This minimal service does not produce blocks.
# Block production requires validator keys.
new_store = self.sync_service.store.on_tick(
new_store, new_aggregated_attestations = self.sync_service.store.on_tick(
time=current_time,
has_proposal=False,
is_aggregator=self.sync_service.is_aggregator,
)

# Update sync service's store reference.
Expand All @@ -137,6 +138,11 @@ async def run(self) -> None:
# the updated time.
self.sync_service.store = new_store

# Publish any new aggregated attestations produced this tick
if new_aggregated_attestations:
for agg in new_aggregated_attestations:
await self.sync_service.publish_aggregated_attestation(agg)

logger.info(
"Tick: slot=%d interval=%d time=%d head=%s finalized=slot%d",
self.clock.current_slot(),
Expand All @@ -162,11 +168,17 @@ async def _initial_tick(self) -> Interval | None:

# Only tick if we're past genesis.
if current_time >= self.clock.genesis_time:
new_store = self.sync_service.store.on_tick(
new_store, _ = self.sync_service.store.on_tick(
time=current_time,
has_proposal=False,
is_aggregator=self.sync_service.is_aggregator,
)
self.sync_service.store = new_store

# Discard aggregated attestations from catch-up.
# During initial sync we may be many slots behind.
# Publishing stale aggregations would spam the network.

return self.clock.total_intervals()

return None
Expand Down
51 changes: 30 additions & 21 deletions src/lean_spec/subspecs/forkchoice/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,6 @@ def on_gossip_aggregated_attestation(
new_attestation_data_by_root = dict(self.attestation_data_by_root)
new_attestation_data_by_root[data_root] = data

store = self
for vid in validator_ids:
# Update Proof Map
#
Expand All @@ -503,7 +502,7 @@ def on_gossip_aggregated_attestation(
new_aggregated_payloads.setdefault(key, []).append(proof)

# Return store with updated aggregated payloads and attestation data
return store.model_copy(
return self.model_copy(
update={
"latest_new_aggregated_payloads": new_aggregated_payloads,
"attestation_data_by_root": new_attestation_data_by_root,
Expand Down Expand Up @@ -943,15 +942,15 @@ def update_safe_target(self) -> "Store":

return self.model_copy(update={"safe_target": safe_target})

def aggregate_committee_signatures(self) -> "Store":
def aggregate_committee_signatures(self) -> tuple["Store", list[SignedAggregatedAttestation]]:
"""
Aggregate committee signatures for attestations in committee_signatures.

This method aggregates signatures from the gossip_signatures map.
Attestations are reconstructed from gossip_signatures using attestation_data_by_root.

Returns:
New Store with updated latest_new_aggregated_payloads.
Tuple of (new Store with updated payloads, list of new SignedAggregatedAttestation).
"""
new_aggregated_payloads = dict(self.latest_new_aggregated_payloads)

Expand All @@ -976,13 +975,15 @@ def aggregate_committee_signatures(self) -> "Store":
committee_signatures,
)

# iterate to broadcast aggregated attestations
# Create list of aggregated attestations for broadcasting
new_aggregates: list[SignedAggregatedAttestation] = []
for aggregated_attestation, aggregated_signature in aggregated_results:
_ = SignedAggregatedAttestation(
data=aggregated_attestation.data,
proof=aggregated_signature,
new_aggregates.append(
SignedAggregatedAttestation(
data=aggregated_attestation.data,
proof=aggregated_signature,
)
)
# Note: here we should broadcast the aggregated signature to committee_aggregators topic

# Compute new aggregated payloads
new_gossip_sigs = dict(self.gossip_signatures)
Expand All @@ -1004,9 +1005,11 @@ def aggregate_committee_signatures(self) -> "Store":
"latest_new_aggregated_payloads": new_aggregated_payloads,
"gossip_signatures": new_gossip_sigs,
}
)
), new_aggregates

def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Store":
def tick_interval(
self, has_proposal: bool, is_aggregator: bool = False
) -> tuple["Store", list[SignedAggregatedAttestation]]:
"""
Advance store time by one interval and perform interval-specific actions.

Expand Down Expand Up @@ -1048,11 +1051,12 @@ def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Sto
is_aggregator: Whether the node is an aggregator.

Returns:
New Store with advanced time and interval-specific updates applied.
Tuple of (new store with advanced time, list of new signed aggregated attestation).
"""
# Advance time by one interval
store = self.model_copy(update={"time": self.time + Uint64(1)})
current_interval = store.time % INTERVALS_PER_SLOT
new_aggregates: list[SignedAggregatedAttestation] = []

if current_interval == Uint64(0):
# Start of slot - process attestations if proposal exists
Expand All @@ -1061,17 +1065,19 @@ def tick_interval(self, has_proposal: bool, is_aggregator: bool = False) -> "Sto
elif current_interval == Uint64(2):
# Aggregation interval - aggregators create proofs
if is_aggregator:
store = store.aggregate_committee_signatures()
store, new_aggregates = store.aggregate_committee_signatures()
elif current_interval == Uint64(3):
# Fast confirm - update safe target based on received proofs
store = store.update_safe_target()
elif current_interval == Uint64(4):
# End of slot - accept accumulated attestations
store = store.accept_new_attestations()

return store
return store, new_aggregates

def on_tick(self, time: Uint64, has_proposal: bool, is_aggregator: bool = False) -> "Store":
def on_tick(
self, time: Uint64, has_proposal: bool, is_aggregator: bool = False
) -> tuple["Store", list[SignedAggregatedAttestation]]:
"""
Advance forkchoice store time to given timestamp.

Expand All @@ -1085,22 +1091,25 @@ def on_tick(self, time: Uint64, has_proposal: bool, is_aggregator: bool = False)
is_aggregator: Whether the node is an aggregator.

Returns:
New Store with time advanced and all interval actions performed.
Tuple of (new store with time advanced,
list of all produced signed aggregated attestation).
"""
# Calculate target time in intervals
time_delta_ms = (time - self.config.genesis_time) * Uint64(1000)
tick_interval_time = time_delta_ms // MILLISECONDS_PER_INTERVAL

# Tick forward one interval at a time
store = self
all_new_aggregates: list[SignedAggregatedAttestation] = []
while store.time < tick_interval_time:
# Check if proposal should be signaled for next interval
should_signal_proposal = has_proposal and (store.time + Uint64(1)) == tick_interval_time

# Advance by one interval with appropriate signaling
store = store.tick_interval(should_signal_proposal, is_aggregator)
store, new_aggregates = store.tick_interval(should_signal_proposal, is_aggregator)
all_new_aggregates.extend(new_aggregates)

return store
return store, all_new_aggregates

def get_proposal_head(self, slot: Slot) -> tuple["Store", Bytes32]:
"""
Expand Down Expand Up @@ -1128,7 +1137,7 @@ def get_proposal_head(self, slot: Slot) -> tuple["Store", Bytes32]:
slot_time = self.config.genesis_time + slot_duration_seconds

# Advance time to current slot (ticking intervals)
store = self.on_tick(slot_time, True)
store, _ = self.on_tick(slot_time, True)

# Process any pending attestations before proposal
store = store.accept_new_attestations()
Expand Down Expand Up @@ -1192,7 +1201,7 @@ def get_attestation_target(self) -> Checkpoint:
# Create checkpoint from selected target block
target_block = self.blocks[target_block_root]

return Checkpoint(root=hash_tree_root(target_block), slot=target_block.slot)
return Checkpoint(root=target_block_root, slot=target_block.slot)

def produce_attestation_data(self, slot: Slot) -> AttestationData:
"""
Expand Down Expand Up @@ -1299,7 +1308,7 @@ def produce_block_with_signatures(
#
# The builder iteratively collects valid attestations.
# It returns the final block, post-state, and signature proofs.
final_block, final_post_state, _, signatures = head_state.build_block(
final_block, final_post_state, collected_attestations, signatures = head_state.build_block(
slot=slot,
proposer_index=validator_index,
parent_root=head_root,
Expand Down
38 changes: 36 additions & 2 deletions src/lean_spec/subspecs/networking/client/event_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@

from lean_spec.snappy import SnappyDecompressionError, frame_decompress
from lean_spec.subspecs.containers import SignedBlockWithAttestation
from lean_spec.subspecs.containers.attestation import SignedAttestation
from lean_spec.subspecs.containers.attestation import SignedAggregatedAttestation, SignedAttestation
from lean_spec.subspecs.networking.config import (
GOSSIPSUB_DEFAULT_PROTOCOL_ID,
GOSSIPSUB_PROTOCOL_ID_V12,
Expand All @@ -130,6 +130,7 @@
)
from lean_spec.subspecs.networking.reqresp.message import Status
from lean_spec.subspecs.networking.service.events import (
GossipAggregatedAttestationEvent,
GossipAttestationEvent,
GossipBlockEvent,
NetworkEvent,
Expand Down Expand Up @@ -235,7 +236,7 @@ def decode_message(
self,
topic_str: str,
compressed_data: bytes,
) -> SignedBlockWithAttestation | SignedAttestation | None:
) -> SignedBlockWithAttestation | SignedAttestation | SignedAggregatedAttestation | None:
"""
Decode a gossip message from topic and compressed data.

Expand Down Expand Up @@ -303,6 +304,8 @@ def decode_message(
return SignedBlockWithAttestation.decode_bytes(ssz_bytes)
case TopicKind.ATTESTATION_SUBNET:
return SignedAttestation.decode_bytes(ssz_bytes)
case TopicKind.AGGREGATED_ATTESTATION:
return SignedAggregatedAttestation.decode_bytes(ssz_bytes)
except SSZSerializationError as e:
raise GossipMessageError(f"SSZ decode failed: {e}") from e

Expand Down Expand Up @@ -726,6 +729,9 @@ async def _handle_gossipsub_message(self, event: GossipsubMessageEvent) -> None:
case TopicKind.ATTESTATION_SUBNET:
if isinstance(message, SignedAttestation):
await self._emit_gossip_attestation(message, event.peer_id)
case TopicKind.AGGREGATED_ATTESTATION:
if isinstance(message, SignedAggregatedAttestation):
await self._emit_gossip_aggregated_attestation(message, event.peer_id)

logger.debug("Processed gossipsub message %s from %s", topic.kind.value, event.peer_id)

Expand Down Expand Up @@ -1078,6 +1084,25 @@ async def _emit_gossip_attestation(
GossipAttestationEvent(attestation=attestation, peer_id=peer_id, topic=topic)
)

async def _emit_gossip_aggregated_attestation(
self,
signed_attestation: SignedAggregatedAttestation,
peer_id: PeerId,
) -> None:
"""
Emit a gossip aggregated attestation event.

Args:
signed_attestation: Aggregated attestation received from gossip.
peer_id: Peer that sent it.
"""
topic = GossipTopic(kind=TopicKind.AGGREGATED_ATTESTATION, fork_digest=self._fork_digest)
await self._events.put(
GossipAggregatedAttestationEvent(
signed_attestation=signed_attestation, peer_id=peer_id, topic=topic
)
)

async def _accept_streams(self, peer_id: PeerId, conn: QuicConnection) -> None:
"""
Accept incoming streams from a connection.
Expand Down Expand Up @@ -1370,6 +1395,15 @@ async def _handle_gossip_stream(self, peer_id: PeerId, stream: InboundStreamProt
# Type mismatch indicates a bug in decode_message.
logger.warning("Attestation topic but got %s", type(message).__name__)

case TopicKind.AGGREGATED_ATTESTATION:
if isinstance(message, SignedAggregatedAttestation):
await self._emit_gossip_aggregated_attestation(message, peer_id)
else:
logger.warning(
"Aggregated attestation topic but got %s",
type(message).__name__,
)

logger.debug("Received gossip %s from %s", topic.kind.value, peer_id)

except GossipMessageError as e:
Expand Down
22 changes: 21 additions & 1 deletion src/lean_spec/subspecs/networking/service/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from dataclasses import dataclass

from lean_spec.subspecs.containers import SignedBlockWithAttestation
from lean_spec.subspecs.containers.attestation import SignedAttestation
from lean_spec.subspecs.containers.attestation import SignedAggregatedAttestation, SignedAttestation
from lean_spec.subspecs.networking.gossipsub.topic import GossipTopic
from lean_spec.subspecs.networking.reqresp.message import Status
from lean_spec.subspecs.networking.transport import PeerId
Expand Down Expand Up @@ -67,6 +67,25 @@ class GossipAttestationEvent:
"""Topic the attestation was received on (includes fork digest)."""


@dataclass(frozen=True, slots=True)
class GossipAggregatedAttestationEvent:
"""
Aggregated attestation received via gossip subscription.

Fired when a signed aggregated attestation arrives from the gossipsub network.
Aggregates contain multiple validator votes combined into a single proof.
"""

signed_attestation: SignedAggregatedAttestation
"""The signed aggregated attestation."""

peer_id: PeerId
"""Peer that propagated this aggregated attestation to us."""

topic: GossipTopic
"""Topic the aggregated attestation was received on."""


@dataclass(frozen=True, slots=True)
class PeerStatusEvent:
"""
Expand Down Expand Up @@ -113,6 +132,7 @@ class PeerDisconnectedEvent:
NetworkEvent = (
GossipBlockEvent
| GossipAttestationEvent
| GossipAggregatedAttestationEvent
| PeerStatusEvent
| PeerConnectedEvent
| PeerDisconnectedEvent
Expand Down
Loading
Loading