From 47ffcac52075c49dadfbe8096aa42bbdd81a9189 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Wed, 23 Apr 2025 11:43:52 -0700 Subject: [PATCH 1/8] feat: Controller node example --- controller_config.json | 31 +++++++++++++ synapse-api | 2 +- synapse/client/controller_recv_demo.py | 63 ++++++++++++++++++++++++++ synapse/client/nodes/__init__.py | 2 + synapse/client/nodes/controller.py | 26 +++++++++++ 5 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 controller_config.json create mode 100644 synapse/client/controller_recv_demo.py create mode 100644 synapse/client/nodes/controller.py diff --git a/controller_config.json b/controller_config.json new file mode 100644 index 0000000..a15176a --- /dev/null +++ b/controller_config.json @@ -0,0 +1,31 @@ +{ + "nodes": [ + { + "type": "kBroadbandSource", + "id": 1, + "broadbandSource": { + "peripheral_id": 100, + "sample_rate_hz": 32000, + "bit_width": 12, + "signal": { + "electrode": { + "channels": [{"id": 0, "electrode_id": 122, "reference_id": 513}, {"id": 1, "electrode_id": 126, "reference_id": 513}, {"id": 2, "electrode_id": 116, "reference_id": 513}, {"id": 3, "electrode_id": 120, "reference_id": 513}, {"id": 4, "electrode_id": 110, "reference_id": 513}, {"id": 5, "electrode_id": 114, "reference_id": 513}, {"id": 6, "electrode_id": 104, "reference_id": 513}, {"id": 7, "electrode_id": 108, "reference_id": 513}, {"id": 8, "electrode_id": 98, "reference_id": 513}, {"id": 9, "electrode_id": 66, "reference_id": 513}, {"id": 10, "electrode_id": 92, "reference_id": 513}, {"id": 11, "electrode_id": 60, "reference_id": 513}, {"id": 12, "electrode_id": 86, "reference_id": 513}, {"id": 13, "electrode_id": 54, "reference_id": 513}, {"id": 14, "electrode_id": 80, "reference_id": 513}, {"id": 15, "electrode_id": 48, "reference_id": 513}, {"id": 16, "electrode_id": 74, "reference_id": 513}, {"id": 17, "electrode_id": 38, "reference_id": 513}, {"id": 18, "electrode_id": 68, "reference_id": 513}, {"id": 19, "electrode_id": 36, "reference_id": 513}, {"id": 20, "electrode_id": 62, "reference_id": 513}, {"id": 21, "electrode_id": 0, "reference_id": 512}, {"id": 22, "electrode_id": 56, "reference_id": 513}, {"id": 23, "electrode_id": 4, "reference_id": 512}, {"id": 24, "electrode_id": 50, "reference_id": 513}, {"id": 25, "electrode_id": 12, "reference_id": 513}, {"id": 26, "electrode_id": 44, "reference_id": 513}, {"id": 27, "electrode_id": 14, "reference_id": 513}, {"id": 28, "electrode_id": 42, "reference_id": 513}, {"id": 29, "electrode_id": 20, "reference_id": 513}, {"id": 30, "electrode_id": 32, "reference_id": 513}, {"id": 31, "electrode_id": 26, "reference_id": 513}], + "low_cutoff_hz": 57, + "high_cutoff_hz": 13489 + } + } + } + }, + { + "type": "kController", + "id": 2, + "controller": {} + } + ], + "connections": [ + { + "src_node_id": 1, + "dst_node_id": 2 + } + ] + } diff --git a/synapse-api b/synapse-api index c37bed6..fcaed3a 160000 --- a/synapse-api +++ b/synapse-api @@ -1 +1 @@ -Subproject commit c37bed603e95dad82d20879f5f137c3828042220 +Subproject commit fcaed3ab4a2f3795198fa87c868e1a62689f740d diff --git a/synapse/client/controller_recv_demo.py b/synapse/client/controller_recv_demo.py new file mode 100644 index 0000000..61748ba --- /dev/null +++ b/synapse/client/controller_recv_demo.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +import zmq +from google.protobuf.json_format import MessageToJson +import time + +from synapse.api.datatype_pb2 import Tensor + + +def main(): + # Create ZMQ context and subscriber socket + context = zmq.Context() + subscriber = context.socket(zmq.SUB) + + # Connect to the publisher + server_ip = "10.40.62.101" + endpoint = f"tcp://{server_ip}:54878" + subscriber.connect(endpoint) + + # Subscribe to the topic + topic = "controller/output" + subscriber.setsockopt_string(zmq.SUBSCRIBE, topic) + + print(f"Connected to {endpoint}") + print(f"Subscribed to topic: {topic}") + + try: + while True: + # Receive topic + print("Waiting") + topic_message = subscriber.recv() + print(topic_message) + # Receive tensor data (protobuf serialized) + tensor_data = subscriber.recv() + + # Parse the tensor data + tensor = Tensor() + tensor.ParseFromString(tensor_data) + + # Extract joystick values + if len(tensor.values) >= 2: + joystick_x = tensor.values[0] + joystick_y = tensor.values[1] + + timestamp_ns = tensor.timestamp_ns + timestamp_s = timestamp_ns / 1e9 # Convert to seconds + + print(f"Received at {time.time():.6f}, message time: {timestamp_s:.6f}") + print(f"Joystick X: {joystick_x:.4f}, Y: {joystick_y:.4f}") + else: + print("Received tensor with unexpected format") + print(MessageToJson(tensor)) + + except KeyboardInterrupt: + print("Subscriber stopped by user") + finally: + # Clean up + subscriber.close() + context.term() + print("Subscriber closed") + + +if __name__ == "__main__": + main() diff --git a/synapse/client/nodes/__init__.py b/synapse/client/nodes/__init__.py index dfafeb2..01075e6 100644 --- a/synapse/client/nodes/__init__.py +++ b/synapse/client/nodes/__init__.py @@ -8,6 +8,7 @@ from synapse.client.nodes.stream_in import StreamIn from synapse.client.nodes.stream_out import StreamOut from synapse.client.nodes.disk_writer import DiskWriter +from synapse.client.nodes.controller import Controller from synapse.api.node_pb2 import NodeType @@ -22,4 +23,5 @@ NodeType.kSpikeSource: SpikeSource, NodeType.kStreamIn: StreamIn, NodeType.kStreamOut: StreamOut, + NodeType.kController: Controller, } diff --git a/synapse/client/nodes/controller.py b/synapse/client/nodes/controller.py new file mode 100644 index 0000000..93b60c7 --- /dev/null +++ b/synapse/client/nodes/controller.py @@ -0,0 +1,26 @@ +from typing import Optional +from synapse.api.node_pb2 import NodeConfig, NodeType +from synapse.client.node import Node +from synapse.api.nodes.controller_pb2 import ControllerNodeConfig + + +class Controller(Node): + type = NodeType.kController + + def __init__(self): + pass + + def _to_proto(self): + n = NodeConfig() + p = ControllerNodeConfig() + n.controller.CopyFrom(p) + return n + + @staticmethod + def _from_proto(proto: Optional[ControllerNodeConfig]): + if not proto: + raise ValueError("parameter 'proto' is missing") + if not isinstance(proto, ControllerNodeConfig): + raise ValueError("proto is not of type SpikeBinnerConfig") + + return Controller() From bd2649ffcd043c7b5b6ef8881e780c0736ecaea9 Mon Sep 17 00:00:00 2001 From: gilbert Date: Wed, 23 Apr 2025 15:37:39 -0700 Subject: [PATCH 2/8] feature: Support streaming device queries (#95) * Better displays and plots * added logs while self test is running in verbose mode * added logs while self test is running in verbose mode * Added cli * Added better options for device log * updated the submodule * Updated synapse api * Updated api to main * Feedback from review * increase sleep * Updated from feedback * Feedback from review * Fixes during streaming --- .github/workflows/main.yml | 2 +- synapse/cli/__main__.py | 7 +- synapse/cli/query.py | 321 +++++++++++++++++++++++++++++++++++++ synapse/cli/rpc.py | 23 ++- synapse/client/device.py | 30 +++- test_query.json | 2 +- 6 files changed, 375 insertions(+), 10 deletions(-) create mode 100644 synapse/cli/query.py diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6866505..e4d2524 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -58,7 +58,7 @@ jobs: synapse-sim --iface-ip 127.0.0.1 --rpc-port 50051 & - sleep 2 + sleep 5 python synapse/examples/stream_out.py 127.0.0.1:50051 diff --git a/synapse/cli/__main__.py b/synapse/cli/__main__.py index 61e4870..5a7ba7d 100755 --- a/synapse/cli/__main__.py +++ b/synapse/cli/__main__.py @@ -2,6 +2,7 @@ import argparse import logging import ipaddress +import sys from importlib import metadata from synapse.cli import discover, rpc, streaming, offline_plot, files @@ -77,4 +78,8 @@ def main(): if __name__ == "__main__": - main() + try: + main() + except Exception as e: + print(f"Uncaught error in CLI. Why: {e}") + sys.exit(1) diff --git a/synapse/cli/query.py b/synapse/cli/query.py new file mode 100644 index 0000000..b26a11a --- /dev/null +++ b/synapse/cli/query.py @@ -0,0 +1,321 @@ +#!/usr/bin/env python3 +import asyncio +import csv +from threading import Thread +import time +import sys +import synapse as syn +from synapse.api.query_pb2 import QueryRequest, StreamQueryRequest +from google.protobuf.json_format import Parse + +from rich.progress import ( + Progress, + SpinnerColumn, + TextColumn, + BarColumn, + TimeElapsedColumn, +) +from rich.table import Table +from rich.console import Console, Group +from rich.live import Live +from rich.panel import Panel + + +class StreamingQueryClient: + def __init__(self, uri, verbose=False): + self.uri = uri + self.verbose = verbose + self.console = Console() + + self.device = syn.Device(self.uri, self.verbose) + if self.verbose: + info = self.device.info() + self.console.log(info) + + # We tail the logs in the background with verbose set + self.log_stop_event = asyncio.Event() + self.new_log_event = asyncio.Event() + self.log_thread = Thread(target=self.tail_logs_background, daemon=True) + self.log_thread.start() + + def close(self): + if self.log_thread.is_alive(): + self.log_stop_event.set() + self.log_thread.join() + + def tail_logs_background(self): + self.last_log_line = "" + for log in self.device.tail_logs(): + if self.last_log_line != log.message: + self.last_log_line = log.message + self.new_log_event.set() + if self.log_stop_event.is_set(): + break + + def stream_query(self, request): + query_type = request.request.query_type + try: + if query_type == QueryRequest.kImpedance: + return self.handle_impedance_stream(request) + elif query_type == QueryRequest.kSelfTest: + return self.handle_self_test_stream(request) + else: + self.console.log(f"[bold red]Unknown stream request: {query_type}") + return False + except Exception as e: + self.console.log(f"[bold red] Uncaught exception during stream: {e}") + return False + except KeyboardInterrupt: + self.console.log("[yellow] Operation cancelled by user") + return False + + def handle_self_test_stream(self, request): + query = request.request.self_test_query + if not query: + self.console.log("[bold red] Invalid query for self test stream") + return False + + self.console.log("[cyan] Starting self test stream") + + all_responses = [] + + with self.console.status( + "Running Self Test", spinner="bouncingBall", spinner_style="green" + ) as status: + # If we are verbose, we want to show the latest log + stop_tailing_logs = asyncio.Event() + + def update_status(): + while not stop_tailing_logs.is_set(): + if self.new_log_event.is_set(): + status.update(self.last_log_line) + self.new_log_event.clear() + + status_thread = None + if self.verbose: + status_thread = Thread(target=update_status, daemon=True) + status_thread.start() + + for response in self.device.stream_query(request): + if not response: + self.console.log("Stream is complete") + break + + if response.code != 0 or not response.self_test: + self.console.log( + f"[bold red] Failed self test, why: {response.message}" + ) + return False + + all_responses.append(response.self_test) + + if status_thread: + stop_tailing_logs.set() + status_thread.join() + + if not all_responses: + return False + + table = Table(title="Self Test Results") + table.add_column("Test", justify="right") + table.add_column("Passed?", justify="right") + table.add_column("Report", justify="right") + + for response in all_responses: + for test in response.tests: + if test.passed: + table.add_row( + test.test_name, "[green]Passed[/green]", test.test_report + ) + else: + table.add_row(test.test_name, "[red]Failed[/red]", test.test_report) + + self.console.print(table) + return True + + def handle_impedance_stream(self, request): + query = request.request.impedance_query + if not query: + self.console.log("[bold red] Invalid query for impedance stream") + return False + + electrode_count = len(query.electrode_ids) + if electrode_count <= 0: + self.console.log("[bold red] No electrodes to query") + return False + + self.console.log( + f"[cyan] Starting impedance_stream with {electrode_count} electordes" + ) + + measurements_received = 0 + all_measurements = [] + failed_measurements = [] + + # Create a CSV file to read from at the beginning + filename = f"impedance_measurements_{time.strftime('%Y%m%d-%H%M%S')}.csv" + with open(filename, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["Electrode ID", "Magnitude", "Phase"]) + self.console.print(f"[green] Started saving measurements to {filename}") + + progress = Progress( + SpinnerColumn(), + TextColumn("[bold cyan] Processing impedance measurements [/bold cyan]"), + BarColumn(), + TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + TextColumn("[cyan]({task.completed}/{task.total})[/cyan]"), + TimeElapsedColumn(), + ) + + def get_renderable(): + if self.verbose: + return Group( + Panel( + self.last_log_line, + title="Latest Device Log", + border_style="cyan", + ), + progress, + ) + else: + return progress + + with Live(get_renderable(), refresh_per_second=10) as live: + task = progress.add_task("Measuring impedance", total=electrode_count) + + # If we are verbose, we want to show the latest log + stop_tailing_logs = asyncio.Event() + + def update_progress(): + while not stop_tailing_logs.is_set(): + if self.new_log_event.is_set(): + live.update(get_renderable()) + self.new_log_event.clear() + + progress_thread = None + if self.verbose: + progress_thread = Thread(target=update_progress, daemon=True) + progress_thread.start() + + for response in self.device.stream_query(request): + if not response: + self.console.log("Stream is complete") + break + + # Check if this failed + if response.code != 0 or not response.impedance: + failed_batch = response.impedance.measurements + failed_measurements.extend(failed_batch) + + failed_ids = [m.electrode_id for m in failed_batch] + progress.console.log( + f"Failed to measure impedance for {failed_ids}, why: {response.message}" + ) + for sample in failed_batch: + progress.console.log( + f"electrode id (mag, phase): {sample.electrode_id}\t {sample.magnitude},{sample.phase}" + ) + measurements_received += len(failed_batch) + progress.update( + task, completed=min(measurements_received, electrode_count) + ) + continue + + measurement_batch = response.impedance.measurements + + # Figure out how many we processed in this batch + measurements_received += len(measurement_batch) + progress.update( + task, completed=min(measurements_received, electrode_count) + ) + + # Add these to our batch + all_measurements.extend(measurement_batch) + self.save_measurement_batch(filename, measurement_batch) + + if self.verbose: + for measurement in measurement_batch: + progress.console.log( + f"Electrode {measurement.electrode_id}: {measurement.magnitude}Ω" + ) + + if progress_thread: + stop_tailing_logs.set() + progress_thread.join() + + if all_measurements: + self.display_impedance_results(all_measurements) + else: + self.console.log("[bold red] All impedance measurements failed") + + if failed_measurements: + failed_ids = [m.electrode_id for m in failed_measurements] + self.console.log(f"[bold red]Failed impedance electrodes\n{failed_ids}") + return True + + def display_impedance_results(self, measurements): + table = Table(title="Impedance Measurements") + table.add_column("Electorde ID", justify="right") + table.add_column("Magnitude (kΩ)", justify="right") + table.add_column("Phase (°)", justify="right") + + for measurement in measurements: + table.add_row( + str(measurement.electrode_id), + f"{measurement.magnitude:.2f}", + f"{measurement.phase:.2f}", + ) + self.console.print(table) + + def save_measurement_batch(self, filename, measurements): + # Save a batch of measurements as they come in + with open(filename, "a", newline="") as f: + writer = csv.writer(f) + for measurement in measurements: + writer.writerow( + [measurement.electrode_id, measurement.magnitude, measurement.phase] + ) + + +def load_config_from_file(path_to_config): + try: + with open(path_to_config, "r") as f: + data = f.read() + proto = Parse(data, QueryRequest()) + return proto + except Exception: + print(f"Failed to open {path_to_config}") + return None + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description="Stream Query Client Test") + parser.add_argument("--uri", default="localhost:50051", help="Synapse server URI") + parser.add_argument("--verbose", action="store_true", help="Use verbose output") + parser.add_argument( + "--config", + type=str, + help="Path to the QueryRequest configuration, in JSON format", + required=True, + ) + + args = parser.parse_args() + + config_path = args.config + request_config = load_config_from_file(config_path) + if not request_config: + sys.exit(1) + + client = StreamingQueryClient(args.uri, args.verbose) + request = StreamQueryRequest(request=request_config) + + try: + if not client.stream_query(request): + print("Failed to stream query for device") + sys.exit(1) + except Exception as e: + print(f"Failed to stream query. Why: {e}") + sys.exit(1) diff --git a/synapse/cli/rpc.py b/synapse/cli/rpc.py index 04731c0..f955c5d 100644 --- a/synapse/cli/rpc.py +++ b/synapse/cli/rpc.py @@ -5,7 +5,7 @@ import synapse as syn from synapse.api.synapse_pb2 import DeviceConfiguration -from synapse.api.query_pb2 import QueryRequest, QueryResponse +from synapse.api.query_pb2 import QueryRequest, QueryResponse, StreamQueryRequest from synapse.api.status_pb2 import StatusCode from google.protobuf import text_format @@ -14,6 +14,7 @@ from rich.console import Console from rich.pretty import pprint +from synapse.cli.query import StreamingQueryClient from synapse.utils.log import log_entry_to_str @@ -23,6 +24,9 @@ def add_commands(subparsers): b = subparsers.add_parser("query", help="Execute a query on the device") b.add_argument("query_file", type=str) + b.add_argument("--verbose", "-v", action="store_true", help="Verbose output") + b.add_argument("--stream", "-s", action="store_true", help="Stream the output") + b.set_defaults(func=query) c = subparsers.add_parser("start", help="Start the device") @@ -89,6 +93,23 @@ def info(args): def query(args): + def load_query_request(path_to_config): + try: + with open(path_to_config, "r") as f: + data = f.read() + proto = Parse(data, QueryRequest()) + return proto + except Exception: + print(f"Failed to open {path_to_config}") + return None + + if args.stream: + client = StreamingQueryClient(args.uri, args.verbose) + query_proto = load_query_request(args.query_file) + if not query_proto: + return False + return client.stream_query(StreamQueryRequest(request=query_proto)) + if Path(args.query_file).suffix != ".json": print("Query file must be a JSON file") return False diff --git a/synapse/client/device.py b/synapse/client/device.py index 0760f37..56fdaf9 100644 --- a/synapse/client/device.py +++ b/synapse/client/device.py @@ -4,7 +4,13 @@ import logging from datetime import datetime -from synapse.api.logging_pb2 import LogQueryResponse, LogQueryRequest, LogLevel, TailLogsRequest +from synapse.api.logging_pb2 import ( + LogQueryResponse, + LogQueryRequest, + LogLevel, + TailLogsRequest, +) +from synapse.api.query_pb2 import StreamQueryRequest, StreamQueryResponse from synapse.api.status_pb2 import StatusCode, Status from synapse.api.synapse_pb2_grpc import SynapseDeviceStub from synapse.client.config import Config @@ -95,13 +101,13 @@ def configure_with_status(self, config: Config) -> Status: assert isinstance(config, Config), "config must be an instance of Config" config.set_device(self) - try: + try: response = self.rpc.Configure(config.to_proto()) return response except grpc.RpcError as e: self.logger.error("Error: %s", e.details()) return None - + def get_name(self) -> Optional[str]: info = self.info() return info.name if info else None @@ -111,7 +117,7 @@ def get_logs( log_level: Union[str, LogLevel] = "INFO", since_ms: Optional[int] = None, start_time: Optional[datetime] = None, - end_time: Optional[datetime] = None + end_time: Optional[datetime] = None, ) -> Optional[LogQueryResponse]: try: request = LogQueryRequest() @@ -136,7 +142,7 @@ def get_logs_with_status( log_level: Union[str, LogLevel] = LogLevel.LOG_LEVEL_INFO, since_ms: Optional[int] = None, start_time: Optional[datetime] = None, - end_time: Optional[datetime] = None + end_time: Optional[datetime] = None, ) -> Optional[Status]: try: request = LogQueryRequest() @@ -155,7 +161,9 @@ def get_logs_with_status( self.logger.error("Error: %s", e.details()) return None - def tail_logs(self, log_level: Union[str, LogLevel] = LogLevel.LOG_LEVEL_INFO) -> AsyncGenerator[LogQueryResponse, None]: + def tail_logs( + self, log_level: Union[str, LogLevel] = LogLevel.LOG_LEVEL_INFO + ) -> AsyncGenerator[LogQueryResponse, None]: try: request = TailLogsRequest() request.min_level = log_level_to_pb(log_level) @@ -164,6 +172,16 @@ def tail_logs(self, log_level: Union[str, LogLevel] = LogLevel.LOG_LEVEL_INFO) - self.logger.error("Error: %s", e.details()) return None + def stream_query( + self, stream_request: StreamQueryRequest + ) -> AsyncGenerator[StreamQueryResponse, None]: + try: + for response in self.rpc.StreamQuery(stream_request): + yield response + except Exception as e: + self.logger.error(f"Error during StreamQuery: {str(e)}") + yield StreamQueryResponse(code=StatusCode.kQueryFailed) + def _handle_status_response(self, status): if status.code != StatusCode.kOk: self.logger.error("Error %d: %s", status.code, status.message) diff --git a/test_query.json b/test_query.json index 2dcad13..8b2ea1d 100644 --- a/test_query.json +++ b/test_query.json @@ -1,6 +1,6 @@ { "query_type": "kImpedance", "impedance_query": { - "electrode_ids": [1] + "electrode_ids": [116, 118] } } From 6e9c9dffd3ff0b745244331b6deabd3f46abc0df Mon Sep 17 00:00:00 2001 From: antonia Date: Thu, 24 Apr 2025 11:54:06 -0700 Subject: [PATCH 3/8] feat: add `get_last_sync_time_ns` to TimeSyncClient (#102) * fix: fix issue where extra sync packets would be sent * chore: more robust estimate calculation, add dispersion calculation --- synapse/utils/time_sync.py | 176 +++++++++++++++++++++++++++---------- 1 file changed, 130 insertions(+), 46 deletions(-) diff --git a/synapse/utils/time_sync.py b/synapse/utils/time_sync.py index c060304..ae997f3 100644 --- a/synapse/utils/time_sync.py +++ b/synapse/utils/time_sync.py @@ -5,7 +5,7 @@ import socket import threading import time -from typing import Union +from typing import Tuple, Union, List from synapse.api.time_pb2 import TimeSyncPacket @@ -23,6 +23,26 @@ class TimeSyncEstimate: offset_ns = 0 +def calculate_root_dispersion(samples: List[TimeSyncEstimate], best_offset_ns: int) -> Union[int, None]: + if len(samples) == 0: + return 0 + + min_rtt_sample = None + for sample in samples: + if sample.rtt_ns <= 0: + continue + if min_rtt_sample is None or sample.rtt_ns < min_rtt_sample.rtt_ns: + min_rtt_sample = sample + + if min_rtt_sample is None: + return None + + squared_deviations = [(sample.offset_ns - best_offset_ns) ** 2 for sample in samples] + std_dev_ns = int((sum(squared_deviations) / len(samples)) ** 0.5) + + root_dispersion_ns = (min_rtt_sample.rtt_ns // 2) + (2 * std_dev_ns) + return root_dispersion_ns + def get_time_sync_estimate(packet: TimeSyncPacket) -> TimeSyncEstimate: server_calculation_time_ns = packet.server_send_time_ns - packet.server_receive_time_ns @@ -39,25 +59,51 @@ def get_time_sync_estimate(packet: TimeSyncPacket) -> TimeSyncEstimate: estimate.offset_ns = calculated_offset_ns return estimate +class OffsetEstimator: + def __init__(self, window_size: int = 120): + self._window_size = window_size + self._best_offset_ns = 0 + self._samples: List[TimeSyncEstimate] = [] + + def add_sample(self, estimate: TimeSyncEstimate): + self._samples.append(estimate) + if len(self._samples) > self._window_size: + self._samples.pop(0) + + self._update() + + def get_offset_ns(self) -> int: + return self._best_offset_ns + + def root_dispersion_ns(self) -> Union[int, None]: + return calculate_root_dispersion(self._samples, self._best_offset_ns) + + def _update(self): + if len(self._samples) == 0: + return + + sorted_samples = sorted(self._samples, key=lambda x: x.rtt_ns) + self._best_offset_ns = sorted_samples[0].offset_ns + class TimeSyncClient: def __init__(self, host: str, port: int, config: TimeSyncConfig = TimeSyncConfig(), logger: Union[logging.Logger, None] = None): - self.client_id = self.generate_client_id() + self.sequence_number = 0 + self.client_id = self._generate_client_id() self.host = host self.port = port self.running = False - self.sequence_number = 0 self.config = config or TimeSyncConfig() + self.offset_estimator = OffsetEstimator() self.current_rtts = [TimeSyncEstimate() for _ in range(self.config.max_sync_packets)] self.latest_offset_ns = 0 + self.last_sync_time_ns = (0, 0) self.socket = None - self.sync_thread = None - self.receive_thread = None - + self.worker_thread = None self.logger = logging.getLogger("time-sync") if logger is None else logger - - self.logger.debug(f"TimeSyncClient initialized with client_id: {self.client_id}") - def generate_client_id(self) -> int: + self.logger.debug(f"TimeSyncClient initialized with client_id: {self.client_id} and host: {self.host} and port: {self.port}") + + def _generate_client_id(self) -> int: return random.randint(0, 2**32 - 1) def start(self) -> bool: @@ -68,17 +114,13 @@ def start(self) -> bool: self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.socket.bind(('0.0.0.0', 0)) self.socket.connect((self.host, self.port)) + self.socket.settimeout(self.config.timeout_s) - self.receive_thread = threading.Thread(target=self._receive_loop) - self.receive_thread.daemon = True - self.receive_thread.start() - - self.sync_thread = threading.Thread(target=self._sync_loop) - self.sync_thread.daemon = True - self.sync_thread.start() + self.worker_thread = threading.Thread(target=self._worker_thread) + self.worker_thread.daemon = True + self.worker_thread.start() - self.logger.info(f"TimeSyncClient started with client_id: {self.client_id} and host: {self.host} and port: {self.port}") - + self.logger.info("TimeSyncClient started") return True def stop(self): @@ -89,33 +131,47 @@ def stop(self): if self.socket: self.socket.close() - if self.sync_thread: - self.sync_thread.join(timeout=1.0) - if self.receive_thread: - self.receive_thread.join(timeout=1.0) + if self.worker_thread: + self.worker_thread.join(timeout=1.0) - def _sync_loop(self): - while self.running: + def _worker_thread(self): + try: self._send_next_sync_packet() + while self.running: + try: + data, _ = self.socket.recvfrom(self.config.max_packet_size) + self._handle_response(data) + except socket.timeout: + self.logger.debug("Timeout waiting for response") + self._schedule_next_sync() + except socket.error as e: + if self.running: + self.logger.error(f"Socket error: {e}") + self._schedule_next_sync() + except Exception as e: + self.logger.error(f"Worker thread error: {e}") + self.running = False - def _receive_loop(self): - while self.running: - try: - data, _ = self.socket.recvfrom(self.config.max_packet_size) - self.handle_response(data) - except (socket.error, Exception) as e: - if self.running: - self.logger.error(f"Error receiving data: {e}") + def _schedule_next_sync(self): + if self.sequence_number >= self.config.max_sync_packets - 1: + self._update_estimate() + self.logger.debug(f"Synced with {self.config.max_sync_packets} packets, updating estimate - current offset: {self.latest_offset_ns} ns") + time.sleep(self.config.sync_interval_s) + if self.running: + self._send_next_sync_packet() + + else: + time.sleep(self.config.send_delay_ms / 1000.0) + if self.running: + self.sequence_number += 1 + self._send_next_sync_packet() def _send_next_sync_packet(self): - if self.sequence_number >= self.config.max_sync_packets: - self.update_estimate() - self.logger.info(f"Synced with {self.config.max_sync_packets} packets, updating estimate - current offset: {self.latest_offset_ns} ns") - time.sleep(self.config.sync_interval_s) + if not self.running: return if self.sequence_number == 0: - self.logger.info(f"Sending sync packets...") + self.logger.debug("Sending sync packets...") request = TimeSyncPacket() request.client_id = self.client_id @@ -123,44 +179,72 @@ def _send_next_sync_packet(self): request.client_send_time_ns = int(time.time_ns()) try: + self.logger.debug(f"Sending sync packet {self.sequence_number} / {self.config.max_sync_packets}") self.socket.send(request.SerializeToString()) except Exception as e: self.logger.error(f"Error sending packet: {e}") - finally: - time.sleep(self.config.send_delay_ms / 1000.0) + self._schedule_next_sync() - def handle_response(self, data: bytes): + def _handle_response(self, data: bytes): now_ns = time.time_ns() + response = None try: response = TimeSyncPacket() response.ParseFromString(data) response.client_receive_time_ns = now_ns if response.client_id != self.client_id: + self.logger.warning(f"Received sync packet from {response.client_id}, but expected {self.client_id}") return estimate = get_time_sync_estimate(response) + + if self.sequence_number >= self.config.max_sync_packets: + self.logger.warning(f"Received sync packet {self.sequence_number} / {self.config.max_sync_packets}, but max is {self.config.max_sync_packets}") + return + self.current_rtts[self.sequence_number] = estimate - self.sequence_number += 1 + + self.last_sync_time_ns = (time.time_ns(), self.time_ns()) except Exception as e: self.logger.error(f"Error processing response: {e}") + return + + finally: + self._schedule_next_sync() - def update_estimate(self): + def _update_estimate(self): if not self.current_rtts: return - best_estimate = min(self.current_rtts[:self.sequence_number], - key=lambda x: x.rtt_ns) - - self.latest_offset_ns = best_estimate.offset_ns + best_estimate = None + for estimate in self.current_rtts[:min(self.sequence_number + 1, len(self.current_rtts))]: + if estimate.rtt_ns <= 0: + continue + if best_estimate is None or estimate.rtt_ns < best_estimate.rtt_ns: + best_estimate = estimate + + if best_estimate is not None and best_estimate.rtt_ns > 0: + self.offset_estimator.add_sample(best_estimate) + self.latest_offset_ns = self.offset_estimator.get_offset_ns() + root_dispersion_ns = self.offset_estimator.root_dispersion_ns() + + self.logger.debug(f"Updated estimate - current offset: {self.latest_offset_ns / 1e6} ms, dispersion: {root_dispersion_ns / 1e6 if root_dispersion_ns is not None else 'N/A'} ms") + self.sequence_number = 0 self.current_rtts = [TimeSyncEstimate() for _ in range(self.config.max_sync_packets)] def get_offset_ns(self) -> int: return self.latest_offset_ns + def get_last_sync_time_ns(self) -> Tuple[int, int]: + """ + Returns a tuple of the last sync time in ns as [client's clock from time.time_ns(), synced clock from self.time_ns()] + """ + return self.last_sync_time_ns + def time_ns(self) -> int: return time.time_ns() + self.latest_offset_ns From dde8ab4efa525298e33c017c66b8fcac331328cf Mon Sep 17 00:00:00 2001 From: Antonia Elsen Date: Thu, 24 Apr 2025 12:13:02 -0700 Subject: [PATCH 4/8] chore(release): 2.1.0 --- CHANGELOG.md | 8 ++++++++ setup.py | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9cfa9bd..5d0fb16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,14 @@ All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines. +## [2.1.0](https://github.com/sciencecorp/synapse-python/compare/v2.0.0...v2.1.0) (2025-04-24) + + +### Features + +* add `get_last_sync_time_ns` to TimeSyncClient ([#102](https://github.com/sciencecorp/synapse-python/issues/102)) ([90a7ffc](https://github.com/sciencecorp/synapse-python/commit/90a7ffcb4e889d79a0d2fbe1127a6f2fa2186f82)) +* support streaming device queries ([#95](https://github.com/sciencecorp/synapse-python/issues/95)) ([e45e5f5](https://github.com/sciencecorp/synapse-python/commit/e45e5f5d9808445ec34acd0caee86ab5440039e6)) + ## [2.0.0](https://github.com/sciencecorp/synapse-python/compare/v1.0.2...v2.0.0) (2025-04-21) diff --git a/setup.py b/setup.py index 983fd89..d3c53da 100644 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ setup( name="science-synapse", - version="2.0.0", + version="2.1.0", description="Client library and CLI for the Synapse API", author="Science Team", author_email="team@science.xyz", From f8f1274ef0c2e5ede6f00e77b9bd4ce8c3f070a6 Mon Sep 17 00:00:00 2001 From: antonia Date: Fri, 25 Apr 2025 10:08:26 -0700 Subject: [PATCH 5/8] feat: populate synapse_api_version DeviceInfo field in server (#94) --- setup.sh | 7 +++++++ synapse/server/rpc.py | 20 +++++++++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/setup.sh b/setup.sh index b4c6adf..5421239 100755 --- a/setup.sh +++ b/setup.sh @@ -58,6 +58,11 @@ generate() { generate_protos "${PROTO_DIR_SYNAPSE_API}" "${PROTO_OUT_SYNAPSE_API}" "api/synapse.proto" } +generate_version() { + echo "Generating version file..." + git -C ${PROTO_DIR_SYNAPSE_API} describe --tags --abbrev=0 | sed 's/^v//' > ${PROTO_OUT_SYNAPSE_API}/api/version.txt || echo "0.0.0" > ${PROTO_OUT_SYNAPSE_API}/api/version.txt +} + run_tests() { echo "Running tests..." pytest -v @@ -70,6 +75,7 @@ case "$1" in ;; "generate") generate + generate_version ;; "test") run_tests @@ -77,6 +83,7 @@ case "$1" in "all") clean generate + generate_version ;; *) echo "Usage: $0 {clean|generate|test|all}" diff --git a/synapse/server/rpc.py b/synapse/server/rpc.py index 3d426db..c7a7b6d 100644 --- a/synapse/server/rpc.py +++ b/synapse/server/rpc.py @@ -24,6 +24,13 @@ LOG_FILEPATH = str(Path.home() / ".science" / "synapse" / "logs" / "server.log") +def _read_api_version(): + try: + with open(str(Path(__file__).parent.parent / "api" / "version.txt")) as f: + return f.read().strip() + except (FileNotFoundError, IOError): + return None + async def serve( server_name, @@ -70,6 +77,8 @@ def __init__(self, name, serial, iface_ip, node_object_map, peripherals): logging.getLogger().addHandler(self.stream_handler) init_file_handler(self.logger, LOG_FILEPATH) + self.synapse_api_version = _read_api_version() + async def Info(self, request, context): self.logger.info("Info()") connections = [ @@ -79,7 +88,7 @@ async def Info(self, request, context): return DeviceInfo( name=self.name, serial=self.serial, - synapse_version=10, + synapse_version=self._synapse_api_version(), firmware_version=1, status=Status( message=None, @@ -372,3 +381,12 @@ async def _stop_streaming(self): def _sockets_status_info(self): return [node.node_socket() for node in self.nodes if node.socket] + + def _synapse_api_version(self): + if self.synapse_api_version is None: + return 0 + try: + major, minor, patch = map(int, self.synapse_api_version.split('.')) + return (major & 0x3FF) << 20 | (minor & 0x3FF) << 10 | (patch & 0x3FF) + except Exception: + return 0 From 22ee64d6a94938acbd2158d21dc2263c807ba498 Mon Sep 17 00:00:00 2001 From: gilbert Date: Mon, 28 Apr 2025 10:49:30 -0700 Subject: [PATCH 6/8] feat: Fix issue where uri is forgotten (#104) * feat: Fix issue where uri is forgotten * feat: Fix issue where uri is forgotten --- synapse/cli/__main__.py | 13 ++++++++++--- synapse/client/device.py | 2 ++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/synapse/cli/__main__.py b/synapse/cli/__main__.py index 5a7ba7d..ad9012a 100755 --- a/synapse/cli/__main__.py +++ b/synapse/cli/__main__.py @@ -71,10 +71,17 @@ def main(): if not args: return - if hasattr(args, "func"): - args.func(args) - else: + try: + if hasattr(args, "func"): + args.func(args) + else: + parser.print_help() + except Exception as e: + console = Console() + console.log(f"[bold red] Uncaught error during function. Why: {e}") parser.print_help() + except KeyboardInterrupt: + print("User cancelled request") if __name__ == "__main__": diff --git a/synapse/client/device.py b/synapse/client/device.py index 56fdaf9..1021fbb 100644 --- a/synapse/client/device.py +++ b/synapse/client/device.py @@ -23,6 +23,8 @@ class Device(object): sockets = None def __init__(self, uri, verbose=False): + if not uri: + raise ValueError("URI cannot be empty or none") if len(uri.split(":")) != 2: self.uri = uri + f":{DEFAULT_SYNAPSE_PORT}" else: From 8bab3cc3b756425a625b12f5d9a967b1343fe73f Mon Sep 17 00:00:00 2001 From: antonia Date: Tue, 29 Apr 2025 15:19:48 -0700 Subject: [PATCH 7/8] fix: fix issue with LogLevel type annotation on certain client device methods (#105) --- synapse/client/device.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/client/device.py b/synapse/client/device.py index 1021fbb..3b292e2 100644 --- a/synapse/client/device.py +++ b/synapse/client/device.py @@ -1,3 +1,4 @@ +from __future__ import annotations from typing import AsyncGenerator, Optional, Union import grpc from google.protobuf.empty_pb2 import Empty From 58cbd4b245f32e46f14a6b085595fa3b4a753bfa Mon Sep 17 00:00:00 2001 From: Antonia Elsen Date: Tue, 29 Apr 2025 18:40:02 -0700 Subject: [PATCH 8/8] chore: update protos --- synapse-api | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse-api b/synapse-api index fcaed3a..ac54f46 160000 --- a/synapse-api +++ b/synapse-api @@ -1 +1 @@ -Subproject commit fcaed3ab4a2f3795198fa87c868e1a62689f740d +Subproject commit ac54f461ada92b6a74510fa6d01ad7d39292a6e8