diff --git a/docs/examples.rst b/docs/examples.rst index 78cd7f43e..3c2b518a0 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -24,6 +24,7 @@ Examples examples.multiple_connections tls-support gossipsub-1.2 + gossipsub-1.3 examples.websocket examples.tls examples.tcp diff --git a/docs/gossipsub-1.3.rst b/docs/gossipsub-1.3.rst new file mode 100644 index 000000000..547f71ece --- /dev/null +++ b/docs/gossipsub-1.3.rst @@ -0,0 +1,84 @@ +GossipSub 1.3 Extensions and Topic Observation +============================================== + +Overview +-------- + +Py-libp2p supports the GossipSub v1.3 Extensions Control Message mechanism and the +Topic Observation extension. These features require negotiating the +``/meshsub/1.3.0`` protocol (or later) with peers. + +Topic Observation +----------------- + +The Topic Observation extension allows a peer to receive IHAVE notifications for +a topic without being a full subscriber. This is useful for presence awareness: +knowing when messages are published on a topic without actually receiving the +message payloads. + +Lifecycle +~~~~~~~~~ + +1. **Start observing**: Call ``start_observing_topic(topic)`` to send OBSERVE + control messages to in-topic peers that support the extension. The router + will then send IHAVE notifications to you when new messages arrive on that + topic. + +2. **Receive IHAVE**: As an observer, you receive IHAVE control messages + containing message IDs. These are presence notifications only; observers do + not typically reply with IWANT to fetch the actual messages. + +3. **Stop observing**: Call ``stop_observing_topic(topic)`` to send UNOBSERVE + control messages and stop receiving IHAVE notifications for that topic. + +API Usage Snippet +~~~~~~~~~~~~~~~~~ + +The snippet below demonstrates the Topic Observation API calls. It is not a +complete runnable program (host setup, service lifecycle, and peer wiring are +omitted for brevity). For a runnable end-to-end example, see +:doc:`examples.pubsub`. + +.. code-block:: python + + from libp2p import new_host + from libp2p.pubsub.gossipsub import PROTOCOL_ID_V13, GossipSub + from libp2p.pubsub.pubsub import Pubsub + + # Create host and Pubsub with a v1.3-capable GossipSub router. + host = new_host() + gossipsub = GossipSub( + protocols=[PROTOCOL_ID_V13], + degree=6, + degree_low=4, + degree_high=12, + ) + pubsub = Pubsub(host, gossipsub) + + # Start observing a topic (IHAVE-only presence notifications). + # In practice, call this once Pubsub/GossipSub services are running. + await gossipsub.start_observing_topic("my-topic") + + # ... later, when done ... + await gossipsub.stop_observing_topic("my-topic") + +Protocol Requirements +~~~~~~~~~~~~~~~~~~~~~ + +* Topic Observation requires both peers to negotiate ``/meshsub/1.3.0`` (or + later) and to advertise support via the Extensions Control Message. +* Extensions are only sent when the negotiated protocol is v1.3+; peers on + v1.1/v1.2 do not receive extension fields. + +Specification References +------------------------ + +* `GossipSub v1.3 Extensions `_ +* `Topic Observation proposal `_ + +Related Documentation +--------------------- + +* :doc:`gossipsub-1.2` - GossipSub 1.2 features (IDONTWANT, etc.) +* :doc:`examples.pubsub` - PubSub chat example +* :doc:`libp2p.pubsub` - Complete PubSub API documentation diff --git a/docs/libp2p.pubsub.rst b/docs/libp2p.pubsub.rst index 62eda2fca..a6f53b992 100644 --- a/docs/libp2p.pubsub.rst +++ b/docs/libp2p.pubsub.rst @@ -3,7 +3,9 @@ libp2p.pubsub package Py-libp2p provides a comprehensive PubSub implementation with support for both FloodSub and GossipSub protocols, including the latest GossipSub 1.2 specification with IDONTWANT control messages for improved bandwidth efficiency. -For detailed information about GossipSub 1.2 features and configuration, see :doc:`gossipsub-1.2`. +For detailed information about GossipSub features and configuration, see +:doc:`gossipsub-1.2` (IDONTWANT, etc.) and :doc:`gossipsub-1.3` (v1.3 extensions, +Topic Observation). Subpackages ----------- diff --git a/libp2p/pubsub/extensions.py b/libp2p/pubsub/extensions.py new file mode 100644 index 000000000..d41f591b7 --- /dev/null +++ b/libp2p/pubsub/extensions.py @@ -0,0 +1,499 @@ +""" +GossipSub v1.3 Extensions Control Message support. + +Spec: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.3.md +extensions.proto: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/extensions/extensions.proto + +Design mirrors the go-libp2p reference implementation (pubsub/extensions.go in +libp2p/go-libp2p-pubsub). + +Key spec rules implemented here: + 1. Extensions control message MUST be in the FIRST message on the stream. + 2. Extensions control message MUST NOT be sent more than once per peer. + 3. A second Extensions control message from the same peer is misbehaviour. + 4. Peers MUST ignore unknown extensions (forward-compatible). +""" + +from __future__ import annotations + +from collections.abc import ( + Callable, +) +from dataclasses import ( + dataclass, + field, +) +import logging + +from libp2p.peer.id import ( + ID, +) + +from .pb import ( + rpc_pb2, +) + +logger = logging.getLogger("libp2p.pubsub.extensions") + +ReportMisbehaviour = Callable[[ID], None] + + +@dataclass +class PeerExtensions: + """ + Describes the set of GossipSub v1.3 extensions that a peer supports. + + Each field corresponds to one optional extension. When we receive a peer's + ``ControlExtensions`` protobuf we decode it into a ``PeerExtensions`` + instance. When we build our own hello packet we encode our + ``PeerExtensions`` into the outgoing ``ControlExtensions`` protobuf. + + Adding a new extension: + 1. Add a ``bool`` field here (default ``False``). + 2. Set the field in :meth:`from_control_extensions`. + 3. Populate the field in :meth:`to_control_extensions`. + 4. Add any per-peer activation logic in :class:`ExtensionsState`. + """ + + # Topic Observation extension (GossipSub v1.3 Topic Observation proposal). + # https://ethresear.ch/t/gossipsub-topic-observation-proposed-gossipsub-1-3/20907 + topic_observation: bool = False + + # testExtension – field 6492434 – used exclusively for cross-implementation + # interoperability testing (go-libp2p / rust-libp2p / py-libp2p). + test_extension: bool = False + + @classmethod + def from_control_extensions(cls, ext: rpc_pb2.ControlExtensions) -> PeerExtensions: + """ + Decode a wire ``ControlExtensions`` protobuf into a ``PeerExtensions``. + + Unknown fields in ``ext`` are silently ignored per spec rule 3 + ("Peers MUST ignore unknown extensions"). + """ + return cls( + topic_observation=ext.topicObservation, + test_extension=ext.testExtension, + ) + + def to_control_extensions(self) -> rpc_pb2.ControlExtensions: + """ + Encode this ``PeerExtensions`` into a wire ``ControlExtensions`` protobuf. + + Only fields that are ``True`` are set; unset optional proto fields are + omitted from the serialised bytes (proto2 semantics). + """ + kwargs: dict[str, bool] = {} + if self.topic_observation: + kwargs["topicObservation"] = True + if self.test_extension: + kwargs["testExtension"] = True + return rpc_pb2.ControlExtensions(**kwargs) + + def has_any(self) -> bool: + """Return True if the local peer supports at least one extension.""" + return self.topic_observation or self.test_extension + + def supports_topic_observation(self) -> bool: + return self.topic_observation + + def supports_test_extension(self) -> bool: + return self.test_extension + + +@dataclass +class ExtensionsState: + """ + Per-router state for the GossipSub v1.3 extension exchange protocol. + + Mirrors ``extensionsState`` in go-libp2p's ``extensions.go``. + + Lifecycle (per peer): + 1. ``build_hello_extensions(peer_id)`` is called when we open a stream + and are about to send the first message. It mutates the hello RPC + in-place, adding ``control.extensions`` when appropriate, and records + that we have sent extensions to this peer. + 2. ``handle_rpc(rpc, peer_id)`` is called on every incoming RPC. + - For the *first* RPC from a peer it records their extensions. + - For subsequent RPCs it checks for a duplicate extensions field and + calls ``report_misbehaviour`` if one is found. + + The ``report_misbehaviour`` callback is expected to apply a peer-score + penalty (analogous to go-libp2p's ``reportMisbehavior``). + """ + + # Extensions we advertise to other peers. + my_extensions: PeerExtensions = field(default_factory=PeerExtensions) + + # Extensions we have received from each peer (populated on first RPC). + _peer_extensions: dict[ID, PeerExtensions] = field( + default_factory=dict, init=False, repr=False + ) + + # Set of peer IDs to whom we have already sent the extensions control message. + # Used to enforce the "at most once" rule on the sending side. + _sent_extensions: set[ID] = field(default_factory=set, init=False, repr=False) + + # Optional callback invoked when a peer sends a duplicate extensions message. + _report_misbehaviour: ReportMisbehaviour | None = field( + default=None, init=False, repr=False + ) + + def set_report_misbehaviour(self, callback: ReportMisbehaviour | None) -> None: + """ + Register the callback that penalises misbehaving peers. + + :param callback: callable(peer_id: ID) -> None + """ + self._report_misbehaviour = callback + + # ------------------------------------------------------------------ + # Sending side + # ------------------------------------------------------------------ + + def build_hello_extensions(self, peer_id: ID, hello: rpc_pb2.RPC) -> rpc_pb2.RPC: + """ + Attach our ``ControlExtensions`` to *hello* if this is a v1.3 peer and + we support at least one extension. + + Per spec rule 1: "If a peer supports any extension, the Extensions + control message MUST be included in the first message on the stream." + + Per spec rule 2: "It MUST NOT be sent more than once." + + This method MUST be called exactly once per peer, before the hello + packet is written to the stream. + + :param peer_id: the remote peer we are greeting. + :param hello: the RPC packet being constructed (mutated in-place). + :return: the (possibly mutated) RPC packet. + """ + if not self.my_extensions.has_any(): + # Nothing to advertise – still record that we did our part so that + # the "sent" tracking is consistent. + self._sent_extensions.add(peer_id) + return hello + + # Ensure control sub-message exists. + if not hello.HasField("control"): + hello.control.CopyFrom(rpc_pb2.ControlMessage()) + + hello.control.extensions.CopyFrom(self.my_extensions.to_control_extensions()) + + self._sent_extensions.add(peer_id) + logger.debug( + "Sent extensions to peer %s: topic_observation=%s test_extension=%s", + peer_id, + self.my_extensions.topic_observation, + self.my_extensions.test_extension, + ) + + # If we already received their extensions (unlikely race on the first + # message, but handled for correctness), activate the shared features. + if peer_id in self._peer_extensions: + self._activate_peer(peer_id) + + return hello + + # ------------------------------------------------------------------ + # Receiving side + # ------------------------------------------------------------------ + + def handle_rpc(self, rpc: rpc_pb2.RPC, peer_id: ID) -> None: + """ + Process the extensions portion of an incoming RPC. + + Called for every incoming RPC. On the very first call for a given + peer this records the peer's extensions; on subsequent calls it checks + for a duplicate ``control.extensions`` field. + + :param rpc: the full incoming RPC message. + :param peer_id: the peer who sent the RPC. + """ + if peer_id not in self._peer_extensions: + # This is the first RPC from this peer. + peer_ext = self._extract_peer_extensions(rpc) + self._peer_extensions[peer_id] = peer_ext + + logger.debug( + "Received extensions from peer %s: topic_observation=%s " + "test_extension=%s", + peer_id, + peer_ext.topic_observation, + peer_ext.test_extension, + ) + + # If we have already sent our extensions, the exchange is complete. + if peer_id in self._sent_extensions: + self._activate_peer(peer_id) + else: + # We already have this peer's extensions. A second + # ``control.extensions`` field is a protocol violation. + if self._rpc_has_extensions(rpc): + logger.warning( + "Peer %s sent a duplicate Extensions control message – " + "this is a protocol violation (GossipSub v1.3 spec rule 2).", + peer_id, + ) + if self._report_misbehaviour is not None: + self._report_misbehaviour(peer_id) + + # ------------------------------------------------------------------ + # Peer lifecycle + # ------------------------------------------------------------------ + + def remove_peer(self, peer_id: ID) -> None: + """ + Clean up all extension state for a disconnected peer. + + :param peer_id: the peer that disconnected. + """ + self._peer_extensions.pop(peer_id, None) + self._sent_extensions.discard(peer_id) + + # ------------------------------------------------------------------ + # Queries + # ------------------------------------------------------------------ + + def peer_supports_topic_observation(self, peer_id: ID) -> bool: + """ + Return True if *peer_id* has advertised the Topic Observation extension. + + :param peer_id: the remote peer to query. + """ + ext = self._peer_extensions.get(peer_id) + return ext is not None and ext.topic_observation + + def peer_supports_test_extension(self, peer_id: ID) -> bool: + """ + Return True if *peer_id* has advertised the test extension. + + :param peer_id: the remote peer to query. + """ + ext = self._peer_extensions.get(peer_id) + return ext is not None and ext.test_extension + + def both_support_topic_observation(self, peer_id: ID) -> bool: + """ + Return True if both this node and *peer_id* support Topic Observation. + + Feature activation is only valid when both sides have advertised + support (per GossipSub v1.3 spec section on extension behaviour). + + :param peer_id: the remote peer to query. + """ + return ( + self.my_extensions.topic_observation + and self.peer_supports_topic_observation(peer_id) + ) + + def both_support_test_extension(self, peer_id: ID) -> bool: + """ + Return True if both this node and *peer_id* support the test extension. + + :param peer_id: the remote peer to query. + """ + return self.my_extensions.test_extension and self.peer_supports_test_extension( + peer_id + ) + + def get_peer_extensions(self, peer_id: ID) -> PeerExtensions | None: + """ + Return the extensions advertised by *peer_id*, or ``None`` if we have + not yet received the peer's first message. + + :param peer_id: the remote peer to query. + """ + return self._peer_extensions.get(peer_id) + + def sent_extensions_to(self, peer_id: ID) -> bool: + """ + Return True if we have already sent extensions to *peer_id*. + + :param peer_id: the remote peer to query. + """ + return peer_id in self._sent_extensions + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + @staticmethod + def _rpc_has_extensions(rpc: rpc_pb2.RPC) -> bool: + """Return True if *rpc* carries a ``control.extensions`` field.""" + return rpc.HasField("control") and rpc.control.HasField("extensions") + + @staticmethod + def _extract_peer_extensions(rpc: rpc_pb2.RPC) -> PeerExtensions: + """ + Decode the peer's extensions from an RPC, returning an empty + ``PeerExtensions`` if none are present. + """ + if ExtensionsState._rpc_has_extensions(rpc): + return PeerExtensions.from_control_extensions(rpc.control.extensions) + return PeerExtensions() + + def _activate_peer(self, peer_id: ID) -> None: + """ + Called once both sides have exchanged extensions. Logs the active + feature set; subclasses / callers can extend this for bookkeeping. + + :param peer_id: the peer whose extension handshake just completed. + """ + peer_ext = self._peer_extensions[peer_id] + if self.my_extensions.topic_observation and peer_ext.topic_observation: + logger.debug("Topic Observation extension active with peer %s.", peer_id) + if self.my_extensions.test_extension and peer_ext.test_extension: + logger.debug("Test extension active with peer %s.", peer_id) + + +# --------------------------------------------------------------------------- +# Topic Observation state (per router) +# --------------------------------------------------------------------------- + + +class TopicObservationState: + """ + Manages the Topic Observation extension state for a single GossipSub router. + + Spec: https://ethresear.ch/t/gossipsub-topic-observation-proposed-gossipsub-1-3/20907 + + Two directions: + + * **Outbound (we are the observer):** We send ``OBSERVE`` to subscribing + peers and receive ``IHAVE`` notifications. We do NOT receive full + message payloads unless we explicitly request them. + + * **Inbound (we are the subscriber):** We receive ``OBSERVE`` / ``UNOBSERVE`` + from observing peers and send ``IHAVE`` to them when new messages arrive. + + The actual IHAVE emission is handled in ``GossipSub.publish()`` so that + notification is immediate (not deferred to the heartbeat) per the spec. + """ + + def __init__(self) -> None: + # Topics we are currently observing (outbound). + # topic -> set of subscriber peer IDs we sent OBSERVE to. + self._observing: dict[str, set[ID]] = {} + + # Peers that are observing us (inbound). + # topic -> set of observer peer IDs. + self._observers: dict[str, set[ID]] = {} + + # ------------------------------------------------------------------ + # Outbound: this node is an observer + # ------------------------------------------------------------------ + + def add_observing(self, topic: str, subscriber_peer: ID) -> None: + """ + Record that we are observing *topic* via *subscriber_peer*. + + Called after we emit an OBSERVE control message. + + :param topic: the topic we sent OBSERVE for. + :param subscriber_peer: the subscribing peer we sent OBSERVE to. + """ + self._observing.setdefault(topic, set()).add(subscriber_peer) + + def remove_observing(self, topic: str, subscriber_peer: ID) -> None: + """ + Record that we stopped observing *topic* via *subscriber_peer*. + + Called after we emit an UNOBSERVE control message. + + :param topic: the topic we sent UNOBSERVE for. + :param subscriber_peer: the peer we sent UNOBSERVE to. + """ + peers = self._observing.get(topic) + if peers is not None: + peers.discard(subscriber_peer) + if not peers: + del self._observing[topic] + + def is_observing(self, topic: str) -> bool: + """ + Return True if we are currently observing *topic*. + + :param topic: the topic to query. + """ + return bool(self._observing.get(topic)) + + # ------------------------------------------------------------------ + # Inbound: remote peers are observing us + # ------------------------------------------------------------------ + + def add_observer(self, topic: str, observer_peer: ID) -> None: + """ + Record that *observer_peer* wants to observe *topic* from us. + + Called when we handle an incoming OBSERVE control message. + + :param topic: the topic the peer wants to observe. + :param observer_peer: the peer that sent us the OBSERVE. + """ + self._observers.setdefault(topic, set()).add(observer_peer) + logger.debug( + "Peer %s is now observing topic '%s' via us.", observer_peer, topic + ) + + def remove_observer(self, topic: str, observer_peer: ID) -> None: + """ + Remove *observer_peer* from the observer list for *topic*. + + Called when we handle an incoming UNOBSERVE control message. + + :param topic: the topic the peer wants to stop observing. + :param observer_peer: the peer that sent us the UNOBSERVE. + """ + peers = self._observers.get(topic) + if peers is not None: + peers.discard(observer_peer) + if not peers: + del self._observers[topic] + logger.debug( + "Peer %s stopped observing topic '%s' via us.", + observer_peer, + topic, + ) + + def get_observers(self, topic: str) -> set[ID]: + """ + Return the set of peers that are currently observing *topic* from us. + + :param topic: the topic to query. + :return: a copy of the observer set (empty set if none). + """ + return set(self._observers.get(topic, set())) + + def remove_peer(self, peer_id: ID) -> None: + """ + Clean up all Topic Observation state for a disconnected peer. + + :param peer_id: the peer that disconnected. + """ + for topic in list(self._observers): + self._observers[topic].discard(peer_id) + if not self._observers[topic]: + del self._observers[topic] + + for topic in list(self._observing): + self._observing[topic].discard(peer_id) + if not self._observing[topic]: + del self._observing[topic] + + def get_observing_topics(self) -> set[str]: + """ + Return the set of topics this node is currently observing (outbound). + + :return: set of topic strings we sent OBSERVE for. + """ + return set(self._observing.keys()) + + def get_subscriber_peers_for_topic(self, topic: str) -> set[ID]: + """ + Return the set of subscriber peers we sent OBSERVE to for *topic*. + + :param topic: the topic to query. + :return: set of subscriber peer IDs we are observing through. + """ + return set(self._observing.get(topic, set())) diff --git a/libp2p/pubsub/gossipsub.py b/libp2p/pubsub/gossipsub.py index 2f3bfeabc..4a24afede 100644 --- a/libp2p/pubsub/gossipsub.py +++ b/libp2p/pubsub/gossipsub.py @@ -48,6 +48,11 @@ from .exceptions import ( NoPubsubAttached, ) +from .extensions import ( + ExtensionsState, + PeerExtensions, + TopicObservationState, +) from .mcache import ( MessageCache, ) @@ -70,6 +75,8 @@ PROTOCOL_ID = TProtocol("/meshsub/1.0.0") PROTOCOL_ID_V11 = TProtocol("/meshsub/1.1.0") PROTOCOL_ID_V12 = TProtocol("/meshsub/1.2.0") +# GossipSub v1.3: Extensions Control Message +# Spec: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.3.md PROTOCOL_ID_V13 = TProtocol("/meshsub/1.3.0") PROTOCOL_ID_V14 = TProtocol("/meshsub/1.4.0") PROTOCOL_ID_V20 = TProtocol("/meshsub/2.0.0") @@ -121,6 +128,11 @@ class GossipSub(IPubsubRouter, Service): int # Maximum number of message IDs to track per peer in IDONTWANT lists ) + # Gossipsub v1.3 – Extensions Control Message + # Spec: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.3.md + extensions_state: ExtensionsState + topic_observation: TopicObservationState + # Gossipsub v2.0 adaptive features adaptive_gossip_enabled: bool network_health_score: float # 0.0 (poor) to 1.0 (excellent) @@ -176,6 +188,10 @@ def __init__( max_messages_per_topic_per_second: float = 10.0, eclipse_protection_enabled: bool = True, min_mesh_diversity_ips: int = 3, + # GossipSub v1.3 – Extensions Control Message + # Pass a PeerExtensions instance to advertise your supported extensions + # to remote peers in the first message on every new stream. + my_extensions: PeerExtensions | None = None, max_pending_messages_per_peer: int = 100, pending_messages_ttl: float = 30.0, ) -> None: @@ -225,6 +241,23 @@ def __init__( self.dont_send_message_ids = dict() self.max_idontwant_messages = max_idontwant_messages + # Gossipsub v1.3 – Extensions Control Message + # ExtensionsState tracks: + # - which extensions we advertise (my_extensions) + # - which extensions each peer has advertised (_peer_extensions) + # - whether we have already sent our extensions to a peer (_sent_extensions) + self.extensions_state = ExtensionsState( + my_extensions=my_extensions or PeerExtensions() + ) + # Wire up the misbehaviour reporter after scorer is initialised. + self.extensions_state.set_report_misbehaviour( + self._report_extensions_misbehaviour + ) + + # Topic Observation extension state (per router). + # Tracks observers (inbound) and topics we are observing (outbound). + self.topic_observation = TopicObservationState() + # Gossipsub v2.0 adaptive features self.adaptive_gossip_enabled = adaptive_gossip_enabled self.network_health_score = 1.0 # Start optimistic @@ -299,6 +332,24 @@ def supports_scoring(self, peer_id: ID) -> bool: PROTOCOL_ID_V20, ) + def supports_v13_features(self, peer_id: ID) -> bool: + """ + Check if *peer_id* negotiated the GossipSub v1.3 protocol. + + v1.3 is required for the Extensions Control Message mechanism and the + Topic Observation extension. A peer that negotiated v1.3 (or later) + MUST have received (and sent) the Extensions control message in the + first stream message. + + :param peer_id: The peer to check. + :return: True if peer negotiated ``/meshsub/1.3.0`` or later. + """ + return self.peer_protocol.get(peer_id) in ( + PROTOCOL_ID_V13, + PROTOCOL_ID_V14, + PROTOCOL_ID_V20, + ) + def supports_v20_features(self, peer_id: ID) -> bool: """ Check if peer supports Gossipsub v2.0 features. @@ -391,25 +442,20 @@ async def emit_extension( """ Emit an extension message to a peer. + In GossipSub v1.3 wire format, the Extensions control message is sent + only once in the first stream message (hello). Arbitrary extension + name/data messages after that are not part of the v1.3 spec. This method + is a no-op for compatibility with the extension_handlers API. + :param extension_name: Name of the extension :param data: Extension data :param to_peer: Target peer ID """ - if not self.supports_protocol_feature(to_peer, "extensions"): - logger.warning( - "Cannot send extension to peer %s: peer doesn't support extensions", - to_peer, - ) - return - - extension_msg = rpc_pb2.ControlExtension() - extension_msg.name = extension_name - extension_msg.data = data - - control_msg = rpc_pb2.ControlMessage() - control_msg.extensions.extend([extension_msg]) - - await self.emit_control_message(control_msg, to_peer) + logger.debug( + "emit_extension(%s, ...) called: v1.3 wire format only sends Extensions " + "in the first hello; skipping.", + extension_name, + ) def _check_iwant_rate_limit(self, peer_id: ID) -> bool: """ @@ -767,6 +813,11 @@ def remove_peer(self, peer_id: ID) -> None: # Clean up security state self._cleanup_security_state(peer_id) + # GossipSub v1.3: clean up extension exchange state for this peer + self.extensions_state.remove_peer(peer_id) + + # Topic Observation: clean up observer / observing state for this peer + self.topic_observation.remove_peer(peer_id) # Discard any pending messages for this peer self._pending_messages.pop(peer_id, None) self._pending_control.pop(peer_id, None) @@ -791,6 +842,12 @@ async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None: control_message = rpc.control + # GossipSub v1.3: process Extensions control message BEFORE dispatching + # other control messages. This must happen on every incoming RPC so + # that the "at most once" duplicate-detection logic runs correctly. + if self.supports_v13_features(sender_peer_id): + self.extensions_state.handle_rpc(rpc, sender_peer_id) + # Relay each rpc control message to the appropriate handler if control_message.ihave: for ihave in control_message.ihave: @@ -807,9 +864,17 @@ async def handle_rpc(self, rpc: rpc_pb2.RPC, sender_peer_id: ID) -> None: if control_message.idontwant: for idontwant in control_message.idontwant: await self.handle_idontwant(idontwant, sender_peer_id) - if control_message.extensions: - for extension in control_message.extensions: - await self.handle_extension(extension, sender_peer_id) + # v1.3 Extensions control message is processed above via + # extensions_state.handle_rpc() + + # GossipSub v1.3 – Topic Observation extension + if self.supports_v13_features(sender_peer_id): + if control_message.observe: + for observe in control_message.observe: + await self.handle_observe(observe, sender_peer_id) + if control_message.unobserve: + for unobserve in control_message.unobserve: + await self.handle_unobserve(unobserve, sender_peer_id) async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None: """Invoked to forward a new message that has been validated.""" @@ -854,6 +919,11 @@ async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None: # Send IDONTWANT to mesh peers about this message await self._emit_idontwant_for_message(msg_id, pubsub_msg.topicIDs) + # GossipSub v1.3 – Topic Observation: immediately notify observers with IHAVE. + # Unlike the heartbeat gossip, notification is sent right after receiving + # a message so observers get near-real-time awareness of new messages. + await self._notify_observers(pubsub_msg.topicIDs, msg_id) + for peer_id in peers_gen: if self.pubsub is None: raise NoPubsubAttached @@ -1764,8 +1834,8 @@ def pack_control_msgs( graft_msgs: list[rpc_pb2.ControlGraft] | None, prune_msgs: list[rpc_pb2.ControlPrune] | None, idontwant_msgs: list[rpc_pb2.ControlIDontWant] | None = None, - extension_msgs: list[rpc_pb2.ControlExtension] | None = None, ) -> rpc_pb2.ControlMessage: + """Pack control messages. v1.3 Extensions are set only in the first hello.""" control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage() if ihave_msgs: control_msg.ihave.extend(ihave_msgs) @@ -1775,8 +1845,6 @@ def pack_control_msgs( control_msg.prune.extend(prune_msgs) if idontwant_msgs: control_msg.idontwant.extend(idontwant_msgs) - if extension_msgs: - control_msg.extensions.extend(extension_msgs) return control_msg async def emit_ihave(self, topic: str, msg_ids: Any, to_peer: ID) -> None: @@ -2086,55 +2154,221 @@ async def handle_idontwant( self.max_idontwant_messages, ) - async def handle_extension( - self, extension_msg: rpc_pb2.ControlExtension, sender_peer_id: ID + # ------------------------------------------------------------------ # + # GossipSub v1.3 – Topic Observation extension handlers # + # ------------------------------------------------------------------ # + + async def handle_observe( + self, observe_msg: rpc_pb2.ControlObserve, sender_peer_id: ID ) -> None: """ - Handle incoming Extension control message. + Handle an incoming OBSERVE control message. - Extensions allow for protocol extensibility in GossipSub v1.3+. - This method dispatches to registered extension handlers. + An OBSERVE message is sent by an *observer* peer that wants to receive + IHAVE notifications for ``topicID`` without being a full subscriber. + After this call, every time a new message for ``topicID`` arrives we + will send an IHAVE to *sender_peer_id* immediately (not at the next + heartbeat). - :param extension_msg: The Extension control message - :param sender_peer_id: ID of the peer who sent the message + Per the Topic Observation spec, only peers that meet both of the + following conditions should be permitted to send OBSERVE: + + - Negotiated ``/meshsub/1.3.0`` (checked by the caller), and + - Advertised the ``topicObservation`` extension in their first message. + + :param observe_msg: The OBSERVE control message. + :param sender_peer_id: ID of the peer that sent the OBSERVE. """ - extension_name = extension_msg.name - extension_data = extension_msg.data + topic: str = observe_msg.topicID + if not topic: + logger.debug( + "Received OBSERVE with empty topicID from peer %s, ignoring.", + sender_peer_id, + ) + return - # Check if peer supports extensions - if not self.supports_protocol_feature(sender_peer_id, "extensions"): - logger.warning( - "Received extension from peer %s that doesn't support extensions", + # Only honour OBSERVE if the peer advertised topic_observation support. + if not self.extensions_state.peer_supports_topic_observation(sender_peer_id): + logger.debug( + "Peer %s sent OBSERVE but did not advertise topic_observation " + "extension – ignoring.", sender_peer_id, ) return - # Dispatch to registered extension handler - if ( - hasattr(self, "extension_handlers") - and extension_name in self.extension_handlers - ): - try: - await self.extension_handlers[extension_name]( - extension_data, sender_peer_id - ) + self.topic_observation.add_observer(topic, sender_peer_id) + logger.debug( + "OBSERVE: peer %s is now observing topic '%s'.", sender_peer_id, topic + ) + + async def handle_unobserve( + self, unobserve_msg: rpc_pb2.ControlUnobserve, sender_peer_id: ID + ) -> None: + """ + Handle an incoming UNOBSERVE control message. + + Stops sending IHAVE notifications to *sender_peer_id* for ``topicID``. + + :param unobserve_msg: The UNOBSERVE control message. + :param sender_peer_id: ID of the peer that sent the UNOBSERVE. + """ + topic: str = unobserve_msg.topicID + if not topic: + logger.debug( + "Received UNOBSERVE with empty topicID from peer %s, ignoring.", + sender_peer_id, + ) + return + + self.topic_observation.remove_observer(topic, sender_peer_id) + logger.debug( + "UNOBSERVE: peer %s stopped observing topic '%s'.", + sender_peer_id, + topic, + ) + + # ------------------------------------------------------------------ # + # GossipSub v1.3 – Topic Observation extension emitters # + # ------------------------------------------------------------------ # + + async def emit_observe(self, topic: str, to_peer: ID) -> None: + """ + Emit an OBSERVE control message to *to_peer* for *topic*. + + Call this when this node wants to observe *topic* via a subscribing + peer. After sending OBSERVE, *to_peer* should begin sending IHAVE + to us when new messages arrive in *topic*. + + :param topic: The topic to start observing. + :param to_peer: The subscribing peer to send OBSERVE to. + """ + observe_msg = rpc_pb2.ControlObserve(topicID=topic) + control_msg = rpc_pb2.ControlMessage() + control_msg.observe.extend([observe_msg]) + + await self.emit_control_message(control_msg, to_peer) + self.topic_observation.add_observing(topic, to_peer) + logger.debug("OBSERVE sent: topic='%s' to peer %s.", topic, to_peer) + + async def emit_unobserve(self, topic: str, to_peer: ID) -> None: + """ + Emit an UNOBSERVE control message to *to_peer* for *topic*. + + Call this to stop observing *topic* via *to_peer*. + + :param topic: The topic to stop observing. + :param to_peer: The subscribing peer to send UNOBSERVE to. + """ + unobserve_msg = rpc_pb2.ControlUnobserve(topicID=topic) + control_msg = rpc_pb2.ControlMessage() + control_msg.unobserve.extend([unobserve_msg]) + + await self.emit_control_message(control_msg, to_peer) + self.topic_observation.remove_observing(topic, to_peer) + logger.debug("UNOBSERVE sent: topic='%s' to peer %s.", topic, to_peer) + + async def _notify_observers(self, topic_ids: Iterable[str], msg_id: bytes) -> None: + """ + Immediately send an IHAVE to every observer of each topic in + *topic_ids* when a new message arrives. + + Unlike the gossip heartbeat, this notification is *immediate* so that + observers get near-real-time awareness (per the Topic Observation spec). + Observers are not expected to reply with IWANT in this flow; they use + the IHAVE purely as a presence notification. + + :param topic_ids: Topics the new message belongs to. + :param msg_id: The message ID to include in the IHAVE notifications. + """ + if self.pubsub is None: + return + pubsub = self.pubsub # narrow type for pyrefly / mypy + + # Use hex() to match heartbeat path; str(bytes) produces "b'...'" which + # fails safe_bytes_from_hex() in handle_ihave(). + msg_id_str = msg_id.hex() + + for topic in topic_ids: + observers = self.topic_observation.get_observers(topic) + if not observers: + continue + + for observer_peer in observers: + if observer_peer not in pubsub.peers: + continue + await self.emit_ihave(topic, [msg_id_str], observer_peer) logger.debug( - "Processed extension '%s' from peer %s", - extension_name, - sender_peer_id, + "Topic Observation: sent IHAVE(topic='%s', msg_id=%s) " + "to observer %s.", + topic, + msg_id_str, + observer_peer, ) - except Exception as e: - logger.warning( - "Failed to process extension '%s' from peer %s: %s", - extension_name, - sender_peer_id, - e, + + # ------------------------------------------------------------------ # + # GossipSub v1.3 – Extensions misbehaviour reporting # + # ------------------------------------------------------------------ # + + def _report_extensions_misbehaviour(self, peer_id: ID) -> None: + """ + Apply a peer-score penalty when a peer sends a duplicate Extensions + control message (violates GossipSub v1.3 spec rule 2). + + Mirrors go-libp2p's ``reportMisbehavior`` callback. + + :param peer_id: The misbehaving peer. + """ + if self.scorer is not None: + self.scorer.penalize_behavior(peer_id, 1.0) + logger.warning( + "Applied score penalty to peer %s for sending duplicate " + "Extensions control message (GossipSub v1.3 violation).", + peer_id, + ) + + async def start_observing_topic(self, topic: str) -> None: + """ + Start observing *topic* by sending OBSERVE to all in-topic v1.3 peers + that support the Topic Observation extension. + + This is the high-level API for callers that want to become an observer. + Internally it picks suitable subscriber peers and calls + :meth:`emit_observe` for each of them. + + :param topic: The topic to start observing. + """ + if self.pubsub is None: + raise NoPubsubAttached + + peers_subscribed = self.pubsub.peer_topics.get(topic, set()) + for peer in peers_subscribed: + if self.supports_v13_features( + peer + ) and self.extensions_state.both_support_topic_observation(peer): + await self.emit_observe(topic, peer) + logger.debug( + "Started observing topic '%s' via peer %s.", + topic, + peer, ) - else: + + async def stop_observing_topic(self, topic: str) -> None: + """ + Stop observing *topic* by sending UNOBSERVE to all peers we previously + sent OBSERVE to for *topic*. + + :param topic: The topic to stop observing. + """ + if self.pubsub is None: + raise NoPubsubAttached + + subscriber_peers = self.topic_observation.get_subscriber_peers_for_topic(topic) + for peer in subscriber_peers: + await self.emit_unobserve(topic, peer) logger.debug( - "No handler registered for extension '%s' from peer %s", - extension_name, - sender_peer_id, + "Stopped observing topic '%s' via peer %s.", + topic, + peer, ) def _track_peer_ip(self, peer_id: ID) -> None: diff --git a/libp2p/pubsub/pb/rpc.proto b/libp2p/pubsub/pb/rpc.proto index de4cbdb97..b0691e93f 100644 --- a/libp2p/pubsub/pb/rpc.proto +++ b/libp2p/pubsub/pb/rpc.proto @@ -1,4 +1,12 @@ // Modified from https://github.com/libp2p/go-libp2p-pubsub/blob/master/pb/rpc.proto +// Updated with GossipSub v1.3 Extensions Control Message support. +// Spec: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.3.md +// extensions.proto: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/extensions/extensions.proto +// +// Interop note: The Topic Observation extension (observe/unobserve, topicObservation) +// is not yet in the upstream libp2p/specs extensions.proto. Field numbers +// follow the go-libp2p reference implementation for cross-client interop. +// See: https://ethresear.ch/t/gossipsub-topic-observation-proposed-gossipsub-1-3/20907 syntax = "proto2"; @@ -15,6 +23,12 @@ message RPC { optional ControlMessage control = 3; optional bytes senderRecord = 4; + + // Canonical Extensions register their top-level RPC messages here. + + // Experimental Extensions MUST use field numbers larger than 0x200000 + // so they are encoded with at least 4 bytes (per GossipSub v1.3 spec). + optional TestExtension testExtension = 6492434; } message Message { @@ -32,12 +46,27 @@ message ControlMessage { repeated ControlGraft graft = 3; repeated ControlPrune prune = 4; repeated ControlIDontWant idontwant = 5; - repeated ControlExtension extensions = 6; + + // GossipSub v1.3: Extensions control message (MUST be in first message, + // MUST NOT be sent more than once per peer). + // Spec: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.3.md + optional ControlExtensions extensions = 6; + + // Topic Observation extension control messages. + repeated ControlObserve observe = 7; + repeated ControlUnobserve unobserve = 8; } -message ControlExtension { - optional string name = 1; - optional bytes data = 2; +// ControlExtensions advertises which v1.3 extensions the sending peer supports. +// Peers MUST ignore unknown fields (forward-compatible per spec). +// Field numbers for experimental extensions MUST be > 0x200000. +message ControlExtensions { + // Set to true if the peer supports the Topic Observation extension. + optional bool topicObservation = 1; + + // Experimental extensions use field numbers > 0x200000. + // testExtension: used for cross-implementation interop testing (go-libp2p compat). + optional bool testExtension = 6492434; } message ControlIHave { @@ -63,6 +92,24 @@ message ControlIDontWant { repeated bytes messageIDs = 1; } +// ControlObserve: Topic Observation extension. +// Sent by an observer to start receiving IHAVE notifications for a topic +// without being a full subscriber. (GossipSub v1.3 Topic Observation extension) +message ControlObserve { + optional string topicID = 1; +} + +// ControlUnobserve: Topic Observation extension. +// Sent by an observer to stop receiving IHAVE notifications for a topic. +message ControlUnobserve { + optional string topicID = 1; +} + +// TestExtension: used for interoperability testing of the v1.3 extension +// mechanism between implementations (go-libp2p, rust-libp2p, py-libp2p). +// An empty message — its presence on the wire is the signal. +message TestExtension {} + message PeerInfo { optional bytes peerID = 1; optional bytes signedPeerRecord = 2; diff --git a/libp2p/pubsub/pb/rpc_pb2.py b/libp2p/pubsub/pb/rpc_pb2.py index 994580df2..4edee2a4e 100644 --- a/libp2p/pubsub/pb/rpc_pb2.py +++ b/libp2p/pubsub/pb/rpc_pb2.py @@ -24,7 +24,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1alibp2p/pubsub/pb/rpc.proto\x12\tpubsub.pb\"\xca\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x12\x14\n\x0csenderRecord\x18\x04 \x01(\x0c\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\x91\x02\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\x12.\n\tidontwant\x18\x05 \x03(\x0b\x32\x1b.pubsub.pb.ControlIDontWant\x12/\n\nextensions\x18\x06 \x03(\x0b\x32\x1b.pubsub.pb.ControlExtension\".\n\x10\x43ontrolExtension\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"T\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\"\n\x05peers\x18\x02 \x03(\x0b\x32\x13.pubsub.pb.PeerInfo\x12\x0f\n\x07\x62\x61\x63koff\x18\x03 \x01(\x04\"&\n\x10\x43ontrolIDontWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\x0c\"4\n\x08PeerInfo\x12\x0e\n\x06peerID\x18\x01 \x01(\x0c\x12\x18\n\x10signedPeerRecord\x18\x02 \x01(\x0c\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1alibp2p/pubsub/pb/rpc.proto\x12\tpubsub.pb\"\xfe\x01\n\x03RPC\x12-\n\rsubscriptions\x18\x01 \x03(\x0b\x32\x16.pubsub.pb.RPC.SubOpts\x12#\n\x07publish\x18\x02 \x03(\x0b\x32\x12.pubsub.pb.Message\x12*\n\x07\x63ontrol\x18\x03 \x01(\x0b\x32\x19.pubsub.pb.ControlMessage\x12\x14\n\x0csenderRecord\x18\x04 \x01(\x0c\x12\x32\n\rtestExtension\x18\x92\xa2\x8c\x03 \x01(\x0b\x32\x18.pubsub.pb.TestExtension\x1a-\n\x07SubOpts\x12\x11\n\tsubscribe\x18\x01 \x01(\x08\x12\x0f\n\x07topicid\x18\x02 \x01(\t\"i\n\x07Message\x12\x0f\n\x07\x66rom_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\x12\r\n\x05seqno\x18\x03 \x01(\x0c\x12\x10\n\x08topicIDs\x18\x04 \x03(\t\x12\x11\n\tsignature\x18\x05 \x01(\x0c\x12\x0b\n\x03key\x18\x06 \x01(\x0c\"\xee\x02\n\x0e\x43ontrolMessage\x12&\n\x05ihave\x18\x01 \x03(\x0b\x32\x17.pubsub.pb.ControlIHave\x12&\n\x05iwant\x18\x02 \x03(\x0b\x32\x17.pubsub.pb.ControlIWant\x12&\n\x05graft\x18\x03 \x03(\x0b\x32\x17.pubsub.pb.ControlGraft\x12&\n\x05prune\x18\x04 \x03(\x0b\x32\x17.pubsub.pb.ControlPrune\x12.\n\tidontwant\x18\x05 \x03(\x0b\x32\x1b.pubsub.pb.ControlIDontWant\x12\x30\n\nextensions\x18\x06 \x01(\x0b\x32\x1c.pubsub.pb.ControlExtensions\x12*\n\x07observe\x18\x07 \x03(\x0b\x32\x19.pubsub.pb.ControlObserve\x12.\n\tunobserve\x18\x08 \x03(\x0b\x32\x1b.pubsub.pb.ControlUnobserve\"G\n\x11\x43ontrolExtensions\x12\x18\n\x10topicObservation\x18\x01 \x01(\x08\x12\x18\n\rtestExtension\x18\x92\xa2\x8c\x03 \x01(\x08\"3\n\x0c\x43ontrolIHave\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\x12\n\nmessageIDs\x18\x02 \x03(\t\"\"\n\x0c\x43ontrolIWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\t\"\x1f\n\x0c\x43ontrolGraft\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"T\n\x0c\x43ontrolPrune\x12\x0f\n\x07topicID\x18\x01 \x01(\t\x12\"\n\x05peers\x18\x02 \x03(\x0b\x32\x13.pubsub.pb.PeerInfo\x12\x0f\n\x07\x62\x61\x63koff\x18\x03 \x01(\x04\"&\n\x10\x43ontrolIDontWant\x12\x12\n\nmessageIDs\x18\x01 \x03(\x0c\"!\n\x0e\x43ontrolObserve\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"#\n\x10\x43ontrolUnobserve\x12\x0f\n\x07topicID\x18\x01 \x01(\t\"\x0f\n\rTestExtension\"4\n\x08PeerInfo\x12\x0e\n\x06peerID\x18\x01 \x01(\x0c\x12\x18\n\x10signedPeerRecord\x18\x02 \x01(\x0c\"\x87\x03\n\x0fTopicDescriptor\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\x31\n\x04\x61uth\x18\x02 \x01(\x0b\x32#.pubsub.pb.TopicDescriptor.AuthOpts\x12/\n\x03\x65nc\x18\x03 \x01(\x0b\x32\".pubsub.pb.TopicDescriptor.EncOpts\x1a|\n\x08\x41uthOpts\x12:\n\x04mode\x18\x01 \x01(\x0e\x32,.pubsub.pb.TopicDescriptor.AuthOpts.AuthMode\x12\x0c\n\x04keys\x18\x02 \x03(\x0c\"&\n\x08\x41uthMode\x12\x08\n\x04NONE\x10\x00\x12\x07\n\x03KEY\x10\x01\x12\x07\n\x03WOT\x10\x02\x1a\x83\x01\n\x07\x45ncOpts\x12\x38\n\x04mode\x18\x01 \x01(\x0e\x32*.pubsub.pb.TopicDescriptor.EncOpts.EncMode\x12\x11\n\tkeyHashes\x18\x02 \x03(\x0c\"+\n\x07\x45ncMode\x12\x08\n\x04NONE\x10\x00\x12\r\n\tSHAREDKEY\x10\x01\x12\x07\n\x03WOT\x10\x02') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -32,35 +32,41 @@ if not _descriptor._USE_C_DESCRIPTORS: DESCRIPTOR._loaded_options = None _globals['_RPC']._serialized_start=42 - _globals['_RPC']._serialized_end=244 - _globals['_RPC_SUBOPTS']._serialized_start=199 - _globals['_RPC_SUBOPTS']._serialized_end=244 - _globals['_MESSAGE']._serialized_start=246 - _globals['_MESSAGE']._serialized_end=351 - _globals['_CONTROLMESSAGE']._serialized_start=354 - _globals['_CONTROLMESSAGE']._serialized_end=627 - _globals['_CONTROLEXTENSION']._serialized_start=629 - _globals['_CONTROLEXTENSION']._serialized_end=675 - _globals['_CONTROLIHAVE']._serialized_start=677 - _globals['_CONTROLIHAVE']._serialized_end=728 - _globals['_CONTROLIWANT']._serialized_start=730 - _globals['_CONTROLIWANT']._serialized_end=764 - _globals['_CONTROLGRAFT']._serialized_start=766 - _globals['_CONTROLGRAFT']._serialized_end=797 - _globals['_CONTROLPRUNE']._serialized_start=799 - _globals['_CONTROLPRUNE']._serialized_end=883 - _globals['_CONTROLIDONTWANT']._serialized_start=885 - _globals['_CONTROLIDONTWANT']._serialized_end=923 - _globals['_PEERINFO']._serialized_start=925 - _globals['_PEERINFO']._serialized_end=977 - _globals['_TOPICDESCRIPTOR']._serialized_start=980 - _globals['_TOPICDESCRIPTOR']._serialized_end=1371 - _globals['_TOPICDESCRIPTOR_AUTHOPTS']._serialized_start=1113 - _globals['_TOPICDESCRIPTOR_AUTHOPTS']._serialized_end=1237 - _globals['_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE']._serialized_start=1199 - _globals['_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE']._serialized_end=1237 - _globals['_TOPICDESCRIPTOR_ENCOPTS']._serialized_start=1240 - _globals['_TOPICDESCRIPTOR_ENCOPTS']._serialized_end=1371 - _globals['_TOPICDESCRIPTOR_ENCOPTS_ENCMODE']._serialized_start=1328 - _globals['_TOPICDESCRIPTOR_ENCOPTS_ENCMODE']._serialized_end=1371 + _globals['_RPC']._serialized_end=296 + _globals['_RPC_SUBOPTS']._serialized_start=251 + _globals['_RPC_SUBOPTS']._serialized_end=296 + _globals['_MESSAGE']._serialized_start=298 + _globals['_MESSAGE']._serialized_end=403 + _globals['_CONTROLMESSAGE']._serialized_start=406 + _globals['_CONTROLMESSAGE']._serialized_end=772 + _globals['_CONTROLEXTENSIONS']._serialized_start=774 + _globals['_CONTROLEXTENSIONS']._serialized_end=845 + _globals['_CONTROLIHAVE']._serialized_start=847 + _globals['_CONTROLIHAVE']._serialized_end=898 + _globals['_CONTROLIWANT']._serialized_start=900 + _globals['_CONTROLIWANT']._serialized_end=934 + _globals['_CONTROLGRAFT']._serialized_start=936 + _globals['_CONTROLGRAFT']._serialized_end=967 + _globals['_CONTROLPRUNE']._serialized_start=969 + _globals['_CONTROLPRUNE']._serialized_end=1053 + _globals['_CONTROLIDONTWANT']._serialized_start=1055 + _globals['_CONTROLIDONTWANT']._serialized_end=1093 + _globals['_CONTROLOBSERVE']._serialized_start=1095 + _globals['_CONTROLOBSERVE']._serialized_end=1128 + _globals['_CONTROLUNOBSERVE']._serialized_start=1130 + _globals['_CONTROLUNOBSERVE']._serialized_end=1165 + _globals['_TESTEXTENSION']._serialized_start=1167 + _globals['_TESTEXTENSION']._serialized_end=1182 + _globals['_PEERINFO']._serialized_start=1184 + _globals['_PEERINFO']._serialized_end=1236 + _globals['_TOPICDESCRIPTOR']._serialized_start=1239 + _globals['_TOPICDESCRIPTOR']._serialized_end=1630 + _globals['_TOPICDESCRIPTOR_AUTHOPTS']._serialized_start=1372 + _globals['_TOPICDESCRIPTOR_AUTHOPTS']._serialized_end=1496 + _globals['_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE']._serialized_start=1458 + _globals['_TOPICDESCRIPTOR_AUTHOPTS_AUTHMODE']._serialized_end=1496 + _globals['_TOPICDESCRIPTOR_ENCOPTS']._serialized_start=1499 + _globals['_TOPICDESCRIPTOR_ENCOPTS']._serialized_end=1630 + _globals['_TOPICDESCRIPTOR_ENCOPTS_ENCMODE']._serialized_start=1587 + _globals['_TOPICDESCRIPTOR_ENCOPTS_ENCMODE']._serialized_end=1630 # @@protoc_insertion_point(module_scope) diff --git a/libp2p/pubsub/pb/rpc_pb2.pyi b/libp2p/pubsub/pb/rpc_pb2.pyi index 05fb281f2..956df5041 100644 --- a/libp2p/pubsub/pb/rpc_pb2.pyi +++ b/libp2p/pubsub/pb/rpc_pb2.pyi @@ -1,7 +1,11 @@ """ @generated by mypy-protobuf. Do not edit manually! isort:skip_file -Modified from https://github.com/libp2p/go-libp2p-pubsub/blob/master/pb/rpc.proto""" +Modified from https://github.com/libp2p/go-libp2p-pubsub/blob/master/pb/rpc.proto +Updated with GossipSub v1.3 Extensions Control Message support. +Spec: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.3.md +extensions.proto: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/extensions/extensions.proto +""" import builtins import collections.abc @@ -45,6 +49,7 @@ class RPC(google.protobuf.message.Message): PUBLISH_FIELD_NUMBER: builtins.int CONTROL_FIELD_NUMBER: builtins.int SENDERRECORD_FIELD_NUMBER: builtins.int + TESTEXTENSION_FIELD_NUMBER: builtins.int senderRecord: builtins.bytes @property def subscriptions(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___RPC.SubOpts]: ... @@ -52,6 +57,14 @@ class RPC(google.protobuf.message.Message): def publish(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___Message]: ... @property def control(self) -> global___ControlMessage: ... + @property + def testExtension(self) -> global___TestExtension: + """Canonical Extensions register their top-level RPC messages here. + + Experimental Extensions MUST use field numbers larger than 0x200000 + so they are encoded with at least 4 bytes (per GossipSub v1.3 spec). + """ + def __init__( self, *, @@ -59,9 +72,10 @@ class RPC(google.protobuf.message.Message): publish: collections.abc.Iterable[global___Message] | None = ..., control: global___ControlMessage | None = ..., senderRecord: builtins.bytes | None = ..., + testExtension: global___TestExtension | None = ..., ) -> None: ... - def HasField(self, field_name: typing.Literal["control", b"control", "senderRecord", b"senderRecord"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["control", b"control", "publish", b"publish", "senderRecord", b"senderRecord", "subscriptions", b"subscriptions"]) -> None: ... + def HasField(self, field_name: typing.Literal["control", b"control", "senderRecord", b"senderRecord", "testExtension", b"testExtension"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["control", b"control", "publish", b"publish", "senderRecord", b"senderRecord", "subscriptions", b"subscriptions", "testExtension", b"testExtension"]) -> None: ... global___RPC = RPC @@ -107,6 +121,8 @@ class ControlMessage(google.protobuf.message.Message): PRUNE_FIELD_NUMBER: builtins.int IDONTWANT_FIELD_NUMBER: builtins.int EXTENSIONS_FIELD_NUMBER: builtins.int + OBSERVE_FIELD_NUMBER: builtins.int + UNOBSERVE_FIELD_NUMBER: builtins.int @property def ihave(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlIHave]: ... @property @@ -118,7 +134,18 @@ class ControlMessage(google.protobuf.message.Message): @property def idontwant(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlIDontWant]: ... @property - def extensions(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlExtension]: ... + def extensions(self) -> global___ControlExtensions: + """GossipSub v1.3: Extensions control message (MUST be in first message, + MUST NOT be sent more than once per peer). + Spec: https://github.com/libp2p/specs/blob/master/pubsub/gossipsub/gossipsub-v1.3.md + """ + + @property + def observe(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlObserve]: + """Topic Observation extension control messages.""" + + @property + def unobserve(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[global___ControlUnobserve]: ... def __init__( self, *, @@ -127,30 +154,42 @@ class ControlMessage(google.protobuf.message.Message): graft: collections.abc.Iterable[global___ControlGraft] | None = ..., prune: collections.abc.Iterable[global___ControlPrune] | None = ..., idontwant: collections.abc.Iterable[global___ControlIDontWant] | None = ..., - extensions: collections.abc.Iterable[global___ControlExtension] | None = ..., + extensions: global___ControlExtensions | None = ..., + observe: collections.abc.Iterable[global___ControlObserve] | None = ..., + unobserve: collections.abc.Iterable[global___ControlUnobserve] | None = ..., ) -> None: ... - def ClearField(self, field_name: typing.Literal["extensions", b"extensions", "graft", b"graft", "idontwant", b"idontwant", "ihave", b"ihave", "iwant", b"iwant", "prune", b"prune"]) -> None: ... + def HasField(self, field_name: typing.Literal["extensions", b"extensions"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["extensions", b"extensions", "graft", b"graft", "idontwant", b"idontwant", "ihave", b"ihave", "iwant", b"iwant", "observe", b"observe", "prune", b"prune", "unobserve", b"unobserve"]) -> None: ... global___ControlMessage = ControlMessage @typing.final -class ControlExtension(google.protobuf.message.Message): +class ControlExtensions(google.protobuf.message.Message): + """ControlExtensions advertises which v1.3 extensions the sending peer supports. + Peers MUST ignore unknown fields (forward-compatible per spec). + Field numbers for experimental extensions MUST be > 0x200000. + """ + DESCRIPTOR: google.protobuf.descriptor.Descriptor - NAME_FIELD_NUMBER: builtins.int - DATA_FIELD_NUMBER: builtins.int - name: builtins.str - data: builtins.bytes + TOPICOBSERVATION_FIELD_NUMBER: builtins.int + TESTEXTENSION_FIELD_NUMBER: builtins.int + topicObservation: builtins.bool + """Set to true if the peer supports the Topic Observation extension.""" + testExtension: builtins.bool + """Experimental extensions use field numbers > 0x200000. + testExtension: used for cross-implementation interop testing (go-libp2p compat). + """ def __init__( self, *, - name: builtins.str | None = ..., - data: builtins.bytes | None = ..., + topicObservation: builtins.bool | None = ..., + testExtension: builtins.bool | None = ..., ) -> None: ... - def HasField(self, field_name: typing.Literal["data", b"data", "name", b"name"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["data", b"data", "name", b"name"]) -> None: ... + def HasField(self, field_name: typing.Literal["testExtension", b"testExtension", "topicObservation", b"topicObservation"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["testExtension", b"testExtension", "topicObservation", b"topicObservation"]) -> None: ... -global___ControlExtension = ControlExtension +global___ControlExtensions = ControlExtensions @typing.final class ControlIHave(google.protobuf.message.Message): @@ -243,6 +282,62 @@ class ControlIDontWant(google.protobuf.message.Message): global___ControlIDontWant = ControlIDontWant +@typing.final +class ControlObserve(google.protobuf.message.Message): + """ControlObserve: Topic Observation extension. + Sent by an observer to start receiving IHAVE notifications for a topic + without being a full subscriber. (GossipSub v1.3 Topic Observation extension) + """ + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + TOPICID_FIELD_NUMBER: builtins.int + topicID: builtins.str + def __init__( + self, + *, + topicID: builtins.str | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["topicID", b"topicID"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["topicID", b"topicID"]) -> None: ... + +global___ControlObserve = ControlObserve + +@typing.final +class ControlUnobserve(google.protobuf.message.Message): + """ControlUnobserve: Topic Observation extension. + Sent by an observer to stop receiving IHAVE notifications for a topic. + """ + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + TOPICID_FIELD_NUMBER: builtins.int + topicID: builtins.str + def __init__( + self, + *, + topicID: builtins.str | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["topicID", b"topicID"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["topicID", b"topicID"]) -> None: ... + +global___ControlUnobserve = ControlUnobserve + +@typing.final +class TestExtension(google.protobuf.message.Message): + """TestExtension: used for interoperability testing of the v1.3 extension + mechanism between implementations (go-libp2p, rust-libp2p, py-libp2p). + An empty message — its presence on the wire is the signal. + """ + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + def __init__( + self, + ) -> None: ... + +global___TestExtension = TestExtension + @typing.final class PeerInfo(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor diff --git a/libp2p/pubsub/pubsub.py b/libp2p/pubsub/pubsub.py index 57197d486..c4fd230e9 100644 --- a/libp2p/pubsub/pubsub.py +++ b/libp2p/pubsub/pubsub.py @@ -14,6 +14,7 @@ import time from typing import ( NamedTuple, + Protocol, cast, ) @@ -36,6 +37,7 @@ TProtocol, ValidatorFn, ) +from libp2p.encoding_config import get_default_encoding from libp2p.exceptions import ( ParseError, ValidationError, @@ -59,6 +61,9 @@ PeerDataError, ) from libp2p.peer.peerstore import env_to_send_in_RPC +from libp2p.pubsub.extensions import ( + ExtensionsState, +) from libp2p.pubsub.utils import maybe_consume_signed_record from libp2p.tools.anyio_service import ( Service, @@ -90,6 +95,23 @@ signature_validator, ) +# GossipSub v1.3+ protocol IDs. Extensions Control Message is only sent when +# negotiating one of these protocols (per spec: extensions in first message). +_MESHSUB_V13_PLUS = frozenset( + ( + TProtocol("/meshsub/1.3.0"), + TProtocol("/meshsub/1.4.0"), + TProtocol("/meshsub/2.0.0"), + ) +) + + +class _RouterWithExtensions(Protocol): + """Protocol for a router that supports GossipSub v1.3 extensions.""" + + extensions_state: ExtensionsState + + # Ref: https://github.com/libp2p/go-libp2p-pubsub/blob/40e1c94708658b155f30cf99e4574f384756d83c/topic.go#L97 # noqa: E501 SUBSCRIPTION_CHANNEL_SIZE = 32 _ANNOUNCE_RETRY_MIN_DELAY_MS = 1 @@ -115,8 +137,6 @@ def get_content_addressed_msg_id( from :mod:`libp2p.encoding_config` is used. :return: Multibase-encoded message ID """ - from libp2p.encoding_config import get_default_encoding - if encoding is None: encoding = get_default_encoding() digest = hashlib.sha256(msg.data).digest() @@ -733,15 +753,35 @@ async def _handle_new_peer(self, peer_id: ID) -> None: logger.debug("fail to add new peer %s, error %s", peer_id, error) return - # Send hello packet + # Build hello packet. hello = self.get_hello_packet() + + # GossipSub v1.3 – Extensions Control Message injection. + # Per spec: "If a peer supports any extension, the Extensions control + # message MUST be included in the first message on the stream." + # Only inject when we negotiated v1.3+; peers on v1.1/v1.2 must not + # receive extension fields. + negotiated_protocol = stream.get_protocol() + router = self.router + if ( + negotiated_protocol in _MESHSUB_V13_PLUS + and hasattr(router, "extensions_state") + and hasattr(router, "supports_v13_features") + ): + # We pass the peer_id because extensions_state needs to track + # "sent_extensions" per peer for the at-most-once rule. + # cast() tells static type-checkers the narrowed type without + # creating a runtime dependency on gossipsub.py from pubsub.py. + v13_router = cast(_RouterWithExtensions, router) + hello = v13_router.extensions_state.build_hello_extensions(peer_id, hello) + try: await stream.write(encode_varint_prefixed(hello.SerializeToString())) except StreamClosed: logger.debug("Fail to add new peer %s: stream closed", peer_id) return try: - self.router.add_peer(peer_id, stream.get_protocol()) + self.router.add_peer(peer_id, negotiated_protocol) except Exception as error: logger.debug("fail to add new peer %s, error %s", peer_id, error) return diff --git a/libp2p/pubsub/rpc_queue.py b/libp2p/pubsub/rpc_queue.py index 32b5ef092..29a386573 100644 --- a/libp2p/pubsub/rpc_queue.py +++ b/libp2p/pubsub/rpc_queue.py @@ -367,15 +367,15 @@ def split_rpc(self, rpc: rpc_pb2.RPC) -> list[rpc_pb2.RPC]: ) ) - # EXTENSIONS - for ext in ctrl.extensions: - current.control.extensions.append(ext) + # EXTENSIONS (optional singular message) + if ctrl.HasField("extensions"): + current.control.extensions.CopyFrom(ctrl.extensions) if current.ByteSize() > limit: - del current.control.extensions[-1] + current.control.ClearField("extensions") out.append(current) current = rpc_pb2.RPC() current.control.SetInParent() - current.control.extensions.append(ext) + current.control.extensions.CopyFrom(ctrl.extensions) # ── Flush remaining ── if current.ByteSize() > 0: diff --git a/libp2p/pubsub/utils.py b/libp2p/pubsub/utils.py index 169d1cccf..78e5f2d1f 100644 --- a/libp2p/pubsub/utils.py +++ b/libp2p/pubsub/utils.py @@ -53,19 +53,20 @@ def maybe_consume_signed_record(msg: RPC, host: IHost, peer_id: ID) -> bool: return True -def parse_message_id_safe(msg_id_str: str) -> MessageID: - """Safely handle message ID as string.""" - return MessageID(msg_id_str) - - -def safe_bytes_from_hex(hex_str: str) -> bytes | None: +def safe_bytes_from_hex(s: str | bytes) -> bytes | None: """ - Decode a hex-encoded string to bytes, returning None on failure. - - Used for defensively parsing wire message IDs in IHAVE/IWANT handlers - so that malformed hex from peers does not crash the gossip handler task. + Safely decode a hex string to bytes. Returns None if invalid. + If input is already bytes, returns it as-is. Used for parsing wire + message IDs in IHAVE/IWANT so malformed hex from peers does not crash. """ + if isinstance(s, bytes): + return s try: - return bytes.fromhex(hex_str) - except ValueError: + return bytes.fromhex(s) + except (ValueError, TypeError): return None + + +def parse_message_id_safe(msg_id_str: str) -> MessageID: + """Safely handle message ID as string.""" + return MessageID(msg_id_str) diff --git a/newsfragments/1231.feature.rst b/newsfragments/1231.feature.rst new file mode 100644 index 000000000..14eea29ac --- /dev/null +++ b/newsfragments/1231.feature.rst @@ -0,0 +1,5 @@ +GossipSub v1.3 Extensions and Topic Observation support. + +- Added Extensions Control Message mechanism per GossipSub v1.3 spec; extensions are advertised in the first message on the stream and gated to v1.3+ protocol negotiation. +- Added Topic Observation extension: peers can observe topics without full subscription via ``start_observing_topic()`` and ``stop_observing_topic()``, receiving IHAVE notifications for presence awareness. +- Added test extension for cross-implementation interop (go-libp2p, rust-libp2p, py-libp2p). diff --git a/tests/core/pubsub/test_gossipsub_v14_extensions.py b/tests/core/pubsub/test_gossipsub_v14_extensions.py index 4c6360f6b..86bcc13b6 100644 --- a/tests/core/pubsub/test_gossipsub_v14_extensions.py +++ b/tests/core/pubsub/test_gossipsub_v14_extensions.py @@ -49,7 +49,15 @@ async def test_handler(data: bytes, sender_peer_id: ID): @pytest.mark.trio async def test_extension_message_handling(): - """Test extension message handling between peers.""" + """ + Test extension handler registration and that emit_extension completes. + + In GossipSub v1.3 wire format, extension data is only sent in the first + hello (control.extensions); there is no wire format for arbitrary + extension name/data. So emit_extension is a no-op and handlers are not + invoked for custom data. This test verifies registration and that + emit_extension completes without raising. + """ received_extensions = [] async def extension_handler(data: bytes, sender_peer_id: ID): @@ -73,17 +81,17 @@ async def extension_handler(data: bytes, sender_peer_id: ID): # Get peer IDs peer1_id = pubsubs[1].host.get_id() - # Send extension message from router0 to router1 + # emit_extension is a no-op (v1.3 does not send arbitrary extension + # name/data); it should complete without raising. test_data = b"test extension data" await router0.emit_extension("test-ext", test_data, peer1_id) # Wait for message processing await trio.sleep(0.5) - # Verify extension was received and handled - assert len(received_extensions) == 1 - assert received_extensions[0][0] == test_data - assert received_extensions[0][1] == pubsubs[0].host.get_id() + # No extension data is delivered over the wire (only hello carries + # control.extensions), so the handler is never called. + assert len(received_extensions) == 0 @pytest.mark.trio @@ -110,7 +118,13 @@ async def test_extension_message_to_unsupported_peer(): @pytest.mark.trio async def test_extension_handler_error_handling(): - """Test error handling in extension handlers.""" + """ + Test that emit_extension completes when a failing handler is registered. + + emit_extension is a no-op (v1.3 does not send arbitrary extension data), + so the handler is never invoked. We verify emit_extension does not raise + and the handler is not called. + """ error_count = [0] # Use list to make it mutable async def failing_handler(data: bytes, sender_peer_id: ID): @@ -132,15 +146,15 @@ async def failing_handler(data: bytes, sender_peer_id: ID): await connect(pubsubs[0].host, pubsubs[1].host) await trio.sleep(0.5) - # Send extension message + # emit_extension is a no-op; should complete without raising peer1_id = pubsubs[1].host.get_id() await router0.emit_extension("failing-ext", b"data", peer1_id) # Wait for processing await trio.sleep(0.5) - # Verify handler was called but error was caught - assert error_count[0] == 1 + # Handler is never called (no extension data is sent over the wire) + assert error_count[0] == 0 @pytest.mark.trio @@ -168,14 +182,9 @@ async def test_unregistered_extension_handling(): @pytest.mark.trio async def test_extension_message_from_unsupported_peer(): - """Test receiving extension message from peer that doesn't support extensions.""" + """Extension data from a v1.1 peer is not processed; only v1.3+ are handled.""" from libp2p.pubsub.gossipsub import PROTOCOL_ID_V11 - received_count = [0] # Use list to make it mutable - - async def handler(data: bytes, sender_peer_id: ID): - received_count[0] += 1 - # Create one v1.4 peer and one v1.1 peer async with PubsubFactory.create_batch_with_gossipsub( 1, protocols=[PROTOCOL_ID_V14] @@ -184,40 +193,41 @@ async def handler(data: bytes, sender_peer_id: ID): 1, protocols=[PROTOCOL_ID_V11] ) as v11_pubsubs: v14_router = v14_pubsubs[0].router - v11_router = v11_pubsubs[0].router assert isinstance(v14_router, GossipSub) - assert isinstance(v11_router, GossipSub) - - # Register handler on v1.4 peer - v14_router.register_extension_handler("test-ext", handler) - # Connect peers await connect(v14_pubsubs[0].host, v11_pubsubs[0].host) await trio.sleep(0.5) - # Manually set the peer protocol mapping to simulate v1.1 peer + # Simulate v1.1 peer: router thinks this peer speaks v1.1 only v11_peer_id = v11_pubsubs[0].host.get_id() v14_router.peer_protocol[v11_peer_id] = PROTOCOL_ID_V11 - # Verify the peer is recognized as not supporting extensions assert not v14_router.supports_protocol_feature(v11_peer_id, "extensions") - # Create extension message from v1.1 peer - extension_msg = rpc_pb2.ControlExtension() - extension_msg.name = "test-ext" - extension_msg.data = b"test data" + # Build an RPC that would carry control.extensions (as a v1.3 peer would) + rpc = rpc_pb2.RPC() + rpc.control.CopyFrom(rpc_pb2.ControlMessage()) + rpc.control.extensions.CopyFrom( + rpc_pb2.ControlExtensions(topicObservation=True) + ) - # This should be ignored due to protocol version check - await v14_router.handle_extension(extension_msg, v11_peer_id) - await trio.sleep(0.5) + # handle_rpc must not process extensions for v1.1 peers + await v14_router.handle_rpc(rpc, v11_peer_id) - # Handler should not have been called - assert received_count[0] == 0 + # v1.1 peer's extensions must not be recorded (we skip extensions_state + # when sender does not support v1.3+) + assert v14_router.extensions_state.get_peer_extensions(v11_peer_id) is None @pytest.mark.trio async def test_multiple_extension_handlers(): - """Test multiple extension handlers on the same router.""" + """ + Test multiple extension handlers registered and emit_extension completes. + + emit_extension is a no-op (v1.3 does not send arbitrary extension data), + so handlers are never invoked. We verify both handlers can be registered + and emit_extension for each completes without raising. + """ received_messages = [] async def handler1(data: bytes, sender_peer_id: ID): @@ -242,7 +252,7 @@ async def handler2(data: bytes, sender_peer_id: ID): await connect(pubsubs[0].host, pubsubs[1].host) await trio.sleep(0.5) - # Send messages to different extensions + # emit_extension is a no-op for both; should complete without raising peer1_id = pubsubs[1].host.get_id() await router0.emit_extension("ext1", b"data1", peer1_id) await router0.emit_extension("ext2", b"data2", peer1_id) @@ -250,15 +260,19 @@ async def handler2(data: bytes, sender_peer_id: ID): # Wait for processing await trio.sleep(0.5) - # Verify both handlers were called - assert len(received_messages) == 2 - assert ("handler1", b"data1") in received_messages - assert ("handler2", b"data2") in received_messages + # No extension data is delivered, so neither handler is called + assert len(received_messages) == 0 @pytest.mark.trio async def test_extension_v13_compatibility(): - """Test extensions work with v1.3 protocol.""" + """ + Test extension registration and emit_extension with v1.3 protocol. + + Same as v1.4: emit_extension is a no-op (v1.3 wire format only sends + extensions in the first hello). We verify registration and that + emit_extension completes without raising. + """ received_extensions = [] async def extension_handler(data: bytes, sender_peer_id: ID): @@ -279,7 +293,7 @@ async def extension_handler(data: bytes, sender_peer_id: ID): await connect(pubsubs[0].host, pubsubs[1].host) await trio.sleep(0.5) - # Send extension message + # emit_extension is a no-op; should complete without raising peer1_id = pubsubs[1].host.get_id() test_data = b"v1.3 extension data" await router0.emit_extension("v13-ext", test_data, peer1_id) @@ -287,6 +301,5 @@ async def extension_handler(data: bytes, sender_peer_id: ID): # Wait for processing await trio.sleep(0.5) - # Verify extension was handled - assert len(received_extensions) == 1 - assert received_extensions[0][0] == test_data + # No extension data is delivered over the wire + assert len(received_extensions) == 0 diff --git a/tests/core/pubsub/test_gossipsub_v1_3_extensions.py b/tests/core/pubsub/test_gossipsub_v1_3_extensions.py new file mode 100644 index 000000000..79ff2ad2b --- /dev/null +++ b/tests/core/pubsub/test_gossipsub_v1_3_extensions.py @@ -0,0 +1,428 @@ +from unittest.mock import AsyncMock + +import pytest + +from libp2p.peer.id import ID +from libp2p.pubsub.extensions import ( + ExtensionsState, + PeerExtensions, + TopicObservationState, +) +from libp2p.pubsub.gossipsub import ( + PROTOCOL_ID_V13, + GossipSub, +) +from libp2p.pubsub.pb import rpc_pb2 +from libp2p.pubsub.score import ScoreParams +from libp2p.tools.constants import GOSSIPSUB_PARAMS +from tests.utils.factories import ( + GossipsubFactory, + IDFactory, +) + +# --------------------------------------------------------------------------- +# PeerExtensions tests +# --------------------------------------------------------------------------- + + +def test_peer_extensions_encode_decode_roundtrip() -> None: + """PeerExtensions should roundtrip via ControlExtensions.""" + ext = PeerExtensions(topic_observation=True, test_extension=True) + wire = ext.to_control_extensions() + + decoded = PeerExtensions.from_control_extensions(wire) + + assert decoded.topic_observation is True + assert decoded.test_extension is True + + +def test_peer_extensions_has_any() -> None: + """has_any() reflects whether at least one feature is enabled.""" + ext = PeerExtensions() + assert not ext.has_any() + + ext.topic_observation = True + assert ext.has_any() + + ext.topic_observation = False + ext.test_extension = True + assert ext.has_any() + + +# --------------------------------------------------------------------------- +# ExtensionsState tests +# --------------------------------------------------------------------------- + + +def _make_rpc_with_extensions( + *, topic_observation: bool = False, test_extension: bool = False +) -> rpc_pb2.RPC: + rpc = rpc_pb2.RPC() + control = rpc_pb2.ControlMessage() + wire = PeerExtensions( + topic_observation=topic_observation, + test_extension=test_extension, + ).to_control_extensions() + control.extensions.CopyFrom(wire) + rpc.control.CopyFrom(control) + return rpc + + +def test_build_hello_extensions_attaches_control_extensions() -> None: + """build_hello_extensions should attach ControlExtensions and mark peer as sent.""" + peer_id = IDFactory() + hello = rpc_pb2.RPC() + + state = ExtensionsState( + my_extensions=PeerExtensions(topic_observation=True, test_extension=True) + ) + + mutated = state.build_hello_extensions(peer_id, hello) + + assert mutated is hello + assert mutated.HasField("control") + assert mutated.control.HasField("extensions") + + ext = mutated.control.extensions + assert ext.topicObservation is True + assert ext.testExtension is True + assert state.sent_extensions_to(peer_id) is True + + +def test_build_hello_extensions_marks_sent_even_without_features() -> None: + """Even with no local extensions, sent_extensions should be tracked.""" + peer_id = IDFactory() + hello = rpc_pb2.RPC() + + state = ExtensionsState(my_extensions=PeerExtensions()) + + mutated = state.build_hello_extensions(peer_id, hello) + + assert mutated is hello + # No control.extensions should be present when we advertise nothing. + assert not mutated.HasField("control") + assert state.sent_extensions_to(peer_id) is True + + +def test_handle_rpc_records_peer_extensions_on_first_message() -> None: + """First RPC with extensions should record peer's advertised extensions.""" + peer_id = IDFactory() + state = ExtensionsState(my_extensions=PeerExtensions()) + + rpc = _make_rpc_with_extensions(topic_observation=True, test_extension=False) + + state.handle_rpc(rpc, peer_id) + + peer_ext = state.get_peer_extensions(peer_id) + assert isinstance(peer_ext, PeerExtensions) + assert peer_ext.topic_observation is True + assert peer_ext.test_extension is False + assert state.peer_supports_topic_observation(peer_id) is True + assert state.peer_supports_test_extension(peer_id) is False + + +def test_handle_rpc_duplicate_extensions_calls_misbehaviour_callback() -> None: + """Second RPC carrying extensions should trigger misbehaviour callback.""" + peer_id = IDFactory() + state = ExtensionsState(my_extensions=PeerExtensions()) + + calls: list[ID] = [] + + def report_misbehaviour(p: ID) -> None: + calls.append(p) + + state.set_report_misbehaviour(report_misbehaviour) + + rpc = _make_rpc_with_extensions(topic_observation=True) + + # First RPC: records extensions. + state.handle_rpc(rpc, peer_id) + assert calls == [] + + # Second RPC: duplicate extensions -> misbehaviour. + state.handle_rpc(rpc, peer_id) + assert calls == [peer_id] + + +def test_extensions_state_remove_peer_clears_state() -> None: + """remove_peer should clear both sent and received extension state.""" + peer_id = IDFactory() + state = ExtensionsState(my_extensions=PeerExtensions(topic_observation=True)) + rpc = _make_rpc_with_extensions(topic_observation=True) + + state.build_hello_extensions(peer_id, rpc_pb2.RPC()) + state.handle_rpc(rpc, peer_id) + + assert state.sent_extensions_to(peer_id) + assert state.get_peer_extensions(peer_id) is not None + + state.remove_peer(peer_id) + + assert not state.sent_extensions_to(peer_id) + assert state.get_peer_extensions(peer_id) is None + + +def test_both_support_topic_observation_query() -> None: + """both_support_topic_observation returns True only when both sides advertise it.""" + state = ExtensionsState(my_extensions=PeerExtensions(topic_observation=True)) + + # Peer that did not advertise topicObservation (we only accept first + # Extensions per peer). + peer_no = IDFactory() + rpc_no = _make_rpc_with_extensions(topic_observation=False) + state.handle_rpc(rpc_no, peer_no) + assert not state.both_support_topic_observation(peer_no) + + # Different peer that does advertise topicObservation. + peer_yes = IDFactory() + rpc_yes = _make_rpc_with_extensions(topic_observation=True) + state.handle_rpc(rpc_yes, peer_yes) + assert state.both_support_topic_observation(peer_yes) + + +def test_gossipsub_report_extensions_misbehaviour_penalizes_behavior() -> None: + """GossipSub._report_extensions_misbehaviour must call scorer.penalize_behavior.""" + score_params = ScoreParams( + p5_behavior_penalty_weight=2.0, + p5_behavior_penalty_threshold=0.0, + p5_behavior_penalty_decay=1.0, + ) + router = GossipSub( + protocols=[PROTOCOL_ID_V13], + degree=GOSSIPSUB_PARAMS.degree, + degree_low=GOSSIPSUB_PARAMS.degree_low, + degree_high=GOSSIPSUB_PARAMS.degree_high, + score_params=score_params, + ) + assert isinstance(router, GossipSub) + assert router.scorer is not None + + peer_id = IDFactory() + assert peer_id not in router.scorer.behavior_penalty + + router._report_extensions_misbehaviour(peer_id) + + # _report_extensions_misbehaviour must call scorer.penalize_behavior(peer_id, 1.0) + assert router.scorer.behavior_penalty[peer_id] == 1.0 + + +# --------------------------------------------------------------------------- +# TopicObservationState tests +# --------------------------------------------------------------------------- + + +def test_topic_observation_state_observing_and_observers() -> None: + """TopicObservationState should track observing topics and observers correctly.""" + state = TopicObservationState() + topic = "test-topic" + observer = IDFactory() + subscriber = IDFactory() + + # Outbound observing. + assert not state.is_observing(topic) + state.add_observing(topic, subscriber) + assert state.is_observing(topic) + assert state.get_subscriber_peers_for_topic(topic) == {subscriber} + + state.remove_observing(topic, subscriber) + assert not state.is_observing(topic) + assert state.get_subscriber_peers_for_topic(topic) == set() + + # Inbound observers. + assert state.get_observers(topic) == set() + state.add_observer(topic, observer) + assert state.get_observers(topic) == {observer} + + state.remove_observer(topic, observer) + assert state.get_observers(topic) == set() + + +def test_topic_observation_state_remove_peer_clears_all_state() -> None: + """remove_peer should drop a peer from both observing and observer maps.""" + state = TopicObservationState() + topic1 = "topic-1" + topic2 = "topic-2" + peer = IDFactory() + + state.add_observing(topic1, peer) + state.add_observer(topic2, peer) + + state.remove_peer(peer) + + assert state.get_observing_topics() == set() + assert state.get_observers(topic1) == set() + assert state.get_observers(topic2) == set() + + +# --------------------------------------------------------------------------- +# GossipSub v1.3 wiring tests (topic observation + notify observers) +# --------------------------------------------------------------------------- + + +def test_supports_v13_features_based_on_protocol() -> None: + """supports_v13_features should be true only for v1.3+ peers.""" + router = GossipsubFactory() + assert isinstance(router, GossipSub) + + v13_peer = IDFactory() + v12_peer = IDFactory() + + router.add_peer(v13_peer, PROTOCOL_ID_V13) + # Reuse PROTOCOL_ID_V13 constant to ensure we don't regress the set of + # supported protocols in _get_in_topic_gossipsub_peers_from_minus. + from libp2p.pubsub.gossipsub import PROTOCOL_ID_V12 + + router.add_peer(v12_peer, PROTOCOL_ID_V12) + + assert router.supports_v13_features(v13_peer) is True + assert router.supports_v13_features(v12_peer) is False + + +@pytest.mark.trio +async def test_handle_observe_and_unobserve_manage_observers() -> None: + """handle_observe / handle_unobserve should add and remove observers.""" + router = GossipsubFactory() + assert isinstance(router, GossipSub) + + topic = "obs-topic" + observer_peer = IDFactory() + + # Simulate that the peer advertised topicObservation support via extensions. + router.extensions_state._peer_extensions[observer_peer] = PeerExtensions( + topic_observation=True + ) + + observe_msg = rpc_pb2.ControlObserve(topicID=topic) + await router.handle_observe(observe_msg, observer_peer) + + assert observer_peer in router.topic_observation.get_observers(topic) + + unobserve_msg = rpc_pb2.ControlUnobserve(topicID=topic) + await router.handle_unobserve(unobserve_msg, observer_peer) + + assert observer_peer not in router.topic_observation.get_observers(topic) + + +@pytest.mark.trio +async def test_handle_observe_ignored_when_peer_did_not_advertise_extension() -> None: + """Peers that did not advertise topicObservation must not become observers.""" + router = GossipsubFactory() + assert isinstance(router, GossipSub) + + topic = "obs-topic" + observer_peer = IDFactory() + + # Peer exists, but its advertised extensions do NOT include topicObservation. + router.extensions_state._peer_extensions[observer_peer] = PeerExtensions( + topic_observation=False + ) + + observe_msg = rpc_pb2.ControlObserve(topicID=topic) + await router.handle_observe(observe_msg, observer_peer) + + assert router.topic_observation.get_observers(topic) == set() + + +@pytest.mark.trio +async def test_emit_observe_and_unobserve_update_observing_state() -> None: + """emit_observe / emit_unobserve should update TopicObservationState (outbound).""" + router = GossipsubFactory() + assert isinstance(router, GossipSub) + + topic = "obs-topic" + subscriber_peer = IDFactory() + + # Stub pubsub.peers so emit_control_message sees the peer as connected. + class DummyPubsub: + def __init__(self) -> None: + self.peers: dict[ID, object] = {subscriber_peer: object()} + + router.pubsub = DummyPubsub() # type: ignore[assignment] + + # Avoid writing to a real stream; we only care about state updates. + router.emit_control_message = AsyncMock() # type: ignore[assignment] + + assert not router.topic_observation.is_observing(topic) + + await router.emit_observe(topic, subscriber_peer) + assert router.topic_observation.is_observing(topic) + assert router.topic_observation.get_subscriber_peers_for_topic(topic) == { + subscriber_peer + } + + await router.emit_unobserve(topic, subscriber_peer) + assert not router.topic_observation.is_observing(topic) + assert router.topic_observation.get_subscriber_peers_for_topic(topic) == set() + + +@pytest.mark.trio +async def test_notify_observers_sends_ihave_to_each_observer() -> None: + """_notify_observers should call emit_ihave for each observer with the msg_id.""" + router = GossipsubFactory() + assert isinstance(router, GossipSub) + + topic = "obs-topic" + observer_peer = IDFactory() + msg_id = b"message-id" + + # Configure TopicObservationState with a single observer. + router.topic_observation.add_observer(topic, observer_peer) + + # Stub pubsub with peers map so observers are considered connected. + class DummyPubsub: + def __init__(self) -> None: + self.peers: dict[ID, object] = {observer_peer: object()} + + router.pubsub = DummyPubsub() # type: ignore[assignment] + + # Capture IHAVE emissions. + router.emit_ihave = AsyncMock() # type: ignore[assignment] + + await router._notify_observers([topic], msg_id) + + # emit_ihave(topic, [msg_id.hex()], observer_peer) is expected. + router.emit_ihave.assert_awaited_once() + called_topic, called_msg_ids, called_peer = router.emit_ihave.call_args.args + assert called_topic == topic + assert called_peer == observer_peer + assert called_msg_ids == [msg_id.hex()] + + +@pytest.mark.trio +async def test_start_and_stop_observing_topic_high_level_api() -> None: + """start_observing_topic / stop_observing_topic delegate to OBSERVE/UNOBSERVE.""" + router = GossipsubFactory() + assert isinstance(router, GossipSub) + + topic = "obs-topic" + subscriber_peer = IDFactory() + + # Simulate pubsub state: subscriber_peer is subscribed to topic. + class DummyPubsub: + def __init__(self) -> None: + self.peer_topics = {topic: {subscriber_peer}} + + router.pubsub = DummyPubsub() # type: ignore[assignment] + + # Ensure the peer negotiated v1.3 and both sides support topicObservation. + router.peer_protocol[subscriber_peer] = PROTOCOL_ID_V13 + router.extensions_state.my_extensions.topic_observation = True + router.extensions_state._peer_extensions[subscriber_peer] = PeerExtensions( + topic_observation=True + ) + + # Avoid touching real network; just record control messages. + router.emit_control_message = AsyncMock() # type: ignore[assignment] + + assert not router.topic_observation.is_observing(topic) + + await router.start_observing_topic(topic) + # After OBSERVE, we should be tracking the topic as "observing". + assert router.topic_observation.is_observing(topic) + assert router.topic_observation.get_subscriber_peers_for_topic(topic) == { + subscriber_peer + } + + await router.stop_observing_topic(topic) + assert not router.topic_observation.is_observing(topic) + assert router.topic_observation.get_subscriber_peers_for_topic(topic) == set() diff --git a/tests/core/pubsub/test_rpc_queue.py b/tests/core/pubsub/test_rpc_queue.py index 45123eb84..3e502b44e 100644 --- a/tests/core/pubsub/test_rpc_queue.py +++ b/tests/core/pubsub/test_rpc_queue.py @@ -510,32 +510,36 @@ def test_propagate_on_empty_out(self) -> None: class TestSplitRpcExtensions: def test_fast_path(self) -> None: rpc = rpc_pb2.RPC() - ext = rpc.control.extensions.add() - ext.name = "test-ext" - ext.data = b"ext-data" + rpc.control.extensions.topicObservation = True parts = RpcQueue(max_message_size=10000).split_rpc(rpc) assert len(parts) == 1 - assert parts[0].control.extensions[0].name == "test-ext" + assert parts[0].control.HasField("extensions") + assert parts[0].control.extensions.topicObservation is True def test_slow_path_split(self) -> None: rpc = rpc_pb2.RPC() for i in range(20): rpc.control.graft.add().topicID = f"topic-{i}" * 10 - for i in range(10): - ext = rpc.control.extensions.add() - ext.name = f"ext-{i}" - ext.data = b"x" * 50 + rpc.control.extensions.topicObservation = True parts = RpcQueue(max_message_size=100).split_rpc(rpc) - all_exts = [ - e for p in parts if p.HasField("control") for e in p.control.extensions - ] - assert len(all_exts) == 10 + assert len(parts) > 1 + ext_count = sum( + 1 + for p in parts + if p.HasField("control") and p.control.HasField("extensions") + ) + assert ext_count == 1 + assert any( + p.control.extensions.topicObservation + for p in parts + if p.HasField("control") and p.control.HasField("extensions") + ) def test_extension_only_not_filtered(self) -> None: rpc = rpc_pb2.RPC() - rpc.control.extensions.add().name = "only" + rpc.control.extensions.topicObservation = True assert _rpc_has_data(rpc) is True parts = RpcQueue(max_message_size=10000).split_rpc(rpc)