From b8d15b738bea794d0a9c64d90b2a963ab3a49e5a Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Wed, 7 May 2025 21:28:57 -0700 Subject: [PATCH 1/6] Update to using taps --- synapse-api | 2 +- synapse/cli/rpc.py | 19 +++++++++ synapse/cli/tap_listen.py | 89 +++++++++++++++++++++++++++++++++++++++ synapse/client/device.py | 9 ++++ 4 files changed, 118 insertions(+), 1 deletion(-) create mode 100644 synapse/cli/tap_listen.py diff --git a/synapse-api b/synapse-api index 8dbfdb1..25c382e 160000 --- a/synapse-api +++ b/synapse-api @@ -1 +1 @@ -Subproject commit 8dbfdb126a833161926d0bc47a6d888859901539 +Subproject commit 25c382e40fdec56f15bbac783b31c2bd189b2983 diff --git a/synapse/cli/rpc.py b/synapse/cli/rpc.py index f955c5d..0d2fc20 100644 --- a/synapse/cli/rpc.py +++ b/synapse/cli/rpc.py @@ -13,6 +13,7 @@ from rich.console import Console from rich.pretty import pprint +from rich.table import Table from synapse.cli.query import StreamingQueryClient from synapse.utils.log import log_entry_to_str @@ -39,6 +40,9 @@ def add_commands(subparsers): e.add_argument("config_file", type=str) e.set_defaults(func=configure) + e = subparsers.add_parser("taps", help="List available taps") + e.set_defaults(func=list_taps) + f = subparsers.add_parser("logs", help="Get logs from the device") f.add_argument("--output", "-o", type=str, help="Optional file to write logs to") f.add_argument( @@ -81,6 +85,21 @@ def add_commands(subparsers): f.set_defaults(func=get_logs) +def list_taps(args): + console = Console() + taps = syn.Device(args.uri, args.verbose).list_taps() + + table = Table(title="Available Taps") + table.add_column("Name", style="cyan") + table.add_column("Message Type", style="green") + table.add_column("Endpoint", style="green") + + for tap in taps.taps: + table.add_row(tap.name, tap.message_type, tap.endpoint) + + console.print(table) + + def info(args): console = Console() with console.status("Getting device information...", spinner="bouncingBall"): diff --git a/synapse/cli/tap_listen.py b/synapse/cli/tap_listen.py new file mode 100644 index 0000000..25f3ef8 --- /dev/null +++ b/synapse/cli/tap_listen.py @@ -0,0 +1,89 @@ +import zmq +import socket +import threading +import time +import argparse +import binascii + + +def listen_zmq(endpoint, topic=""): + """Listen for ZMQ messages""" + context = zmq.Context() + subscriber = context.socket(zmq.SUB) + subscriber.connect(endpoint) + + # Set subscription filter (empty = all messages) + subscriber.setsockopt_string(zmq.SUBSCRIBE, topic) + + print(f"ZMQ Subscriber connected to {endpoint} with topic '{topic or 'ALL'}'") + + while True: + try: + # If multi-part message with topic + if topic: + # Get the topic first + topic_msg = subscriber.recv() + # Then get the data + message = subscriber.recv() + else: + # Single message + message = subscriber.recv() + + print(f"Got ZMQ data: {len(message)} bytes") + # Print first 20 bytes as hex for debugging + print(f" Data preview: {binascii.hexlify(message[:20]).decode()}") + except Exception as e: + print(f"ZMQ Error: {e}") + time.sleep(1) + + +def listen_udp(ip, port): + """Listen for UDP messages""" + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.bind((ip, port)) + + print(f"UDP Listener bound to {ip}:{port}") + + while True: + try: + data, addr = sock.recvfrom(8192) # Buffer size is 8KB + print(f"Got UDP data from {addr}: {len(data)} bytes") + # Print first 20 bytes as hex for debugging + print(f" Data preview: {binascii.hexlify(data[:20]).decode()}") + except Exception as e: + print(f"UDP Error: {e}") + time.sleep(1) + + +def main(): + parser = argparse.ArgumentParser(description="Listen to ZMQ and UDP taps") + + # ZMQ options + parser.add_argument( + "--port", + type=str, + default="tcp://10.40.62.57", + help="ZMQ endpoint to connect to", + ) + + args = parser.parse_args() + + # Start ZMQ listener thread + connection = f"tcp://10.40.62.57:{args.port}" + zmq_topic = "" + zmq_thread = threading.Thread( + target=listen_zmq, args=(connection, zmq_topic), daemon=True + ) + zmq_thread.start() + + # Keep main thread alive + try: + print("Listening for messages (press Ctrl+C to exit)...") + while True: + time.sleep(1) + except KeyboardInterrupt: + print("Shutting down...") + + +if __name__ == "__main__": + main() diff --git a/synapse/client/device.py b/synapse/client/device.py index 3b292e2..20911b8 100644 --- a/synapse/client/device.py +++ b/synapse/client/device.py @@ -16,6 +16,7 @@ from synapse.api.synapse_pb2_grpc import SynapseDeviceStub from synapse.client.config import Config from synapse.utils.log import log_level_to_pb +from synapse.api.tap_pb2 import ListTapsRequest DEFAULT_SYNAPSE_PORT = 647 @@ -185,6 +186,14 @@ def stream_query( self.logger.error(f"Error during StreamQuery: {str(e)}") yield StreamQueryResponse(code=StatusCode.kQueryFailed) + def list_taps(self) -> ListTapResponse: + try: + request = ListTapsRequest() + return self.rpc.ListTaps(request) + except grpc.RpcError as e: + self.logger.error("Error: %s", e.details) + return None + def _handle_status_response(self, status): if status.code != StatusCode.kOk: self.logger.error("Error %d: %s", status.code, status.message) From 112aba43e8b4b51a2d251197d347373ba4b1bd11 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Thu, 8 May 2025 16:15:36 -0700 Subject: [PATCH 2/6] Updated to use the new taps api --- .gitignore | 1 + synapse-api | 2 +- synapse/cli/rpc.py | 12 ++++++++---- synapse/client/device.py | 9 --------- 4 files changed, 10 insertions(+), 14 deletions(-) diff --git a/.gitignore b/.gitignore index dbe68da..da7ebd1 100644 --- a/.gitignore +++ b/.gitignore @@ -184,3 +184,4 @@ output_*.json *.jsonl .scienv synapse_data* +.synapse_deploy_cache.json diff --git a/synapse-api b/synapse-api index 25c382e..c3d730e 160000 --- a/synapse-api +++ b/synapse-api @@ -1 +1 @@ -Subproject commit 25c382e40fdec56f15bbac783b31c2bd189b2983 +Subproject commit c3d730e6fac727b1ce2d3eef66202f185d736a37 diff --git a/synapse/cli/rpc.py b/synapse/cli/rpc.py index 0d2fc20..9b297e1 100644 --- a/synapse/cli/rpc.py +++ b/synapse/cli/rpc.py @@ -87,14 +87,18 @@ def add_commands(subparsers): def list_taps(args): console = Console() - taps = syn.Device(args.uri, args.verbose).list_taps() - table = Table(title="Available Taps") + request = QueryRequest() + request.query_type = QueryRequest.kListTaps + request.list_taps_query.SetInParent() + response = syn.Device(args.uri, args.verbose).query(request) + + table = Table(title="Available Taps", show_lines=True) table.add_column("Name", style="cyan") table.add_column("Message Type", style="green") - table.add_column("Endpoint", style="green") + table.add_column("Endpoint (will be abstracted)", style="green") - for tap in taps.taps: + for tap in response.list_taps_response.taps: table.add_row(tap.name, tap.message_type, tap.endpoint) console.print(table) diff --git a/synapse/client/device.py b/synapse/client/device.py index 20911b8..3b292e2 100644 --- a/synapse/client/device.py +++ b/synapse/client/device.py @@ -16,7 +16,6 @@ from synapse.api.synapse_pb2_grpc import SynapseDeviceStub from synapse.client.config import Config from synapse.utils.log import log_level_to_pb -from synapse.api.tap_pb2 import ListTapsRequest DEFAULT_SYNAPSE_PORT = 647 @@ -186,14 +185,6 @@ def stream_query( self.logger.error(f"Error during StreamQuery: {str(e)}") yield StreamQueryResponse(code=StatusCode.kQueryFailed) - def list_taps(self) -> ListTapResponse: - try: - request = ListTapsRequest() - return self.rpc.ListTaps(request) - except grpc.RpcError as e: - self.logger.error("Error: %s", e.details) - return None - def _handle_status_response(self, status): if status.code != StatusCode.kOk: self.logger.error("Error %d: %s", status.code, status.message) From 5ec89c1de7d82120644acf855c61b7a3225a83ce Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Thu, 8 May 2025 16:59:22 -0700 Subject: [PATCH 3/6] refactor --- synapse/cli/rpc.py | 38 ++++++++--------- synapse/cli/tap_listen.py | 89 --------------------------------------- 2 files changed, 19 insertions(+), 108 deletions(-) delete mode 100644 synapse/cli/tap_listen.py diff --git a/synapse/cli/rpc.py b/synapse/cli/rpc.py index 9b297e1..b298b8a 100644 --- a/synapse/cli/rpc.py +++ b/synapse/cli/rpc.py @@ -85,25 +85,6 @@ def add_commands(subparsers): f.set_defaults(func=get_logs) -def list_taps(args): - console = Console() - - request = QueryRequest() - request.query_type = QueryRequest.kListTaps - request.list_taps_query.SetInParent() - response = syn.Device(args.uri, args.verbose).query(request) - - table = Table(title="Available Taps", show_lines=True) - table.add_column("Name", style="cyan") - table.add_column("Message Type", style="green") - table.add_column("Endpoint (will be abstracted)", style="green") - - for tap in response.list_taps_response.taps: - table.add_row(tap.name, tap.message_type, tap.endpoint) - - console.print(table) - - def info(args): console = Console() with console.status("Getting device information...", spinner="bouncingBall"): @@ -267,3 +248,22 @@ def parse_datetime(time_str: Optional[str]) -> Optional[datetime]: finally: if output_file: output_file.close() + + +def list_taps(args): + console = Console() + + request = QueryRequest() + request.query_type = QueryRequest.kListTaps + request.list_taps_query.SetInParent() + response = syn.Device(args.uri, args.verbose).query(request) + + table = Table(title="Available Taps", show_lines=True) + table.add_column("Name", style="cyan") + table.add_column("Message Type", style="green") + table.add_column("Endpoint", style="green") + + for tap in response.list_taps_response.taps: + table.add_row(tap.name, tap.message_type, tap.endpoint) + + console.print(table) diff --git a/synapse/cli/tap_listen.py b/synapse/cli/tap_listen.py deleted file mode 100644 index 25f3ef8..0000000 --- a/synapse/cli/tap_listen.py +++ /dev/null @@ -1,89 +0,0 @@ -import zmq -import socket -import threading -import time -import argparse -import binascii - - -def listen_zmq(endpoint, topic=""): - """Listen for ZMQ messages""" - context = zmq.Context() - subscriber = context.socket(zmq.SUB) - subscriber.connect(endpoint) - - # Set subscription filter (empty = all messages) - subscriber.setsockopt_string(zmq.SUBSCRIBE, topic) - - print(f"ZMQ Subscriber connected to {endpoint} with topic '{topic or 'ALL'}'") - - while True: - try: - # If multi-part message with topic - if topic: - # Get the topic first - topic_msg = subscriber.recv() - # Then get the data - message = subscriber.recv() - else: - # Single message - message = subscriber.recv() - - print(f"Got ZMQ data: {len(message)} bytes") - # Print first 20 bytes as hex for debugging - print(f" Data preview: {binascii.hexlify(message[:20]).decode()}") - except Exception as e: - print(f"ZMQ Error: {e}") - time.sleep(1) - - -def listen_udp(ip, port): - """Listen for UDP messages""" - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - sock.bind((ip, port)) - - print(f"UDP Listener bound to {ip}:{port}") - - while True: - try: - data, addr = sock.recvfrom(8192) # Buffer size is 8KB - print(f"Got UDP data from {addr}: {len(data)} bytes") - # Print first 20 bytes as hex for debugging - print(f" Data preview: {binascii.hexlify(data[:20]).decode()}") - except Exception as e: - print(f"UDP Error: {e}") - time.sleep(1) - - -def main(): - parser = argparse.ArgumentParser(description="Listen to ZMQ and UDP taps") - - # ZMQ options - parser.add_argument( - "--port", - type=str, - default="tcp://10.40.62.57", - help="ZMQ endpoint to connect to", - ) - - args = parser.parse_args() - - # Start ZMQ listener thread - connection = f"tcp://10.40.62.57:{args.port}" - zmq_topic = "" - zmq_thread = threading.Thread( - target=listen_zmq, args=(connection, zmq_topic), daemon=True - ) - zmq_thread.start() - - # Keep main thread alive - try: - print("Listening for messages (press Ctrl+C to exit)...") - while True: - time.sleep(1) - except KeyboardInterrupt: - print("Shutting down...") - - -if __name__ == "__main__": - main() From 51031b578db7999d05c68c8bf61c6f84cb4f90a5 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Thu, 8 May 2025 19:03:40 -0700 Subject: [PATCH 4/6] feature: client api for taps --- synapse/client/protobuf_helpers.py | 26 ++++ synapse/client/taps.py | 172 ++++++++++++++++++++++++++ synapse/examples/tap_example.py | 187 +++++++++++++++++++++++++++++ 3 files changed, 385 insertions(+) create mode 100644 synapse/client/protobuf_helpers.py create mode 100644 synapse/client/taps.py create mode 100644 synapse/examples/tap_example.py diff --git a/synapse/client/protobuf_helpers.py b/synapse/client/protobuf_helpers.py new file mode 100644 index 0000000..a0eec1c --- /dev/null +++ b/synapse/client/protobuf_helpers.py @@ -0,0 +1,26 @@ +from typing import Optional, Type, TypeVar +from google.protobuf.message import Message + +# Generic type for protobuf messages +T = TypeVar("T", bound=Message) + + +def parse_protobuf(data: bytes, message_type: Type[T]) -> Optional[T]: + """Parse raw bytes into a protobuf message of the specified type. + + Args: + data (bytes): Raw binary data. + message_type (Type[T]): The protobuf message class to use for parsing. + + Returns: + Optional[T]: The parsed protobuf message, or None if parsing failed. + """ + if not data: + return None + + try: + message = message_type() + message.ParseFromString(data) + return message + except Exception: + return None diff --git a/synapse/client/taps.py b/synapse/client/taps.py new file mode 100644 index 0000000..875a1ab --- /dev/null +++ b/synapse/client/taps.py @@ -0,0 +1,172 @@ +import logging +import zmq +from typing import Optional, Generator + +from synapse.api.query_pb2 import QueryRequest +from synapse.api.status_pb2 import StatusCode + + +class Tap(object): + def __init__(self, uri, verbose=False): + """Initialize a Tap client to connect to the Synapse device. + + Args: + uri (str): The URI of the Synapse device. + verbose (bool, optional): Whether to enable verbose logging. Defaults to False. + """ + self.uri = uri + self.verbose = verbose + self.logger = logging.getLogger(__name__) + self.logger.setLevel(logging.DEBUG if verbose else logging.INFO) + + # ZMQ context (will be initialized upon connection) + self.zmq_context = None + self.zmq_socket = None + self.connected_tap = None + + def list_taps(self): + """List all available taps on the device. + + Returns: + list: List of TapConnection objects. + """ + from synapse.client.device import Device + + device = Device(self.uri, self.verbose) + + request = QueryRequest() + request.query_type = QueryRequest.kListTaps + request.list_taps_query.SetInParent() + + response = device.query(request) + + if not response or response.status.code != StatusCode.kOk: + self.logger.error( + f"Failed to list taps: {response.status.message if response else 'No response'}" + ) + return [] + + return response.list_taps_response.taps + + def connect(self, name: str) -> bool: + """Connect to a specific tap by name. + + Args: + name (str): The name of the tap to connect to. + + Returns: + bool: True if connected successfully, False otherwise. + """ + taps = self.list_taps() + + # Find the tap with the specified name + selected_tap = None + for tap in taps: + if tap.name == name: + selected_tap = tap + break + + if not selected_tap: + self.logger.error(f"Tap '{name}' not found") + return False + + # Store the connected tap + self.connected_tap = selected_tap + + # Initialize ZMQ context and socket + self.zmq_context = zmq.Context() + self.zmq_socket = self.zmq_context.socket(zmq.SUB) + + # Replace the endpoint with our device URI if needed + endpoint = selected_tap.endpoint + if "://" in endpoint: + # Extract the protocol and port + protocol, address = endpoint.split("://") + _, port = address.split(":") + + # Use the device URI with the same port + endpoint = f"{protocol}://{self.uri.split(':')[0]}:{port}" + + try: + print(f"Connecting to tap '{name}' at {endpoint}") + self.zmq_socket.connect(endpoint) + self.zmq_socket.setsockopt(zmq.SUBSCRIBE, b"") # Subscribe to all messages + return True + except zmq.ZMQError as e: + self.logger.error(f"Failed to connect to tap: {e}") + self._cleanup() + return False + + def read(self, timeout_ms: int = 1000) -> Optional[bytes]: + """Read raw data from the tap with timeout. + + Args: + timeout_ms (int, optional): Timeout in milliseconds. Defaults to 1000. + + Returns: + Optional[bytes]: Raw message data or None if timeout/error. + """ + if not self.zmq_socket: + self.logger.error("Not connected to any tap") + return None + + try: + # Set socket timeout + self.zmq_socket.setsockopt(zmq.RCVTIMEO, timeout_ms) + + # Receive data (will timeout if no data available) + return self.zmq_socket.recv() + except zmq.Again: + # Timeout occurred + return None + except zmq.ZMQError as e: + self.logger.error(f"Error receiving message: {e}") + return None + + def stream(self, timeout_ms: int = 1000) -> Generator[bytes, None, None]: + """Stream raw data from the tap. + + Args: + timeout_ms (int, optional): Timeout between messages in milliseconds. Defaults to 1000. + + Yields: + Generator[bytes, None, None]: Stream of raw message data. + """ + if not self.zmq_socket: + self.logger.error("Not connected to any tap") + return + + # Set socket timeout + self.zmq_socket.setsockopt(zmq.RCVTIMEO, timeout_ms) + + try: + while True: + try: + data = self.zmq_socket.recv() + yield data + except zmq.Again: + # Timeout occurred, continue to next iteration + continue + except KeyboardInterrupt: + self.logger.info("Stream interrupted") + except zmq.ZMQError as e: + self.logger.error(f"Error streaming messages: {e}") + finally: + # Don't close the socket here, let the user call disconnect() + pass + + def disconnect(self): + """Disconnect from the tap.""" + self._cleanup() + + def _cleanup(self): + """Clean up ZMQ resources.""" + if self.zmq_socket: + self.zmq_socket.close() + self.zmq_socket = None + + if self.zmq_context: + self.zmq_context.term() + self.zmq_context = None + + self.connected_tap = None diff --git a/synapse/examples/tap_example.py b/synapse/examples/tap_example.py new file mode 100644 index 0000000..7492180 --- /dev/null +++ b/synapse/examples/tap_example.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 +import sys +import argparse +import binascii +import importlib +from typing import Type + +from synapse.client.taps import Tap +from synapse.client.protobuf_helpers import parse_protobuf +from google.protobuf.message import Message + + +def load_message_type(message_type_path: str) -> Type[Message]: + """Load a protobuf message type from its fully qualified path. + + Args: + message_type_path (str): The fully qualified path to the message type (e.g. 'synapse.api.datatype_pb2.BroadbandFrame') + + Returns: + Type[Message]: The message type class + """ + parts = message_type_path.split(".") + class_name = parts[-1] + module_name = ".".join(parts[:-1]) + + try: + module = importlib.import_module(module_name) + return getattr(module, class_name) + except (ImportError, AttributeError) as e: + raise ImportError(f"Could not load message type {message_type_path}: {e}") + + +def format_data(data): + """Format raw bytes for display.""" + if isinstance(data, bytes) or isinstance(data, bytearray): + # For binary data, show a hex summary + if len(data) > 100: + hex_data = binascii.hexlify(data[:50]).decode("ascii") + return f"Binary data ({len(data)} bytes): {hex_data}... [truncated]" + else: + hex_data = binascii.hexlify(data).decode("ascii") + return f"Binary data ({len(data)} bytes): {hex_data}" + else: + # For other types, use default string representation + return str(data) + + +def main(): + parser = argparse.ArgumentParser( + description="Example for using the Synapse Tap API" + ) + parser.add_argument( + "--uri", "-u", type=str, required=True, help="URI of the Synapse device" + ) + parser.add_argument("--name", "-n", type=str, help="Name of the tap to connect to") + parser.add_argument( + "--verbose", "-v", action="store_true", help="Enable verbose logging" + ) + parser.add_argument( + "--timeout", + "-t", + type=int, + default=5000, + help="Timeout in milliseconds (default: 5000)", + ) + parser.add_argument( + "--count", + "-c", + type=int, + default=None, + help="Number of messages to receive (default: infinite)", + ) + parser.add_argument( + "--message-type", + "-m", + type=str, + help="Fully qualified message type path (e.g. 'synapse.api.datatype_pb2.BroadbandFrame')", + ) + args = parser.parse_args() + + # Create a tap client + tap = Tap(args.uri, args.verbose) + + # List available taps + print("Available taps:") + taps = tap.list_taps() + + if not taps: + print("No taps found") + return 1 + + for i, tap_info in enumerate(taps): + print( + f" {i + 1}. {tap_info.name} - Type: {tap_info.message_type}, Endpoint: {tap_info.endpoint}" + ) + + # Determine which tap to connect to + tap_name = args.name + if not tap_name: + # If name not provided, ask the user to select one + try: + selection = int(input("\nSelect a tap by number: ")) + if 1 <= selection <= len(taps): + tap_name = taps[selection - 1].name + else: + print(f"Invalid selection: {selection}") + return 1 + except ValueError: + print("Invalid input. Please enter a number.") + return 1 + + # Load message type if provided + message_type_class = None + if args.message_type: + try: + message_type_class = load_message_type(args.message_type) + print(f"Using message type: {message_type_class.__name__}") + except ImportError as e: + print(f"Warning: {e}") + print("Will use raw data instead") + + # Connect to the selected tap + print(f"\nConnecting to tap: {tap_name}") + if not tap.connect(tap_name): + print(f"Failed to connect to tap: {tap_name}") + return 1 + + print("Connected successfully!") + + try: + if args.count: + # Receive a specific number of messages + print(f"\nReceiving {args.count} messages (timeout: {args.timeout}ms):") + for i in range(args.count): + raw_data = tap.read(args.timeout) + if raw_data: + print(f"Message {i + 1}:") + + # Parse data if message type was provided + if message_type_class: + parsed = parse_protobuf(raw_data, message_type_class) + if parsed: + print(f"Parsed message: {parsed}") + else: + print("Failed to parse as protobuf message") + print(f"Raw data: {format_data(raw_data)}") + else: + print(f"Raw data: {format_data(raw_data)}") + + print() + else: + print(f"Timeout waiting for message {i + 1}") + else: + # Stream messages until interrupted + print("\nStreaming messages (Ctrl+C to stop):") + count = 0 + for raw_data in tap.stream(args.timeout): + count += 1 + print(f"Message {count}:") + + # Parse data if message type was provided + if message_type_class: + parsed = parse_protobuf(raw_data, message_type_class) + if parsed: + print(f"Parsed message: {parsed}") + else: + print("Failed to parse as protobuf message") + print(f"Raw data: {format_data(raw_data)}") + else: + print(f"Raw data: {format_data(raw_data)}") + + print() + + except KeyboardInterrupt: + print("\nStream interrupted by user") + + finally: + # Clean up + print("\nDisconnecting...") + tap.disconnect() + print("Done") + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) From 967195fda998e1caec03a50a7dbdf583bdb0c192 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Fri, 9 May 2025 18:21:12 -0700 Subject: [PATCH 5/6] feature: add ability to list and stream taps --- synapse-api | 2 +- synapse/cli/__main__.py | 4 ++- synapse/cli/rpc.py | 23 ------------ synapse/cli/taps.py | 56 ++++++++++++++++++++++++++++++ synapse/client/protobuf_helpers.py | 26 -------------- 5 files changed, 60 insertions(+), 51 deletions(-) create mode 100644 synapse/cli/taps.py delete mode 100644 synapse/client/protobuf_helpers.py diff --git a/synapse-api b/synapse-api index c3d730e..8dbfdb1 160000 --- a/synapse-api +++ b/synapse-api @@ -1 +1 @@ -Subproject commit c3d730e6fac727b1ce2d3eef66202f185d736a37 +Subproject commit 8dbfdb126a833161926d0bc47a6d888859901539 diff --git a/synapse/cli/__main__.py b/synapse/cli/__main__.py index ad9012a..e618b9f 100755 --- a/synapse/cli/__main__.py +++ b/synapse/cli/__main__.py @@ -5,7 +5,7 @@ import sys from importlib import metadata -from synapse.cli import discover, rpc, streaming, offline_plot, files +from synapse.cli import discover, rpc, streaming, offline_plot, files, taps from rich.logging import RichHandler from rich.console import Console from synapse.utils.discover import find_device_by_name @@ -64,6 +64,8 @@ def main(): streaming.add_commands(subparsers) offline_plot.add_commands(subparsers) files.add_commands(subparsers) + taps.add_commands(subparsers) + args = parser.parse_args() # If we need to setup the device URI, do that now diff --git a/synapse/cli/rpc.py b/synapse/cli/rpc.py index b298b8a..f955c5d 100644 --- a/synapse/cli/rpc.py +++ b/synapse/cli/rpc.py @@ -13,7 +13,6 @@ from rich.console import Console from rich.pretty import pprint -from rich.table import Table from synapse.cli.query import StreamingQueryClient from synapse.utils.log import log_entry_to_str @@ -40,9 +39,6 @@ def add_commands(subparsers): e.add_argument("config_file", type=str) e.set_defaults(func=configure) - e = subparsers.add_parser("taps", help="List available taps") - e.set_defaults(func=list_taps) - f = subparsers.add_parser("logs", help="Get logs from the device") f.add_argument("--output", "-o", type=str, help="Optional file to write logs to") f.add_argument( @@ -248,22 +244,3 @@ def parse_datetime(time_str: Optional[str]) -> Optional[datetime]: finally: if output_file: output_file.close() - - -def list_taps(args): - console = Console() - - request = QueryRequest() - request.query_type = QueryRequest.kListTaps - request.list_taps_query.SetInParent() - response = syn.Device(args.uri, args.verbose).query(request) - - table = Table(title="Available Taps", show_lines=True) - table.add_column("Name", style="cyan") - table.add_column("Message Type", style="green") - table.add_column("Endpoint", style="green") - - for tap in response.list_taps_response.taps: - table.add_row(tap.name, tap.message_type, tap.endpoint) - - console.print(table) diff --git a/synapse/cli/taps.py b/synapse/cli/taps.py new file mode 100644 index 0000000..34d2f46 --- /dev/null +++ b/synapse/cli/taps.py @@ -0,0 +1,56 @@ +from synapse.client.taps import Tap + +from rich.console import Console +from rich.pretty import pprint +from rich.table import Table + + +def add_commands(subparsers): + tap_parser = subparsers.add_parser("taps", help="Interact with taps on the network") + + tap_subparsers = tap_parser.add_subparsers(title="Tap Commands") + + # Now add the list parser to the tap_subparsers + list_parser = tap_subparsers.add_parser("list", help="list the taps for a device") + list_parser.set_defaults(func=list_taps) + + stream_parser = tap_subparsers.add_parser("stream", help="Stream a tap") + stream_parser.add_argument("tap_name", type=str) + stream_parser.set_defaults(func=stream_taps) + + +def list_taps(args): + tap = Tap(args.uri, args.verbose) + + console = Console() + + taps = tap.list_taps() + table = Table(title="Available Taps", show_lines=True) + table.add_column("Name", style="cyan") + table.add_column("Message Type", style="green") + table.add_column("Endpoint", style="green") + + for tap in taps: + table.add_row(tap.name, tap.message_type, tap.endpoint) + + console.print(table) + + +def stream_taps(args): + tap = Tap(args.uri, args.verbose) + taps = tap.list_taps() + + if args.tap_name not in [tap.name for tap in taps]: + print(f"Tap {args.tap_name} not found") + return + + tap.connect(args.tap_name) + + console = Console() + console.print(f"[bold cyan]Streaming tap:[/] [green]{args.tap_name}[/]") + + for message in tap.stream(): + message_size = len(str(message)) + console.print(f"[bold]Message Size:[/] [cyan]{message_size} bytes[/]") + pprint(message, expand_all=False) + console.print("---") diff --git a/synapse/client/protobuf_helpers.py b/synapse/client/protobuf_helpers.py deleted file mode 100644 index a0eec1c..0000000 --- a/synapse/client/protobuf_helpers.py +++ /dev/null @@ -1,26 +0,0 @@ -from typing import Optional, Type, TypeVar -from google.protobuf.message import Message - -# Generic type for protobuf messages -T = TypeVar("T", bound=Message) - - -def parse_protobuf(data: bytes, message_type: Type[T]) -> Optional[T]: - """Parse raw bytes into a protobuf message of the specified type. - - Args: - data (bytes): Raw binary data. - message_type (Type[T]): The protobuf message class to use for parsing. - - Returns: - Optional[T]: The parsed protobuf message, or None if parsing failed. - """ - if not data: - return None - - try: - message = message_type() - message.ParseFromString(data) - return message - except Exception: - return None From 4d112d9bc8019c5c2af90480685758debec7711e Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Fri, 9 May 2025 18:24:09 -0700 Subject: [PATCH 6/6] Remove tap example --- synapse/examples/tap_example.py | 187 -------------------------------- 1 file changed, 187 deletions(-) delete mode 100644 synapse/examples/tap_example.py diff --git a/synapse/examples/tap_example.py b/synapse/examples/tap_example.py deleted file mode 100644 index 7492180..0000000 --- a/synapse/examples/tap_example.py +++ /dev/null @@ -1,187 +0,0 @@ -#!/usr/bin/env python3 -import sys -import argparse -import binascii -import importlib -from typing import Type - -from synapse.client.taps import Tap -from synapse.client.protobuf_helpers import parse_protobuf -from google.protobuf.message import Message - - -def load_message_type(message_type_path: str) -> Type[Message]: - """Load a protobuf message type from its fully qualified path. - - Args: - message_type_path (str): The fully qualified path to the message type (e.g. 'synapse.api.datatype_pb2.BroadbandFrame') - - Returns: - Type[Message]: The message type class - """ - parts = message_type_path.split(".") - class_name = parts[-1] - module_name = ".".join(parts[:-1]) - - try: - module = importlib.import_module(module_name) - return getattr(module, class_name) - except (ImportError, AttributeError) as e: - raise ImportError(f"Could not load message type {message_type_path}: {e}") - - -def format_data(data): - """Format raw bytes for display.""" - if isinstance(data, bytes) or isinstance(data, bytearray): - # For binary data, show a hex summary - if len(data) > 100: - hex_data = binascii.hexlify(data[:50]).decode("ascii") - return f"Binary data ({len(data)} bytes): {hex_data}... [truncated]" - else: - hex_data = binascii.hexlify(data).decode("ascii") - return f"Binary data ({len(data)} bytes): {hex_data}" - else: - # For other types, use default string representation - return str(data) - - -def main(): - parser = argparse.ArgumentParser( - description="Example for using the Synapse Tap API" - ) - parser.add_argument( - "--uri", "-u", type=str, required=True, help="URI of the Synapse device" - ) - parser.add_argument("--name", "-n", type=str, help="Name of the tap to connect to") - parser.add_argument( - "--verbose", "-v", action="store_true", help="Enable verbose logging" - ) - parser.add_argument( - "--timeout", - "-t", - type=int, - default=5000, - help="Timeout in milliseconds (default: 5000)", - ) - parser.add_argument( - "--count", - "-c", - type=int, - default=None, - help="Number of messages to receive (default: infinite)", - ) - parser.add_argument( - "--message-type", - "-m", - type=str, - help="Fully qualified message type path (e.g. 'synapse.api.datatype_pb2.BroadbandFrame')", - ) - args = parser.parse_args() - - # Create a tap client - tap = Tap(args.uri, args.verbose) - - # List available taps - print("Available taps:") - taps = tap.list_taps() - - if not taps: - print("No taps found") - return 1 - - for i, tap_info in enumerate(taps): - print( - f" {i + 1}. {tap_info.name} - Type: {tap_info.message_type}, Endpoint: {tap_info.endpoint}" - ) - - # Determine which tap to connect to - tap_name = args.name - if not tap_name: - # If name not provided, ask the user to select one - try: - selection = int(input("\nSelect a tap by number: ")) - if 1 <= selection <= len(taps): - tap_name = taps[selection - 1].name - else: - print(f"Invalid selection: {selection}") - return 1 - except ValueError: - print("Invalid input. Please enter a number.") - return 1 - - # Load message type if provided - message_type_class = None - if args.message_type: - try: - message_type_class = load_message_type(args.message_type) - print(f"Using message type: {message_type_class.__name__}") - except ImportError as e: - print(f"Warning: {e}") - print("Will use raw data instead") - - # Connect to the selected tap - print(f"\nConnecting to tap: {tap_name}") - if not tap.connect(tap_name): - print(f"Failed to connect to tap: {tap_name}") - return 1 - - print("Connected successfully!") - - try: - if args.count: - # Receive a specific number of messages - print(f"\nReceiving {args.count} messages (timeout: {args.timeout}ms):") - for i in range(args.count): - raw_data = tap.read(args.timeout) - if raw_data: - print(f"Message {i + 1}:") - - # Parse data if message type was provided - if message_type_class: - parsed = parse_protobuf(raw_data, message_type_class) - if parsed: - print(f"Parsed message: {parsed}") - else: - print("Failed to parse as protobuf message") - print(f"Raw data: {format_data(raw_data)}") - else: - print(f"Raw data: {format_data(raw_data)}") - - print() - else: - print(f"Timeout waiting for message {i + 1}") - else: - # Stream messages until interrupted - print("\nStreaming messages (Ctrl+C to stop):") - count = 0 - for raw_data in tap.stream(args.timeout): - count += 1 - print(f"Message {count}:") - - # Parse data if message type was provided - if message_type_class: - parsed = parse_protobuf(raw_data, message_type_class) - if parsed: - print(f"Parsed message: {parsed}") - else: - print("Failed to parse as protobuf message") - print(f"Raw data: {format_data(raw_data)}") - else: - print(f"Raw data: {format_data(raw_data)}") - - print() - - except KeyboardInterrupt: - print("\nStream interrupted by user") - - finally: - # Clean up - print("\nDisconnecting...") - tap.disconnect() - print("Done") - - return 0 - - -if __name__ == "__main__": - sys.exit(main())