From 8064e6d05e6925b4810af8ec1e56e76ca87f8cb3 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Fri, 28 Mar 2025 10:17:45 -0700 Subject: [PATCH 01/13] Better displays and plots --- synapse-api | 2 +- synapse/client/device.py | 30 +++- synapse/client/query.py | 290 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 315 insertions(+), 7 deletions(-) create mode 100644 synapse/client/query.py diff --git a/synapse-api b/synapse-api index 22f14a20..84cc8415 160000 --- a/synapse-api +++ b/synapse-api @@ -1 +1 @@ -Subproject commit 22f14a204e007fbbb7695aea84c344b41f5b47dc +Subproject commit 84cc84153354110960a4cb0cf4d3a1bc063c5ed6 diff --git a/synapse/client/device.py b/synapse/client/device.py index 53a26c55..52e05b2e 100644 --- a/synapse/client/device.py +++ b/synapse/client/device.py @@ -4,7 +4,13 @@ import logging from datetime import datetime -from synapse.api.logging_pb2 import LogQueryResponse, LogQueryRequest, LogLevel, TailLogsRequest +from synapse.api.logging_pb2 import ( + LogQueryResponse, + LogQueryRequest, + LogLevel, + TailLogsRequest, +) +from synapse.api.query_pb2 import StreamQueryRequest, StreamQueryResponse from synapse.api.status_pb2 import StatusCode, Status from synapse.api.synapse_pb2_grpc import SynapseDeviceStub from synapse.client.config import Config @@ -95,13 +101,13 @@ def configure_with_status(self, config: Config) -> Status: assert isinstance(config, Config), "config must be an instance of Config" config.set_device(self) - try: + try: response = self.rpc.Configure(config.to_proto()) return response except grpc.RpcError as e: self.logger.error("Error: %s", e.details()) return None - + def get_name(self) -> Optional[str]: info = self.info() return info.name if info else None @@ -111,7 +117,7 @@ def get_logs( log_level: Union[str, LogLevel] = "INFO", since_ms: Optional[int] = None, start_time: Optional[datetime] = None, - end_time: Optional[datetime] = None + end_time: Optional[datetime] = None, ) -> Optional[LogQueryResponse]: try: request = LogQueryRequest() @@ -136,7 +142,7 @@ def get_logs_with_status( log_level: Union[str, LogLevel] = LogLevel.LOG_LEVEL_INFO, since_ms: Optional[int] = None, start_time: Optional[datetime] = None, - end_time: Optional[datetime] = None + end_time: Optional[datetime] = None, ) -> Optional[Status]: try: request = LogQueryRequest() @@ -155,7 +161,9 @@ def get_logs_with_status( self.logger.error("Error: %s", e.details()) return None - def tail_logs(self, log_level: Union[str, LogLevel] = LogLevel.LOG_LEVEL_INFO) -> AsyncGenerator[LogQueryResponse, None]: + def tail_logs( + self, log_level: Union[str, LogLevel] = LogLevel.LOG_LEVEL_INFO + ) -> AsyncGenerator[LogQueryResponse, None]: try: request = TailLogsRequest() request.min_level = log_level_to_pb(log_level) @@ -164,6 +172,16 @@ def tail_logs(self, log_level: Union[str, LogLevel] = LogLevel.LOG_LEVEL_INFO) - self.logger.error("Error: %s", e.details()) return None + def stream_query( + self, stream_request: StreamQueryRequest + ) -> AsyncGenerator[StreamQueryResponse, None]: + try: + for response in self.rpc.StreamQuery(stream_request): + yield response + except Exception as e: + self.logger.error(f"Error during StreamQuery: {str(e)}") + yield StreamQueryResponse(code=StatusCode.kQueryFailed) + def _handle_status_response(self, status): if status.code != StatusCode.kOk: self.logger.error("Error %d: %s", status.code, status.message) diff --git a/synapse/client/query.py b/synapse/client/query.py new file mode 100644 index 00000000..397d22e2 --- /dev/null +++ b/synapse/client/query.py @@ -0,0 +1,290 @@ +#!/usr/bin/env python3 +import csv +import numpy as np +import time +import sys +import synapse as syn +from synapse.api.node_pb2 import NodeType +from synapse.api.query_pb2 import QueryRequest, StreamQueryRequest, SelfTestQuery +from synapse.api.synapse_pb2 import DeviceConfiguration +from google.protobuf.json_format import Parse + +from rich.progress import ( + Progress, + SpinnerColumn, + TextColumn, + BarColumn, + TimeElapsedColumn, +) +from rich.table import Table +from rich.console import Console + +import matplotlib.pyplot as plt + + +class StreamingQueryClient: + def __init__(self, uri, verbose=False, plot=False): + self.uri = uri + self.verbose = verbose + self.plot = plot + self.console = Console() + + self.device = syn.Device(self.uri, self.verbose) + if self.verbose: + info = self.device.info() + self.console.log(info) + + def stream_query(self, request): + query_type = request.request.query_type + if query_type == QueryRequest.kImpedance: + return self.handle_impedance_stream(request) + elif query_type == QueryRequest.kSelfTest: + return self.handle_self_test_stream(request) + else: + self.console.log(f"[bold red]Unknown stream request: {query_type}") + return False + + def handle_self_test_stream(self, request): + query = request.request.self_test_query + if not query: + self.console.log("[bold red] Invalid query for self test stream") + return False + + self.console.log("[cyan] Starting self test stream") + + all_responses = [] + with self.console.status( + "Running Self Test", spinner="bouncingBall", spinner_style="green" + ): + for response in self.device.stream_query(request): + if not response: + self.console.log("Stream is complete") + break + + if response.code != 0 or not response.self_test: + self.console.log( + f"[bold red] Failed self test, why: {response.message}" + ) + return False + + all_responses.append(response.self_test) + + if not all_responses: + return False + + table = Table(title="Self Test Results") + table.add_column("Test", justify="right") + table.add_column("Passed?", justify="right") + table.add_column("Report", justify="right") + + for response in all_responses: + for test in response.tests: + print(test) + + def handle_impedance_stream(self, request): + query = request.request.impedance_query + if not query: + self.console.log("[bold red] Invalid query for impedance stream") + return False + + electrode_count = len(query.electrode_ids) + self.console.log( + f"[cyan] Starting impedance_stream with {electrode_count} electordes" + ) + + measurements_received = 0 + all_measurements = [] + failed_measurements = [] + + with Progress( + SpinnerColumn(), + TextColumn("[bold cyan] Processing impedance measurements [/bold cyan]"), + BarColumn(), + TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + TimeElapsedColumn(), + ) as progress: + task = progress.add_task("Measuring impedance", total=electrode_count) + + for response in self.device.stream_query(request): + if not response: + self.console.log("Stream is complete") + break + + # Check if this failed + if response.code != 0 or not response.impedance: + failed_batch = response.impedance.measurements + failed_measurements.extend(failed_batch) + + failed_ids = [m.electrode_id for m in failed_batch] + self.console.log( + f"Failed to measure impedance for {failed_ids}, why: {response.message}" + ) + continue + + measurement_batch = response.impedance.measurements + + # Figure out how many we processed in this batch + measurements_received += len(measurement_batch) + progress.update( + task, completed=min(measurements_received, electrode_count) + ) + + # Add these to our batch + all_measurements.extend(measurement_batch) + + if args.verbose: + for measurement in measurement_batch: + progress.console.print( + f"Electrode {measurement.electrode_id}: {measurement.magnitude}Ω" + ) + + if all_measurements: + self.display_impedance_results(all_measurements) + self.save_impedance_results(all_measurements) + if self.plot: + self.plot_impedance_results(all_measurements) + else: + self.console.log("[bold red] All impedance measurements failed") + + if failed_measurements: + failed_ids = [m.electrode_id for m in failed_measurements] + self.console.log(f"[bold red]Failed impedance electrodes\n{failed_ids}") + return True + + def display_impedance_results(self, measurements): + table = Table(title="Impedance Measurements") + table.add_column("Electorde ID", justify="right") + table.add_column("Magnitude", justify="right") + table.add_column("Phase", justify="right") + + for measurement in measurements: + table.add_row( + str(measurement.electrode_id), + f"{measurement.magnitude:.2f}", + f"{measurement.phase:.2f}", + ) + self.console.print(table) + + def save_impedance_results(self, measurements): + # just match the original implementations filename + filename = f"impedance_measurements_{time.strftime('%Y%m%d-%H%M%S')}.csv" + + # probably won't have a duplicate file here + with open(filename, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["Electrode ID", "Magnitude", "Phase"]) + for measurement in measurements: + writer.writerow( + [measurement.electrode_id, measurement.magnitude, measurement.phase] + ) + + self.console.print(f"[green] Measurements saved to {filename}") + + def plot_impedance_results(self, measurements): + electrode_ids = [measurement.electrode_id for measurement in measurements] + + # Convert the magnitudes to kilo ohms + magnitudes = [measurement.magnitude / 1000 for measurement in measurements] + phases = [measurement.phase for measurement in measurements] + + # Sort by the electrode id + sorted_indices = np.argsort(electrode_ids) + electrode_ids = [electrode_ids[i] for i in sorted_indices] + magnitudes = [magnitudes[i] for i in sorted_indices] + phases = [phases[i] for i in sorted_indices] + fig, ax = plt.subplots(figsize=(10, 6)) + x_positions = np.arange(len(electrode_ids)) + + # Add phase values as text annotations on top of each bar + for i, (pos, y, phase) in enumerate(zip(x_positions, magnitudes, phases)): + ax.annotate( + f"{phase:.1f}°", + (pos, y), + xytext=(0, 3), + textcoords="offset points", + ha="center", + fontsize=9, + ) + + # Add labels and title + ax.set_xlabel("Electrode ID", fontsize=12) + ax.set_ylabel("Impedance Magnitude (kΩ)", fontsize=12) + ax.set_title("Electrode Impedance Measurements", fontsize=14) + ax.set_xticks(x_positions) + ax.set_xticklabels(electrode_ids) + ax.grid(axis="y", linestyle="--", alpha=0.7) + + plt.tight_layout() + plt.show() + + +def load_config_from_file(path_to_config): + try: + with open(path_to_config, "r") as f: + data = f.read() + proto = Parse(data, DeviceConfiguration()) + return syn.Config.from_proto(proto) + except Exception: + print(f"Failed to open {path_to_config}") + return None + + +def get_electrode_ids_from_config(config): + # Check if we have a broadband config + broadband = next( + (n for n in config.nodes if n.type == NodeType.kBroadbandSource), None + ) + if not broadband: + return None + channels = broadband.signal.electrode.channels + return [i.electrode_id for i in channels] + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description="Stream Query Client Test") + parser.add_argument("--uri", default="localhost:50051", help="Synapse server URI") + parser.add_argument("--verbose", action="store_true", help="Use verbose output") + parser.add_argument( + "--plot", action="store_true", help="Plot the output after the run" + ) + parser.add_argument( + "--config", + type=str, + help="Path to the configuration with the electrode ids", + required=True, + ) + + args = parser.parse_args() + + config_path = args.config + config = load_config_from_file(config_path) + if not config: + sys.exit(1) + + electrode_ids = get_electrode_ids_from_config(config) + if not electrode_ids: + print("No electrode IDs present in the broadband configuration") + sys.exit(1) + + client = StreamingQueryClient(args.uri, args.verbose, args.plot) + + # request = StreamQueryRequest( + # request=QueryRequest( + # query_type=QueryRequest.kImpedance, + # impedance_query=ImpedanceQuery( + # electrode_ids=electrode_ids + # ) + # ) + # ) + + request = StreamQueryRequest( + request=QueryRequest( + query_type=QueryRequest.kSelfTest, + self_test_query=SelfTestQuery(peripheral_id=2), + ) + ) + if not client.stream_query(request): + print("Failed to stream query for device") + sys.exit(1) From 728c8a2ba417e3389525dca26aa70043cf78598e Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Mon, 31 Mar 2025 10:50:37 -0700 Subject: [PATCH 02/13] added logs while self test is running in verbose mode --- synapse/client/query.py | 103 +++++++++++++++++++++++----------------- 1 file changed, 60 insertions(+), 43 deletions(-) diff --git a/synapse/client/query.py b/synapse/client/query.py index 397d22e2..858c41bd 100644 --- a/synapse/client/query.py +++ b/synapse/client/query.py @@ -1,12 +1,12 @@ #!/usr/bin/env python3 +import asyncio import csv import numpy as np +from threading import Thread import time import sys import synapse as syn -from synapse.api.node_pb2 import NodeType -from synapse.api.query_pb2 import QueryRequest, StreamQueryRequest, SelfTestQuery -from synapse.api.synapse_pb2 import DeviceConfiguration +from synapse.api.query_pb2 import QueryRequest, StreamQueryRequest from google.protobuf.json_format import Parse from rich.progress import ( @@ -34,6 +34,26 @@ def __init__(self, uri, verbose=False, plot=False): info = self.device.info() self.console.log(info) + # We tail the logs in the background with verbose set + self.log_stop_event = asyncio.Event() + self.new_log_event = asyncio.Event() + self.log_thread = Thread(target=self.tail_logs_background, daemon=True) + self.log_thread.start() + + def close(self): + if self.log_thread.is_alive(): + self.log_stop_event.set() + self.log_thread.join() + + def tail_logs_background(self): + self.last_log_line = None + for log in self.device.tail_logs(): + if self.last_log_line != log.message: + self.last_log_line = log.message + self.new_log_event.set() + if self.log_stop_event.is_set(): + break + def stream_query(self, request): query_type = request.request.query_type if query_type == QueryRequest.kImpedance: @@ -53,9 +73,24 @@ def handle_self_test_stream(self, request): self.console.log("[cyan] Starting self test stream") all_responses = [] + with self.console.status( "Running Self Test", spinner="bouncingBall", spinner_style="green" - ): + ) as status: + # If we are verbose, we want to show the latest log + stop_tailing_logs = asyncio.Event() + + def update_status(): + while not stop_tailing_logs.is_set(): + if self.new_log_event.is_set(): + status.update(self.last_log_line) + self.new_log_event.clear() + + status_thread = None + if self.verbose: + status_thread = Thread(target=update_status, daemon=True) + status_thread.start() + for response in self.device.stream_query(request): if not response: self.console.log("Stream is complete") @@ -69,6 +104,10 @@ def handle_self_test_stream(self, request): all_responses.append(response.self_test) + if status_thread: + stop_tailing_logs.set() + status_thread.join() + if not all_responses: return False @@ -79,7 +118,14 @@ def handle_self_test_stream(self, request): for response in all_responses: for test in response.tests: - print(test) + if test.passed: + table.add_row( + test.test_name, "[green]Passed[/green]", test.test_report + ) + else: + table.add_row(test.test_name, "[red]Failed[/red]", test.test_report) + + self.console.print(table) def handle_impedance_stream(self, request): query = request.request.impedance_query @@ -154,8 +200,8 @@ def handle_impedance_stream(self, request): def display_impedance_results(self, measurements): table = Table(title="Impedance Measurements") table.add_column("Electorde ID", justify="right") - table.add_column("Magnitude", justify="right") - table.add_column("Phase", justify="right") + table.add_column("Magnitude (kΩ)", justify="right") + table.add_column("Phase (°)", justify="right") for measurement in measurements: table.add_row( @@ -222,24 +268,13 @@ def load_config_from_file(path_to_config): try: with open(path_to_config, "r") as f: data = f.read() - proto = Parse(data, DeviceConfiguration()) - return syn.Config.from_proto(proto) + proto = Parse(data, QueryRequest()) + return proto except Exception: print(f"Failed to open {path_to_config}") return None -def get_electrode_ids_from_config(config): - # Check if we have a broadband config - broadband = next( - (n for n in config.nodes if n.type == NodeType.kBroadbandSource), None - ) - if not broadband: - return None - channels = broadband.signal.electrode.channels - return [i.electrode_id for i in channels] - - if __name__ == "__main__": import argparse @@ -252,39 +287,21 @@ def get_electrode_ids_from_config(config): parser.add_argument( "--config", type=str, - help="Path to the configuration with the electrode ids", + help="Path to the QueryRequest configuration, in JSON format", required=True, ) args = parser.parse_args() config_path = args.config - config = load_config_from_file(config_path) - if not config: - sys.exit(1) - - electrode_ids = get_electrode_ids_from_config(config) - if not electrode_ids: - print("No electrode IDs present in the broadband configuration") + request_config = load_config_from_file(config_path) + if not request_config: sys.exit(1) client = StreamingQueryClient(args.uri, args.verbose, args.plot) - # request = StreamQueryRequest( - # request=QueryRequest( - # query_type=QueryRequest.kImpedance, - # impedance_query=ImpedanceQuery( - # electrode_ids=electrode_ids - # ) - # ) - # ) - - request = StreamQueryRequest( - request=QueryRequest( - query_type=QueryRequest.kSelfTest, - self_test_query=SelfTestQuery(peripheral_id=2), - ) - ) + request = StreamQueryRequest(request=request_config) + if not client.stream_query(request): print("Failed to stream query for device") sys.exit(1) From a81421d4e5c97a54bd29593be3f0fc58b3d85f85 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Mon, 31 Mar 2025 10:54:10 -0700 Subject: [PATCH 03/13] added logs while self test is running in verbose mode --- synapse/client/query.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/client/query.py b/synapse/client/query.py index 858c41bd..a6710e0b 100644 --- a/synapse/client/query.py +++ b/synapse/client/query.py @@ -126,6 +126,7 @@ def update_status(): table.add_row(test.test_name, "[red]Failed[/red]", test.test_report) self.console.print(table) + return True def handle_impedance_stream(self, request): query = request.request.impedance_query From 28eef2681f0b3389e6cdd405e2b076285531e192 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Mon, 31 Mar 2025 10:59:43 -0700 Subject: [PATCH 04/13] Added cli --- synapse/cli/rpc.py | 38 +++++++++++++++++++++++++++++++++----- synapse/client/query.py | 1 - 2 files changed, 33 insertions(+), 6 deletions(-) diff --git a/synapse/cli/rpc.py b/synapse/cli/rpc.py index 9d91712d..0e70f386 100644 --- a/synapse/cli/rpc.py +++ b/synapse/cli/rpc.py @@ -5,7 +5,7 @@ import synapse as syn from synapse.api.synapse_pb2 import DeviceConfiguration -from synapse.api.query_pb2 import QueryRequest, QueryResponse +from synapse.api.query_pb2 import QueryRequest, QueryResponse, StreamQueryRequest from synapse.api.status_pb2 import StatusCode from google.protobuf import text_format @@ -15,6 +15,7 @@ from rich.pretty import pprint from synapse.utils.logging import log_entry_to_str +from synapse.client.query import StreamingQueryClient def add_commands(subparsers): @@ -25,6 +26,10 @@ def add_commands(subparsers): b = subparsers.add_parser("query", help="Execute a query on the device") b.add_argument("uri", type=str) b.add_argument("query_file", type=str) + b.add_argument("--verbose", "-v", action="store_true", help="Verbose output") + b.add_argument("--plot", "-p", action="store_true", help="Plot the output") + b.add_argument("--stream", "-s", action="store_true", help="Stream the output") + b.set_defaults(func=query) c = subparsers.add_parser("start", help="Start the device") @@ -64,7 +69,8 @@ def add_commands(subparsers): help="Follow log output", ) f.add_argument( - "--since", "-S", + "--since", + "-S", type=int, help="Get logs from the last N milliseconds", metavar="N", @@ -94,6 +100,23 @@ def info(args): def query(args): + def load_query_request(path_to_config): + try: + with open(path_to_config, "r") as f: + data = f.read() + proto = Parse(data, QueryRequest()) + return proto + except Exception: + print(f"Failed to open {path_to_config}") + return None + + if args.stream: + client = StreamingQueryClient(args.uri, args.verbose, args.plot) + query_proto = load_query_request(args.query_file) + if not query_proto: + return False + return client.stream_query(StreamQueryRequest(request=query_proto)) + if Path(args.query_file).suffix != ".json": print("Query file must be a JSON file") return False @@ -119,6 +142,7 @@ def query(args): f"{measurement.electrode_id},{measurement.magnitude},{measurement.phase},1\n" ) + def start(args): console = Console() with console.status("Starting device...", spinner="bouncingBall"): @@ -194,12 +218,16 @@ def parse_datetime(time_str: Optional[str]) -> Optional[datetime]: start_time = parse_datetime(args.start_time) if args.start_time and not start_time: - console.print("[bold red]Invalid start time format. Use ISO format (e.g., '2024-03-14T15:30:00')") + console.print( + "[bold red]Invalid start time format. Use ISO format (e.g., '2024-03-14T15:30:00')" + ) return end_time = parse_datetime(args.end_time) if args.end_time and not end_time: - console.print("[bold red]Invalid end time format. Use ISO format (e.g., '2024-03-14T15:30:00')") + console.print( + "[bold red]Invalid end time format. Use ISO format (e.g., '2024-03-14T15:30:00')" + ) return with console.status("Getting logs...", spinner="bouncingBall"): @@ -207,7 +235,7 @@ def parse_datetime(time_str: Optional[str]) -> Optional[datetime]: log_level=args.log_level, since_ms=args.since, start_time=start_time, - end_time=end_time + end_time=end_time, ) if not res or not res.entries: diff --git a/synapse/client/query.py b/synapse/client/query.py index a6710e0b..fae51337 100644 --- a/synapse/client/query.py +++ b/synapse/client/query.py @@ -300,7 +300,6 @@ def load_config_from_file(path_to_config): sys.exit(1) client = StreamingQueryClient(args.uri, args.verbose, args.plot) - request = StreamQueryRequest(request=request_config) if not client.stream_query(request): From 87170ce61426c6c2682f26ea4943df28348ad6ff Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Mon, 31 Mar 2025 11:26:30 -0700 Subject: [PATCH 05/13] Added better options for device log --- synapse/client/query.py | 51 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 45 insertions(+), 6 deletions(-) diff --git a/synapse/client/query.py b/synapse/client/query.py index fae51337..4a7fa0c9 100644 --- a/synapse/client/query.py +++ b/synapse/client/query.py @@ -17,7 +17,9 @@ TimeElapsedColumn, ) from rich.table import Table -from rich.console import Console +from rich.console import Console, Group +from rich.live import Live +from rich.panel import Panel import matplotlib.pyplot as plt @@ -46,7 +48,7 @@ def close(self): self.log_thread.join() def tail_logs_background(self): - self.last_log_line = None + self.last_log_line = "" for log in self.device.tail_logs(): if self.last_log_line != log.message: self.last_log_line = log.message @@ -143,15 +145,44 @@ def handle_impedance_stream(self, request): all_measurements = [] failed_measurements = [] - with Progress( + progress = Progress( SpinnerColumn(), TextColumn("[bold cyan] Processing impedance measurements [/bold cyan]"), BarColumn(), TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), TimeElapsedColumn(), - ) as progress: + ) + + def get_renderable(): + if self.verbose: + return Group( + Panel( + self.last_log_line, + title="Latest Device Log", + border_style="cyan", + ), + progress, + ) + else: + return progress + + with Live(get_renderable(), refresh_per_second=10) as live: task = progress.add_task("Measuring impedance", total=electrode_count) + # If we are verbose, we want to show the latest log + stop_tailing_logs = asyncio.Event() + + def update_progress(): + while not stop_tailing_logs.is_set(): + if self.new_log_event.is_set(): + live.update(get_renderable()) + self.new_log_event.clear() + + progress_thread = None + if self.verbose: + progress_thread = Thread(target=update_progress, daemon=True) + progress_thread.start() + for response in self.device.stream_query(request): if not response: self.console.log("Stream is complete") @@ -163,9 +194,13 @@ def handle_impedance_stream(self, request): failed_measurements.extend(failed_batch) failed_ids = [m.electrode_id for m in failed_batch] - self.console.log( + progress.console.log( f"Failed to measure impedance for {failed_ids}, why: {response.message}" ) + measurements_received += len(failed_batch) + progress.update( + task, completed=min(measurements_received, electrode_count) + ) continue measurement_batch = response.impedance.measurements @@ -181,10 +216,14 @@ def handle_impedance_stream(self, request): if args.verbose: for measurement in measurement_batch: - progress.console.print( + progress.console.log( f"Electrode {measurement.electrode_id}: {measurement.magnitude}Ω" ) + if progress_thread: + stop_tailing_logs.set() + progress_thread.join() + if all_measurements: self.display_impedance_results(all_measurements) self.save_impedance_results(all_measurements) From 4dba0c8837b70686a1452a56a30865bc516d91fc Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Mon, 31 Mar 2025 11:31:26 -0700 Subject: [PATCH 06/13] updated the submodule --- synapse-api | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse-api b/synapse-api index 84cc8415..5eba5695 160000 --- a/synapse-api +++ b/synapse-api @@ -1 +1 @@ -Subproject commit 84cc84153354110960a4cb0cf4d3a1bc063c5ed6 +Subproject commit 5eba5695c3d91907315353452203fbf144690ca7 From 97525923e299aea2aedaed796f69ca7fcdde4245 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Wed, 9 Apr 2025 15:04:16 -0700 Subject: [PATCH 07/13] Updated synapse api --- synapse-api | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse-api b/synapse-api index 5eba5695..84cc8415 160000 --- a/synapse-api +++ b/synapse-api @@ -1 +1 @@ -Subproject commit 5eba5695c3d91907315353452203fbf144690ca7 +Subproject commit 84cc84153354110960a4cb0cf4d3a1bc063c5ed6 From 64d60d10275b75127d3117af2764bad928007b21 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Tue, 15 Apr 2025 14:54:48 -0700 Subject: [PATCH 08/13] Updated api to main --- synapse-api | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse-api b/synapse-api index 84cc8415..ef2f04ca 160000 --- a/synapse-api +++ b/synapse-api @@ -1 +1 @@ -Subproject commit 84cc84153354110960a4cb0cf4d3a1bc063c5ed6 +Subproject commit ef2f04ca41a0469c4c33e77548967d724f9531e7 From 69b224e17b19fa4e859de3ff4c483330efc6e249 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Tue, 15 Apr 2025 15:00:34 -0700 Subject: [PATCH 09/13] Feedback from review --- synapse/{client => cli}/query.py | 2 +- synapse/cli/rpc.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename synapse/{client => cli}/query.py (99%) diff --git a/synapse/client/query.py b/synapse/cli/query.py similarity index 99% rename from synapse/client/query.py rename to synapse/cli/query.py index 4a7fa0c9..69f6f53d 100644 --- a/synapse/client/query.py +++ b/synapse/cli/query.py @@ -214,7 +214,7 @@ def update_progress(): # Add these to our batch all_measurements.extend(measurement_batch) - if args.verbose: + if self.verbose: for measurement in measurement_batch: progress.console.log( f"Electrode {measurement.electrode_id}: {measurement.magnitude}Ω" diff --git a/synapse/cli/rpc.py b/synapse/cli/rpc.py index 0e70f386..6556b07f 100644 --- a/synapse/cli/rpc.py +++ b/synapse/cli/rpc.py @@ -15,7 +15,7 @@ from rich.pretty import pprint from synapse.utils.logging import log_entry_to_str -from synapse.client.query import StreamingQueryClient +from synapse.cli.query import StreamingQueryClient def add_commands(subparsers): From 7c418a1bc282fa40150a13976d22d63284d73a09 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Mon, 21 Apr 2025 10:47:32 -0700 Subject: [PATCH 10/13] increase sleep --- .github/workflows/main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6866505f..e4d25242 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -58,7 +58,7 @@ jobs: synapse-sim --iface-ip 127.0.0.1 --rpc-port 50051 & - sleep 2 + sleep 5 python synapse/examples/stream_out.py 127.0.0.1:50051 From eab10bf99565a1fc6bf4159b814cc9d1527be74a Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Tue, 22 Apr 2025 10:12:23 -0700 Subject: [PATCH 11/13] Updated from feedback --- synapse/cli/query.py | 74 ++++++++++---------------------------------- synapse/cli/rpc.py | 3 +- test_query.json | 2 +- 3 files changed, 19 insertions(+), 60 deletions(-) diff --git a/synapse/cli/query.py b/synapse/cli/query.py index 69f6f53d..db101bc2 100644 --- a/synapse/cli/query.py +++ b/synapse/cli/query.py @@ -1,7 +1,6 @@ #!/usr/bin/env python3 import asyncio import csv -import numpy as np from threading import Thread import time import sys @@ -21,14 +20,11 @@ from rich.live import Live from rich.panel import Panel -import matplotlib.pyplot as plt - class StreamingQueryClient: - def __init__(self, uri, verbose=False, plot=False): + def __init__(self, uri, verbose=False): self.uri = uri self.verbose = verbose - self.plot = plot self.console = Console() self.device = syn.Device(self.uri, self.verbose) @@ -145,6 +141,13 @@ def handle_impedance_stream(self, request): all_measurements = [] failed_measurements = [] + # Create a CSV file to read from at the beginning + filename = f"impedance_measurements_{time.strftime('%Y%m%d-%H%M%S')}.csv" + with open(filename, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["Electrode ID", "Magnitude", "Phase"]) + self.console.print(f"[green] Started saving measurements to {filename}") + progress = Progress( SpinnerColumn(), TextColumn("[bold cyan] Processing impedance measurements [/bold cyan]"), @@ -197,6 +200,10 @@ def update_progress(): progress.console.log( f"Failed to measure impedance for {failed_ids}, why: {response.message}" ) + for sample in failed_batch: + progress.console.log( + f"electrode id (mag, phase): {sample.electrode_id}\t {sample.magnitude},{sample.phase}" + ) measurements_received += len(failed_batch) progress.update( task, completed=min(measurements_received, electrode_count) @@ -213,6 +220,7 @@ def update_progress(): # Add these to our batch all_measurements.extend(measurement_batch) + self.save_measurement_batch(filename, measurement_batch) if self.verbose: for measurement in measurement_batch: @@ -227,8 +235,6 @@ def update_progress(): if all_measurements: self.display_impedance_results(all_measurements) self.save_impedance_results(all_measurements) - if self.plot: - self.plot_impedance_results(all_measurements) else: self.console.log("[bold red] All impedance measurements failed") @@ -251,58 +257,15 @@ def display_impedance_results(self, measurements): ) self.console.print(table) - def save_impedance_results(self, measurements): - # just match the original implementations filename - filename = f"impedance_measurements_{time.strftime('%Y%m%d-%H%M%S')}.csv" - - # probably won't have a duplicate file here - with open(filename, "w", newline="") as f: + def save_measurement_batch(self, filename, measurements): + # Save a batch of measurements as they come in + with open(filename, "a", newline="") as f: writer = csv.writer(f) - writer.writerow(["Electrode ID", "Magnitude", "Phase"]) for measurement in measurements: writer.writerow( [measurement.electrode_id, measurement.magnitude, measurement.phase] ) - self.console.print(f"[green] Measurements saved to {filename}") - - def plot_impedance_results(self, measurements): - electrode_ids = [measurement.electrode_id for measurement in measurements] - - # Convert the magnitudes to kilo ohms - magnitudes = [measurement.magnitude / 1000 for measurement in measurements] - phases = [measurement.phase for measurement in measurements] - - # Sort by the electrode id - sorted_indices = np.argsort(electrode_ids) - electrode_ids = [electrode_ids[i] for i in sorted_indices] - magnitudes = [magnitudes[i] for i in sorted_indices] - phases = [phases[i] for i in sorted_indices] - fig, ax = plt.subplots(figsize=(10, 6)) - x_positions = np.arange(len(electrode_ids)) - - # Add phase values as text annotations on top of each bar - for i, (pos, y, phase) in enumerate(zip(x_positions, magnitudes, phases)): - ax.annotate( - f"{phase:.1f}°", - (pos, y), - xytext=(0, 3), - textcoords="offset points", - ha="center", - fontsize=9, - ) - - # Add labels and title - ax.set_xlabel("Electrode ID", fontsize=12) - ax.set_ylabel("Impedance Magnitude (kΩ)", fontsize=12) - ax.set_title("Electrode Impedance Measurements", fontsize=14) - ax.set_xticks(x_positions) - ax.set_xticklabels(electrode_ids) - ax.grid(axis="y", linestyle="--", alpha=0.7) - - plt.tight_layout() - plt.show() - def load_config_from_file(path_to_config): try: @@ -321,9 +284,6 @@ def load_config_from_file(path_to_config): parser = argparse.ArgumentParser(description="Stream Query Client Test") parser.add_argument("--uri", default="localhost:50051", help="Synapse server URI") parser.add_argument("--verbose", action="store_true", help="Use verbose output") - parser.add_argument( - "--plot", action="store_true", help="Plot the output after the run" - ) parser.add_argument( "--config", type=str, @@ -338,7 +298,7 @@ def load_config_from_file(path_to_config): if not request_config: sys.exit(1) - client = StreamingQueryClient(args.uri, args.verbose, args.plot) + client = StreamingQueryClient(args.uri, args.verbose) request = StreamQueryRequest(request=request_config) if not client.stream_query(request): diff --git a/synapse/cli/rpc.py b/synapse/cli/rpc.py index 6a103106..f955c5dc 100644 --- a/synapse/cli/rpc.py +++ b/synapse/cli/rpc.py @@ -25,7 +25,6 @@ def add_commands(subparsers): b = subparsers.add_parser("query", help="Execute a query on the device") b.add_argument("query_file", type=str) b.add_argument("--verbose", "-v", action="store_true", help="Verbose output") - b.add_argument("--plot", "-p", action="store_true", help="Plot the output") b.add_argument("--stream", "-s", action="store_true", help="Stream the output") b.set_defaults(func=query) @@ -105,7 +104,7 @@ def load_query_request(path_to_config): return None if args.stream: - client = StreamingQueryClient(args.uri, args.verbose, args.plot) + client = StreamingQueryClient(args.uri, args.verbose) query_proto = load_query_request(args.query_file) if not query_proto: return False diff --git a/test_query.json b/test_query.json index 2dcad13b..8b2ea1d2 100644 --- a/test_query.json +++ b/test_query.json @@ -1,6 +1,6 @@ { "query_type": "kImpedance", "impedance_query": { - "electrode_ids": [1] + "electrode_ids": [116, 118] } } From 3bd8a91cda08839601f0f8e7bd4f5d2d9deee8c0 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Wed, 23 Apr 2025 15:08:59 -0700 Subject: [PATCH 12/13] Feedback from review --- synapse/cli/query.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/synapse/cli/query.py b/synapse/cli/query.py index db101bc2..189d7578 100644 --- a/synapse/cli/query.py +++ b/synapse/cli/query.py @@ -133,6 +133,10 @@ def handle_impedance_stream(self, request): return False electrode_count = len(query.electrode_ids) + if electrode_count <= 0: + self.console.log("[bold red] No electrodes to query") + return False + self.console.log( f"[cyan] Starting impedance_stream with {electrode_count} electordes" ) @@ -153,6 +157,7 @@ def handle_impedance_stream(self, request): TextColumn("[bold cyan] Processing impedance measurements [/bold cyan]"), BarColumn(), TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + TextColumn("[cyan]({task.completed}/{task.total})[/cyan]"), TimeElapsedColumn(), ) @@ -234,7 +239,6 @@ def update_progress(): if all_measurements: self.display_impedance_results(all_measurements) - self.save_impedance_results(all_measurements) else: self.console.log("[bold red] All impedance measurements failed") @@ -301,6 +305,10 @@ def load_config_from_file(path_to_config): client = StreamingQueryClient(args.uri, args.verbose) request = StreamQueryRequest(request=request_config) - if not client.stream_query(request): - print("Failed to stream query for device") + try: + if not client.stream_query(request): + print("Failed to stream query for device") + sys.exit(1) + except Exception as e: + print(f"Failed to stream query. Why: {e}") sys.exit(1) From 642916b8d6612d5713a7551a2c8d1cae4f31b420 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Wed, 23 Apr 2025 15:19:12 -0700 Subject: [PATCH 13/13] Fixes during streaming --- synapse/cli/__main__.py | 7 ++++++- synapse/cli/query.py | 19 +++++++++++++------ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/synapse/cli/__main__.py b/synapse/cli/__main__.py index 61e48703..5a7ba7d5 100755 --- a/synapse/cli/__main__.py +++ b/synapse/cli/__main__.py @@ -2,6 +2,7 @@ import argparse import logging import ipaddress +import sys from importlib import metadata from synapse.cli import discover, rpc, streaming, offline_plot, files @@ -77,4 +78,8 @@ def main(): if __name__ == "__main__": - main() + try: + main() + except Exception as e: + print(f"Uncaught error in CLI. Why: {e}") + sys.exit(1) diff --git a/synapse/cli/query.py b/synapse/cli/query.py index 189d7578..b26a11ae 100644 --- a/synapse/cli/query.py +++ b/synapse/cli/query.py @@ -54,12 +54,19 @@ def tail_logs_background(self): def stream_query(self, request): query_type = request.request.query_type - if query_type == QueryRequest.kImpedance: - return self.handle_impedance_stream(request) - elif query_type == QueryRequest.kSelfTest: - return self.handle_self_test_stream(request) - else: - self.console.log(f"[bold red]Unknown stream request: {query_type}") + try: + if query_type == QueryRequest.kImpedance: + return self.handle_impedance_stream(request) + elif query_type == QueryRequest.kSelfTest: + return self.handle_self_test_stream(request) + else: + self.console.log(f"[bold red]Unknown stream request: {query_type}") + return False + except Exception as e: + self.console.log(f"[bold red] Uncaught exception during stream: {e}") + return False + except KeyboardInterrupt: + self.console.log("[yellow] Operation cancelled by user") return False def handle_self_test_stream(self, request):