Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

setup(
name="science-synapse",
version="2.2.5",
version="2.2.6",
description="Client library and CLI for the Synapse API",
author="Science Team",
author_email="team@science.xyz",
Expand Down
2 changes: 1 addition & 1 deletion synapse-api
31 changes: 13 additions & 18 deletions synapse/cli/device_info_display.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,32 @@
import time
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.tree import Tree
from google.protobuf.json_format import MessageToDict
from synapse.client.device import Device


def visualize_configuration(info_dict):
def visualize_configuration(info_dict, status):
nodes_status = status.get("signal_chain", {}).get("nodes", {})
config = info_dict.get("configuration", {})
if config:
tree = Tree("Configuration")
for node in config.get("nodes", []):
for index, node in enumerate(config.get("nodes", [])):
node_type = node.get("type", "").replace("k", "")
node_name = node.get("name", "Unknown")
node_tree = tree.add(f"{node_name}")
node_tree = tree.add(f"{node_type}")
node_tree.add(f"ID: {node.get('id', 'Unknown')}")
node_tree.add(f"Type: {node_type}")

if node_type == "Application":
app = node.get("application", {})
name = app.get("name", "Unknown")
running = app.get("running", False)
status = "[green]Running[/green]" if running else "[red]Stopped[/red]"

application_status = nodes_status[index].get("application", None)
running = application_status.get("running", False)
error_logs = application_status.get(
"error_logs", "Could not get error logs"
)
node_tree.add(f"Name: {name}")
node_tree.add(f"Status: {status}")
node_tree.add(f"Running: {running}")
node_tree.add(f"Error Logs:\n{error_logs}")
elif node_type == "BroadbandSource":
source = node.get("broadband_source", {})
name = source.get("name", "Unknown")
running = source.get("running", False)
status = "[green]Running[/green]" if running else "[red]Stopped[/red]"
node_tree.add(f"Name: {name}")
node_tree.add(f"Status: {status}")
if "signal" in source and "electrode" in source["signal"]:
channels = source["signal"]["electrode"].get("channels", [])
electrode_ids = [
Expand Down Expand Up @@ -113,4 +108,4 @@ def summary(self, device: Device):
)

self.console.print(visualize_peripherals(info_dict))
self.console.print(visualize_configuration(info_dict))
self.console.print(visualize_configuration(info_dict, status))
40 changes: 17 additions & 23 deletions synapse/cli/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,18 @@
from typing import Optional

import synapse as syn
from synapse.api.synapse_pb2 import DeviceConfiguration
from synapse.api.query_pb2 import QueryRequest, QueryResponse, StreamQueryRequest
from synapse.api.status_pb2 import StatusCode

from google.protobuf import text_format
from google.protobuf.json_format import Parse

from rich.console import Console
from rich.pretty import pprint

from synapse.cli.query import StreamingQueryClient
from synapse.utils.log import log_entry_to_str
from synapse.cli.device_info_display import DeviceInfoDisplay
from synapse.utils.proto import load_device_config


def add_commands(subparsers):
Expand Down Expand Up @@ -182,7 +181,7 @@ def start(args):

console = Console()

config_obj = None # syn.Config if we are provided a *.json* file
config_obj = None
cfg_path = getattr(args, "config_file", None)

if cfg_path:
Expand All @@ -196,10 +195,7 @@ def start(args):

# Load the configuration proto and build Config object
try:
with open(cfg_path, "r") as f:
json_text = f.read()
cfg_proto = Parse(json_text, DeviceConfiguration())
config_obj = syn.Config.from_proto(cfg_proto)
config_obj = load_device_config(cfg_path, console)
except Exception as e:
console.print(
f"[bold red]Failed to parse configuration file[/bold red]: {e}"
Expand Down Expand Up @@ -263,25 +259,23 @@ def stop(args):


def configure(args):
console = Console()
if Path(args.config_file).suffix != ".json":
print("Configuration file must be a JSON file")
console.print("[bold red]Configuration file must be a JSON file")
return False

with open(args.config_file) as config_json:
console = Console()
config_proto = Parse(config_json.read(), DeviceConfiguration())
console.print("Configuring device with the following configuration:")
config = syn.Config.from_proto(config_proto)
console.print(config.to_proto())

config_ret = syn.Device(args.uri, args.verbose).configure_with_status(config)
if not config_ret:
console.print("[bold red]Internal error configuring device")
return
if config_ret.code != StatusCode.kOk:
console.print(f"[bold red]Error configuring\n{config_ret.message}")
return
console.print("[green]Device configured")
config_obj = load_device_config(args.config_file, console)
console.print("Configuring device with the following configuration:")
console.print(config_obj.to_proto())

config_ret = syn.Device(args.uri, args.verbose).configure_with_status(config_obj)
if not config_ret:
console.print("[bold red]Internal error configuring device")
return
if config_ret.code != StatusCode.kOk:
console.print(f"[bold red]Error configuring\n{config_ret.message}")
return
console.print("[green]Device configured")


def get_logs(args):
Expand Down
2 changes: 1 addition & 1 deletion synapse/cli/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from synapse.api.node_pb2 import NodeType
from synapse.api.status_pb2 import DeviceState, StatusCode
from synapse.api.synapse_pb2 import DeviceConfiguration
from synapse.api.device_pb2 import DeviceConfiguration
import synapse as syn
import synapse.client.channel as channel
import synapse.utils.ndtp_types as ndtp_types
Expand Down
93 changes: 87 additions & 6 deletions synapse/cli/taps.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,72 @@
from synapse.client.taps import Tap

from rich.console import Console
from rich.pretty import pprint
from rich.table import Table
from rich.live import Live
from rich.text import Text

import time


class TapHealthMonitor:
"""Health monitor for streaming tap data with real-time statistics display."""

def __init__(self, console: Console):
self.console = console
self.message_count = 0
self.total_bytes = 0
self.start_time = None

def start(self):
"""Start the monitoring session."""
self.start_time = time.time()
self.message_count = 0
self.total_bytes = 0

def update(self, message_size: int) -> Text:
"""Update statistics with a new message and return formatted display text."""
current_time = time.time()
self.message_count += 1
self.total_bytes += message_size

# Calculate stats
elapsed_time = current_time - self.start_time
msgs_per_sec = self.message_count / elapsed_time if elapsed_time > 0 else 0
bandwidth_bps = self.total_bytes / elapsed_time if elapsed_time > 0 else 0

# Format bandwidth
bandwidth_str = self._format_bandwidth(bandwidth_bps)

# Create formatted display text
return self._create_display_text(
self.message_count, msgs_per_sec, bandwidth_str, message_size
)

def _format_bandwidth(self, bandwidth_bps: float) -> str:
"""Format bandwidth with appropriate units."""
if bandwidth_bps >= 1024 * 1024:
return f"{bandwidth_bps / (1024 * 1024):.2f} MB/s"
elif bandwidth_bps >= 1024:
return f"{bandwidth_bps / 1024:.2f} KB/s"
else:
return f"{bandwidth_bps:.1f} B/s"

def _create_display_text(
self, msg_count: int, rate: float, bandwidth: str, latest_size: int
) -> Text:
"""Create styled text for the live display."""
stats_text = Text()
stats_text.append("Messages: ", style="bold")
stats_text.append(f"{msg_count:,}", style="cyan")
stats_text.append(" | msgs/sec: ", style="bold")
stats_text.append(f"{rate:.1f}/s", style="green")
stats_text.append(" | Bandwidth: ", style="bold")
stats_text.append(bandwidth, style="yellow")
stats_text.append(" | Latest: ", style="bold")
stats_text.append(f"{latest_size:,} bytes", style="magenta")
stats_text.append(" | Runtime: ", style="bold")
stats_text.append(f"{time.time() - self.start_time:.1f}s", style="blue")
return stats_text


def add_commands(subparsers):
Expand Down Expand Up @@ -48,9 +112,26 @@ def stream_taps(args):

console = Console()
console.print(f"[bold cyan]Streaming tap:[/] [green]{args.tap_name}[/]")
console.print("[dim]Press Ctrl+C to stop[/]\n")

for message in tap.stream():
message_size = len(str(message))
console.print(f"[bold]Message Size:[/] [cyan]{message_size} bytes[/]")
pprint(message, expand_all=False)
console.print("---")
# Initialize health monitor
monitor = TapHealthMonitor(console)
monitor.start()

# Create initial display
initial_text = Text("Waiting for messages...", style="dim")

try:
with Live(initial_text, console=console, refresh_per_second=10) as live:
for message in tap.stream():
message_size = len(message)

# Update statistics and get formatted display
stats_text = monitor.update(message_size)

# Update the live display
live.update(stats_text)
except KeyboardInterrupt:
pass
finally:
tap.disconnect()
2 changes: 1 addition & 1 deletion synapse/client/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from synapse.api.synapse_pb2 import DeviceConfiguration
from synapse.api.device_pb2 import DeviceConfiguration
from synapse.api.node_pb2 import NodeConnection
from synapse.client.nodes import NODE_TYPE_OBJECT_MAP

Expand Down
8 changes: 4 additions & 4 deletions synapse/client/nodes/application_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
class ApplicationNode(Node):
type = NodeType.kApplication

def __init__(self, name: str):
def __init__(self, name: str, parameters):
self.name = name
self.parameters = parameters

def _to_proto(self):
n = NodeConfig()
p = ApplicationNodeConfig(name=self.name)
p = ApplicationNodeConfig(name=self.name, parameters=self.parameters)
n.application.CopyFrom(p)
return n

Expand All @@ -21,5 +22,4 @@ def _from_proto(proto: ApplicationNodeConfig):
raise ValueError("parameter 'proto' is missing")
if not isinstance(proto, ApplicationNodeConfig):
raise ValueError("proto is not of type ApplicationNodeConfig")

return ApplicationNode(name=proto.name)
return ApplicationNode(name=proto.name, parameters=proto.parameters)
50 changes: 47 additions & 3 deletions synapse/client/taps.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import logging
import time
import zmq
from typing import Optional, Generator

from synapse.api.query_pb2 import QueryRequest
from synapse.api.status_pb2 import StatusCode
from synapse.api.tap_pb2 import TapType


class Tap(object):
Expand Down Expand Up @@ -75,7 +77,14 @@ def connect(self, name: str) -> bool:

# Initialize ZMQ context and socket
self.zmq_context = zmq.Context()
self.zmq_socket = self.zmq_context.socket(zmq.SUB)

# Create appropriate socket type based on tap type from the selected tap
if selected_tap.tap_type == TapType.TAP_TYPE_CONSUMER:
# For consumer taps, we need to publish data TO the tap
self.zmq_socket = self.zmq_context.socket(zmq.PUB)
else:
# For producer taps (or unspecified), we need to subscribe and listen FROM the tap
self.zmq_socket = self.zmq_context.socket(zmq.SUB)

# Replace the endpoint with our device URI if needed
endpoint = selected_tap.endpoint
Expand All @@ -88,9 +97,17 @@ def connect(self, name: str) -> bool:
endpoint = f"{protocol}://{self.uri.split(':')[0]}:{port}"

try:
print(f"Connecting to tap '{name}' at {endpoint}")
self.zmq_socket.connect(endpoint)
self.zmq_socket.setsockopt(zmq.SUBSCRIBE, b"") # Subscribe to all messages

# Give the socket a chance to connect
self.logger.info("Waiting for socket to connect...")
time.sleep(1)

# Only set subscription options for subscriber sockets
if selected_tap.tap_type != TapType.TAP_TYPE_CONSUMER:
self.zmq_socket.setsockopt(zmq.SUBSCRIBE, b"")
print("Subscribed to all messages")

return True
except zmq.ZMQError as e:
self.logger.error(f"Failed to connect to tap: {e}")
Expand Down Expand Up @@ -123,6 +140,33 @@ def read(self, timeout_ms: int = 1000) -> Optional[bytes]:
self.logger.error(f"Error receiving message: {e}")
return None

def send(self, data: bytes) -> bool:
"""Send raw data to the tap (only works for consumer taps with PUB socket).

Args:
data (bytes): Raw message data to send.

Returns:
bool: True if sent successfully, False otherwise.
"""
if not self.zmq_socket:
self.logger.error("Not connected to any tap")
return False

if (
not self.connected_tap
or self.connected_tap.tap_type != TapType.TAP_TYPE_CONSUMER
):
self.logger.error("Send is only available for consumer taps")
return False

try:
self.zmq_socket.send(data)
return True
except zmq.ZMQError as e:
self.logger.error(f"Error sending message: {e}")
return False

def stream(self, timeout_ms: int = 1000) -> Generator[bytes, None, None]:
"""Stream raw data from the tap.

Expand Down
2 changes: 1 addition & 1 deletion synapse/server/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from synapse.api.logging_pb2 import LogLevel, LogQueryResponse
from synapse.api.query_pb2 import QueryResponse
from synapse.api.status_pb2 import DeviceState, Status, StatusCode
from synapse.api.synapse_pb2 import DeviceConfiguration, DeviceInfo
from synapse.api.device_pb2 import DeviceConfiguration, DeviceInfo
from synapse.api.synapse_pb2_grpc import (
SynapseDeviceServicer,
add_SynapseDeviceServicer_to_server,
Expand Down
4 changes: 1 addition & 3 deletions synapse/tests/blink_ostim.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import time

import cv2
import numpy as np

from synapse.device import Device
from synapse.config import Config
from synapse.nodes.stream_in import StreamIn
from synapse.api.synapse_pb2 import DeviceConfiguration
from synapse.api.device_pb2 import DeviceConfiguration
from google.protobuf.json_format import Parse

addr = "localhost:647"
Expand Down
Loading