diff --git a/setup.py b/setup.py index d364942..39453e4 100644 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ setup( name="science-synapse", - version="2.2.5", + version="2.2.6", description="Client library and CLI for the Synapse API", author="Science Team", author_email="team@science.xyz", diff --git a/synapse-api b/synapse-api index 344ac86..604c4c3 160000 --- a/synapse-api +++ b/synapse-api @@ -1 +1 @@ -Subproject commit 344ac867c60d1db95b9a6933f5c0819df5d26980 +Subproject commit 604c4c34e2dd3256df710f339f45a618665d7c25 diff --git a/synapse/cli/device_info_display.py b/synapse/cli/device_info_display.py index 93e9cb3..ff848c5 100644 --- a/synapse/cli/device_info_display.py +++ b/synapse/cli/device_info_display.py @@ -1,37 +1,32 @@ -import time from rich.console import Console -from rich.panel import Panel -from rich.table import Table from rich.tree import Tree from google.protobuf.json_format import MessageToDict from synapse.client.device import Device -def visualize_configuration(info_dict): +def visualize_configuration(info_dict, status): + nodes_status = status.get("signal_chain", {}).get("nodes", {}) config = info_dict.get("configuration", {}) if config: tree = Tree("Configuration") - for node in config.get("nodes", []): + for index, node in enumerate(config.get("nodes", [])): node_type = node.get("type", "").replace("k", "") - node_name = node.get("name", "Unknown") - node_tree = tree.add(f"{node_name}") + node_tree = tree.add(f"{node_type}") node_tree.add(f"ID: {node.get('id', 'Unknown')}") - node_tree.add(f"Type: {node_type}") - if node_type == "Application": app = node.get("application", {}) name = app.get("name", "Unknown") - running = app.get("running", False) - status = "[green]Running[/green]" if running else "[red]Stopped[/red]" + + application_status = nodes_status[index].get("application", None) + running = application_status.get("running", False) + error_logs = application_status.get( + "error_logs", "Could not get error logs" + ) node_tree.add(f"Name: {name}") - node_tree.add(f"Status: {status}") + node_tree.add(f"Running: {running}") + node_tree.add(f"Error Logs:\n{error_logs}") elif node_type == "BroadbandSource": source = node.get("broadband_source", {}) - name = source.get("name", "Unknown") - running = source.get("running", False) - status = "[green]Running[/green]" if running else "[red]Stopped[/red]" - node_tree.add(f"Name: {name}") - node_tree.add(f"Status: {status}") if "signal" in source and "electrode" in source["signal"]: channels = source["signal"]["electrode"].get("channels", []) electrode_ids = [ @@ -113,4 +108,4 @@ def summary(self, device: Device): ) self.console.print(visualize_peripherals(info_dict)) - self.console.print(visualize_configuration(info_dict)) + self.console.print(visualize_configuration(info_dict, status)) diff --git a/synapse/cli/rpc.py b/synapse/cli/rpc.py index a5223d7..ec751e9 100644 --- a/synapse/cli/rpc.py +++ b/synapse/cli/rpc.py @@ -4,7 +4,6 @@ from typing import Optional import synapse as syn -from synapse.api.synapse_pb2 import DeviceConfiguration from synapse.api.query_pb2 import QueryRequest, QueryResponse, StreamQueryRequest from synapse.api.status_pb2 import StatusCode @@ -12,11 +11,11 @@ from google.protobuf.json_format import Parse from rich.console import Console -from rich.pretty import pprint from synapse.cli.query import StreamingQueryClient from synapse.utils.log import log_entry_to_str from synapse.cli.device_info_display import DeviceInfoDisplay +from synapse.utils.proto import load_device_config def add_commands(subparsers): @@ -182,7 +181,7 @@ def start(args): console = Console() - config_obj = None # syn.Config if we are provided a *.json* file + config_obj = None cfg_path = getattr(args, "config_file", None) if cfg_path: @@ -196,10 +195,7 @@ def start(args): # Load the configuration proto and build Config object try: - with open(cfg_path, "r") as f: - json_text = f.read() - cfg_proto = Parse(json_text, DeviceConfiguration()) - config_obj = syn.Config.from_proto(cfg_proto) + config_obj = load_device_config(cfg_path, console) except Exception as e: console.print( f"[bold red]Failed to parse configuration file[/bold red]: {e}" @@ -263,25 +259,23 @@ def stop(args): def configure(args): + console = Console() if Path(args.config_file).suffix != ".json": - print("Configuration file must be a JSON file") + console.print("[bold red]Configuration file must be a JSON file") return False - with open(args.config_file) as config_json: - console = Console() - config_proto = Parse(config_json.read(), DeviceConfiguration()) - console.print("Configuring device with the following configuration:") - config = syn.Config.from_proto(config_proto) - console.print(config.to_proto()) - - config_ret = syn.Device(args.uri, args.verbose).configure_with_status(config) - if not config_ret: - console.print("[bold red]Internal error configuring device") - return - if config_ret.code != StatusCode.kOk: - console.print(f"[bold red]Error configuring\n{config_ret.message}") - return - console.print("[green]Device configured") + config_obj = load_device_config(args.config_file, console) + console.print("Configuring device with the following configuration:") + console.print(config_obj.to_proto()) + + config_ret = syn.Device(args.uri, args.verbose).configure_with_status(config_obj) + if not config_ret: + console.print("[bold red]Internal error configuring device") + return + if config_ret.code != StatusCode.kOk: + console.print(f"[bold red]Error configuring\n{config_ret.message}") + return + console.print("[green]Device configured") def get_logs(args): diff --git a/synapse/cli/streaming.py b/synapse/cli/streaming.py index 9b5c0ff..ae5c225 100644 --- a/synapse/cli/streaming.py +++ b/synapse/cli/streaming.py @@ -13,7 +13,7 @@ from synapse.api.node_pb2 import NodeType from synapse.api.status_pb2 import DeviceState, StatusCode -from synapse.api.synapse_pb2 import DeviceConfiguration +from synapse.api.device_pb2 import DeviceConfiguration import synapse as syn import synapse.client.channel as channel import synapse.utils.ndtp_types as ndtp_types diff --git a/synapse/cli/taps.py b/synapse/cli/taps.py index 34d2f46..2b47cbb 100644 --- a/synapse/cli/taps.py +++ b/synapse/cli/taps.py @@ -1,8 +1,72 @@ from synapse.client.taps import Tap from rich.console import Console -from rich.pretty import pprint from rich.table import Table +from rich.live import Live +from rich.text import Text + +import time + + +class TapHealthMonitor: + """Health monitor for streaming tap data with real-time statistics display.""" + + def __init__(self, console: Console): + self.console = console + self.message_count = 0 + self.total_bytes = 0 + self.start_time = None + + def start(self): + """Start the monitoring session.""" + self.start_time = time.time() + self.message_count = 0 + self.total_bytes = 0 + + def update(self, message_size: int) -> Text: + """Update statistics with a new message and return formatted display text.""" + current_time = time.time() + self.message_count += 1 + self.total_bytes += message_size + + # Calculate stats + elapsed_time = current_time - self.start_time + msgs_per_sec = self.message_count / elapsed_time if elapsed_time > 0 else 0 + bandwidth_bps = self.total_bytes / elapsed_time if elapsed_time > 0 else 0 + + # Format bandwidth + bandwidth_str = self._format_bandwidth(bandwidth_bps) + + # Create formatted display text + return self._create_display_text( + self.message_count, msgs_per_sec, bandwidth_str, message_size + ) + + def _format_bandwidth(self, bandwidth_bps: float) -> str: + """Format bandwidth with appropriate units.""" + if bandwidth_bps >= 1024 * 1024: + return f"{bandwidth_bps / (1024 * 1024):.2f} MB/s" + elif bandwidth_bps >= 1024: + return f"{bandwidth_bps / 1024:.2f} KB/s" + else: + return f"{bandwidth_bps:.1f} B/s" + + def _create_display_text( + self, msg_count: int, rate: float, bandwidth: str, latest_size: int + ) -> Text: + """Create styled text for the live display.""" + stats_text = Text() + stats_text.append("Messages: ", style="bold") + stats_text.append(f"{msg_count:,}", style="cyan") + stats_text.append(" | msgs/sec: ", style="bold") + stats_text.append(f"{rate:.1f}/s", style="green") + stats_text.append(" | Bandwidth: ", style="bold") + stats_text.append(bandwidth, style="yellow") + stats_text.append(" | Latest: ", style="bold") + stats_text.append(f"{latest_size:,} bytes", style="magenta") + stats_text.append(" | Runtime: ", style="bold") + stats_text.append(f"{time.time() - self.start_time:.1f}s", style="blue") + return stats_text def add_commands(subparsers): @@ -48,9 +112,26 @@ def stream_taps(args): console = Console() console.print(f"[bold cyan]Streaming tap:[/] [green]{args.tap_name}[/]") + console.print("[dim]Press Ctrl+C to stop[/]\n") - for message in tap.stream(): - message_size = len(str(message)) - console.print(f"[bold]Message Size:[/] [cyan]{message_size} bytes[/]") - pprint(message, expand_all=False) - console.print("---") + # Initialize health monitor + monitor = TapHealthMonitor(console) + monitor.start() + + # Create initial display + initial_text = Text("Waiting for messages...", style="dim") + + try: + with Live(initial_text, console=console, refresh_per_second=10) as live: + for message in tap.stream(): + message_size = len(message) + + # Update statistics and get formatted display + stats_text = monitor.update(message_size) + + # Update the live display + live.update(stats_text) + except KeyboardInterrupt: + pass + finally: + tap.disconnect() diff --git a/synapse/client/config.py b/synapse/client/config.py index 26a769c..afb68dc 100644 --- a/synapse/client/config.py +++ b/synapse/client/config.py @@ -1,4 +1,4 @@ -from synapse.api.synapse_pb2 import DeviceConfiguration +from synapse.api.device_pb2 import DeviceConfiguration from synapse.api.node_pb2 import NodeConnection from synapse.client.nodes import NODE_TYPE_OBJECT_MAP diff --git a/synapse/client/nodes/application_node.py b/synapse/client/nodes/application_node.py index 61c1dc0..08d37f1 100644 --- a/synapse/client/nodes/application_node.py +++ b/synapse/client/nodes/application_node.py @@ -6,12 +6,13 @@ class ApplicationNode(Node): type = NodeType.kApplication - def __init__(self, name: str): + def __init__(self, name: str, parameters): self.name = name + self.parameters = parameters def _to_proto(self): n = NodeConfig() - p = ApplicationNodeConfig(name=self.name) + p = ApplicationNodeConfig(name=self.name, parameters=self.parameters) n.application.CopyFrom(p) return n @@ -21,5 +22,4 @@ def _from_proto(proto: ApplicationNodeConfig): raise ValueError("parameter 'proto' is missing") if not isinstance(proto, ApplicationNodeConfig): raise ValueError("proto is not of type ApplicationNodeConfig") - - return ApplicationNode(name=proto.name) + return ApplicationNode(name=proto.name, parameters=proto.parameters) diff --git a/synapse/client/taps.py b/synapse/client/taps.py index 875a1ab..8143c87 100644 --- a/synapse/client/taps.py +++ b/synapse/client/taps.py @@ -1,9 +1,11 @@ import logging +import time import zmq from typing import Optional, Generator from synapse.api.query_pb2 import QueryRequest from synapse.api.status_pb2 import StatusCode +from synapse.api.tap_pb2 import TapType class Tap(object): @@ -75,7 +77,14 @@ def connect(self, name: str) -> bool: # Initialize ZMQ context and socket self.zmq_context = zmq.Context() - self.zmq_socket = self.zmq_context.socket(zmq.SUB) + + # Create appropriate socket type based on tap type from the selected tap + if selected_tap.tap_type == TapType.TAP_TYPE_CONSUMER: + # For consumer taps, we need to publish data TO the tap + self.zmq_socket = self.zmq_context.socket(zmq.PUB) + else: + # For producer taps (or unspecified), we need to subscribe and listen FROM the tap + self.zmq_socket = self.zmq_context.socket(zmq.SUB) # Replace the endpoint with our device URI if needed endpoint = selected_tap.endpoint @@ -88,9 +97,17 @@ def connect(self, name: str) -> bool: endpoint = f"{protocol}://{self.uri.split(':')[0]}:{port}" try: - print(f"Connecting to tap '{name}' at {endpoint}") self.zmq_socket.connect(endpoint) - self.zmq_socket.setsockopt(zmq.SUBSCRIBE, b"") # Subscribe to all messages + + # Give the socket a chance to connect + self.logger.info("Waiting for socket to connect...") + time.sleep(1) + + # Only set subscription options for subscriber sockets + if selected_tap.tap_type != TapType.TAP_TYPE_CONSUMER: + self.zmq_socket.setsockopt(zmq.SUBSCRIBE, b"") + print("Subscribed to all messages") + return True except zmq.ZMQError as e: self.logger.error(f"Failed to connect to tap: {e}") @@ -123,6 +140,33 @@ def read(self, timeout_ms: int = 1000) -> Optional[bytes]: self.logger.error(f"Error receiving message: {e}") return None + def send(self, data: bytes) -> bool: + """Send raw data to the tap (only works for consumer taps with PUB socket). + + Args: + data (bytes): Raw message data to send. + + Returns: + bool: True if sent successfully, False otherwise. + """ + if not self.zmq_socket: + self.logger.error("Not connected to any tap") + return False + + if ( + not self.connected_tap + or self.connected_tap.tap_type != TapType.TAP_TYPE_CONSUMER + ): + self.logger.error("Send is only available for consumer taps") + return False + + try: + self.zmq_socket.send(data) + return True + except zmq.ZMQError as e: + self.logger.error(f"Error sending message: {e}") + return False + def stream(self, timeout_ms: int = 1000) -> Generator[bytes, None, None]: """Stream raw data from the tap. diff --git a/synapse/server/rpc.py b/synapse/server/rpc.py index ff72ec2..e124a23 100644 --- a/synapse/server/rpc.py +++ b/synapse/server/rpc.py @@ -10,7 +10,7 @@ from synapse.api.logging_pb2 import LogLevel, LogQueryResponse from synapse.api.query_pb2 import QueryResponse from synapse.api.status_pb2 import DeviceState, Status, StatusCode -from synapse.api.synapse_pb2 import DeviceConfiguration, DeviceInfo +from synapse.api.device_pb2 import DeviceConfiguration, DeviceInfo from synapse.api.synapse_pb2_grpc import ( SynapseDeviceServicer, add_SynapseDeviceServicer_to_server, diff --git a/synapse/tests/blink_ostim.py b/synapse/tests/blink_ostim.py index 48c7db9..d7a8b6d 100644 --- a/synapse/tests/blink_ostim.py +++ b/synapse/tests/blink_ostim.py @@ -1,12 +1,10 @@ import time -import cv2 -import numpy as np from synapse.device import Device from synapse.config import Config from synapse.nodes.stream_in import StreamIn -from synapse.api.synapse_pb2 import DeviceConfiguration +from synapse.api.device_pb2 import DeviceConfiguration from google.protobuf.json_format import Parse addr = "localhost:647" diff --git a/synapse/tests/doom_synapse.py b/synapse/tests/doom_synapse.py index 28b1253..4027ddb 100755 --- a/synapse/tests/doom_synapse.py +++ b/synapse/tests/doom_synapse.py @@ -1,12 +1,11 @@ import time import cv2 -import numpy as np from synapse.device import Device from synapse.config import Config from synapse.nodes.stream_in import StreamIn -from synapse.api.synapse_pb2 import DeviceConfiguration +from synapse.api.device_pb2 import DeviceConfiguration from google.protobuf.json_format import Parse addr = "localhost:647" @@ -53,7 +52,6 @@ time.sleep(0.05) except KeyboardInterrupt: - cap.release() print("Stopping") diff --git a/synapse/utils/proto.py b/synapse/utils/proto.py new file mode 100644 index 0000000..93f1558 --- /dev/null +++ b/synapse/utils/proto.py @@ -0,0 +1,27 @@ +from google.protobuf.json_format import Parse +from synapse.api.device_pb2 import DeviceConfiguration +from synapse.api.app_pb2 import AppManifest +import synapse as syn + + +def load_device_config(path_to_json, console): + # We support either a manifest or a device configuration. + # First, try to load a device configuration + try: + json_text = open(path_to_json, "r").read() + cfg_proto = Parse(json_text, DeviceConfiguration()) + return syn.Config.from_proto(cfg_proto) + except Exception: + pass + + # We couldn't load a device configuration, so try to load a manifest + try: + json_text = open(path_to_json, "r").read() + manifest_proto = Parse(json_text, AppManifest()) + return syn.Config.from_proto(manifest_proto.device_config) + except Exception: + raise ValueError( + f"Could not parse {path_to_json} as either device configuration or manifest" + ) + + return None