-
Notifications
You must be signed in to change notification settings - Fork 211
eg/1130-add-gossipsub-comparison-and-standalone-examples #1139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
9c2ca50
8b8a524
0f49934
e56f9b3
9c3a00f
dacc878
c7a5b2b
cf5da6e
b8cf91a
292c82f
373afd7
293a568
5f679d5
2cb55dc
eec7272
c2f0375
058f846
41e5fea
9a152d4
0a5f066
615f6c8
dcc1da6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| """ | ||
| Gossipsub Examples Package | ||
|
|
||
| This package contains comprehensive examples showcasing the differences between | ||
| Gossipsub protocol versions and demonstrating advanced features. | ||
| """ |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,303 @@ | ||
| #!/usr/bin/env python3 | ||
| """ | ||
| Gossipsub 1.0 Example | ||
|
|
||
| This example demonstrates the basic Gossipsub 1.0 protocol (/meshsub/1.0.0). | ||
| Gossipsub 1.0 provides basic mesh-based pubsub with simple flooding for message | ||
| dissemination. It has no peer scoring or advanced security features, making it | ||
| suitable for trusted networks with low adversarial activity. | ||
|
|
||
| Features demonstrated: | ||
| - Basic mesh-based pubsub | ||
| - Simple message flooding | ||
| - Mesh topology maintenance | ||
| - Message publishing and subscription | ||
| - Fanout behaviour: a publisher that is not in the mesh (e.g. does not subscribe) | ||
| sends to a random set of topic subscribers (fanout peers) instead of mesh peers | ||
|
|
||
| Usage (from repository root): | ||
| python examples/pubsub/gossipsub/gossipsub_v1.0.py --nodes 5 --duration 30 | ||
| """ | ||
|
|
||
| import argparse | ||
| import logging | ||
| import random | ||
| import time | ||
|
|
||
| import trio | ||
|
|
||
| from libp2p import new_host | ||
| from libp2p.abc import IHost, ISubscriptionAPI | ||
| from libp2p.crypto.rsa import create_new_key_pair | ||
| from libp2p.custom_types import TProtocol | ||
| from libp2p.pubsub.gossipsub import GossipSub | ||
| from libp2p.pubsub.pubsub import Pubsub | ||
| from libp2p.stream_muxer.mplex.mplex import MPLEX_PROTOCOL_ID, Mplex | ||
| from libp2p.tools.anyio_service import background_trio_service | ||
| from libp2p.utils.address_validation import find_free_port | ||
|
|
||
| # Configure logging | ||
| logging.basicConfig( | ||
| level=logging.INFO, | ||
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | ||
| ) | ||
| logger = logging.getLogger("gossipsub-v1.0") | ||
|
|
||
| # Protocol version | ||
| GOSSIPSUB_V10 = TProtocol("/meshsub/1.0.0") | ||
| TOPIC = "gossipsub-v1.0-demo" | ||
|
|
||
|
|
||
| class GossipsubV10Node: | ||
| """A node running Gossipsub 1.0""" | ||
|
|
||
| def __init__(self, node_id: str, port: int, fanout_only: bool = False): | ||
| self.node_id = node_id | ||
| self.port = port | ||
| self.fanout_only = fanout_only # If True, node only publishes (no subscribe) | ||
| self.host: IHost | None = None | ||
| self.pubsub: Pubsub | None = None | ||
| self.gossipsub: GossipSub | None = None | ||
| self.subscription: ISubscriptionAPI | None = None | ||
| self.messages_sent = 0 | ||
| self.messages_received = 0 | ||
|
|
||
| async def start(self): | ||
| """Start the node with Gossipsub 1.0 configuration""" | ||
| key_pair = create_new_key_pair() | ||
|
|
||
| self.host = new_host( | ||
| key_pair=key_pair, | ||
| muxer_opt={MPLEX_PROTOCOL_ID: Mplex}, | ||
| ) | ||
|
|
||
| # Configure Gossipsub 1.0 - basic configuration only | ||
| self.gossipsub = GossipSub( | ||
| protocols=[GOSSIPSUB_V10], | ||
| degree=3, | ||
| degree_low=2, | ||
| degree_high=4, | ||
| heartbeat_interval=5, | ||
| heartbeat_initial_delay=1.0, | ||
| # No score_params - v1.0 doesn't have peer scoring | ||
| # No max_idontwant_messages - v1.0 doesn't support IDONTWANT | ||
| # No adaptive features - v1.0 doesn't have adaptive gossip | ||
| # No security features - v1.0 has basic security only | ||
| ) | ||
|
|
||
| self.pubsub = Pubsub(self.host, self.gossipsub) | ||
|
|
||
| # Start services | ||
| import multiaddr | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor: |
||
|
|
||
| listen_addrs = [multiaddr.Multiaddr(f"/ip4/127.0.0.1/tcp/{self.port}")] | ||
|
|
||
| async with self.host.run(listen_addrs=listen_addrs): | ||
| async with background_trio_service(self.pubsub): | ||
| async with background_trio_service(self.gossipsub): | ||
| await self.pubsub.wait_until_ready() | ||
| if not self.fanout_only: | ||
| self.subscription = await self.pubsub.subscribe(TOPIC) | ||
| logger.info( | ||
| f"Node {self.node_id} (Gossipsub 1.0" | ||
| + (", fanout-only publisher" if self.fanout_only else "") | ||
| + f") started on port {self.port}" | ||
| ) | ||
|
|
||
| # Keep running | ||
| await trio.sleep_forever() | ||
|
|
||
| async def publish_message(self, message: str): | ||
| """Publish a message to the topic""" | ||
| if self.pubsub: | ||
| await self.pubsub.publish(TOPIC, message.encode()) | ||
| self.messages_sent += 1 | ||
| logger.info(f"Node {self.node_id} published: {message}") | ||
|
|
||
| async def receive_messages(self): | ||
| """Receive and process messages""" | ||
| if not self.subscription: | ||
| return | ||
|
|
||
| try: | ||
| while True: | ||
| if self.subscription is None: | ||
| break | ||
| message = await self.subscription.get() | ||
| decoded = message.data.decode("utf-8") | ||
| self.messages_received += 1 | ||
| logger.info(f"Node {self.node_id} received: {decoded}") | ||
| except Exception as e: | ||
| logger.debug(f"Node {self.node_id} receive loop ended: {e}") | ||
|
|
||
| async def connect_to_peer(self, peer_addr: str): | ||
| """Connect to another peer""" | ||
| if self.host: | ||
| try: | ||
| import multiaddr | ||
|
|
||
| from libp2p.peer.peerinfo import info_from_p2p_addr | ||
|
|
||
| maddr = multiaddr.Multiaddr(peer_addr) | ||
| info = info_from_p2p_addr(maddr) | ||
| await self.host.connect(info) | ||
| logger.debug(f"Node {self.node_id} connected to {peer_addr}") | ||
| except Exception as e: | ||
| logger.debug( | ||
| f"Node {self.node_id} failed to connect to {peer_addr}: {e}" | ||
| ) | ||
|
|
||
|
|
||
| class GossipsubV10Demo: | ||
| """Demo controller for Gossipsub 1.0""" | ||
|
|
||
| def __init__(self): | ||
| self.nodes: list[GossipsubV10Node] = [] | ||
|
|
||
| async def setup_network(self, node_count: int = 5): | ||
| """ | ||
| Set up a network of nodes. Node 0 is a | ||
| fanout-only publisher (no subscribe). | ||
| """ | ||
| for i in range(node_count): | ||
| port = find_free_port() | ||
| fanout_only = i == 0 | ||
| node = GossipsubV10Node(f"node_{i}", port, fanout_only=fanout_only) | ||
| self.nodes.append(node) | ||
|
|
||
| logger.info( | ||
| f"Created network with {node_count} nodes running Gossipsub 1.0 " | ||
| f"(node_0 is fanout-only: publishes without subscribing, " | ||
| f"using fanout peers)" | ||
| ) | ||
|
|
||
| async def start_network(self, duration: int = 30): | ||
| """Start all nodes and run the demo""" | ||
| try: | ||
| async with trio.open_nursery() as nursery: | ||
| # Start all nodes | ||
| for node in self.nodes: | ||
| nursery.start_soon(node.start) | ||
|
|
||
| # Wait for initialization | ||
| await trio.sleep(3) | ||
|
|
||
| # Connect nodes in a mesh topology | ||
| await self._connect_nodes() | ||
| await trio.sleep(2) | ||
|
|
||
| # Start message receiving for all nodes | ||
| for node in self.nodes: | ||
| nursery.start_soon(node.receive_messages) | ||
|
|
||
| # Run publishing loop | ||
| end_time = time.time() + duration | ||
| message_counter = 0 | ||
|
|
||
| print(f"\n{'=' * 60}") | ||
| print("GOSSIPSUB 1.0 DEMO") | ||
| print(f"{'=' * 60}") | ||
| print(f"Running for {duration} seconds...") | ||
| print("Protocol: /meshsub/1.0.0") | ||
| print("Features: Basic mesh-based pubsub, simple flooding, fanout demo") | ||
| print(" (node_0 is fanout-only: publishes via fanout, not in mesh)") | ||
| print(f"{'=' * 60}\n") | ||
|
|
||
| while time.time() < end_time: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| # Random node publishes (node_0 uses fanout when it publishes) | ||
| node = random.choice(self.nodes) | ||
| message = f"msg_{message_counter}_{int(time.time())}" | ||
| await node.publish_message(message) | ||
| message_counter += 1 | ||
| await trio.sleep(2) # Publish every 2 seconds | ||
|
|
||
| # Print statistics | ||
| await trio.sleep(1) # Wait for final messages | ||
| self._print_statistics() | ||
|
|
||
| # Cancel all tasks to exit nursery | ||
| nursery.cancel_scope.cancel() | ||
|
|
||
| except Exception as e: | ||
| logger.warning(f"Demo execution interrupted: {e}") | ||
|
|
||
| async def _connect_nodes(self): | ||
| """Connect nodes in a mesh topology""" | ||
| for i, node in enumerate(self.nodes): | ||
| # Connect to the next node in a ring topology | ||
| if len(self.nodes) > 1: | ||
| target_idx = (i + 1) % len(self.nodes) | ||
| target = self.nodes[target_idx] | ||
|
|
||
| if target.host and node.host: | ||
| peer_addr = ( | ||
| f"/ip4/127.0.0.1/tcp/{target.port}/p2p/{target.host.get_id()}" | ||
| ) | ||
| await node.connect_to_peer(peer_addr) | ||
|
|
||
| # Also connect to one more node for better connectivity | ||
| if len(self.nodes) > 2: | ||
| target_idx2 = (i + 2) % len(self.nodes) | ||
| target2 = self.nodes[target_idx2] | ||
|
|
||
| if target2.host and node.host: | ||
| peer_addr2 = ( | ||
| f"/ip4/127.0.0.1/tcp/{target2.port}/p2p/" | ||
| f"{target2.host.get_id()}" | ||
| ) | ||
| await node.connect_to_peer(peer_addr2) | ||
|
|
||
| def _print_statistics(self): | ||
| """Print demo statistics""" | ||
| print(f"\n{'=' * 60}") | ||
| print("DEMO STATISTICS") | ||
| print(f"{'=' * 60}") | ||
|
|
||
| total_sent = sum(node.messages_sent for node in self.nodes) | ||
| total_received = sum(node.messages_received for node in self.nodes) | ||
|
|
||
| print(f"Total messages sent: {total_sent}") | ||
| print(f"Total messages received: {total_received}") | ||
| print("\nPer-node statistics:") | ||
| for node in self.nodes: | ||
| print( | ||
| f" {node.node_id}: sent={node.messages_sent}, " | ||
| f"received={node.messages_received}" | ||
| ) | ||
|
|
||
| print(f"\n{'=' * 60}") | ||
| print("Gossipsub 1.0 Features:") | ||
| print(" ✓ Basic mesh-based pubsub") | ||
| print(" ✓ Simple message flooding") | ||
| print(" ✓ Mesh topology maintenance") | ||
| print(" ✓ Fanout behaviour: node_0 publishes without subscribing;") | ||
| print(" messages are sent to a random set of topic peers (fanout peers)") | ||
| print(" ✗ No peer scoring") | ||
| print(" ✗ No IDONTWANT support") | ||
| print(" ✗ No adaptive gossip") | ||
| print(" ✗ No advanced security features") | ||
| print(f"{'=' * 60}\n") | ||
|
|
||
|
|
||
| async def main(): | ||
| parser = argparse.ArgumentParser(description="Gossipsub 1.0 Example") | ||
| parser.add_argument( | ||
| "--nodes", type=int, default=5, help="Number of nodes in the network" | ||
| ) | ||
| parser.add_argument( | ||
| "--duration", type=int, default=30, help="Demo duration in seconds" | ||
| ) | ||
| parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") | ||
|
|
||
| args = parser.parse_args() | ||
|
|
||
| if args.verbose: | ||
| logging.getLogger().setLevel(logging.DEBUG) | ||
|
|
||
| demo = GossipsubV10Demo() | ||
| await demo.setup_network(args.nodes) | ||
| await demo.start_network(args.duration) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| trio.run(main) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This entire method body (
start,publish_message,receive_messages,connect_to_peer) is copy-pasted nearly identically across all 6 version files. Same for the Demo class (setup_network,start_network,_connect_nodes,_print_statistics,main). I diffed v1.0 against v1.1 and the method signatures are identical — only the GossipSub constructor args change.That's roughly 200+ lines of boilerplate repeated 6 times. If anything in the demo infra needs fixing (say, the connection logic or nursery management), you'd have to patch all 6 files.
Would you consider extracting a shared
BaseGossipsubNode+BaseGossipsubDemothat each version file subclasses? Each file would then only need to override the GossipSub config and the feature checklist printout — maybe 40-60 lines per version instead of 300-550. Happy to help sketch this out if useful.