diff --git a/.gitignore b/.gitignore index 5cb9061..da7ebd1 100644 --- a/.gitignore +++ b/.gitignore @@ -184,5 +184,4 @@ output_*.json *.jsonl .scienv synapse_data* - .synapse_deploy_cache.json diff --git a/synapse/cli/__main__.py b/synapse/cli/__main__.py index 2fa050c..3ac5d2f 100755 --- a/synapse/cli/__main__.py +++ b/synapse/cli/__main__.py @@ -5,7 +5,7 @@ import sys from importlib import metadata -from synapse.cli import discover, rpc, streaming, offline_plot, files, deploy +from synapse.cli import discover, rpc, streaming, offline_plot, files, deploy, taps from rich.logging import RichHandler from rich.console import Console from synapse.utils.discover import find_device_by_name @@ -64,6 +64,7 @@ def main(): streaming.add_commands(subparsers) offline_plot.add_commands(subparsers) files.add_commands(subparsers) + taps.add_commands(subparsers) deploy.add_commands(subparsers) args = parser.parse_args() diff --git a/synapse/cli/taps.py b/synapse/cli/taps.py new file mode 100644 index 0000000..34d2f46 --- /dev/null +++ b/synapse/cli/taps.py @@ -0,0 +1,56 @@ +from synapse.client.taps import Tap + +from rich.console import Console +from rich.pretty import pprint +from rich.table import Table + + +def add_commands(subparsers): + tap_parser = subparsers.add_parser("taps", help="Interact with taps on the network") + + tap_subparsers = tap_parser.add_subparsers(title="Tap Commands") + + # Now add the list parser to the tap_subparsers + list_parser = tap_subparsers.add_parser("list", help="list the taps for a device") + list_parser.set_defaults(func=list_taps) + + stream_parser = tap_subparsers.add_parser("stream", help="Stream a tap") + stream_parser.add_argument("tap_name", type=str) + stream_parser.set_defaults(func=stream_taps) + + +def list_taps(args): + tap = Tap(args.uri, args.verbose) + + console = Console() + + taps = tap.list_taps() + table = Table(title="Available Taps", show_lines=True) + table.add_column("Name", style="cyan") + table.add_column("Message Type", style="green") + table.add_column("Endpoint", style="green") + + for tap in taps: + table.add_row(tap.name, tap.message_type, tap.endpoint) + + console.print(table) + + +def stream_taps(args): + tap = Tap(args.uri, args.verbose) + taps = tap.list_taps() + + if args.tap_name not in [tap.name for tap in taps]: + print(f"Tap {args.tap_name} not found") + return + + tap.connect(args.tap_name) + + console = Console() + console.print(f"[bold cyan]Streaming tap:[/] [green]{args.tap_name}[/]") + + for message in tap.stream(): + message_size = len(str(message)) + console.print(f"[bold]Message Size:[/] [cyan]{message_size} bytes[/]") + pprint(message, expand_all=False) + console.print("---") diff --git a/synapse/client/taps.py b/synapse/client/taps.py new file mode 100644 index 0000000..875a1ab --- /dev/null +++ b/synapse/client/taps.py @@ -0,0 +1,172 @@ +import logging +import zmq +from typing import Optional, Generator + +from synapse.api.query_pb2 import QueryRequest +from synapse.api.status_pb2 import StatusCode + + +class Tap(object): + def __init__(self, uri, verbose=False): + """Initialize a Tap client to connect to the Synapse device. + + Args: + uri (str): The URI of the Synapse device. + verbose (bool, optional): Whether to enable verbose logging. Defaults to False. + """ + self.uri = uri + self.verbose = verbose + self.logger = logging.getLogger(__name__) + self.logger.setLevel(logging.DEBUG if verbose else logging.INFO) + + # ZMQ context (will be initialized upon connection) + self.zmq_context = None + self.zmq_socket = None + self.connected_tap = None + + def list_taps(self): + """List all available taps on the device. + + Returns: + list: List of TapConnection objects. + """ + from synapse.client.device import Device + + device = Device(self.uri, self.verbose) + + request = QueryRequest() + request.query_type = QueryRequest.kListTaps + request.list_taps_query.SetInParent() + + response = device.query(request) + + if not response or response.status.code != StatusCode.kOk: + self.logger.error( + f"Failed to list taps: {response.status.message if response else 'No response'}" + ) + return [] + + return response.list_taps_response.taps + + def connect(self, name: str) -> bool: + """Connect to a specific tap by name. + + Args: + name (str): The name of the tap to connect to. + + Returns: + bool: True if connected successfully, False otherwise. + """ + taps = self.list_taps() + + # Find the tap with the specified name + selected_tap = None + for tap in taps: + if tap.name == name: + selected_tap = tap + break + + if not selected_tap: + self.logger.error(f"Tap '{name}' not found") + return False + + # Store the connected tap + self.connected_tap = selected_tap + + # Initialize ZMQ context and socket + self.zmq_context = zmq.Context() + self.zmq_socket = self.zmq_context.socket(zmq.SUB) + + # Replace the endpoint with our device URI if needed + endpoint = selected_tap.endpoint + if "://" in endpoint: + # Extract the protocol and port + protocol, address = endpoint.split("://") + _, port = address.split(":") + + # Use the device URI with the same port + endpoint = f"{protocol}://{self.uri.split(':')[0]}:{port}" + + try: + print(f"Connecting to tap '{name}' at {endpoint}") + self.zmq_socket.connect(endpoint) + self.zmq_socket.setsockopt(zmq.SUBSCRIBE, b"") # Subscribe to all messages + return True + except zmq.ZMQError as e: + self.logger.error(f"Failed to connect to tap: {e}") + self._cleanup() + return False + + def read(self, timeout_ms: int = 1000) -> Optional[bytes]: + """Read raw data from the tap with timeout. + + Args: + timeout_ms (int, optional): Timeout in milliseconds. Defaults to 1000. + + Returns: + Optional[bytes]: Raw message data or None if timeout/error. + """ + if not self.zmq_socket: + self.logger.error("Not connected to any tap") + return None + + try: + # Set socket timeout + self.zmq_socket.setsockopt(zmq.RCVTIMEO, timeout_ms) + + # Receive data (will timeout if no data available) + return self.zmq_socket.recv() + except zmq.Again: + # Timeout occurred + return None + except zmq.ZMQError as e: + self.logger.error(f"Error receiving message: {e}") + return None + + def stream(self, timeout_ms: int = 1000) -> Generator[bytes, None, None]: + """Stream raw data from the tap. + + Args: + timeout_ms (int, optional): Timeout between messages in milliseconds. Defaults to 1000. + + Yields: + Generator[bytes, None, None]: Stream of raw message data. + """ + if not self.zmq_socket: + self.logger.error("Not connected to any tap") + return + + # Set socket timeout + self.zmq_socket.setsockopt(zmq.RCVTIMEO, timeout_ms) + + try: + while True: + try: + data = self.zmq_socket.recv() + yield data + except zmq.Again: + # Timeout occurred, continue to next iteration + continue + except KeyboardInterrupt: + self.logger.info("Stream interrupted") + except zmq.ZMQError as e: + self.logger.error(f"Error streaming messages: {e}") + finally: + # Don't close the socket here, let the user call disconnect() + pass + + def disconnect(self): + """Disconnect from the tap.""" + self._cleanup() + + def _cleanup(self): + """Clean up ZMQ resources.""" + if self.zmq_socket: + self.zmq_socket.close() + self.zmq_socket = None + + if self.zmq_context: + self.zmq_context.term() + self.zmq_context = None + + self.connected_tap = None