diff --git a/examples/pubsub/gossipsub/__init__.py b/examples/pubsub/gossipsub/__init__.py new file mode 100755 index 000000000..357590a2c --- /dev/null +++ b/examples/pubsub/gossipsub/__init__.py @@ -0,0 +1,6 @@ +""" +Gossipsub Examples Package + +This package contains comprehensive examples showcasing the differences between +Gossipsub protocol versions and demonstrating advanced features. +""" diff --git a/examples/pubsub/gossipsub/gossipsub_v1.0.py b/examples/pubsub/gossipsub/gossipsub_v1.0.py new file mode 100755 index 000000000..b2ba2dfe7 --- /dev/null +++ b/examples/pubsub/gossipsub/gossipsub_v1.0.py @@ -0,0 +1,303 @@ +#!/usr/bin/env python3 +""" +Gossipsub 1.0 Example + +This example demonstrates the basic Gossipsub 1.0 protocol (/meshsub/1.0.0). +Gossipsub 1.0 provides basic mesh-based pubsub with simple flooding for message +dissemination. It has no peer scoring or advanced security features, making it +suitable for trusted networks with low adversarial activity. + +Features demonstrated: +- Basic mesh-based pubsub +- Simple message flooding +- Mesh topology maintenance +- Message publishing and subscription +- Fanout behaviour: a publisher that is not in the mesh (e.g. does not subscribe) + sends to a random set of topic subscribers (fanout peers) instead of mesh peers + +Usage (from repository root): + python examples/pubsub/gossipsub/gossipsub_v1.0.py --nodes 5 --duration 30 +""" + +import argparse +import logging +import random +import time + +import trio + +from libp2p import new_host +from libp2p.abc import IHost, ISubscriptionAPI +from libp2p.crypto.rsa import create_new_key_pair +from libp2p.custom_types import TProtocol +from libp2p.pubsub.gossipsub import GossipSub +from libp2p.pubsub.pubsub import Pubsub +from libp2p.stream_muxer.mplex.mplex import MPLEX_PROTOCOL_ID, Mplex +from libp2p.tools.anyio_service import background_trio_service +from libp2p.utils.address_validation import find_free_port + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger("gossipsub-v1.0") + +# Protocol version +GOSSIPSUB_V10 = TProtocol("/meshsub/1.0.0") +TOPIC = "gossipsub-v1.0-demo" + + +class GossipsubV10Node: + """A node running Gossipsub 1.0""" + + def __init__(self, node_id: str, port: int, fanout_only: bool = False): + self.node_id = node_id + self.port = port + self.fanout_only = fanout_only # If True, node only publishes (no subscribe) + self.host: IHost | None = None + self.pubsub: Pubsub | None = None + self.gossipsub: GossipSub | None = None + self.subscription: ISubscriptionAPI | None = None + self.messages_sent = 0 + self.messages_received = 0 + + async def start(self): + """Start the node with Gossipsub 1.0 configuration""" + key_pair = create_new_key_pair() + + self.host = new_host( + key_pair=key_pair, + muxer_opt={MPLEX_PROTOCOL_ID: Mplex}, + ) + + # Configure Gossipsub 1.0 - basic configuration only + self.gossipsub = GossipSub( + protocols=[GOSSIPSUB_V10], + degree=3, + degree_low=2, + degree_high=4, + heartbeat_interval=5, + heartbeat_initial_delay=1.0, + # No score_params - v1.0 doesn't have peer scoring + # No max_idontwant_messages - v1.0 doesn't support IDONTWANT + # No adaptive features - v1.0 doesn't have adaptive gossip + # No security features - v1.0 has basic security only + ) + + self.pubsub = Pubsub(self.host, self.gossipsub) + + # Start services + import multiaddr + + listen_addrs = [multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/{self.port}")] + + async with self.host.run(listen_addrs=listen_addrs): + async with background_trio_service(self.pubsub): + async with background_trio_service(self.gossipsub): + await self.pubsub.wait_until_ready() + if not self.fanout_only: + self.subscription = await self.pubsub.subscribe(TOPIC) + logger.info( + f"Node {self.node_id} (Gossipsub 1.0" + + (", fanout-only publisher" if self.fanout_only else "") + + f") started on port {self.port}" + ) + + # Keep running + await trio.sleep_forever() + + async def publish_message(self, message: str): + """Publish a message to the topic""" + if self.pubsub: + await self.pubsub.publish(TOPIC, message.encode()) + self.messages_sent += 1 + logger.info(f"Node {self.node_id} published: {message}") + + async def receive_messages(self): + """Receive and process messages""" + if not self.subscription: + return + + try: + while True: + if self.subscription is None: + break + message = await self.subscription.get() + decoded = message.data.decode("utf-8") + self.messages_received += 1 + logger.info(f"Node {self.node_id} received: {decoded}") + except Exception as e: + logger.debug(f"Node {self.node_id} receive loop ended: {e}") + + async def connect_to_peer(self, peer_addr: str): + """Connect to another peer""" + if self.host: + try: + import multiaddr + + from libp2p.peer.peerinfo import info_from_p2p_addr + + maddr = multiaddr.Multiaddr(peer_addr) + info = info_from_p2p_addr(maddr) + await self.host.connect(info) + logger.debug(f"Node {self.node_id} connected to {peer_addr}") + except Exception as e: + logger.debug( + f"Node {self.node_id} failed to connect to {peer_addr}: {e}" + ) + + +class GossipsubV10Demo: + """Demo controller for Gossipsub 1.0""" + + def __init__(self): + self.nodes: list[GossipsubV10Node] = [] + + async def setup_network(self, node_count: int = 5): + """ + Set up a network of nodes. Node 0 is a + fanout-only publisher (no subscribe). + """ + for i in range(node_count): + port = find_free_port() + fanout_only = i == 0 + node = GossipsubV10Node(f"node_{i}", port, fanout_only=fanout_only) + self.nodes.append(node) + + logger.info( + f"Created network with {node_count} nodes running Gossipsub 1.0 " + f"(node_0 is fanout-only: publishes without subscribing, " + f"using fanout peers)" + ) + + async def start_network(self, duration: int = 30): + """Start all nodes and run the demo""" + try: + async with trio.open_nursery() as nursery: + # Start all nodes + for node in self.nodes: + nursery.start_soon(node.start) + + # Wait for initialization + await trio.sleep(3) + + # Connect nodes in a mesh topology + await self._connect_nodes() + await trio.sleep(2) + + # Start message receiving for all nodes + for node in self.nodes: + nursery.start_soon(node.receive_messages) + + # Run publishing loop + end_time = time.time() + duration + message_counter = 0 + + print(f"\n{'=' * 60}") + print("GOSSIPSUB 1.0 DEMO") + print(f"{'=' * 60}") + print(f"Running for {duration} seconds...") + print("Protocol: /meshsub/1.0.0") + print("Features: Basic mesh-based pubsub, simple flooding, fanout demo") + print(" (node_0 is fanout-only: publishes via fanout, not in mesh)") + print(f"{'=' * 60}\n") + + while time.time() < end_time: + # Random node publishes (node_0 uses fanout when it publishes) + node = random.choice(self.nodes) + message = f"msg_{message_counter}_{int(time.time())}" + await node.publish_message(message) + message_counter += 1 + await trio.sleep(2) # Publish every 2 seconds + + # Print statistics + await trio.sleep(1) # Wait for final messages + self._print_statistics() + + # Cancel all tasks to exit nursery + nursery.cancel_scope.cancel() + + except Exception as e: + logger.warning(f"Demo execution interrupted: {e}") + + async def _connect_nodes(self): + """Connect nodes in a mesh topology""" + for i, node in enumerate(self.nodes): + # Connect to the next node in a ring topology + if len(self.nodes) > 1: + target_idx = (i + 1) % len(self.nodes) + target = self.nodes[target_idx] + + if target.host and node.host: + peer_addr = ( + f"/ip4/127.0.0.1/tcp/{target.port}/p2p/{target.host.get_id()}" + ) + await node.connect_to_peer(peer_addr) + + # Also connect to one more node for better connectivity + if len(self.nodes) > 2: + target_idx2 = (i + 2) % len(self.nodes) + target2 = self.nodes[target_idx2] + + if target2.host and node.host: + peer_addr2 = ( + f"/ip4/127.0.0.1/tcp/{target2.port}/p2p/" + f"{target2.host.get_id()}" + ) + await node.connect_to_peer(peer_addr2) + + def _print_statistics(self): + """Print demo statistics""" + print(f"\n{'=' * 60}") + print("DEMO STATISTICS") + print(f"{'=' * 60}") + + total_sent = sum(node.messages_sent for node in self.nodes) + total_received = sum(node.messages_received for node in self.nodes) + + print(f"Total messages sent: {total_sent}") + print(f"Total messages received: {total_received}") + print("\nPer-node statistics:") + for node in self.nodes: + print( + f" {node.node_id}: sent={node.messages_sent}, " + f"received={node.messages_received}" + ) + + print(f"\n{'=' * 60}") + print("Gossipsub 1.0 Features:") + print(" ✓ Basic mesh-based pubsub") + print(" ✓ Simple message flooding") + print(" ✓ Mesh topology maintenance") + print(" ✓ Fanout behaviour: node_0 publishes without subscribing;") + print(" messages are sent to a random set of topic peers (fanout peers)") + print(" ✗ No peer scoring") + print(" ✗ No IDONTWANT support") + print(" ✗ No adaptive gossip") + print(" ✗ No advanced security features") + print(f"{'=' * 60}\n") + + +async def main(): + parser = argparse.ArgumentParser(description="Gossipsub 1.0 Example") + parser.add_argument( + "--nodes", type=int, default=5, help="Number of nodes in the network" + ) + parser.add_argument( + "--duration", type=int, default=30, help="Demo duration in seconds" + ) + parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") + + args = parser.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + demo = GossipsubV10Demo() + await demo.setup_network(args.nodes) + await demo.start_network(args.duration) + + +if __name__ == "__main__": + trio.run(main) diff --git a/examples/pubsub/gossipsub/gossipsub_v1.1.py b/examples/pubsub/gossipsub/gossipsub_v1.1.py new file mode 100755 index 000000000..31e55ad93 --- /dev/null +++ b/examples/pubsub/gossipsub/gossipsub_v1.1.py @@ -0,0 +1,361 @@ +#!/usr/bin/env python3 +""" +Gossipsub 1.1 Example + +This example demonstrates Gossipsub 1.1 protocol (/meshsub/1.1.0). +Gossipsub 1.1 adds peer scoring and behavioral penalties to the basic +mesh-based pubsub, providing better resilience against basic attacks. + +Features demonstrated: +- Basic mesh-based pubsub (from v1.0) +- Peer scoring with P1-P4 topic-scoped parameters +- Behavioral penalties (P5) +- P6 (application-specific score) and P7 (IP colocation factor) +- Prune backoff and peer exchange (PX) enabled +- Optional application score function (e.g. staking/reputation, validator role) +- Signed peer records +- Better resilience against attacks + +Usage (from repository root): + python examples/pubsub/gossipsub/gossipsub_v1.1.py --nodes 5 --duration 30 +""" + +import argparse +import logging +import random +import time + +import trio + +from libp2p import new_host +from libp2p.abc import IHost, ISubscriptionAPI +from libp2p.crypto.rsa import create_new_key_pair +from libp2p.custom_types import TProtocol +from libp2p.peer.id import ID +from libp2p.pubsub.gossipsub import GossipSub +from libp2p.pubsub.pubsub import Pubsub +from libp2p.pubsub.score import ScoreParams, TopicScoreParams +from libp2p.stream_muxer.mplex.mplex import MPLEX_PROTOCOL_ID, Mplex +from libp2p.tools.anyio_service import background_trio_service +from libp2p.utils.address_validation import find_free_port + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger("gossipsub-v1.1") + +# Protocol version +GOSSIPSUB_V11 = TProtocol("/meshsub/1.1.0") +TOPIC = "gossipsub-v1.1-demo" + + +class GossipsubV11Node: + """A node running Gossipsub 1.1""" + + def __init__(self, node_id: str, port: int, role: str = "honest"): + self.node_id = node_id + self.port = port + self.role = role # "honest", "malicious", or "validator" + self.host: IHost | None = None + self.pubsub: Pubsub | None = None + self.gossipsub: GossipSub | None = None + self.subscription: ISubscriptionAPI | None = None + self.messages_sent = 0 + self.messages_received = 0 + + async def start(self): + """Start the node with Gossipsub 1.1 configuration""" + key_pair = create_new_key_pair() + + self.host = new_host( + key_pair=key_pair, + muxer_opt={MPLEX_PROTOCOL_ID: Mplex}, + ) + + # Configure Gossipsub 1.1 - adds peer scoring, P6/P7, prune backoff, PX + score_params = ScoreParams( + # Topic-scoped parameters (P1-P4) + p1_time_in_mesh=TopicScoreParams(weight=0.1, cap=10.0, decay=0.99), + p2_first_message_deliveries=TopicScoreParams( + weight=0.5, cap=20.0, decay=0.99 + ), + p3_mesh_message_deliveries=TopicScoreParams( + weight=0.3, cap=10.0, decay=0.99 + ), + p4_invalid_messages=TopicScoreParams(weight=-1.0, cap=50.0, decay=0.99), + # Global behavioral penalty (P5) + p5_behavior_penalty_weight=1.0, + p5_behavior_penalty_decay=0.99, + # P6: application-specific score (optional; + # in production: staking/reputation) + p6_appl_slack_weight=0.1, + p6_appl_slack_decay=0.99, + # P7: IP colocation factor - penalise many peers from same IP + p7_ip_colocation_weight=0.5, + p7_ip_colocation_threshold=3, + # Optional application score: + # e.g. validator/full node role, + # stake, reputation + app_specific_score_fn=self._application_score_function, + ) + + self.gossipsub = GossipSub( + protocols=[GOSSIPSUB_V11], + degree=3, + degree_low=2, + degree_high=4, + heartbeat_interval=5, + heartbeat_initial_delay=1.0, + score_params=score_params, + do_px=True, + px_peers_count=16, + prune_back_off=60, + unsubscribe_back_off=10, + # No max_idontwant_messages - v1.1 doesn't support IDONTWANT + # No adaptive features - v1.1 doesn't have adaptive gossip + # No advanced security features - v1.1 has basic security + ) + + self.pubsub = Pubsub(self.host, self.gossipsub) + + # Start services + import multiaddr + + listen_addrs = [multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/{self.port}")] + + async with self.host.run(listen_addrs=listen_addrs): + async with background_trio_service(self.pubsub): + async with background_trio_service(self.gossipsub): + await self.pubsub.wait_until_ready() + self.subscription = await self.pubsub.subscribe(TOPIC) + logger.info( + f"Node {self.node_id} (Gossipsub 1.1, {self.role}) started on " + f"port {self.port}" + ) + + # Keep running + await trio.sleep_forever() + + def _application_score_function(self, peer_id: ID) -> float: + """ + Optional application-specific score (P6). In real applications this could + be based on staking, reputation, or role in the network (validator, full node). + """ + return 0.0 + + async def publish_message(self, message: str): + """Publish a message to the topic""" + if self.pubsub: + await self.pubsub.publish(TOPIC, message.encode()) + self.messages_sent += 1 + logger.info(f"Node {self.node_id} ({self.role}) published: {message}") + + async def receive_messages(self): + """Receive and process messages""" + if not self.subscription: + return + + try: + while True: + if self.subscription is None: + break + message = await self.subscription.get() + decoded = message.data.decode("utf-8") + self.messages_received += 1 + logger.info(f"Node {self.node_id} received: {decoded}") + except Exception as e: + logger.debug(f"Node {self.node_id} receive loop ended: {e}") + + async def connect_to_peer(self, peer_addr: str): + """Connect to another peer""" + if self.host: + try: + import multiaddr + + from libp2p.peer.peerinfo import info_from_p2p_addr + + maddr = multiaddr.Multiaddr(peer_addr) + info = info_from_p2p_addr(maddr) + await self.host.connect(info) + logger.debug(f"Node {self.node_id} connected to {peer_addr}") + except Exception as e: + logger.debug( + f"Node {self.node_id} failed to connect to {peer_addr}: {e}" + ) + + +class GossipsubV11Demo: + """Demo controller for Gossipsub 1.1""" + + def __init__(self): + self.nodes: list[GossipsubV11Node] = [] + + async def setup_network(self, node_count: int = 5): + """Set up a network of nodes""" + roles = ["honest"] * (node_count - 1) + ["malicious"] * 1 + + for i in range(node_count): + port = find_free_port() + role = roles[i] if i < len(roles) else "honest" + node = GossipsubV11Node(f"node_{i}", port, role) + self.nodes.append(node) + + logger.info(f"Created network with {node_count} nodes running Gossipsub 1.1") + + async def start_network(self, duration: int = 30): + """Start all nodes and run the demo""" + try: + async with trio.open_nursery() as nursery: + # Start all nodes + for node in self.nodes: + nursery.start_soon(node.start) + + # Wait for initialization + await trio.sleep(3) + + # Connect nodes in a mesh topology + await self._connect_nodes() + await trio.sleep(2) + + # Start message receiving for all nodes + for node in self.nodes: + nursery.start_soon(node.receive_messages) + + # Run publishing loop + end_time = time.time() + duration + message_counter = 0 + + print(f"\n{'=' * 60}") + print("GOSSIPSUB 1.1 DEMO") + print(f"{'=' * 60}") + print(f"Running for {duration} seconds...") + print("Protocol: /meshsub/1.1.0") + print( + "Features: Peer scoring (P1-P7), prune backoff, peer exchange (PX)," + "optional app score" + ) + print(f"{'=' * 60}\n") + + while time.time() < end_time: + # Honest nodes publish normally + honest_nodes = [n for n in self.nodes if n.role == "honest"] + if honest_nodes: + node = random.choice(honest_nodes) + message = f"honest_msg_{message_counter}_{int(time.time())}" + await node.publish_message(message) + message_counter += 1 + + # Malicious nodes might send more messages (will be penalized) + malicious_nodes = [n for n in self.nodes if n.role == "malicious"] + if malicious_nodes and random.random() < 0.3: # 30% chance + node = malicious_nodes[0] + message = f"malicious_msg_{message_counter}_{int(time.time())}" + await node.publish_message(message) + message_counter += 1 + + await trio.sleep(2) # Publish every 2 seconds + + # Print statistics + await trio.sleep(1) # Wait for final messages + self._print_statistics() + + # Cancel all tasks to exit nursery + nursery.cancel_scope.cancel() + + except Exception as e: + logger.warning(f"Demo execution interrupted: {e}") + + async def _connect_nodes(self): + """Connect nodes in a mesh topology""" + for i, node in enumerate(self.nodes): + # Connect to the next node in a ring topology + if len(self.nodes) > 1: + target_idx = (i + 1) % len(self.nodes) + target = self.nodes[target_idx] + + if target.host and node.host: + peer_addr = ( + f"/ip4/127.0.0.1/tcp/{target.port}/p2p/{target.host.get_id()}" + ) + await node.connect_to_peer(peer_addr) + + # Also connect to one more node for better connectivity + if len(self.nodes) > 2: + target_idx2 = (i + 2) % len(self.nodes) + target2 = self.nodes[target_idx2] + + if target2.host and node.host: + peer_addr2 = ( + f"/ip4/127.0.0.1/tcp/{target2.port}/p2p/" + f"{target2.host.get_id()}" + ) + await node.connect_to_peer(peer_addr2) + + def _print_statistics(self): + """Print demo statistics""" + print(f"\n{'=' * 60}") + print("DEMO STATISTICS") + print(f"{'=' * 60}") + + total_sent = sum(node.messages_sent for node in self.nodes) + total_received = sum(node.messages_received for node in self.nodes) + + honest_sent = sum(n.messages_sent for n in self.nodes if n.role == "honest") + malicious_sent = sum( + n.messages_sent for n in self.nodes if n.role == "malicious" + ) + + print(f"Total messages sent: {total_sent}") + print(f" Honest nodes: {honest_sent}") + print(f" Malicious nodes: {malicious_sent}") + print(f"Total messages received: {total_received}") + print("\nPer-node statistics:") + for node in self.nodes: + print( + f" {node.node_id} ({node.role}): sent={node.messages_sent}, " + f"received={node.messages_received}" + ) + + print(f"\n{'=' * 60}") + print("Gossipsub 1.1 Features:") + print(" ✓ Basic mesh-based pubsub (from v1.0)") + print(" ✓ Peer scoring with P1-P4 (topic-scoped)") + print(" - P1: Time in mesh") + print(" - P2: First message deliveries") + print(" - P3: Mesh message deliveries") + print(" - P4: Invalid messages penalty") + print(" ✓ Behavioral penalties (P5)") + print(" ✓ P6: Application-specific score (optional; e.g. staking/role)") + print(" ✓ P7: IP colocation factor (Behavioural Penalty)") + print(" ✓ Prune backoff and peer exchange (PX) enabled") + print(" ✓ Signed peer records") + print(" ✗ No IDONTWANT support") + print(" ✗ No adaptive gossip") + print(" ✗ No advanced security features") + print(f"{'=' * 60}\n") + + +async def main(): + parser = argparse.ArgumentParser(description="Gossipsub 1.1 Example") + parser.add_argument( + "--nodes", type=int, default=5, help="Number of nodes in the network" + ) + parser.add_argument( + "--duration", type=int, default=30, help="Demo duration in seconds" + ) + parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") + + args = parser.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + demo = GossipsubV11Demo() + await demo.setup_network(args.nodes) + await demo.start_network(args.duration) + + +if __name__ == "__main__": + trio.run(main) diff --git a/examples/pubsub/gossipsub/gossipsub_v1.2.py b/examples/pubsub/gossipsub/gossipsub_v1.2.py new file mode 100755 index 000000000..76dd928dc --- /dev/null +++ b/examples/pubsub/gossipsub/gossipsub_v1.2.py @@ -0,0 +1,324 @@ +#!/usr/bin/env python3 +""" +Gossipsub 1.2 Example + +This example demonstrates Gossipsub 1.2 protocol (/meshsub/1.2.0). +Gossipsub 1.2 adds IDONTWANT message filtering to reduce redundant message +transmission, improving efficiency in dense networks. + +Features demonstrated: +- All Gossipsub 1.1 features (peer scoring, behavioral penalties) +- IDONTWANT message filtering +- Reduced bandwidth usage +- Improved efficiency in dense networks + +Usage (from repository root): + python examples/pubsub/gossipsub/gossipsub_v1.2.py --nodes 5 --duration 30 +""" + +import argparse +import logging +import random +import time + +import trio + +from libp2p import new_host +from libp2p.abc import IHost, ISubscriptionAPI +from libp2p.crypto.rsa import create_new_key_pair +from libp2p.custom_types import TProtocol +from libp2p.pubsub.gossipsub import GossipSub +from libp2p.pubsub.pubsub import Pubsub +from libp2p.pubsub.score import ScoreParams, TopicScoreParams +from libp2p.stream_muxer.mplex.mplex import MPLEX_PROTOCOL_ID, Mplex +from libp2p.tools.anyio_service import background_trio_service +from libp2p.utils.address_validation import find_free_port + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger("gossipsub-v1.2") + +# Protocol version +GOSSIPSUB_V12 = TProtocol("/meshsub/1.2.0") +TOPIC = "gossipsub-v1.2-demo" + + +class GossipsubV12Node: + """A node running Gossipsub 1.2""" + + def __init__(self, node_id: str, port: int, role: str = "honest"): + self.node_id = node_id + self.port = port + self.role = role # "honest" or "malicious" + self.host: IHost | None = None + self.pubsub: Pubsub | None = None + self.gossipsub: GossipSub | None = None + self.subscription: ISubscriptionAPI | None = None + self.messages_sent = 0 + self.messages_received = 0 + + async def start(self): + """Start the node with Gossipsub 1.2 configuration""" + key_pair = create_new_key_pair() + + self.host = new_host( + key_pair=key_pair, + muxer_opt={MPLEX_PROTOCOL_ID: Mplex}, + ) + + # Configure Gossipsub 1.2 - adds IDONTWANT support + score_params = ScoreParams( + # Topic-scoped parameters (P1-P4) + p1_time_in_mesh=TopicScoreParams(weight=0.1, cap=10.0, decay=0.99), + p2_first_message_deliveries=TopicScoreParams( + weight=0.5, cap=20.0, decay=0.99 + ), + p3_mesh_message_deliveries=TopicScoreParams( + weight=0.3, cap=10.0, decay=0.99 + ), + p4_invalid_messages=TopicScoreParams(weight=-1.0, cap=50.0, decay=0.99), + # Global behavioral penalty (P5) + p5_behavior_penalty_weight=1.0, + p5_behavior_penalty_decay=0.99, + ) + + self.gossipsub = GossipSub( + protocols=[GOSSIPSUB_V12], + degree=3, + degree_low=2, + degree_high=4, + heartbeat_interval=5, + heartbeat_initial_delay=1.0, + score_params=score_params, + # v1.2 feature: IDONTWANT support + max_idontwant_messages=20, + # No adaptive features - v1.2 doesn't have adaptive gossip + # No advanced security features - v1.2 has basic security + ) + + self.pubsub = Pubsub(self.host, self.gossipsub) + + # Start services + import multiaddr + + listen_addrs = [multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/{self.port}")] + + async with self.host.run(listen_addrs=listen_addrs): + async with background_trio_service(self.pubsub): + async with background_trio_service(self.gossipsub): + await self.pubsub.wait_until_ready() + self.subscription = await self.pubsub.subscribe(TOPIC) + logger.info( + f"Node {self.node_id} (Gossipsub 1.2, {self.role}) started on " + f"port {self.port}" + ) + + # Keep running + await trio.sleep_forever() + + async def publish_message(self, message: str): + """Publish a message to the topic""" + if self.pubsub: + await self.pubsub.publish(TOPIC, message.encode()) + self.messages_sent += 1 + logger.info(f"Node {self.node_id} ({self.role}) published: {message}") + + async def receive_messages(self): + """Receive and process messages""" + if not self.subscription: + return + + try: + while True: + if self.subscription is None: + break + message = await self.subscription.get() + decoded = message.data.decode("utf-8") + self.messages_received += 1 + logger.info(f"Node {self.node_id} received: {decoded}") + except Exception as e: + logger.debug(f"Node {self.node_id} receive loop ended: {e}") + + async def connect_to_peer(self, peer_addr: str): + """Connect to another peer""" + if self.host: + try: + import multiaddr + + from libp2p.peer.peerinfo import info_from_p2p_addr + + maddr = multiaddr.Multiaddr(peer_addr) + info = info_from_p2p_addr(maddr) + await self.host.connect(info) + logger.debug(f"Node {self.node_id} connected to {peer_addr}") + except Exception as e: + logger.debug( + f"Node {self.node_id} failed to connect to {peer_addr}: {e}" + ) + + +class GossipsubV12Demo: + """Demo controller for Gossipsub 1.2""" + + def __init__(self): + self.nodes: list[GossipsubV12Node] = [] + + async def setup_network(self, node_count: int = 5): + """Set up a network of nodes""" + roles = ["honest"] * (node_count - 1) + ["malicious"] * 1 + + for i in range(node_count): + port = find_free_port() + role = roles[i] if i < len(roles) else "honest" + node = GossipsubV12Node(f"node_{i}", port, role) + self.nodes.append(node) + + logger.info(f"Created network with {node_count} nodes running Gossipsub 1.2") + + async def start_network(self, duration: int = 30): + """Start all nodes and run the demo""" + try: + async with trio.open_nursery() as nursery: + # Start all nodes + for node in self.nodes: + nursery.start_soon(node.start) + + # Wait for initialization + await trio.sleep(3) + + # Connect nodes in a mesh topology + await self._connect_nodes() + await trio.sleep(2) + + # Start message receiving for all nodes + for node in self.nodes: + nursery.start_soon(node.receive_messages) + + # Run publishing loop + end_time = time.time() + duration + message_counter = 0 + + print(f"\n{'=' * 60}") + print("GOSSIPSUB 1.2 DEMO") + print(f"{'=' * 60}") + print(f"Running for {duration} seconds...") + print("Protocol: /meshsub/1.2.0") + print("Features: IDONTWANT filtering, reduced bandwidth usage") + print(f"{'=' * 60}\n") + + while time.time() < end_time: + # Honest nodes publish normally + honest_nodes = [n for n in self.nodes if n.role == "honest"] + if honest_nodes: + node = random.choice(honest_nodes) + message = f"honest_msg_{message_counter}_{int(time.time())}" + await node.publish_message(message) + message_counter += 1 + + # Malicious nodes might send more messages (will be penalized) + malicious_nodes = [n for n in self.nodes if n.role == "malicious"] + if malicious_nodes and random.random() < 0.3: # 30% chance + node = malicious_nodes[0] + message = f"malicious_msg_{message_counter}_{int(time.time())}" + await node.publish_message(message) + message_counter += 1 + + await trio.sleep(2) # Publish every 2 seconds + + # Print statistics + await trio.sleep(1) # Wait for final messages + self._print_statistics() + + # Cancel all tasks to exit nursery + nursery.cancel_scope.cancel() + + except Exception as e: + logger.warning(f"Demo execution interrupted: {e}") + + async def _connect_nodes(self): + """Connect nodes in a mesh topology""" + for i, node in enumerate(self.nodes): + # Connect to the next node in a ring topology + if len(self.nodes) > 1: + target_idx = (i + 1) % len(self.nodes) + target = self.nodes[target_idx] + + if target.host and node.host: + peer_addr = ( + f"/ip4/127.0.0.1/tcp/{target.port}/p2p/{target.host.get_id()}" + ) + await node.connect_to_peer(peer_addr) + + # Also connect to one more node for better connectivity + if len(self.nodes) > 2: + target_idx2 = (i + 2) % len(self.nodes) + target2 = self.nodes[target_idx2] + + if target2.host and node.host: + peer_addr2 = ( + f"/ip4/127.0.0.1/tcp/{target2.port}/p2p/" + f"{target2.host.get_id()}" + ) + await node.connect_to_peer(peer_addr2) + + def _print_statistics(self): + """Print demo statistics""" + print(f"\n{'=' * 60}") + print("DEMO STATISTICS") + print(f"{'=' * 60}") + + total_sent = sum(node.messages_sent for node in self.nodes) + total_received = sum(node.messages_received for node in self.nodes) + + honest_sent = sum(n.messages_sent for n in self.nodes if n.role == "honest") + malicious_sent = sum( + n.messages_sent for n in self.nodes if n.role == "malicious" + ) + + print(f"Total messages sent: {total_sent}") + print(f" Honest nodes: {honest_sent}") + print(f" Malicious nodes: {malicious_sent}") + print(f"Total messages received: {total_received}") + print("\nPer-node statistics:") + for node in self.nodes: + print( + f" {node.node_id} ({node.role}): sent={node.messages_sent}, " + f"received={node.messages_received}" + ) + + print(f"\n{'=' * 60}") + print("Gossipsub 1.2 Features:") + print(" ✓ All Gossipsub 1.1 features") + print(" ✓ IDONTWANT message filtering") + print(" ✓ Reduced bandwidth usage") + print(" ✓ Improved efficiency in dense networks") + print(" ✗ No adaptive gossip") + print(" ✗ No advanced security features (P6, P7)") + print(f"{'=' * 60}\n") + + +async def main(): + parser = argparse.ArgumentParser(description="Gossipsub 1.2 Example") + parser.add_argument( + "--nodes", type=int, default=5, help="Number of nodes in the network" + ) + parser.add_argument( + "--duration", type=int, default=30, help="Demo duration in seconds" + ) + parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") + + args = parser.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + demo = GossipsubV12Demo() + await demo.setup_network(args.nodes) + await demo.start_network(args.duration) + + +if __name__ == "__main__": + trio.run(main) diff --git a/examples/pubsub/gossipsub/gossipsub_v1.3.py b/examples/pubsub/gossipsub/gossipsub_v1.3.py new file mode 100644 index 000000000..121a29489 --- /dev/null +++ b/examples/pubsub/gossipsub/gossipsub_v1.3.py @@ -0,0 +1,543 @@ +#!/usr/bin/env python3 +""" +GossipSub 1.3 Example + +This example demonstrates GossipSub v1.3 protocol (/meshsub/1.3.0). +GossipSub 1.3 adds the Extensions Control Message mechanism and the +Topic Observation extension. + +Features demonstrated: +- All GossipSub 1.2 features (IDONTWANT, peer scoring, etc.) +- GossipSub v1.3 Extensions Control Message (sent once, at most once per peer) +- Topic Observation: observer nodes receive IHAVE notifications without full + message payloads, enabling lightweight presence awareness +- Misbehaviour detection: duplicate Extensions messages from a peer are penalised +- Protocol gating: extension fields are only sent when /meshsub/1.3.0 is negotiated + +Roles in this demo: + - publisher : subscribes to the topic and publishes messages every few seconds + - subscriber : subscribes to the topic and reads full message payloads + - observer : starts observing (IHAVE-only) without subscribing; tracks presence + +Usage (from repository root): + python examples/pubsub/gossipsub/gossipsub_v1.3.py --nodes 6 --duration 40 + python examples/pubsub/gossipsub/gossipsub_v1.3.py --nodes 6 --duration 40 --verbose +""" + +import argparse +import logging +import random +import time + +import trio + +from libp2p import new_host +from libp2p.abc import IHost, ISubscriptionAPI +from libp2p.crypto.rsa import create_new_key_pair +from libp2p.pubsub.extensions import PeerExtensions +from libp2p.pubsub.gossipsub import PROTOCOL_ID_V13, GossipSub +from libp2p.pubsub.pubsub import Pubsub +from libp2p.pubsub.score import ScoreParams, TopicScoreParams +from libp2p.stream_muxer.mplex.mplex import MPLEX_PROTOCOL_ID, Mplex +from libp2p.tools.anyio_service import background_trio_service +from libp2p.utils.address_validation import find_free_port + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger("gossipsub-v1.3") + +TOPIC = "gossipsub-v1.3-demo" + + +class GossipsubV13Node: + """ + A node running GossipSub v1.3. + + Each node has one of three roles: + - "publisher" – subscribes and publishes messages + - "subscriber" – subscribes and reads messages (no publishing) + - "observer" – uses Topic Observation to receive IHAVE presence + notifications without a full subscription + """ + + def __init__(self, node_id: str, port: int, role: str = "publisher"): + self.node_id = node_id + self.port = port + self.role = role + + self.host: IHost | None = None + self.pubsub: Pubsub | None = None + self.gossipsub: GossipSub | None = None + self.subscription: ISubscriptionAPI | None = None + + self.messages_sent = 0 + self.messages_received = 0 + self.ihave_notifications = 0 # reserved for future explicit IHAVE hooks + + async def start(self) -> None: + """Initialise the libp2p host and GossipSub v1.3 router.""" + import multiaddr + + key_pair = create_new_key_pair() + + self.host = new_host( + key_pair=key_pair, + muxer_opt={MPLEX_PROTOCOL_ID: Mplex}, + ) + + score_params = ScoreParams( + p1_time_in_mesh=TopicScoreParams(weight=0.1, cap=10.0, decay=0.99), + p2_first_message_deliveries=TopicScoreParams( + weight=0.5, cap=20.0, decay=0.99 + ), + p3_mesh_message_deliveries=TopicScoreParams( + weight=0.3, cap=10.0, decay=0.99 + ), + p4_invalid_messages=TopicScoreParams(weight=-1.0, cap=50.0, decay=0.99), + p5_behavior_penalty_weight=1.0, + p5_behavior_penalty_decay=0.99, + ) + + # Advertise GossipSub v1.3 extensions: both Topic Observation and the + # interop test extension (testExtension) are enabled for all nodes. + my_extensions = PeerExtensions( + topic_observation=True, + test_extension=True, + ) + + self.gossipsub = GossipSub( + protocols=[PROTOCOL_ID_V13], + degree=3, + degree_low=2, + degree_high=4, + heartbeat_interval=5, + heartbeat_initial_delay=1.0, + score_params=score_params, + max_idontwant_messages=20, + # GossipSub v1.3: advertise our supported extensions in the first + # message on every new stream (enforced at-most-once per peer). + my_extensions=my_extensions, + ) + + self.pubsub = Pubsub(self.host, self.gossipsub) + + listen_addrs = [multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/{self.port}")] + + async with self.host.run(listen_addrs=listen_addrs): + async with background_trio_service(self.pubsub): + async with background_trio_service(self.gossipsub): + await self.pubsub.wait_until_ready() + + if self.role in ("publisher", "subscriber"): + self.subscription = await self.pubsub.subscribe(TOPIC) + logger.info( + "[%s] subscribed to topic '%s' (full payload mode)", + self.node_id, + TOPIC, + ) + + logger.info( + "Node %s started | role=%s | port=%d | %s", + self.node_id, + self.role, + self.port, + self.extensions_summary(), + ) + await trio.sleep_forever() + + async def publish_message(self, message: str) -> None: + """Publish a message to the topic (publisher role only).""" + if self.pubsub and self.role == "publisher": + await self.pubsub.publish(TOPIC, message.encode()) + self.messages_sent += 1 + logger.info( + "[PUBLISH] node=%s topic=%s payload=%s", + self.node_id, + TOPIC, + message, + ) + + async def receive_messages(self) -> None: + """Read full message payloads (subscriber / publisher roles).""" + subscription = self.subscription + if subscription is None: + return + try: + while True: + message = await subscription.get() + decoded = message.data.decode("utf-8") + self.messages_received += 1 + logger.info( + "[RECEIVE] node=%s role=%s payload=%s", + self.node_id, + self.role, + decoded, + ) + except Exception as exc: + logger.debug("Node %s receive loop ended: %s", self.node_id, exc) + + async def start_observing(self) -> None: + """ + Start Topic Observation for TOPIC (observer role). + + This sends OBSERVE control messages to in-mesh peers that also + advertised Topic Observation support. From that point on those peers + will forward IHAVE presence notifications to us whenever a new message + arrives on TOPIC, without sending the full payload. + """ + if self.gossipsub and self.role == "observer": + await self.gossipsub.start_observing_topic(TOPIC) + logger.info( + "[OBSERVE-START] node=%s topic=%s observing_topics=%s", + self.node_id, + TOPIC, + sorted(self.gossipsub.topic_observation.get_observing_topics()), + ) + + async def stop_observing(self) -> None: + """ + Stop Topic Observation for TOPIC (observer role). + + Sends UNOBSERVE control messages to the peers we were observing through. + """ + if self.gossipsub and self.role == "observer": + await self.gossipsub.stop_observing_topic(TOPIC) + logger.info( + "[OBSERVE-STOP] node=%s topic=%s observing_topics=%s", + self.node_id, + TOPIC, + sorted(self.gossipsub.topic_observation.get_observing_topics()), + ) + + async def connect_to_peer(self, peer_addr: str) -> None: + """Connect to a remote peer by multiaddr.""" + if not self.host: + return + try: + import multiaddr + + from libp2p.peer.peerinfo import info_from_p2p_addr + + maddr = multiaddr.Multiaddr(peer_addr) + info = info_from_p2p_addr(maddr) + await self.host.connect(info) + logger.info( + "[CONNECT] %s -> %s", + self.node_id, + peer_addr, + ) + except Exception as exc: + logger.debug( + "Node %s failed to connect to %s: %s", self.node_id, peer_addr, exc + ) + + def extensions_summary(self) -> str: + """Return a human-readable summary of negotiated extensions.""" + if not self.gossipsub: + return "not started" + ext_state = self.gossipsub.extensions_state + my = ext_state.my_extensions + return ( + f"advertised=(topic_observation={my.topic_observation}, " + f"test_extension={my.test_extension})" + ) + + +class GossipsubV13Demo: + """ + Demo controller that sets up a mixed network of publishers, subscribers, + and observers and runs them for a configurable duration. + + Network layout (with default --nodes 6): + nodes 0-1 → publisher + nodes 2-3 → subscriber + nodes 4-5 → observer + """ + + def __init__(self) -> None: + self.nodes: list[GossipsubV13Node] = [] + + async def setup_network(self, node_count: int = 6) -> None: + """Allocate ports and create nodes with their roles.""" + roles = _assign_roles(node_count) + for i in range(node_count): + port = find_free_port() + node = GossipsubV13Node(f"node_{i}", port, roles[i]) + self.nodes.append(node) + + role_counts = {r: roles.count(r) for r in set(roles)} + logger.info( + "Created %d-node GossipSub v1.3 network: %s", + node_count, + role_counts, + ) + for node in self.nodes: + logger.info( + "[PLAN] node=%s role=%s listen=/ip4/127.0.0.1/tcp/%d", + node.node_id, + node.role, + node.port, + ) + + async def start_network(self, duration: int = 40) -> None: + """Start all nodes, wire them together, then run the demo loop.""" + try: + async with trio.open_nursery() as nursery: + # Boot all nodes concurrently. + for node in self.nodes: + nursery.start_soon(node.start) + + # Give nodes time to bind their listening ports. + await trio.sleep(3) + logger.info("[STAGE] all node services started") + + # Wire a ring + chord topology so every node has ≥2 peers. + logger.info("[STAGE] wiring peer connections (ring + skip links)") + await self._connect_nodes() + await trio.sleep(2) + logger.info("[STAGE] peer wiring complete") + self._log_protocol_snapshot("after-connect") + + # Start receive loops for publishers and subscribers. + for node in self.nodes: + if node.role in ("publisher", "subscriber"): + nursery.start_soon(node.receive_messages) + logger.info("[STAGE] receive loops started (publishers/subscribers)") + + # Observer nodes start Topic Observation after wiring. + for node in self.nodes: + if node.role == "observer": + await node.start_observing() + self._log_observer_snapshot("after-observe-start") + + # Print banner. + _print_banner(duration) + + end_time = time.time() + duration + message_counter = 0 + half_time = duration // 2 + heartbeat_step = 0 + midpoint_unobserve_done = False + + while time.time() < end_time: + elapsed = duration - (end_time - time.time()) + + # Publishers take turns sending a message. + publishers = [n for n in self.nodes if n.role == "publisher"] + if publishers: + node = random.choice(publishers) + msg = f"msg_{message_counter}_{int(time.time())}" + await node.publish_message(msg) + message_counter += 1 + + # Halfway through, stop one observer to show UNOBSERVE. + if elapsed >= half_time and not midpoint_unobserve_done: + for node in self.nodes: + if ( + node.role == "observer" + and node.gossipsub is not None + and node.gossipsub.topic_observation.is_observing(TOPIC) + ): + await node.stop_observing() + self._log_observer_snapshot("after-midpoint-unobserve") + midpoint_unobserve_done = True + break # stop only the first one + + # Emit periodic runtime snapshots so demos are easier to narrate. + heartbeat_step += 1 + self._log_runtime_snapshot(heartbeat_step, elapsed) + await trio.sleep(2) + + await trio.sleep(1) + self._log_protocol_snapshot("final") + self._print_statistics() + nursery.cancel_scope.cancel() + + except Exception as exc: + logger.warning("Demo interrupted: %s", exc) + + async def _connect_nodes(self) -> None: + """Connect nodes in a ring + one-hop-skip topology.""" + n = len(self.nodes) + for i, node in enumerate(self.nodes): + for offset in (1, 2): + target = self.nodes[(i + offset) % n] + if target.host and node.host: + peer_addr = ( + f"/ip4/127.0.0.1/tcp/{target.port}/p2p/{target.host.get_id()}" + ) + await node.connect_to_peer(peer_addr) + logger.info("[STAGE] requested all topology connections") + + def _log_runtime_snapshot(self, tick: int, elapsed: float) -> None: + publishers = [n for n in self.nodes if n.role == "publisher"] + subscribers = [n for n in self.nodes if n.role == "subscriber"] + observers = [n for n in self.nodes if n.role == "observer"] + published = sum(n.messages_sent for n in publishers) + delivered = sum(n.messages_received for n in publishers + subscribers) + active_observers = sum( + 1 + for n in observers + if n.gossipsub is not None + and n.gossipsub.topic_observation.is_observing(TOPIC) + ) + logger.info( + "[SNAPSHOT t+%ds #%d] published=%d delivered=%d active_observers=%d/%d", + int(elapsed), + tick, + published, + delivered, + active_observers, + len(observers), + ) + + def _log_observer_snapshot(self, label: str) -> None: + for node in self.nodes: + if node.role != "observer" or node.gossipsub is None: + continue + observing_topics = sorted( + node.gossipsub.topic_observation.get_observing_topics() + ) + logger.info( + "[OBSERVER-SNAPSHOT:%s] node=%s observing_topics=%s", + label, + node.node_id, + observing_topics, + ) + + def _log_protocol_snapshot(self, label: str) -> None: + for node in self.nodes: + if node.gossipsub is None: + continue + router = node.gossipsub + peers_total = len(router.peer_protocol) + v13_peers = sum( + 1 for pid in router.peer_protocol if router.supports_v13_features(pid) + ) + ext_known = sum( + 1 + for pid in router.peer_protocol + if router.extensions_state.get_peer_extensions(pid) is not None + ) + topic_observation_peers = sum( + 1 + for pid in router.peer_protocol + if router.extensions_state.peer_supports_topic_observation(pid) + ) + logger.info( + "[PROTO-SNAPSHOT:%s] node=%s peers=%d v13=%d ext_known=%d topic_obs=%d", + label, + node.node_id, + peers_total, + v13_peers, + ext_known, + topic_observation_peers, + ) + + def _print_statistics(self) -> None: + """Print a summary table at the end of the demo.""" + print(f"\n{'=' * 65}") + print("DEMO STATISTICS") + print(f"{'=' * 65}") + + total_sent = sum(n.messages_sent for n in self.nodes) + total_received = sum(n.messages_received for n in self.nodes) + + print(f"Total messages published : {total_sent}") + print(f"Total messages received : {total_received}") + print() + print(f"{'Node':<10} {'Role':<12} {'Sent':>6} {'Recv':>6} Extensions") + print(f"{'-' * 65}") + for node in self.nodes: + print( + f"{node.node_id:<10} {node.role:<12} " + f"{node.messages_sent:>6} {node.messages_received:>6} " + f"{node.extensions_summary()}" + ) + + print(f"\n{'=' * 65}") + print("GossipSub 1.3 Features demonstrated:") + print(" + Extensions Control Message (first message, at most once per peer)") + print(" + Topic Observation (IHAVE presence notifications without payloads)") + print(" + Misbehaviour scoring on duplicate Extensions messages") + print(" + Protocol gating (/meshsub/1.3.0 only)") + print(" + All GossipSub 1.2 features (IDONTWANT, peer scoring, etc.)") + print(f"{'=' * 65}\n") + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _assign_roles(node_count: int) -> list[str]: + """ + Distribute roles across nodes. + + With 6 nodes the split is 2 publishers / 2 subscribers / 2 observers. + Smaller counts fall back gracefully. + """ + if node_count < 3: + return ["publisher"] * node_count + publishers = max(1, node_count // 3) + observers = max(1, node_count // 3) + subscribers = node_count - publishers - observers + return ( + ["publisher"] * publishers + + ["subscriber"] * subscribers + + ["observer"] * observers + ) + + +def _print_banner(duration: int) -> None: + print(f"\n{'=' * 65}") + print("GOSSIPSUB 1.3 DEMO") + print(f"{'=' * 65}") + print("Protocol : /meshsub/1.3.0") + print(f"Duration : {duration} seconds") + print("Features : Extensions Control Message, Topic Observation,") + print(" IDONTWANT filtering, peer scoring") + print() + print("Roles:") + print(" publisher – subscribes + publishes messages") + print(" subscriber – subscribes and reads payloads") + print(" observer – Topic Observation only (IHAVE-only, no payload)") + print() + print( + f"At t={duration // 2}s one observer will send UNOBSERVE to stop " + "receiving notifications." + ) + print(f"{'=' * 65}\n") + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + + +async def main() -> None: + parser = argparse.ArgumentParser(description="GossipSub 1.3 Example") + parser.add_argument( + "--nodes", type=int, default=6, help="Total number of nodes (default: 6)" + ) + parser.add_argument( + "--duration", + type=int, + default=40, + help="Demo duration in seconds (default: 40)", + ) + parser.add_argument("--verbose", action="store_true", help="Enable DEBUG logging") + args = parser.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + demo = GossipsubV13Demo() + await demo.setup_network(args.nodes) + await demo.start_network(args.duration) + + +if __name__ == "__main__": + trio.run(main) diff --git a/examples/pubsub/gossipsub/gossipsub_v1.4.py b/examples/pubsub/gossipsub/gossipsub_v1.4.py new file mode 100644 index 000000000..442bc690f --- /dev/null +++ b/examples/pubsub/gossipsub/gossipsub_v1.4.py @@ -0,0 +1,564 @@ +#!/usr/bin/env python3 +""" +GossipSub 1.4 Example + +This example demonstrates GossipSub v1.4 protocol (/meshsub/1.4.0). +GossipSub 1.4 focuses on enhanced rate limiting, GRAFT flood protection, +and adaptive gossip parameter tuning based on network health metrics. + +Features demonstrated: +- All GossipSub 1.3 features (Extensions Control Message, Topic Observation) +- IWANT request rate limiting per peer (anti-spam) +- IHAVE message rate limiting per peer per topic (anti-spam) +- GRAFT flood protection with automatic score penalty +- Adaptive gossip factor based on network health score +- Opportunistic grafting threshold adaptation +- Heartbeat interval adaptation under poor network conditions +- Extended scoring (P5-P7) with v1.4 protocol gating + +Node roles in this demo: + - honest : publishes and receives messages normally + - spammer : sends rapid IWANT / IHAVE style spam to trigger rate limits + - observer : uses Topic Observation (IHAVE-only) inherited from v1.3 + - validator : receives messages and validates them; no publishing + +Usage (from repository root): + python examples/pubsub/gossipsub/gossipsub_v1.4.py --nodes 6 --duration 40 + python examples/pubsub/gossipsub/gossipsub_v1.4.py --nodes 6 --duration 40 --verbose +""" + +import argparse +import logging +import random +import time + +import trio + +from libp2p import new_host +from libp2p.abc import IHost, ISubscriptionAPI +from libp2p.crypto.rsa import create_new_key_pair +from libp2p.pubsub.extensions import PeerExtensions +from libp2p.pubsub.gossipsub import PROTOCOL_ID_V14, GossipSub +from libp2p.pubsub.pubsub import Pubsub +from libp2p.pubsub.score import ScoreParams, TopicScoreParams +from libp2p.stream_muxer.mplex.mplex import MPLEX_PROTOCOL_ID, Mplex +from libp2p.tools.anyio_service import background_trio_service +from libp2p.utils.address_validation import find_free_port + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger("gossipsub-v1.4") + +TOPIC = "gossipsub-v1.4-demo" + + +class GossipsubV14Node: + """ + A node running GossipSub v1.4. + + Roles: + - "honest" – subscribes and publishes messages at a normal rate + - "spammer" – sends messages rapidly to exercise rate limiting + - "observer" – uses Topic Observation (inherited from v1.3) + - "validator" – subscribes and receives messages, no publishing + """ + + def __init__(self, node_id: str, port: int, role: str = "honest"): + self.node_id = node_id + self.port = port + self.role = role + + self.host: IHost | None = None + self.pubsub: Pubsub | None = None + self.gossipsub: GossipSub | None = None + self.subscription: ISubscriptionAPI | None = None + + self.messages_sent = 0 + self.messages_received = 0 + self.rate_limit_hits = 0 + + async def start(self) -> None: + """Initialise the libp2p host and GossipSub v1.4 router.""" + import multiaddr + + key_pair = create_new_key_pair() + + host = new_host( + key_pair=key_pair, + muxer_opt={MPLEX_PROTOCOL_ID: Mplex}, + ) + self.host = host + + # Full peer scoring: P1-P4 topic-scoped + P5 behavior + P6/P7 global. + score_params = ScoreParams( + p1_time_in_mesh=TopicScoreParams(weight=0.1, cap=10.0, decay=0.99), + p2_first_message_deliveries=TopicScoreParams( + weight=0.5, cap=20.0, decay=0.99 + ), + p3_mesh_message_deliveries=TopicScoreParams( + weight=0.3, cap=10.0, decay=0.99 + ), + p4_invalid_messages=TopicScoreParams(weight=-1.0, cap=50.0, decay=0.99), + p5_behavior_penalty_weight=1.0, + p5_behavior_penalty_decay=0.99, + ) + + # Advertise v1.3-compatible extensions (Topic Observation + test extension). + my_extensions = PeerExtensions( + topic_observation=True, + test_extension=True, + ) + + # v1.4-specific constructor parameters: + # max_iwant_requests_per_second – caps IWANT storm per peer + # max_ihave_messages_per_second – caps IHAVE flood per peer/topic + # graft_flood_threshold – minimum seconds between PRUNE and GRAFT + # adaptive_gossip_enabled – turn on health-based parameter adaptation + gossipsub = GossipSub( + protocols=[PROTOCOL_ID_V14], + degree=3, + degree_low=2, + degree_high=4, + heartbeat_interval=5, + heartbeat_initial_delay=1.0, + score_params=score_params, + max_idontwant_messages=20, + my_extensions=my_extensions, + # v1.4 rate limiting + adaptive_gossip_enabled=True, + ) + self.gossipsub = gossipsub + + # Override v1.4 rate limiting thresholds directly on the router so the + # demo can observe them being triggered with a small message volume. + gossipsub.max_iwant_requests_per_second = 5.0 + gossipsub.max_ihave_messages_per_second = 5.0 + gossipsub.graft_flood_threshold = 8.0 + + self.pubsub = Pubsub(host, gossipsub) + + listen_addrs = [multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/{self.port}")] + + async with self.host.run(listen_addrs=listen_addrs): + async with background_trio_service(self.pubsub): + async with background_trio_service(self.gossipsub): + await self.pubsub.wait_until_ready() + + # observers use Topic Observation – no full subscription needed. + if self.role in ("honest", "spammer", "validator"): + self.subscription = await self.pubsub.subscribe(TOPIC) + logger.info( + "[%s] subscribed to topic '%s' role=%s", + self.node_id, + TOPIC, + self.role, + ) + + logger.info( + "Node %s started | role=%s | port=%d | protocol=%s", + self.node_id, + self.role, + self.port, + PROTOCOL_ID_V14, + ) + await trio.sleep_forever() + + async def publish_message(self, message: str) -> None: + """Publish a message to TOPIC.""" + if self.pubsub and self.role in ("honest", "spammer"): + await self.pubsub.publish(TOPIC, message.encode()) + self.messages_sent += 1 + logger.info( + "[PUBLISH] node=%s role=%s payload=%s", + self.node_id, + self.role, + message, + ) + + async def receive_messages(self) -> None: + """Drain the subscription queue and count deliveries.""" + subscription = self.subscription + if subscription is None: + return + try: + while True: + message = await subscription.get() + decoded = message.data.decode("utf-8") + self.messages_received += 1 + logger.info( + "[RECEIVE] node=%s role=%s payload=%s", + self.node_id, + self.role, + decoded, + ) + except Exception as exc: + logger.debug("Node %s receive loop ended: %s", self.node_id, exc) + + async def start_observing(self) -> None: + """Start Topic Observation (observer role – inherited from v1.3).""" + if self.gossipsub and self.role == "observer": + await self.gossipsub.start_observing_topic(TOPIC) + logger.info( + "[OBSERVE-START] node=%s topic=%s", + self.node_id, + TOPIC, + ) + + async def stop_observing(self) -> None: + """Stop Topic Observation (observer role).""" + if self.gossipsub and self.role == "observer": + await self.gossipsub.stop_observing_topic(TOPIC) + logger.info( + "[OBSERVE-STOP] node=%s topic=%s", + self.node_id, + TOPIC, + ) + + async def connect_to_peer(self, peer_addr: str) -> None: + """Connect to a remote peer by multiaddr.""" + if not self.host: + return + try: + import multiaddr + + from libp2p.peer.peerinfo import info_from_p2p_addr + + maddr = multiaddr.Multiaddr(peer_addr) + info = info_from_p2p_addr(maddr) + await self.host.connect(info) + logger.info("[CONNECT] %s -> %s", self.node_id, peer_addr) + except Exception as exc: + logger.debug( + "Node %s failed to connect to %s: %s", + self.node_id, + peer_addr, + exc, + ) + + def v14_status(self) -> str: + """Return a one-line v1.4 feature status string for this node.""" + if not self.gossipsub: + return "not started" + gs = self.gossipsub + return ( + f"health={gs.network_health_score:.2f} " + f"gossip_factor={gs.gossip_factor:.2f} " + f"iwant_limit={gs.max_iwant_requests_per_second:.0f}/s " + f"ihave_limit={gs.max_ihave_messages_per_second:.0f}/s " + f"graft_flood_threshold={gs.graft_flood_threshold:.0f}s " + f"opp_graft_threshold={gs.opportunistic_graft_threshold:.2f}" + ) + + +class GossipsubV14Demo: + """ + Demo controller: sets up a mixed-role network and runs a scenario that + exercises all GossipSub v1.4 features. + + Network layout (default --nodes 6): + nodes 0-1 → honest + nodes 2-3 → validator + nodes 4 → spammer + nodes 5 → observer + """ + + def __init__(self) -> None: + self.nodes: list[GossipsubV14Node] = [] + + async def setup_network(self, node_count: int = 6) -> None: + """Allocate ports and create nodes with appropriate roles.""" + roles = _assign_roles(node_count) + for i in range(node_count): + port = find_free_port() + node = GossipsubV14Node(f"node_{i}", port, roles[i]) + self.nodes.append(node) + + role_counts = {r: roles.count(r) for r in set(roles)} + logger.info( + "Created %d-node GossipSub v1.4 network: %s", + node_count, + role_counts, + ) + for node in self.nodes: + logger.info( + "[PLAN] node=%s role=%s listen=/ip4/127.0.0.1/tcp/%d", + node.node_id, + node.role, + node.port, + ) + + async def start_network(self, duration: int = 40) -> None: + """Boot all nodes, wire topology, then run the demo loop.""" + try: + async with trio.open_nursery() as nursery: + # Boot every node concurrently. + for node in self.nodes: + nursery.start_soon(node.start) + + # Wait for all listening ports to bind. + await trio.sleep(3) + logger.info("[STAGE] all node services started") + + # Ring + chord topology (≥2 peers per node). + logger.info("[STAGE] wiring peer connections (ring + skip links)") + await self._connect_nodes() + await trio.sleep(2) + logger.info("[STAGE] peer wiring complete") + self._log_v14_snapshot("after-connect") + + # Start receive loops for subscribing roles. + for node in self.nodes: + if node.role in ("honest", "validator", "spammer"): + nursery.start_soon(node.receive_messages) + logger.info("[STAGE] receive loops started") + + # Observer nodes activate Topic Observation after wiring. + for node in self.nodes: + if node.role == "observer": + await node.start_observing() + + _print_banner(duration) + + end_time = time.time() + duration + msg_counter = 0 + tick = 0 + half_time = duration // 2 + spam_burst_done = False + unobserve_done = False + + while time.time() < end_time: + elapsed = duration - (end_time - time.time()) + + # ── Honest nodes publish at a normal cadence ────────────── + honest_nodes = [n for n in self.nodes if n.role == "honest"] + if honest_nodes: + sender = random.choice(honest_nodes) + msg = f"msg_{msg_counter}_{int(time.time())}" + await sender.publish_message(msg) + msg_counter += 1 + + # ── Spammer: burst of rapid messages at the 1/4 mark ───── + # This exercises the IWANT / IHAVE rate limiting paths. + if elapsed >= duration // 4 and not spam_burst_done: + spammers = [n for n in self.nodes if n.role == "spammer"] + if spammers: + logger.info( + "[STAGE] triggering spammer burst (rate-limit demo)" + ) + for _ in range(12): + for spammer in spammers: + burst_msg = f"spam_{msg_counter}_{int(time.time())}" + await spammer.publish_message(burst_msg) + msg_counter += 1 + await trio.sleep(0.05) + spam_burst_done = True + logger.info( + "[STAGE] spammer burst complete – " + "check logs for rate-limit warnings" + ) + + # ── Midpoint: stop one observer (UNOBSERVE demo) ────────── + if elapsed >= half_time and not unobserve_done: + for node in self.nodes: + if ( + node.role == "observer" + and node.gossipsub is not None + and node.gossipsub.topic_observation.is_observing(TOPIC) + ): + await node.stop_observing() + logger.info( + "[STAGE] midpoint UNOBSERVE sent by %s", + node.node_id, + ) + unobserve_done = True + break + + tick += 1 + self._log_runtime_snapshot(tick, elapsed) + await trio.sleep(2) + + await trio.sleep(1) + self._log_v14_snapshot("final") + self._print_statistics() + nursery.cancel_scope.cancel() + + except Exception as exc: + logger.warning("Demo interrupted: %s", exc) + + async def _connect_nodes(self) -> None: + """Connect nodes in a ring + one-hop-skip topology.""" + n = len(self.nodes) + for i, node in enumerate(self.nodes): + for offset in (1, 2): + target = self.nodes[(i + offset) % n] + if target.host and node.host: + peer_addr = ( + f"/ip4/127.0.0.1/tcp/{target.port}/p2p/{target.host.get_id()}" + ) + await node.connect_to_peer(peer_addr) + logger.info("[STAGE] requested all topology connections") + + def _log_runtime_snapshot(self, tick: int, elapsed: float) -> None: + honest = [n for n in self.nodes if n.role == "honest"] + validators = [n for n in self.nodes if n.role == "validator"] + spammers = [n for n in self.nodes if n.role == "spammer"] + observers = [n for n in self.nodes if n.role == "observer"] + + published = sum(n.messages_sent for n in honest + spammers) + delivered = sum(n.messages_received for n in honest + validators) + active_observers = sum( + 1 + for n in observers + if n.gossipsub is not None + and n.gossipsub.topic_observation.is_observing(TOPIC) + ) + logger.info( + "[SNAPSHOT t+%ds #%d] published=%d delivered=%d observers=%d/%d", + int(elapsed), + tick, + published, + delivered, + active_observers, + len(observers), + ) + + def _log_v14_snapshot(self, label: str) -> None: + """Log v1.4 specific metrics for every node.""" + for node in self.nodes: + if node.gossipsub is None: + continue + gs = node.gossipsub + peers_total = len(gs.peer_protocol) + v14_peers = sum( + 1 + for pid in gs.peer_protocol + if gs.supports_protocol_feature(pid, "adaptive_gossip") + ) + logger.info( + "[V14-SNAPSHOT:%s] node=%s peers=%d v14=%d %s", + label, + node.node_id, + peers_total, + v14_peers, + node.v14_status(), + ) + + def _print_statistics(self) -> None: + """Print a summary table at the end of the demo.""" + print(f"\n{'=' * 70}") + print("DEMO STATISTICS") + print(f"{'=' * 70}") + + total_sent = sum(n.messages_sent for n in self.nodes) + total_received = sum(n.messages_received for n in self.nodes) + + print(f"Total messages published : {total_sent}") + print(f"Total messages received : {total_received}") + print() + print(f"{'Node':<10} {'Role':<12} {'Sent':>6} {'Recv':>6} v1.4 Status") + print(f"{'-' * 70}") + for node in self.nodes: + print( + f"{node.node_id:<10} {node.role:<12} " + f"{node.messages_sent:>6} {node.messages_received:>6} " + f"{node.v14_status()}" + ) + + print(f"\n{'=' * 70}") + print("GossipSub 1.4 Features demonstrated:") + print(" + IWANT request rate limiting per peer (anti-spam)") + print(" + IHAVE message rate limiting per peer per topic (anti-spam)") + print(" + GRAFT flood protection with automatic score penalty") + print(" + Adaptive gossip factor based on network health score") + print(" + Opportunistic grafting threshold adaptation") + print(" + Heartbeat interval adaptation under poor network conditions") + print(" + Extended scoring (P5-P7) gated to /meshsub/1.4.0") + print(" + Topic Observation (inherited from v1.3) with UNOBSERVE demo") + print(" + All GossipSub 1.3 features (Extensions Control Message, etc.)") + print(f"{'=' * 70}\n") + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _assign_roles(node_count: int) -> list[str]: + """ + Distribute roles across nodes. + + With 6 nodes: 2 honest / 2 validator / 1 spammer / 1 observer. + Smaller counts fall back gracefully. + """ + if node_count < 2: + return ["honest"] * node_count + if node_count == 2: + return ["honest", "validator"] + if node_count == 3: + return ["honest", "validator", "spammer"] + + honest = max(1, node_count // 3) + spammer = max(1, node_count // 6) + observer = max(1, node_count // 6) + validator = node_count - honest - spammer - observer + return ( + ["honest"] * honest + + ["validator"] * validator + + ["spammer"] * spammer + + ["observer"] * observer + ) + + +def _print_banner(duration: int) -> None: + print(f"\n{'=' * 70}") + print("GOSSIPSUB 1.4 DEMO") + print(f"{'=' * 70}") + print("Protocol : /meshsub/1.4.0") + print(f"Duration : {duration} seconds") + print("Features : IWANT/IHAVE rate limiting, GRAFT flood protection,") + print(" adaptive gossip factor, heartbeat adaptation,") + print(" opportunistic grafting threshold, extended scoring (P5-P7),") + print(" Topic Observation (inherited from v1.3)") + print() + print("Roles:") + print(" honest – subscribes + publishes at a normal cadence") + print(" validator – subscribes, reads messages, no publishing") + print(" spammer – bursts messages to trigger rate-limit paths") + print(" observer – Topic Observation only (no full subscription)") + print() + print(f" At t={duration // 4}s : spammer burst (IWANT/IHAVE rate-limit demo)") + print(f" At t={duration // 2}s : UNOBSERVE sent by one observer") + print(f"{'=' * 70}\n") + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + + +async def main() -> None: + parser = argparse.ArgumentParser(description="GossipSub 1.4 Example") + parser.add_argument( + "--nodes", type=int, default=6, help="Total number of nodes (default: 6)" + ) + parser.add_argument( + "--duration", + type=int, + default=40, + help="Demo duration in seconds (default: 40)", + ) + parser.add_argument("--verbose", action="store_true", help="Enable DEBUG logging") + args = parser.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + demo = GossipsubV14Demo() + await demo.setup_network(args.nodes) + await demo.start_network(args.duration) + + +if __name__ == "__main__": + trio.run(main) diff --git a/examples/pubsub/gossipsub/gossipsub_v2.0.py b/examples/pubsub/gossipsub/gossipsub_v2.0.py new file mode 100755 index 000000000..4569fe811 --- /dev/null +++ b/examples/pubsub/gossipsub/gossipsub_v2.0.py @@ -0,0 +1,393 @@ +#!/usr/bin/env python3 +""" +Gossipsub 2.0 Example + +This example demonstrates Gossipsub 2.0 protocol (/meshsub/2.0.0). +Gossipsub 2.0 adds enhanced security, adaptive gossip, and advanced peer scoring +to provide the most robust and efficient pubsub protocol. + +Features demonstrated: +- All Gossipsub 1.2 features (peer scoring, IDONTWANT) +- Enhanced peer scoring with P6 (application-specific) and P7 (IP colocation) +- Adaptive gossip behavior based on network health +- Advanced security features: + - Spam protection with rate limiting + - Eclipse attack protection via IP diversity + - Equivocation detection + - Enhanced message validation + +Usage (from repository root): + python examples/pubsub/gossipsub/gossipsub_v2.0.py --nodes 5 --duration 30 +""" + +import argparse +import logging +import random +import time + +import trio + +from libp2p import new_host +from libp2p.abc import IHost, ISubscriptionAPI +from libp2p.crypto.rsa import create_new_key_pair +from libp2p.custom_types import TProtocol +from libp2p.peer.id import ID +from libp2p.pubsub.gossipsub import GossipSub +from libp2p.pubsub.pubsub import Pubsub +from libp2p.pubsub.score import ScoreParams, TopicScoreParams +from libp2p.stream_muxer.mplex.mplex import MPLEX_PROTOCOL_ID, Mplex +from libp2p.tools.anyio_service import background_trio_service +from libp2p.utils.address_validation import find_free_port + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger("gossipsub-v2.0") + +# Protocol version +GOSSIPSUB_V20 = TProtocol("/meshsub/2.0.0") +TOPIC = "gossipsub-v2.0-demo" + + +class GossipsubV20Node: + """A node running Gossipsub 2.0""" + + def __init__(self, node_id: str, port: int, role: str = "honest"): + self.node_id = node_id + self.port = port + self.role = role # "honest", "spammer", "validator" + self.host: IHost | None = None + self.pubsub: Pubsub | None = None + self.gossipsub: GossipSub | None = None + self.subscription: ISubscriptionAPI | None = None + self.messages_sent = 0 + self.messages_received = 0 + self.messages_validated = 0 + self.messages_rejected = 0 + + async def start(self): + """Start the node with Gossipsub 2.0 configuration""" + key_pair = create_new_key_pair() + + self.host = new_host( + key_pair=key_pair, + muxer_opt={MPLEX_PROTOCOL_ID: Mplex}, + ) + + # Configure Gossipsub 2.0 - full feature set + score_params = ScoreParams( + # Topic-scoped parameters (P1-P4) + p1_time_in_mesh=TopicScoreParams(weight=0.1, cap=10.0, decay=0.99), + p2_first_message_deliveries=TopicScoreParams( + weight=0.5, cap=20.0, decay=0.99 + ), + p3_mesh_message_deliveries=TopicScoreParams( + weight=0.3, cap=10.0, decay=0.99 + ), + p4_invalid_messages=TopicScoreParams(weight=-1.0, cap=50.0, decay=0.99), + # Global behavioral penalty (P5) + p5_behavior_penalty_weight=1.0, + p5_behavior_penalty_decay=0.99, + # Application-specific score (P6) + p6_appl_slack_weight=0.1, + p6_appl_slack_decay=0.99, + # IP colocation penalty (P7) + p7_ip_colocation_weight=0.5, + p7_ip_colocation_threshold=3, + # Application-specific scoring function + app_specific_score_fn=self._application_score_function, + ) + + self.gossipsub = GossipSub( + protocols=[GOSSIPSUB_V20], + degree=4, + degree_low=2, + degree_high=6, + heartbeat_interval=5, + heartbeat_initial_delay=1.0, + score_params=score_params, + # v1.2 feature: IDONTWANT support + max_idontwant_messages=20, + # v2.0 adaptive features + adaptive_gossip_enabled=True, + # v2.0 security features + spam_protection_enabled=True, + max_messages_per_topic_per_second=5.0, + eclipse_protection_enabled=True, + min_mesh_diversity_ips=2, + ) + + self.pubsub = Pubsub(self.host, self.gossipsub) + + # Start services + import multiaddr + + listen_addrs = [multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/{self.port}")] + + async with self.host.run(listen_addrs=listen_addrs): + async with background_trio_service(self.pubsub): + async with background_trio_service(self.gossipsub): + await self.pubsub.wait_until_ready() + self.subscription = await self.pubsub.subscribe(TOPIC) + logger.info( + f"Node {self.node_id} (Gossipsub 2.0, {self.role}) started on " + f"port {self.port}" + ) + + # Keep running + await trio.sleep_forever() + + def _application_score_function(self, peer_id: ID) -> float: + """Custom application scoring function (P6)""" + # Example: Reward peers that have been connected longer + # In a real application, this could be based on stake, reputation, etc. + if self.gossipsub and peer_id in getattr(self.gossipsub, "peers", {}): + # Simple time-based scoring for demo + return min(5.0, time.time() % 10) # Varies over time for demo + return 0.0 + + async def publish_message(self, message: str): + """Publish a message to the topic""" + if self.pubsub: + await self.pubsub.publish(TOPIC, message.encode()) + self.messages_sent += 1 + logger.info(f"Node {self.node_id} ({self.role}) published: {message}") + + async def receive_messages(self): + """Receive and process messages""" + if not self.subscription: + return + + try: + while True: + if self.subscription is None: + break + message = await self.subscription.get() + decoded = message.data.decode("utf-8") + self.messages_received += 1 + + # Simulate message validation + if self._validate_message(message): + self.messages_validated += 1 + logger.info(f"Node {self.node_id} received (valid): {decoded}") + else: + self.messages_rejected += 1 + logger.warning(f"Node {self.node_id} received (invalid): {decoded}") + except Exception as e: + logger.debug(f"Node {self.node_id} receive loop ended: {e}") + + def _validate_message(self, message) -> bool: + """Simple message validation""" + try: + decoded = message.data.decode("utf-8") + # Basic validation: message should have expected format + return "_msg_" in decoded and len(decoded) < 1000 + except Exception: + return False + + async def connect_to_peer(self, peer_addr: str): + """Connect to another peer""" + if self.host: + try: + import multiaddr + + from libp2p.peer.peerinfo import info_from_p2p_addr + + maddr = multiaddr.Multiaddr(peer_addr) + info = info_from_p2p_addr(maddr) + await self.host.connect(info) + logger.debug(f"Node {self.node_id} connected to {peer_addr}") + except Exception as e: + logger.debug( + f"Node {self.node_id} failed to connect to {peer_addr}: {e}" + ) + + +class GossipsubV20Demo: + """Demo controller for Gossipsub 2.0""" + + def __init__(self): + self.nodes: list[GossipsubV20Node] = [] + + async def setup_network(self, node_count: int = 5): + """Set up a network of nodes with different roles""" + # Mix of honest, spammer, and validator nodes + roles = ["honest"] * (node_count - 2) + ["spammer"] * 1 + ["validator"] * 1 + + for i in range(node_count): + port = find_free_port() + role = roles[i] if i < len(roles) else "honest" + node = GossipsubV20Node(f"node_{i}", port, role) + self.nodes.append(node) + + logger.info(f"Created network with {node_count} nodes running Gossipsub 2.0") + + async def start_network(self, duration: int = 30): + """Start all nodes and run the demo""" + try: + async with trio.open_nursery() as nursery: + # Start all nodes + for node in self.nodes: + nursery.start_soon(node.start) + + # Wait for initialization + await trio.sleep(3) + + # Connect nodes in a mesh topology + await self._connect_nodes() + await trio.sleep(2) + + # Start message receiving for all nodes + for node in self.nodes: + nursery.start_soon(node.receive_messages) + + # Run publishing loop + end_time = time.time() + duration + message_counter = 0 + + print(f"\n{'=' * 60}") + print("GOSSIPSUB 2.0 DEMO") + print(f"{'=' * 60}") + print(f"Running for {duration} seconds...") + print("Protocol: /meshsub/2.0.0") + print("Features: Adaptive gossip, advanced security, P6/P7 scoring") + print(f"{'=' * 60}\n") + + while time.time() < end_time: + # Honest nodes publish normally + honest_nodes = [n for n in self.nodes if n.role == "honest"] + if honest_nodes: + node = random.choice(honest_nodes) + message = f"honest_msg_{message_counter}_{int(time.time())}" + await node.publish_message(message) + message_counter += 1 + + # Validator nodes publish less frequently but with high quality + validator_nodes = [n for n in self.nodes if n.role == "validator"] + if validator_nodes and random.random() < 0.3: # 30% chance + node = validator_nodes[0] + message = f"validator_msg_{message_counter}_{int(time.time())}" + await node.publish_message(message) + message_counter += 1 + + # Spammer nodes try to send many messages (will be rate-limited) + spammer_nodes = [n for n in self.nodes if n.role == "spammer"] + if spammer_nodes and random.random() < 0.5: # 50% chance + node = spammer_nodes[0] + # Try to send multiple messages quickly + for _ in range(3): + message = f"spam_msg_{message_counter}_{int(time.time())}" + await node.publish_message(message) + message_counter += 1 + await trio.sleep(0.1) # Small delay between spam messages + + await trio.sleep(2) # Publish every 2 seconds + + # Print statistics + await trio.sleep(1) # Wait for final messages + self._print_statistics() + + # Cancel all tasks to exit nursery + nursery.cancel_scope.cancel() + + except Exception as e: + logger.warning(f"Demo execution interrupted: {e}") + + async def _connect_nodes(self): + """Connect nodes in a mesh topology""" + for i, node in enumerate(self.nodes): + # Connect to the next node in a ring topology + if len(self.nodes) > 1: + target_idx = (i + 1) % len(self.nodes) + target = self.nodes[target_idx] + + if target.host and node.host: + peer_addr = ( + f"/ip4/127.0.0.1/tcp/{target.port}/p2p/{target.host.get_id()}" + ) + await node.connect_to_peer(peer_addr) + + # Also connect to one more node for better connectivity + if len(self.nodes) > 2: + target_idx2 = (i + 2) % len(self.nodes) + target2 = self.nodes[target_idx2] + + if target2.host and node.host: + peer_addr2 = ( + f"/ip4/127.0.0.1/tcp/{target2.port}/p2p/" + f"{target2.host.get_id()}" + ) + await node.connect_to_peer(peer_addr2) + + def _print_statistics(self): + """Print demo statistics""" + print(f"\n{'=' * 60}") + print("DEMO STATISTICS") + print(f"{'=' * 60}") + + total_sent = sum(node.messages_sent for node in self.nodes) + total_received = sum(node.messages_received for node in self.nodes) + total_validated = sum(node.messages_validated for node in self.nodes) + total_rejected = sum(node.messages_rejected for node in self.nodes) + + honest_sent = sum(n.messages_sent for n in self.nodes if n.role == "honest") + spammer_sent = sum(n.messages_sent for n in self.nodes if n.role == "spammer") + validator_sent = sum( + n.messages_sent for n in self.nodes if n.role == "validator" + ) + + print(f"Total messages sent: {total_sent}") + print(f" Honest nodes: {honest_sent}") + print(f" Spammer nodes: {spammer_sent}") + print(f" Validator nodes: {validator_sent}") + print(f"Total messages received: {total_received}") + print(f"Messages validated: {total_validated}") + print(f"Messages rejected: {total_rejected}") + print("\nPer-node statistics:") + for node in self.nodes: + print( + f" {node.node_id} ({node.role}): " + f"sent={node.messages_sent}, received={node.messages_received}, " + f"validated={node.messages_validated}, " + f"rejected={node.messages_rejected}" + ) + + print(f"\n{'=' * 60}") + print("Gossipsub 2.0 Features:") + print(" ✓ All Gossipsub 1.2 features") + print(" ✓ Enhanced peer scoring:") + print(" - P6: Application-specific score") + print(" - P7: IP colocation penalty") + print(" ✓ Adaptive gossip behavior") + print(" ✓ Advanced security features:") + print(" - Spam protection with rate limiting") + print(" - Eclipse attack protection") + print(" - Equivocation detection") + print(" - Enhanced message validation") + print(f"{'=' * 60}\n") + + +async def main(): + parser = argparse.ArgumentParser(description="Gossipsub 2.0 Example") + parser.add_argument( + "--nodes", type=int, default=5, help="Number of nodes in the network" + ) + parser.add_argument( + "--duration", type=int, default=30, help="Demo duration in seconds" + ) + parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") + + args = parser.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + demo = GossipsubV20Demo() + await demo.setup_network(args.nodes) + await demo.start_network(args.duration) + + +if __name__ == "__main__": + trio.run(main) diff --git a/tests/core/pubsub/test_gossipsub.py b/tests/core/pubsub/test_gossipsub.py index c93e11ec6..8139395d8 100644 --- a/tests/core/pubsub/test_gossipsub.py +++ b/tests/core/pubsub/test_gossipsub.py @@ -724,7 +724,9 @@ async def test_sparse_connect_degree_zero(): degree = 0 await sparse_connect(hosts, degree) - await trio.sleep(0.1) # Allow connections to establish + # Match test_sparse_connect / test_dense_connect_fallback: pubsub streams need + # time to settle; CI under pytest-xdist can exceed a short 0.1s window. + await trio.sleep(2) # With degree=0, sparse_connect should still create neighbor connections # for connectivity (this is part of the algorithm design)