From 51031b578db7999d05c68c8bf61c6f84cb4f90a5 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Thu, 8 May 2025 19:03:40 -0700 Subject: [PATCH] feature: client api for taps --- synapse/client/protobuf_helpers.py | 26 ++++ synapse/client/taps.py | 172 ++++++++++++++++++++++++++ synapse/examples/tap_example.py | 187 +++++++++++++++++++++++++++++ 3 files changed, 385 insertions(+) create mode 100644 synapse/client/protobuf_helpers.py create mode 100644 synapse/client/taps.py create mode 100644 synapse/examples/tap_example.py diff --git a/synapse/client/protobuf_helpers.py b/synapse/client/protobuf_helpers.py new file mode 100644 index 00000000..a0eec1c7 --- /dev/null +++ b/synapse/client/protobuf_helpers.py @@ -0,0 +1,26 @@ +from typing import Optional, Type, TypeVar +from google.protobuf.message import Message + +# Generic type for protobuf messages +T = TypeVar("T", bound=Message) + + +def parse_protobuf(data: bytes, message_type: Type[T]) -> Optional[T]: + """Parse raw bytes into a protobuf message of the specified type. + + Args: + data (bytes): Raw binary data. + message_type (Type[T]): The protobuf message class to use for parsing. + + Returns: + Optional[T]: The parsed protobuf message, or None if parsing failed. + """ + if not data: + return None + + try: + message = message_type() + message.ParseFromString(data) + return message + except Exception: + return None diff --git a/synapse/client/taps.py b/synapse/client/taps.py new file mode 100644 index 00000000..875a1aba --- /dev/null +++ b/synapse/client/taps.py @@ -0,0 +1,172 @@ +import logging +import zmq +from typing import Optional, Generator + +from synapse.api.query_pb2 import QueryRequest +from synapse.api.status_pb2 import StatusCode + + +class Tap(object): + def __init__(self, uri, verbose=False): + """Initialize a Tap client to connect to the Synapse device. + + Args: + uri (str): The URI of the Synapse device. + verbose (bool, optional): Whether to enable verbose logging. Defaults to False. + """ + self.uri = uri + self.verbose = verbose + self.logger = logging.getLogger(__name__) + self.logger.setLevel(logging.DEBUG if verbose else logging.INFO) + + # ZMQ context (will be initialized upon connection) + self.zmq_context = None + self.zmq_socket = None + self.connected_tap = None + + def list_taps(self): + """List all available taps on the device. + + Returns: + list: List of TapConnection objects. + """ + from synapse.client.device import Device + + device = Device(self.uri, self.verbose) + + request = QueryRequest() + request.query_type = QueryRequest.kListTaps + request.list_taps_query.SetInParent() + + response = device.query(request) + + if not response or response.status.code != StatusCode.kOk: + self.logger.error( + f"Failed to list taps: {response.status.message if response else 'No response'}" + ) + return [] + + return response.list_taps_response.taps + + def connect(self, name: str) -> bool: + """Connect to a specific tap by name. + + Args: + name (str): The name of the tap to connect to. + + Returns: + bool: True if connected successfully, False otherwise. + """ + taps = self.list_taps() + + # Find the tap with the specified name + selected_tap = None + for tap in taps: + if tap.name == name: + selected_tap = tap + break + + if not selected_tap: + self.logger.error(f"Tap '{name}' not found") + return False + + # Store the connected tap + self.connected_tap = selected_tap + + # Initialize ZMQ context and socket + self.zmq_context = zmq.Context() + self.zmq_socket = self.zmq_context.socket(zmq.SUB) + + # Replace the endpoint with our device URI if needed + endpoint = selected_tap.endpoint + if "://" in endpoint: + # Extract the protocol and port + protocol, address = endpoint.split("://") + _, port = address.split(":") + + # Use the device URI with the same port + endpoint = f"{protocol}://{self.uri.split(':')[0]}:{port}" + + try: + print(f"Connecting to tap '{name}' at {endpoint}") + self.zmq_socket.connect(endpoint) + self.zmq_socket.setsockopt(zmq.SUBSCRIBE, b"") # Subscribe to all messages + return True + except zmq.ZMQError as e: + self.logger.error(f"Failed to connect to tap: {e}") + self._cleanup() + return False + + def read(self, timeout_ms: int = 1000) -> Optional[bytes]: + """Read raw data from the tap with timeout. + + Args: + timeout_ms (int, optional): Timeout in milliseconds. Defaults to 1000. + + Returns: + Optional[bytes]: Raw message data or None if timeout/error. + """ + if not self.zmq_socket: + self.logger.error("Not connected to any tap") + return None + + try: + # Set socket timeout + self.zmq_socket.setsockopt(zmq.RCVTIMEO, timeout_ms) + + # Receive data (will timeout if no data available) + return self.zmq_socket.recv() + except zmq.Again: + # Timeout occurred + return None + except zmq.ZMQError as e: + self.logger.error(f"Error receiving message: {e}") + return None + + def stream(self, timeout_ms: int = 1000) -> Generator[bytes, None, None]: + """Stream raw data from the tap. + + Args: + timeout_ms (int, optional): Timeout between messages in milliseconds. Defaults to 1000. + + Yields: + Generator[bytes, None, None]: Stream of raw message data. + """ + if not self.zmq_socket: + self.logger.error("Not connected to any tap") + return + + # Set socket timeout + self.zmq_socket.setsockopt(zmq.RCVTIMEO, timeout_ms) + + try: + while True: + try: + data = self.zmq_socket.recv() + yield data + except zmq.Again: + # Timeout occurred, continue to next iteration + continue + except KeyboardInterrupt: + self.logger.info("Stream interrupted") + except zmq.ZMQError as e: + self.logger.error(f"Error streaming messages: {e}") + finally: + # Don't close the socket here, let the user call disconnect() + pass + + def disconnect(self): + """Disconnect from the tap.""" + self._cleanup() + + def _cleanup(self): + """Clean up ZMQ resources.""" + if self.zmq_socket: + self.zmq_socket.close() + self.zmq_socket = None + + if self.zmq_context: + self.zmq_context.term() + self.zmq_context = None + + self.connected_tap = None diff --git a/synapse/examples/tap_example.py b/synapse/examples/tap_example.py new file mode 100644 index 00000000..74921809 --- /dev/null +++ b/synapse/examples/tap_example.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 +import sys +import argparse +import binascii +import importlib +from typing import Type + +from synapse.client.taps import Tap +from synapse.client.protobuf_helpers import parse_protobuf +from google.protobuf.message import Message + + +def load_message_type(message_type_path: str) -> Type[Message]: + """Load a protobuf message type from its fully qualified path. + + Args: + message_type_path (str): The fully qualified path to the message type (e.g. 'synapse.api.datatype_pb2.BroadbandFrame') + + Returns: + Type[Message]: The message type class + """ + parts = message_type_path.split(".") + class_name = parts[-1] + module_name = ".".join(parts[:-1]) + + try: + module = importlib.import_module(module_name) + return getattr(module, class_name) + except (ImportError, AttributeError) as e: + raise ImportError(f"Could not load message type {message_type_path}: {e}") + + +def format_data(data): + """Format raw bytes for display.""" + if isinstance(data, bytes) or isinstance(data, bytearray): + # For binary data, show a hex summary + if len(data) > 100: + hex_data = binascii.hexlify(data[:50]).decode("ascii") + return f"Binary data ({len(data)} bytes): {hex_data}... [truncated]" + else: + hex_data = binascii.hexlify(data).decode("ascii") + return f"Binary data ({len(data)} bytes): {hex_data}" + else: + # For other types, use default string representation + return str(data) + + +def main(): + parser = argparse.ArgumentParser( + description="Example for using the Synapse Tap API" + ) + parser.add_argument( + "--uri", "-u", type=str, required=True, help="URI of the Synapse device" + ) + parser.add_argument("--name", "-n", type=str, help="Name of the tap to connect to") + parser.add_argument( + "--verbose", "-v", action="store_true", help="Enable verbose logging" + ) + parser.add_argument( + "--timeout", + "-t", + type=int, + default=5000, + help="Timeout in milliseconds (default: 5000)", + ) + parser.add_argument( + "--count", + "-c", + type=int, + default=None, + help="Number of messages to receive (default: infinite)", + ) + parser.add_argument( + "--message-type", + "-m", + type=str, + help="Fully qualified message type path (e.g. 'synapse.api.datatype_pb2.BroadbandFrame')", + ) + args = parser.parse_args() + + # Create a tap client + tap = Tap(args.uri, args.verbose) + + # List available taps + print("Available taps:") + taps = tap.list_taps() + + if not taps: + print("No taps found") + return 1 + + for i, tap_info in enumerate(taps): + print( + f" {i + 1}. {tap_info.name} - Type: {tap_info.message_type}, Endpoint: {tap_info.endpoint}" + ) + + # Determine which tap to connect to + tap_name = args.name + if not tap_name: + # If name not provided, ask the user to select one + try: + selection = int(input("\nSelect a tap by number: ")) + if 1 <= selection <= len(taps): + tap_name = taps[selection - 1].name + else: + print(f"Invalid selection: {selection}") + return 1 + except ValueError: + print("Invalid input. Please enter a number.") + return 1 + + # Load message type if provided + message_type_class = None + if args.message_type: + try: + message_type_class = load_message_type(args.message_type) + print(f"Using message type: {message_type_class.__name__}") + except ImportError as e: + print(f"Warning: {e}") + print("Will use raw data instead") + + # Connect to the selected tap + print(f"\nConnecting to tap: {tap_name}") + if not tap.connect(tap_name): + print(f"Failed to connect to tap: {tap_name}") + return 1 + + print("Connected successfully!") + + try: + if args.count: + # Receive a specific number of messages + print(f"\nReceiving {args.count} messages (timeout: {args.timeout}ms):") + for i in range(args.count): + raw_data = tap.read(args.timeout) + if raw_data: + print(f"Message {i + 1}:") + + # Parse data if message type was provided + if message_type_class: + parsed = parse_protobuf(raw_data, message_type_class) + if parsed: + print(f"Parsed message: {parsed}") + else: + print("Failed to parse as protobuf message") + print(f"Raw data: {format_data(raw_data)}") + else: + print(f"Raw data: {format_data(raw_data)}") + + print() + else: + print(f"Timeout waiting for message {i + 1}") + else: + # Stream messages until interrupted + print("\nStreaming messages (Ctrl+C to stop):") + count = 0 + for raw_data in tap.stream(args.timeout): + count += 1 + print(f"Message {count}:") + + # Parse data if message type was provided + if message_type_class: + parsed = parse_protobuf(raw_data, message_type_class) + if parsed: + print(f"Parsed message: {parsed}") + else: + print("Failed to parse as protobuf message") + print(f"Raw data: {format_data(raw_data)}") + else: + print(f"Raw data: {format_data(raw_data)}") + + print() + + except KeyboardInterrupt: + print("\nStream interrupted by user") + + finally: + # Clean up + print("\nDisconnecting...") + tap.disconnect() + print("Done") + + return 0 + + +if __name__ == "__main__": + sys.exit(main())