From bb078cc40da3646d43ebb960ab8df5314c216be8 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Fri, 27 Jun 2025 16:25:36 -0700 Subject: [PATCH 01/10] feature: improvements from testing --- synapse/cli/streaming.py | 356 ++++++++++++++++++++------------- synapse/cli/synapse_plotter.py | 88 ++++++-- synapse/client/taps.py | 28 ++- 3 files changed, 301 insertions(+), 171 deletions(-) diff --git a/synapse/cli/streaming.py b/synapse/cli/streaming.py index 73d8fe8..9f1b7a9 100644 --- a/synapse/cli/streaming.py +++ b/synapse/cli/streaming.py @@ -25,7 +25,7 @@ def __init__(self, console: Console): self.last_count = 0 self.last_sequence = 0 self.total_dropped = 0 - self.queue = queue.Queue(maxsize=100) + self.queue = queue.Queue(maxsize=1000) # Increased for high data rate self.stop_event = threading.Event() self.monitor_thread = None @@ -55,6 +55,16 @@ def put(self, frame: BroadbandFrame): # Drop frame if queue is full to prevent blocking pass + def put_batch(self, frames: list): + """Add multiple frames to monitoring queue efficiently (non-blocking)""" + for frame in frames: + try: + self.queue.put(frame, block=False) + except queue.Full: + # Drop frame if queue is full to prevent blocking + # Only break if queue is full to avoid flooding + break + def _monitor_loop(self): """Process frames for monitoring in separate thread""" while not self.stop_event.is_set(): @@ -116,9 +126,20 @@ def get_current_stats(self) -> Text: class BroadbandFrameWriter: + """ + Threaded HDF5 writer for broadband data streams + + Features: + - Single writer thread with bounded queue (prevents blocking the reader) + - Non-blocking puts with frame dropping if queue is full + - Batch writes to HDF5 for better I/O performance + - Compressed datasets to reduce disk space and I/O + - Periodic flushes for optimal performance + """ + def __init__(self, output_dir: str): self.output_dir = output_dir - self.data_queue = queue.Queue(maxsize=2000) # Increased queue size + self.data_queue = queue.Queue(maxsize=1000) # Simple bounded queue self.stop_event = threading.Event() self.writer_thread = None @@ -134,22 +155,35 @@ def __init__(self, output_dir: str): self.filename = os.path.join(output_dir, f"broadband_data_{timestamp}.h5") self.file = h5py.File(self.filename, "w") - # Create datasets + # Create datasets with chunking and compression for better performance self.timestamp_dataset = self.file.create_dataset( - "/acquisition/timestamp", shape=(0,), maxshape=(None,), dtype="uint64" + "/acquisition/timestamp", + shape=(0,), + maxshape=(None,), + dtype="uint64", + chunks=True, + compression="gzip", + compression_opts=1, # Fast compression ) self.sequence_dataset = self.file.create_dataset( - "/acquisition/sequence_number", shape=(0,), maxshape=(None,), dtype="uint64" + "/acquisition/sequence_number", + shape=(0,), + maxshape=(None,), + dtype="uint64", + chunks=True, + compression="gzip", + compression_opts=1, ) - # Create frame data dataset as a flat array of samples self.frame_data_dataset = self.file.create_dataset( - "/acquisition/ElectricalSeries", shape=(0,), maxshape=(None,), dtype="int32" + "/acquisition/ElectricalSeries", + shape=(0,), + maxshape=(None,), + dtype="int32", + chunks=True, + compression="gzip", + compression_opts=1, ) - # Buffer for collecting frames before writing - self.frame_buffer = [] - self.buffer_size = 500 # Reduced buffer size for more frequent writes - def get_stats(self): """Get current statistics""" elapsed = time.time() - self.start_time @@ -161,8 +195,12 @@ def get_stats(self): "total_samples": 0, "dropped_frames": 0, "last_sequence": 0, + "queue_size": 0, + "queue_utilization": 0.0, + "memory_pressure": "Low", } + queue_size = self.data_queue.qsize() return { "frames_per_sec": self.frames_received / elapsed, "samples_per_sec": self.samples_received / elapsed, @@ -170,34 +208,38 @@ def get_stats(self): "total_samples": self.samples_received, "dropped_frames": self.dropped_frames, "last_sequence": self.last_sequence, + "queue_size": queue_size, + "queue_utilization": queue_size / self.data_queue.maxsize, + "memory_pressure": "High" + if queue_size > 800 + else "Medium" + if queue_size > 500 + else "Low", } def set_attributes( self, sample_rate_hz: float, channels: list, session_description: str = "" ): - """Set HDF5 attributes similar to C++ implementation""" - # Set basic attributes + """Set HDF5 attributes""" self.file.attrs["sample_rate_hz"] = sample_rate_hz if session_description: self.file.attrs["session_description"] = session_description - - # Set session start time self.file.attrs["session_start_time"] = datetime.now().isoformat() - # Set device type device_group = self.file.create_group("general/device") device_group.attrs["device_type"] = "SciFi" - # Create electrodes group and write channel IDs electrodes_group = self.file.create_group( "general/extracellular_ephys/electrodes" ) - channel_ids = channels - electrodes_group.create_dataset("id", data=channel_ids, dtype="uint32") + electrodes_group.create_dataset("id", data=channels, dtype="uint32") def start(self): """Start the writer thread""" - self.writer_thread = threading.Thread(target=self._write_loop) + self.stop_event.clear() + self.writer_thread = threading.Thread( + target=self._write_loop, name="HDF5Writer" + ) self.writer_thread.start() def stop(self): @@ -205,11 +247,10 @@ def stop(self): self.stop_event.set() if self.writer_thread: self.writer_thread.join() - self.flush() self.file.close() def put(self, frame: BroadbandFrame): - """Add frame to the write queue (non-blocking)""" + """Add frame to write queue (non-blocking)""" # Update stats self.frames_received += 1 self.samples_received += len(frame.frame_data) @@ -221,106 +262,102 @@ def put(self, frame: BroadbandFrame): self.dropped_frames += frame.sequence_number - expected_sequence self.last_sequence = frame.sequence_number + # Try to put in queue, drop if full (non-blocking) try: self.data_queue.put(frame, block=False) except queue.Full: - # If queue is full, we'll drop the oldest data - try: - self.data_queue.get_nowait() - self.data_queue.put(frame, block=False) - except queue.Empty: - pass + # Queue is full, drop this frame to prevent blocking the reader + pass def put_batch(self, frames: list): - """Add multiple frames to the write queue efficiently""" + """Add multiple frames efficiently""" for frame in frames: - self.frames_received += 1 - self.samples_received += len(frame.frame_data) - - # Check for dropped frames - if self.last_sequence != 0: - expected_sequence = self.last_sequence + 1 - if frame.sequence_number != expected_sequence: - self.dropped_frames += frame.sequence_number - expected_sequence - self.last_sequence = frame.sequence_number - - # Try to add all frames to queue - for frame in frames: - try: - self.data_queue.put(frame, block=False) - except queue.Full: - # If queue is full, drop oldest and try again - try: - self.data_queue.get_nowait() - self.data_queue.put(frame, block=False) - except queue.Empty: - pass + self.put(frame) def _write_loop(self): - """Main writing loop that consumes data from the queue""" + """Simple writer thread loop - based on proven pattern""" + frame_buffer = [] + buffer_size = 100 # Smaller, more frequent writes + last_flush_time = time.time() + while not self.stop_event.is_set() or not self.data_queue.empty(): try: - frame = self.data_queue.get(timeout=0.1) - self.frame_buffer.append(frame) - - # Write when buffer is full - if len(self.frame_buffer) >= self.buffer_size: - self._write_buffer() + # Get frame from queue with timeout + frame = self.data_queue.get(timeout=0.5) + frame_buffer.append(frame) + + current_time = time.time() + # Write buffer if it's full or if enough time has passed + if len(frame_buffer) >= buffer_size or ( + frame_buffer and current_time - last_flush_time > 1.0 + ): + self._write_buffer(frame_buffer) + frame_buffer = [] + last_flush_time = current_time except queue.Empty: + current_time = time.time() + # Flush any remaining data if timeout occurred + if frame_buffer and current_time - last_flush_time > 1.0: + self._write_buffer(frame_buffer) + frame_buffer = [] + last_flush_time = current_time continue except Exception as e: - print(f"Error writing data: {e}") + print(f"Error in writer thread: {e}") continue - def _write_buffer(self): - """Write the buffered frames to disk""" - if not self.frame_buffer: - return - - # Get current sizes - current_timestamp_size = self.timestamp_dataset.shape[0] - current_frame_size = self.frame_data_dataset.shape[0] - num_frames = len(self.frame_buffer) - - # Resize datasets - new_timestamp_size = current_timestamp_size + num_frames - new_frame_size = current_frame_size + ( - num_frames * len(self.frame_buffer[0].frame_data) - ) - - self.timestamp_dataset.resize(new_timestamp_size, axis=0) - self.sequence_dataset.resize(new_timestamp_size, axis=0) - self.frame_data_dataset.resize(new_frame_size, axis=0) + # Write any remaining frames when stopping + if frame_buffer: + self._write_buffer(frame_buffer) - # Write data - for i, frame in enumerate(self.frame_buffer): - idx = current_timestamp_size + i - self.timestamp_dataset[idx] = frame.timestamp_ns - self.sequence_dataset[idx] = frame.sequence_number - - # Write frame data - frame_start = current_frame_size + (i * len(frame.frame_data)) - frame_end = frame_start + len(frame.frame_data) - self.frame_data_dataset[frame_start:frame_end] = frame.frame_data + def _write_buffer(self, frame_buffer: list): + """Write buffered frames to HDF5""" + if not frame_buffer: + return - # Clear buffer - self.frame_buffer = [] + try: + # Get current dataset sizes + current_timestamp_size = self.timestamp_dataset.shape[0] + current_frame_size = self.frame_data_dataset.shape[0] + + num_frames = len(frame_buffer) + samples_per_frame = len(frame_buffer[0].frame_data) + + # Resize datasets + new_timestamp_size = current_timestamp_size + num_frames + new_frame_size = current_frame_size + (num_frames * samples_per_frame) + + self.timestamp_dataset.resize(new_timestamp_size, axis=0) + self.sequence_dataset.resize(new_timestamp_size, axis=0) + self.frame_data_dataset.resize(new_frame_size, axis=0) + + # Write data in batch + timestamps = [] + sequences = [] + all_frame_data = [] + + for frame in frame_buffer: + timestamps.append(frame.timestamp_ns) + sequences.append(frame.sequence_number) + all_frame_data.extend(frame.frame_data) + + # Write all data at once (more efficient) + self.timestamp_dataset[current_timestamp_size:new_timestamp_size] = ( + timestamps + ) + self.sequence_dataset[current_timestamp_size:new_timestamp_size] = sequences + self.frame_data_dataset[current_frame_size:new_frame_size] = all_frame_data - # Flush to disk - self.flush() + # Flush to disk periodically (not every write) + if current_timestamp_size % 1000 == 0: # Flush every 1000 frames + self.file.flush() - def flush(self): - """Flush all datasets to disk""" - if self.frame_buffer: - self._write_buffer() - self.timestamp_dataset.flush() - self.sequence_dataset.flush() - self.frame_data_dataset.flush() - self.file.flush() + except Exception as e: + print(f"Error writing buffer to HDF5: {e}") -def create_status_table(writer: BroadbandFrameWriter) -> Table: +def create_status_table(writer) -> Table: """Create a status table for display""" stats = writer.get_stats() table = Table(title="Streaming Status") @@ -335,12 +372,35 @@ def create_status_table(writer: BroadbandFrameWriter) -> Table: table.add_row("Dropped Frames", str(stats["dropped_frames"])) table.add_row("Last Sequence", str(stats["last_sequence"])) + # Add queue health information + utilization = stats["queue_utilization"] + util_color = ( + "green" if utilization < 0.5 else "yellow" if utilization < 0.8 else "red" + ) + + max_size = writer.data_queue.maxsize + table.add_row( + "Queue Usage", + f"{stats['queue_size']}/{max_size} ({utilization:.1%})", + style=util_color, + ) + + # Add memory pressure indicator + pressure_color = ( + "green" + if stats["memory_pressure"] == "Low" + else "yellow" + if stats["memory_pressure"] == "Medium" + else "red" + ) + table.add_row("Memory Pressure", stats["memory_pressure"], style=pressure_color) + return table def add_commands(subparsers): read_parser = subparsers.add_parser( - "read", help="Read from a device's Broadband Tap" + "read", help="Read from a device's Broadband Tap and save to HDF5" ) read_parser.add_argument( @@ -522,6 +582,54 @@ def get_broadband_tap(args, device, console): return None +def stream_data(broadband_tap, writer, plotter, monitor, first_frame, console, args): + """Simple streaming function using threaded writer""" + try: + # Use batch streaming for better throughput + with Live(monitor.get_current_stats(), refresh_per_second=4) as live: + # Process the first frame that we already read for parameter detection + if first_frame: + if writer: + writer.put(first_frame) + if plotter: + plotter.put(first_frame) + monitor.put(first_frame) + + # Continue with batch streaming for remaining frames + for message_batch in broadband_tap.stream_batch(batch_size=50): + frames = [] + for message in message_batch: + frame = BroadbandFrame() + frame.ParseFromString(message) + frames.append(frame) + + # Send batch to monitor and writer for better performance + monitor.put_batch(frames) + if writer and frames: + writer.put_batch(frames) + + # Send to plotter individually (plotter might need individual frames) + if plotter: + for frame in frames: + plotter.put(frame) + + live.update(monitor.get_current_stats()) + + except KeyboardInterrupt: + console.print("\n[yellow]Stopping data collection...[/yellow]") + finally: + if writer: + writer.stop() + if plotter: + plotter.stop() + if monitor: + monitor.stop() + if args.output: + console.print(f"[green]Data saved to {args.output}[/green]") + if args.plot: + console.print("[green]Plotter stopped[/green]") + + def read(args): console = Console() @@ -578,6 +686,7 @@ def read(args): writer = BroadbandFrameWriter(args.output) writer.set_attributes(sample_rate_hz=sample_rate, channels=available_channels) writer.start() + console.log("[cyan]Using threaded writer for serializing data[/cyan]") # Setup plotter if requested plotter = None @@ -604,46 +713,5 @@ def read(args): monitor = StreamMonitor(console) monitor.start() - try: - # Use batch streaming for better throughput - with Live(monitor.get_current_stats(), refresh_per_second=4) as live: - # Process the first frame that we already read for parameter detection - if first_frame: - if writer: - writer.put(first_frame) - if plotter: - plotter.put(first_frame) - monitor.put(first_frame) - - # Continue with batch streaming for remaining frames - for message_batch in broadband_tap.stream_batch(batch_size=10): - frames = [] - for message in message_batch: - frame = BroadbandFrame() - frame.ParseFromString(message) - frames.append(frame) - - # Send to monitor (non-blocking) - monitor.put(frame) - if plotter: - plotter.put(frame) - - # Batch write for better performance - if writer and frames: - writer.put_batch(frames) - - live.update(monitor.get_current_stats()) - - except KeyboardInterrupt: - console.print("\n[yellow]Stopping data collection...[/yellow]") - finally: - if writer: - writer.stop() - if plotter: - plotter.stop() - if monitor: - monitor.stop() - if args.output: - console.print(f"[green]Data saved to {args.output}[/green]") - if args.plot: - console.print("[green]Plotter stopped[/green]") + # Run the streaming function + stream_data(broadband_tap, writer, plotter, monitor, first_frame, console, args) diff --git a/synapse/cli/synapse_plotter.py b/synapse/cli/synapse_plotter.py index 1c98dbe..a238974 100644 --- a/synapse/cli/synapse_plotter.py +++ b/synapse/cli/synapse_plotter.py @@ -49,10 +49,19 @@ def __init__(self, sample_rate: int, window_size: int, channel_ids): self.zoom_y_max = 4096 self.signal_separation = 1000 + self.downsample_factor = 4 # Default downsample factor for performance + self.center_data = ( + True # Whether to center data around zero by removing DC offset + ) # Dictionary to store line series for plotted channels self.active_lines = {} + # Running statistics for centering data + self.running_means = [0.0] * self.num_channels + self.sample_counts = [0] * self.num_channels + self.alpha = 0.001 # Low-pass filter coefficient for running mean + # Queue and threading for BroadbandFrame processing self.data_queue = queue.Queue(maxsize=2000) self.stop_event = Event() @@ -120,19 +129,32 @@ def setup_gui(self): callback=self.set_signal_separation, ) - # Zoomed Channel Y-range - dpg.add_text("Zoomed Y-Axis Range (Manual):") - dpg.add_input_float( - label="Min", - default_value=self.zoom_y_min, - callback=self.set_zoom_y_min, - tag="zoom_y_min_input", + dpg.add_text("Downsample Factor:") + dpg.add_input_int( + label="", + default_value=self.downsample_factor, + min_value=1, + max_value=20, + tag="downsample_factor_input", + callback=self.set_downsample_factor, ) + + dpg.add_separator() + dpg.add_checkbox( + label="Center Data Around Zero", + default_value=self.center_data, + callback=self.set_center_data, + tag="center_data_checkbox", + ) + + # Zoomed Channel Y-range + dpg.add_text("Zoomed Y-Axis Range (±):") dpg.add_input_float( - label="Max", - default_value=self.zoom_y_max, - callback=self.set_zoom_y_max, - tag="zoom_y_max_input", + label="Range", + default_value=abs(self.zoom_y_max), + callback=self.set_zoom_y_range, + tag="zoom_y_range_input", + min_value=1.0, ) # ----------------------------- @@ -234,15 +256,26 @@ def remove_line_series(self, ch_id): dpg.delete_item(line_tag) del self.active_lines[ch_id] - def set_zoom_y_min(self, sender, app_data): - self.zoom_y_min = app_data - - def set_zoom_y_max(self, sender, app_data): - self.zoom_y_max = app_data + def set_zoom_y_range(self, sender, app_data): + # Ensure the range is centered around zero + abs_range = abs(app_data) + self.zoom_y_min = -abs_range + self.zoom_y_max = abs_range def set_signal_separation(self, sender, app_data): self.signal_separation = app_data + def set_downsample_factor(self, sender, app_data): + # Ensure the downsample factor is at least 1 + self.downsample_factor = max(1, app_data) + + def set_center_data(self, sender, app_data): + self.center_data = app_data + if not app_data: + # Reset running means when centering is disabled + self.running_means = [0.0] * self.num_channels + self.sample_counts = [0] * self.num_channels + def put(self, frame: BroadbandFrame): """Add a BroadbandFrame to the processing queue""" try: @@ -334,9 +367,8 @@ def update_plot(self): Update both the 'all channels' plot and the 'single channel' zoom plot. We 'roll' each channel's data so that the newest sample is on the right. """ - # Downsample factor for performance - # Note(gilbert): we should probably make this configurable, it is arbitrary - ds_factor = 4 + # Use configurable downsample factor for performance + ds_factor = self.downsample_factor # Get the current time window for x-axis limits based on latest data # Use latest data timestamp instead of wall clock for better sync @@ -432,7 +464,23 @@ def process_broadband_frame(self, frame: BroadbandFrame): # Distribute data to each channel buffer for ch_idx, ch_id in enumerate(self.channel_ids): if ch_idx < len(frame_data): - sample = frame_data[ch_idx] + raw_sample = frame_data[ch_idx] + + # Center data around zero if enabled + if self.center_data: + # Update running mean with exponential moving average + self.sample_counts[ch_idx] += 1 + if self.sample_counts[ch_idx] == 1: + self.running_means[ch_idx] = raw_sample + else: + self.running_means[ch_idx] = ( + 1 - self.alpha + ) * self.running_means[ch_idx] + self.alpha * raw_sample + + # Subtract the running mean to center around zero + sample = raw_sample - self.running_means[ch_idx] + else: + sample = raw_sample # Add sample to this channel's ring buffer pos = self.buffer_positions[ch_idx] diff --git a/synapse/client/taps.py b/synapse/client/taps.py index 4feb38d..3b75bdc 100644 --- a/synapse/client/taps.py +++ b/synapse/client/taps.py @@ -2,6 +2,7 @@ import time import zmq from typing import Optional, Generator +import sys from synapse.api.query_pb2 import QueryRequest from synapse.api.status_pb2 import StatusCode @@ -88,10 +89,14 @@ def connect(self, name: str) -> bool: # Optimize ZMQ for high-throughput data # Increase receive buffer size significantly for high-speed data - self.zmq_socket.setsockopt( - zmq.RCVHWM, 10000 - ) # High water mark - buffer up to 10K messages - self.zmq_socket.setsockopt(zmq.RCVBUF, 16 * 1024 * 1024) # 16MB receive buffer + if sys.platform == "win32": + # Use smaller, more achievable buffer sizes for Windows + self.zmq_socket.setsockopt(zmq.RCVBUF, 2 * 1024 * 1024) # 2MB + self.zmq_socket.setsockopt(zmq.RCVHWM, 5000) # Lower HWM + else: + # Linux can handle larger buffers + self.zmq_socket.setsockopt(zmq.RCVBUF, 16 * 1024 * 1024) + self.zmq_socket.setsockopt(zmq.RCVHWM, 10000) # Set TCP keepalive for connection stability self.zmq_socket.setsockopt(zmq.TCP_KEEPALIVE, 1) @@ -112,7 +117,10 @@ def connect(self, name: str) -> bool: # Reduce connection wait time to minimize startup delay self.logger.info("Connecting to tap...") - time.sleep(0.1) # Reduced from 1 second + if sys.platform == "win32": + time.sleep(0.001) # 1ms for Windows + else: + time.sleep(0.0001) # 0.1ms for Linux # Only set subscription options for subscriber sockets if selected_tap.tap_type != TapType.TAP_TYPE_CONSUMER: @@ -201,7 +209,10 @@ def stream(self, timeout_ms: int = 100) -> Generator[bytes, None, None]: yield data except zmq.Again: # No data available right now, yield control briefly - time.sleep(0.0001) # 0.1ms sleep to prevent busy waiting + if sys.platform == "win32": + time.sleep(0.001) # 1ms for Windows + else: + time.sleep(0.0001) # 0.1ms for Linux continue except KeyboardInterrupt: self.logger.info("Stream interrupted") @@ -244,7 +255,10 @@ def stream_batch( if batch: yield batch batch = [] - time.sleep(0.0001) + if sys.platform == "win32": + time.sleep(0.001) # 1ms for Windows + else: + time.sleep(0.0001) # 0.1ms for Linux continue except KeyboardInterrupt: self.logger.info("Stream interrupted") From e2004dba491fd632984ed44b755c4586985b8834 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Fri, 27 Jun 2025 16:42:06 -0700 Subject: [PATCH 02/10] feature: improvements from testing --- synapse/cli/offline_plot.py | 3 +++ synapse/cli/streaming.py | 49 +++++++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/synapse/cli/offline_plot.py b/synapse/cli/offline_plot.py index 0d520a7..8670e87 100644 --- a/synapse/cli/offline_plot.py +++ b/synapse/cli/offline_plot.py @@ -172,6 +172,9 @@ def plot(args): console.print( "[yellow bold]Legacy plotting is deprecated, please use the hdf5 files going forward[/yellow bold]" ) + console.print( + "[yellow bold]Use --data to plot hdf5 files[/yellow bold]" + ) app = QtWidgets.QApplication.instance() if not app: diff --git a/synapse/cli/streaming.py b/synapse/cli/streaming.py index 9f1b7a9..9ef2268 100644 --- a/synapse/cli/streaming.py +++ b/synapse/cli/streaming.py @@ -423,6 +423,11 @@ def add_commands(subparsers): read_parser.add_argument( "--list-taps", action="store_true", help="List all available taps and exit" ) + read_parser.add_argument( + "--duration", + type=float, + help="Duration in seconds to stream data (if not specified, streams until Ctrl+C)", + ) read_parser.set_defaults(func=read) @@ -464,6 +469,17 @@ def start_device(device, console): return True +def stop_device(device, console): + with console.status("Stopping device...", spinner="bouncingBall"): + stop_status = device.stop_with_status() + if stop_status.code != StatusCode.kOk: + console.print( + f"[bold red]Failed to stop device: {stop_status.message}[/bold red]" + ) + return False + return True + + def setup_output(args, console): if not args.output: console.print("[bold red]No output directory specified[/bold red]") @@ -584,6 +600,9 @@ def get_broadband_tap(args, device, console): def stream_data(broadband_tap, writer, plotter, monitor, first_frame, console, args): """Simple streaming function using threaded writer""" + duration_exceeded = False + start_time = time.time() + try: # Use batch streaming for better throughput with Live(monitor.get_current_stats(), refresh_per_second=4) as live: @@ -597,6 +616,16 @@ def stream_data(broadband_tap, writer, plotter, monitor, first_frame, console, a # Continue with batch streaming for remaining frames for message_batch in broadband_tap.stream_batch(batch_size=50): + # Check if duration limit has been reached + if hasattr(args, "duration") and args.duration is not None: + elapsed_time = time.time() - start_time + if elapsed_time >= args.duration: + duration_exceeded = True + console.print( + f"\n[yellow]Duration limit of {args.duration:.1f} seconds reached. Stopping data collection...[/yellow]" + ) + break + frames = [] for message in message_batch: frame = BroadbandFrame() @@ -629,6 +658,17 @@ def stream_data(broadband_tap, writer, plotter, monitor, first_frame, console, a if args.plot: console.print("[green]Plotter stopped[/green]") + # Show final duration info + final_elapsed = time.time() - start_time + if duration_exceeded: + console.print( + f"[blue]Streaming completed after {final_elapsed:.1f} seconds (duration limit reached)[/blue]" + ) + else: + console.print( + f"[blue]Total streaming time: {final_elapsed:.1f} seconds[/blue]" + ) + def read(args): console = Console() @@ -715,3 +755,12 @@ def read(args): # Run the streaming function stream_data(broadband_tap, writer, plotter, monitor, first_frame, console, args) + + # Stop the device after streaming is complete + console.log("[cyan]Stopping device...[/cyan]") + if not stop_device(device, console): + console.print( + "[bold yellow]Warning: Failed to stop device cleanly[/bold yellow]" + ) + else: + console.log("[green]Device stopped successfully[/green]") From 0e6ace9edcfef4e6d4ba1b97c1b5db934de8974f Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Fri, 27 Jun 2025 17:40:12 -0700 Subject: [PATCH 03/10] feature: improvements from testing --- synapse/cli/streaming.py | 39 +++++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/synapse/cli/streaming.py b/synapse/cli/streaming.py index 9ef2268..a869aff 100644 --- a/synapse/cli/streaming.py +++ b/synapse/cli/streaming.py @@ -25,6 +25,7 @@ def __init__(self, console: Console): self.last_count = 0 self.last_sequence = 0 self.total_dropped = 0 + self.queue_overflow_drops = 0 # Track frames dropped due to queue being full self.queue = queue.Queue(maxsize=1000) # Increased for high data rate self.stop_event = threading.Event() self.monitor_thread = None @@ -37,6 +38,7 @@ def start(self): self.last_count = 0 self.last_sequence = 0 self.total_dropped = 0 + self.queue_overflow_drops = 0 self.stop_event.clear() self.monitor_thread = threading.Thread(target=self._monitor_loop) self.monitor_thread.start() @@ -53,7 +55,7 @@ def put(self, frame: BroadbandFrame): self.queue.put(frame, block=False) except queue.Full: # Drop frame if queue is full to prevent blocking - pass + self.queue_overflow_drops += 1 def put_batch(self, frames: list): """Add multiple frames to monitoring queue efficiently (non-blocking)""" @@ -63,6 +65,7 @@ def put_batch(self, frames: list): except queue.Full: # Drop frame if queue is full to prevent blocking # Only break if queue is full to avoid flooding + self.queue_overflow_drops += 1 break def _monitor_loop(self): @@ -117,6 +120,8 @@ def get_current_stats(self) -> Text: stats_text.append(f"{rate:.1f}/s", style="green") stats_text.append(" | Dropped: ", style="bold") stats_text.append(f"{self.total_dropped:,}", style="red") + stats_text.append(" | Queue Drops: ", style="bold") + stats_text.append(f"{self.queue_overflow_drops:,}", style="magenta") stats_text.append(" | Loss: ", style="bold") stats_text.append(f"{loss_percent:.2f}%", style="yellow") stats_text.append(" | Runtime: ", style="bold") @@ -149,6 +154,7 @@ def __init__(self, output_dir: str): self.samples_received = 0 self.last_sequence = 0 self.dropped_frames = 0 + self.queue_overflow_drops = 0 # Track frames dropped due to queue being full # Create HDF5 file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") @@ -194,6 +200,7 @@ def get_stats(self): "total_frames": 0, "total_samples": 0, "dropped_frames": 0, + "queue_overflow_drops": 0, "last_sequence": 0, "queue_size": 0, "queue_utilization": 0.0, @@ -207,6 +214,7 @@ def get_stats(self): "total_frames": self.frames_received, "total_samples": self.samples_received, "dropped_frames": self.dropped_frames, + "queue_overflow_drops": self.queue_overflow_drops, "last_sequence": self.last_sequence, "queue_size": queue_size, "queue_utilization": queue_size / self.data_queue.maxsize, @@ -251,23 +259,25 @@ def stop(self): def put(self, frame: BroadbandFrame): """Add frame to write queue (non-blocking)""" - # Update stats - self.frames_received += 1 - self.samples_received += len(frame.frame_data) - - # Check for dropped frames - if self.last_sequence != 0: - expected_sequence = self.last_sequence + 1 - if frame.sequence_number != expected_sequence: - self.dropped_frames += frame.sequence_number - expected_sequence - self.last_sequence = frame.sequence_number - - # Try to put in queue, drop if full (non-blocking) + # Try to put in queue first, drop if full (non-blocking) try: self.data_queue.put(frame, block=False) + + # Only update stats for frames that actually made it into the queue + self.frames_received += 1 + self.samples_received += len(frame.frame_data) + + # Check for dropped frames in the data stream (not queue drops) + if self.last_sequence != 0: + expected_sequence = self.last_sequence + 1 + if frame.sequence_number != expected_sequence: + self.dropped_frames += frame.sequence_number - expected_sequence + self.last_sequence = frame.sequence_number + except queue.Full: # Queue is full, drop this frame to prevent blocking the reader - pass + # Note: This is a queue overflow drop, not a network/source drop + self.queue_overflow_drops += 1 def put_batch(self, frames: list): """Add multiple frames efficiently""" @@ -370,6 +380,7 @@ def create_status_table(writer) -> Table: table.add_row("Total Frames", str(stats["total_frames"])) table.add_row("Total Samples", str(stats["total_samples"])) table.add_row("Dropped Frames", str(stats["dropped_frames"])) + table.add_row("Queue Overflow Drops", str(stats["queue_overflow_drops"])) table.add_row("Last Sequence", str(stats["last_sequence"])) # Add queue health information From 06d4d273d94916bd7239920e0efb88f9aff51394 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Fri, 27 Jun 2025 17:53:53 -0700 Subject: [PATCH 04/10] feature: improvements from testing --- synapse/cli/streaming.py | 226 ++++++++++++++++++++++++++++++++------- 1 file changed, 187 insertions(+), 39 deletions(-) diff --git a/synapse/cli/streaming.py b/synapse/cli/streaming.py index a869aff..9d6c6b7 100644 --- a/synapse/cli/streaming.py +++ b/synapse/cli/streaming.py @@ -8,6 +8,8 @@ from rich.table import Table from rich.console import Console from rich.text import Text +from rich.layout import Layout +from rich.panel import Panel import synapse as syn from synapse.api.status_pb2 import DeviceState, StatusCode @@ -88,8 +90,8 @@ def _update_stats(self, frame: BroadbandFrame): self.total_dropped += frame.sequence_number - expected_sequence self.last_sequence = frame.sequence_number - def get_current_stats(self) -> Text: - """Get current statistics as formatted text""" + def get_current_stats(self) -> dict: + """Get current statistics as dictionary""" current_time = time.time() # Calculate message rate @@ -112,22 +114,14 @@ def get_current_stats(self) -> Text: (self.total_dropped / total_expected * 100) if total_expected > 0 else 0 ) - # Create styled text - stats_text = Text() - stats_text.append("Messages: ", style="bold") - stats_text.append(f"{self.message_count:,}", style="cyan") - stats_text.append(" | msgs/sec: ", style="bold") - stats_text.append(f"{rate:.1f}/s", style="green") - stats_text.append(" | Dropped: ", style="bold") - stats_text.append(f"{self.total_dropped:,}", style="red") - stats_text.append(" | Queue Drops: ", style="bold") - stats_text.append(f"{self.queue_overflow_drops:,}", style="magenta") - stats_text.append(" | Loss: ", style="bold") - stats_text.append(f"{loss_percent:.2f}%", style="yellow") - stats_text.append(" | Runtime: ", style="bold") - stats_text.append(f"{current_time - self.start_time:.1f}s", style="blue") - - return stats_text + return { + "messages": self.message_count, + "rate": rate, + "dropped": self.total_dropped, + "queue_drops": self.queue_overflow_drops, + "loss_percent": loss_percent, + "runtime": current_time - self.start_time, + } class BroadbandFrameWriter: @@ -148,13 +142,17 @@ def __init__(self, output_dir: str): self.stop_event = threading.Event() self.writer_thread = None - # Stats tracking + # Stats tracking - separate queued vs actually written self.start_time = time.time() - self.frames_received = 0 - self.samples_received = 0 + self.frames_queued = 0 # Frames successfully added to queue + self.samples_queued = 0 # Samples successfully added to queue + self.frames_written = 0 # Frames actually written to disk + self.samples_written = 0 # Samples actually written to disk self.last_sequence = 0 - self.dropped_frames = 0 - self.queue_overflow_drops = 0 # Track frames dropped due to queue being full + self.dropped_frames = 0 # Missing sequence numbers in stream + self.queue_overflow_drops = 0 # Frames dropped due to queue being full + self.write_errors = 0 # Count of write errors + self.last_write_error = None # Last write error message # Create HDF5 file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") @@ -195,26 +193,40 @@ def get_stats(self): elapsed = time.time() - self.start_time if elapsed == 0: return { - "frames_per_sec": 0, - "samples_per_sec": 0, - "total_frames": 0, - "total_samples": 0, + "frames_queued_per_sec": 0, + "samples_queued_per_sec": 0, + "frames_written_per_sec": 0, + "samples_written_per_sec": 0, + "total_frames_queued": 0, + "total_samples_queued": 0, + "total_frames_written": 0, + "total_samples_written": 0, "dropped_frames": 0, "queue_overflow_drops": 0, + "write_errors": 0, "last_sequence": 0, "queue_size": 0, "queue_utilization": 0.0, "memory_pressure": "Low", + "write_lag": 0, + "last_write_error": None, } queue_size = self.data_queue.qsize() + write_lag = self.frames_queued - self.frames_written + return { - "frames_per_sec": self.frames_received / elapsed, - "samples_per_sec": self.samples_received / elapsed, - "total_frames": self.frames_received, - "total_samples": self.samples_received, + "frames_queued_per_sec": self.frames_queued / elapsed, + "samples_queued_per_sec": self.samples_queued / elapsed, + "frames_written_per_sec": self.frames_written / elapsed, + "samples_written_per_sec": self.samples_written / elapsed, + "total_frames_queued": self.frames_queued, + "total_samples_queued": self.samples_queued, + "total_frames_written": self.frames_written, + "total_samples_written": self.samples_written, "dropped_frames": self.dropped_frames, "queue_overflow_drops": self.queue_overflow_drops, + "write_errors": self.write_errors, "last_sequence": self.last_sequence, "queue_size": queue_size, "queue_utilization": queue_size / self.data_queue.maxsize, @@ -223,6 +235,8 @@ def get_stats(self): else "Medium" if queue_size > 500 else "Low", + "write_lag": write_lag, + "last_write_error": self.last_write_error, } def set_attributes( @@ -264,8 +278,8 @@ def put(self, frame: BroadbandFrame): self.data_queue.put(frame, block=False) # Only update stats for frames that actually made it into the queue - self.frames_received += 1 - self.samples_received += len(frame.frame_data) + self.frames_queued += 1 + self.samples_queued += len(frame.frame_data) # Check for dropped frames in the data stream (not queue drops) if self.last_sequence != 0: @@ -314,6 +328,8 @@ def _write_loop(self): last_flush_time = current_time continue except Exception as e: + self.write_errors += 1 + self.last_write_error = str(e) print(f"Error in writer thread: {e}") continue @@ -359,14 +375,116 @@ def _write_buffer(self, frame_buffer: list): self.sequence_dataset[current_timestamp_size:new_timestamp_size] = sequences self.frame_data_dataset[current_frame_size:new_frame_size] = all_frame_data + # Update written stats AFTER successful write + self.frames_written += num_frames + self.samples_written += len(all_frame_data) + # Flush to disk periodically (not every write) if current_timestamp_size % 1000 == 0: # Flush every 1000 frames self.file.flush() except Exception as e: + self.write_errors += 1 + self.last_write_error = str(e) print(f"Error writing buffer to HDF5: {e}") +def create_combined_display(monitor, writer=None) -> Layout: + """Create a combined display showing both monitor and writer statistics""" + layout = Layout() + + # Create monitor stats + monitor_stats = monitor.get_current_stats() + monitor_text = Text() + monitor_text.append("Stream Monitor\n", style="bold cyan") + monitor_text.append(f"Messages: {monitor_stats['messages']:,} ", style="cyan") + monitor_text.append(f"({monitor_stats['rate']:.1f}/s)\n", style="green") + monitor_text.append(f"Dropped: {monitor_stats['dropped']:,} ", style="red") + monitor_text.append( + f"Queue Drops: {monitor_stats['queue_drops']:,}\n", style="magenta" + ) + monitor_text.append(f"Loss: {monitor_stats['loss_percent']:.2f}% ", style="yellow") + monitor_text.append(f"Runtime: {monitor_stats['runtime']:.1f}s", style="blue") + + if writer: + writer_stats = writer.get_stats() + writer_text = Text() + writer_text.append("HDF5 Writer\n", style="bold yellow") + + # Queue status + util_color = ( + "green" + if writer_stats["queue_utilization"] < 0.5 + else "yellow" + if writer_stats["queue_utilization"] < 0.8 + else "red" + ) + writer_text.append( + f"Queue: {writer_stats['queue_size']}/{writer.data_queue.maxsize} ", + style=util_color, + ) + writer_text.append( + f"({writer_stats['queue_utilization']:.1%})\n", style=util_color + ) + + # Write performance + write_lag = writer_stats["write_lag"] + lag_color = ( + "green" if write_lag < 50 else "yellow" if write_lag < 200 else "red" + ) + writer_text.append( + f"Queued: {writer_stats['total_frames_queued']:,} frames\n", style="cyan" + ) + writer_text.append( + f"Written: {writer_stats['total_frames_written']:,} frames\n", style="green" + ) + writer_text.append(f"Write Lag: {write_lag:,} frames\n", style=lag_color) + writer_text.append( + f"Write Rate: {writer_stats['frames_written_per_sec']:.1f}/s\n", + style="green", + ) + + # Error tracking + if writer_stats["write_errors"] > 0: + writer_text.append( + f"Write Errors: {writer_stats['write_errors']}\n", style="bold red" + ) + if writer_stats["last_write_error"]: + error_msg = ( + writer_stats["last_write_error"][:50] + "..." + if len(writer_stats["last_write_error"]) > 50 + else writer_stats["last_write_error"] + ) + writer_text.append(f"Last Error: {error_msg}\n", style="red") + else: + writer_text.append("Write Errors: 0\n", style="green") + + # Memory pressure + pressure_color = ( + "green" + if writer_stats["memory_pressure"] == "Low" + else "yellow" + if writer_stats["memory_pressure"] == "Medium" + else "red" + ) + writer_text.append( + f"Memory: {writer_stats['memory_pressure']}", style=pressure_color + ) + + # Split layout to show both + layout.split_row( + Layout(Panel(monitor_text, title="Stream Monitor", border_style="cyan")), + Layout(Panel(writer_text, title="HDF5 Writer", border_style="yellow")), + ) + else: + # Only monitor stats + layout.add_split( + Layout(Panel(monitor_text, title="Stream Monitor", border_style="cyan")) + ) + + return layout + + def create_status_table(writer) -> Table: """Create a status table for display""" stats = writer.get_stats() @@ -375,10 +493,10 @@ def create_status_table(writer) -> Table: table.add_column("Metric", style="cyan") table.add_column("Value", style="green") - table.add_row("Frames/sec", f"{stats['frames_per_sec']:.1f}") - table.add_row("Samples/sec", f"{stats['samples_per_sec']:.1f}") - table.add_row("Total Frames", str(stats["total_frames"])) - table.add_row("Total Samples", str(stats["total_samples"])) + table.add_row("Frames/sec", f"{stats['frames_written_per_sec']:.1f}") + table.add_row("Samples/sec", f"{stats['samples_written_per_sec']:.1f}") + table.add_row("Total Frames", str(stats["total_frames_written"])) + table.add_row("Total Samples", str(stats["total_samples_written"])) table.add_row("Dropped Frames", str(stats["dropped_frames"])) table.add_row("Queue Overflow Drops", str(stats["queue_overflow_drops"])) table.add_row("Last Sequence", str(stats["last_sequence"])) @@ -616,7 +734,9 @@ def stream_data(broadband_tap, writer, plotter, monitor, first_frame, console, a try: # Use batch streaming for better throughput - with Live(monitor.get_current_stats(), refresh_per_second=4) as live: + # Create combined display showing both monitor and writer stats + initial_display = create_combined_display(monitor, writer) + with Live(initial_display, refresh_per_second=4) as live: # Process the first frame that we already read for parameter detection if first_frame: if writer: @@ -653,7 +773,8 @@ def stream_data(broadband_tap, writer, plotter, monitor, first_frame, console, a for frame in frames: plotter.put(frame) - live.update(monitor.get_current_stats()) + # Update the combined display + live.update(create_combined_display(monitor, writer)) except KeyboardInterrupt: console.print("\n[yellow]Stopping data collection...[/yellow]") @@ -680,6 +801,33 @@ def stream_data(broadband_tap, writer, plotter, monitor, first_frame, console, a f"[blue]Total streaming time: {final_elapsed:.1f} seconds[/blue]" ) + # Show final statistics summary + if writer: + final_stats = writer.get_stats() + console.print("\n[bold cyan]Final Statistics:[/bold cyan]") + console.print( + f"[green]Frames Written to Disk: {final_stats['total_frames_written']:,}[/green]" + ) + console.print( + f"[green]Samples Written to Disk: {final_stats['total_samples_written']:,}[/green]" + ) + console.print( + f"[cyan]Frames Queued: {final_stats['total_frames_queued']:,}[/cyan]" + ) + if final_stats["write_errors"] > 0: + console.print(f"[red]Write Errors: {final_stats['write_errors']}[/red]") + if final_stats["last_write_error"]: + console.print( + f"[red]Last Error: {final_stats['last_write_error']}[/red]" + ) + final_lag = final_stats["write_lag"] + if final_lag > 0: + console.print( + f"[yellow]Unwritten Frames in Queue: {final_lag:,}[/yellow]" + ) + else: + console.print("[green]All queued frames written to disk[/green]") + def read(args): console = Console() From 0eb7f8119ac86df561033d4d957a738f591ef4ca Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Fri, 27 Jun 2025 18:14:37 -0700 Subject: [PATCH 05/10] Jack up the buffers --- synapse/cli/streaming.py | 4 +- synapse/cli/synapse_plotter.py | 226 +++++++++++++++++++++------------ 2 files changed, 146 insertions(+), 84 deletions(-) diff --git a/synapse/cli/streaming.py b/synapse/cli/streaming.py index 9d6c6b7..91a43cd 100644 --- a/synapse/cli/streaming.py +++ b/synapse/cli/streaming.py @@ -28,7 +28,7 @@ def __init__(self, console: Console): self.last_sequence = 0 self.total_dropped = 0 self.queue_overflow_drops = 0 # Track frames dropped due to queue being full - self.queue = queue.Queue(maxsize=1000) # Increased for high data rate + self.queue = queue.Queue(maxsize=10000) # Increased for high data rate self.stop_event = threading.Event() self.monitor_thread = None @@ -138,7 +138,7 @@ class BroadbandFrameWriter: def __init__(self, output_dir: str): self.output_dir = output_dir - self.data_queue = queue.Queue(maxsize=1000) # Simple bounded queue + self.data_queue = queue.Queue(maxsize=10000) # Simple bounded queue self.stop_event = threading.Event() self.writer_thread = None diff --git a/synapse/cli/synapse_plotter.py b/synapse/cli/synapse_plotter.py index a238974..78b79df 100644 --- a/synapse/cli/synapse_plotter.py +++ b/synapse/cli/synapse_plotter.py @@ -22,18 +22,17 @@ def __init__(self, sample_rate: int, window_size: int, channel_ids): # Track which channels are selected for plotting (start with first 5) self.selected_channels = set(self.channel_ids[:5]) - # One ring buffer (of length BUFFER_SIZE) per channel - self.data_buffers = [ - np.zeros(self.buffer_size) for _ in range(self.num_channels) - ] - - # Timestamp buffer for each channel (in seconds, relative to start) - self.timestamp_buffers = [ - np.zeros(self.buffer_size) for _ in range(self.num_channels) - ] + # Optimized ring buffers - use circular indexing instead of rolling + self.data_buffers = np.zeros( + (self.num_channels, self.buffer_size), dtype=np.float32 + ) + self.timestamp_buffers = np.zeros( + (self.num_channels, self.buffer_size), dtype=np.float64 + ) - # A separate ring-buffer pointer for each channel - self.buffer_positions = [0] * self.num_channels + # Single shared write position for all channels (assuming synchronized data) + self.write_position = 0 + self.buffer_filled = False # Track if we've wrapped around once # Track which channel to display in the "zoom" (single channel) plot self.selected_channel_idx = 0 @@ -57,13 +56,27 @@ def __init__(self, sample_rate: int, window_size: int, channel_ids): # Dictionary to store line series for plotted channels self.active_lines = {} - # Running statistics for centering data - self.running_means = [0.0] * self.num_channels - self.sample_counts = [0] * self.num_channels + # Optimized running statistics for centering data + self.running_means = np.zeros(self.num_channels, dtype=np.float32) + self.sample_counts = np.zeros(self.num_channels, dtype=np.int64) self.alpha = 0.001 # Low-pass filter coefficient for running mean + # Pre-allocated arrays for plotting to avoid memory allocations + self.plot_x_buffer = np.zeros( + self.buffer_size // self.downsample_factor, dtype=np.float64 + ) + self.plot_y_buffer = np.zeros( + self.buffer_size // self.downsample_factor, dtype=np.float32 + ) + + # Adaptive update frequency + self.last_update_time = 0 + self.min_update_interval = 1.0 / 60.0 # Max 60 FPS + self.frames_since_update = 0 + self.update_every_n_frames = 5 # Update every N frames by default + # Queue and threading for BroadbandFrame processing - self.data_queue = queue.Queue(maxsize=2000) + self.data_queue = queue.Queue(maxsize=5000) # Larger queue self.stop_event = Event() self.plot_thread = None self.running = False @@ -268,13 +281,18 @@ def set_signal_separation(self, sender, app_data): def set_downsample_factor(self, sender, app_data): # Ensure the downsample factor is at least 1 self.downsample_factor = max(1, app_data) + # Reallocate plot buffers if needed + new_size = self.buffer_size // self.downsample_factor + if len(self.plot_x_buffer) != new_size: + self.plot_x_buffer = np.zeros(new_size, dtype=np.float64) + self.plot_y_buffer = np.zeros(new_size, dtype=np.float32) def set_center_data(self, sender, app_data): self.center_data = app_data if not app_data: # Reset running means when centering is disabled - self.running_means = [0.0] * self.num_channels - self.sample_counts = [0] * self.num_channels + self.running_means.fill(0.0) + self.sample_counts.fill(0) def put(self, frame: BroadbandFrame): """Add a BroadbandFrame to the processing queue""" @@ -283,7 +301,7 @@ def put(self, frame: BroadbandFrame): except queue.Full: # If queue is full, drop multiple old frames and add the new one dropped = 0 - while dropped < 5: # Drop up to 5 old frames + while dropped < 10: # Drop up to 10 old frames try: self.data_queue.get_nowait() dropped += 1 @@ -335,15 +353,10 @@ def _plot_thread_main(self): # Record start time self.start_time = time.time() - # Main loop - fps_limit = 30 - frame_duration = 1.0 / fps_limit - last_time = time.time() - while dpg.is_dearpygui_running() and not self.stop_event.is_set(): - # Process multiple frames per iteration for better throughput + # Process ALL available frames per iteration for maximum throughput frames_processed = 0 - max_frames_per_iter = 10 + max_frames_per_iter = 50 # Increased batch size while frames_processed < max_frames_per_iter: try: @@ -353,25 +366,77 @@ def _plot_thread_main(self): except queue.Empty: break - # Throttle rendering to the fps limit + self.frames_since_update += frames_processed + + # Adaptive update frequency - only update when necessary now = time.time() - if (now - last_time) >= frame_duration: + should_update = ( + (now - self.last_update_time) >= self.min_update_interval + and self.frames_since_update >= self.update_every_n_frames + ) + + if should_update or frames_processed == 0: self.update_plot() dpg.render_dearpygui_frame() - last_time = now + self.last_update_time = now + self.frames_since_update = 0 + + # Adaptive update frequency based on processing load + if frames_processed > 30: + self.update_every_n_frames = min(20, self.update_every_n_frames + 1) + elif frames_processed < 5: + self.update_every_n_frames = max(1, self.update_every_n_frames - 1) + else: + # Still need to render DearPyGui even if not updating plots + dpg.render_dearpygui_frame() dpg.destroy_context() + def _get_circular_data(self, channel_idx, downsample=True): + """Efficiently get data from circular buffer without copying the entire array""" + # Get the data and timestamp buffers for this channel + data_buf = self.data_buffers[channel_idx] + time_buf = self.timestamp_buffers[channel_idx] + + if not self.buffer_filled: + # Buffer hasn't wrapped yet, just use data from 0 to write_position + end_idx = self.write_position + if downsample: + step = self.downsample_factor + data_slice = data_buf[:end_idx:step] + time_slice = time_buf[:end_idx:step] + else: + data_slice = data_buf[:end_idx] + time_slice = time_buf[:end_idx] + else: + # Buffer has wrapped, need to get data in correct time order + if downsample: + step = self.downsample_factor + # Get newer data (from write_position to end) + newer_data = data_buf[self.write_position :: step] + newer_time = time_buf[self.write_position :: step] + # Get older data (from start to write_position) + older_data = data_buf[: self.write_position : step] + older_time = time_buf[: self.write_position : step] + # Concatenate in chronological order + data_slice = np.concatenate([newer_data, older_data]) + time_slice = np.concatenate([newer_time, older_time]) + else: + newer_data = data_buf[self.write_position :] + newer_time = time_buf[self.write_position :] + older_data = data_buf[: self.write_position] + older_time = time_buf[: self.write_position] + data_slice = np.concatenate([newer_data, older_data]) + time_slice = np.concatenate([newer_time, older_time]) + + return time_slice, data_slice + def update_plot(self): """ Update both the 'all channels' plot and the 'single channel' zoom plot. - We 'roll' each channel's data so that the newest sample is on the right. + Optimized to avoid expensive operations. """ - # Use configurable downsample factor for performance - ds_factor = self.downsample_factor - # Get the current time window for x-axis limits based on latest data - # Use latest data timestamp instead of wall clock for better sync current_data_time = self.latest_data_time x_min = max(0, current_data_time - self.window_size_seconds) x_max = current_data_time @@ -385,23 +450,20 @@ def update_plot(self): continue idx = self.channel_to_index[ch_id] - pos = self.buffer_positions[idx] - # Roll data so that index -1 corresponds to the newest sample - rolled_y = np.roll(self.data_buffers[idx], -pos) - rolled_x = np.roll(self.timestamp_buffers[idx], -pos) + # Get data efficiently using circular buffer indexing + time_data, signal_data = self._get_circular_data(idx, downsample=True) - # Downsample - ds_x = rolled_x[::ds_factor] - ds_y = rolled_y[::ds_factor] + if len(signal_data) == 0: + continue # Apply vertical offset for each active channel to avoid overlap offset = active_channel_idx * self.signal_separation - ds_y_offset = ds_y + offset + signal_data_offset = signal_data + offset # Update the line series for this channel line_tag = self.active_lines[ch_id] - dpg.set_value(line_tag, [ds_x.tolist(), ds_y_offset.tolist()]) + dpg.set_value(line_tag, [time_data.tolist(), signal_data_offset.tolist()]) active_channel_idx += 1 @@ -419,16 +481,13 @@ def update_plot(self): # Update Zoomed Channel Plot # ----------------------------- idx = self.selected_channel_idx - pos = self.buffer_positions[idx] + time_data_zoom, signal_data_zoom = self._get_circular_data(idx, downsample=True) - rolled_y_ch = np.roll(self.data_buffers[idx], -pos) - rolled_x_ch = np.roll(self.timestamp_buffers[idx], -pos) - - ds_x_ch = rolled_x_ch[::ds_factor] - ds_y_ch = rolled_y_ch[::ds_factor] - - # Update the single "zoomed_line" series - dpg.set_value("zoomed_line", [ds_x_ch.tolist(), ds_y_ch.tolist()]) + if len(signal_data_zoom) > 0: + # Update the single "zoomed_line" series + dpg.set_value( + "zoomed_line", [time_data_zoom.tolist(), signal_data_zoom.tolist()] + ) # Set axis limits for both plots dpg.set_axis_limits("x_axis_zoom", x_min, x_max) @@ -444,7 +503,7 @@ def update_plot(self): def process_broadband_frame(self, frame: BroadbandFrame): """ Process a BroadbandFrame and distribute the data to channel buffers. - Uses actual timestamps from the frame for proper time synchronization. + Optimized for high throughput. """ # Set start timestamp on first frame if self.start_timestamp_ns is None: @@ -458,38 +517,41 @@ def process_broadband_frame(self, frame: BroadbandFrame): self.latest_data_time = relative_time_s # frame_data is a flat array with one sample per channel - # We assume the data is organized as: [ch0_sample, ch1_sample, ch2_sample, ...] frame_data = frame.frame_data + num_samples = min(len(frame_data), self.num_channels) + + if num_samples == 0: + return - # Distribute data to each channel buffer - for ch_idx, ch_id in enumerate(self.channel_ids): - if ch_idx < len(frame_data): - raw_sample = frame_data[ch_idx] - - # Center data around zero if enabled - if self.center_data: - # Update running mean with exponential moving average - self.sample_counts[ch_idx] += 1 - if self.sample_counts[ch_idx] == 1: - self.running_means[ch_idx] = raw_sample - else: - self.running_means[ch_idx] = ( - 1 - self.alpha - ) * self.running_means[ch_idx] + self.alpha * raw_sample - - # Subtract the running mean to center around zero - sample = raw_sample - self.running_means[ch_idx] - else: - sample = raw_sample - - # Add sample to this channel's ring buffer - pos = self.buffer_positions[ch_idx] - self.data_buffers[ch_idx][pos] = sample - - # Add actual timestamp to this channel's timestamp buffer - self.timestamp_buffers[ch_idx][pos] = relative_time_s - - self.buffer_positions[ch_idx] = (pos + 1) % self.buffer_size + # Vectorized processing for all channels at once + raw_samples = np.array(frame_data[:num_samples], dtype=np.float32) + + # Center data around zero if enabled - vectorized operation + if self.center_data: + # Update running means with exponential moving average - vectorized + mask = self.sample_counts[:num_samples] == 0 + self.running_means[:num_samples][mask] = raw_samples[mask] + self.running_means[:num_samples][~mask] = ( + 1 - self.alpha + ) * self.running_means[:num_samples][~mask] + self.alpha * raw_samples[ + ~mask + ] + self.sample_counts[:num_samples] += 1 + + # Subtract the running mean to center around zero - vectorized + processed_samples = raw_samples - self.running_means[:num_samples] + else: + processed_samples = raw_samples + + # Add samples to ring buffers - vectorized write + pos = self.write_position + self.data_buffers[:num_samples, pos] = processed_samples + self.timestamp_buffers[:num_samples, pos] = relative_time_s + + # Update write position + self.write_position = (pos + 1) % self.buffer_size + if pos + 1 >= self.buffer_size: + self.buffer_filled = True def select_all_channels(self): """Select all channels for plotting.""" From f4e3162ad6c50880671226d97966f273de651755 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Mon, 30 Jun 2025 10:51:50 -0700 Subject: [PATCH 06/10] tuning things --- synapse/cli/streaming.py | 71 +++++++++++++++++++++++++++------------- 1 file changed, 49 insertions(+), 22 deletions(-) diff --git a/synapse/cli/streaming.py b/synapse/cli/streaming.py index 91a43cd..ef67ed1 100644 --- a/synapse/cli/streaming.py +++ b/synapse/cli/streaming.py @@ -28,7 +28,7 @@ def __init__(self, console: Console): self.last_sequence = 0 self.total_dropped = 0 self.queue_overflow_drops = 0 # Track frames dropped due to queue being full - self.queue = queue.Queue(maxsize=10000) # Increased for high data rate + self.queue = queue.Queue(maxsize=10000) self.stop_event = threading.Event() self.monitor_thread = None @@ -138,7 +138,7 @@ class BroadbandFrameWriter: def __init__(self, output_dir: str): self.output_dir = output_dir - self.data_queue = queue.Queue(maxsize=10000) # Simple bounded queue + self.data_queue = queue.Queue(maxsize=32000) self.stop_event = threading.Event() self.writer_thread = None @@ -299,9 +299,8 @@ def put_batch(self, frames: list): self.put(frame) def _write_loop(self): - """Simple writer thread loop - based on proven pattern""" frame_buffer = [] - buffer_size = 100 # Smaller, more frequent writes + buffer_size = 1000 last_flush_time = time.time() while not self.stop_event.is_set() or not self.data_queue.empty(): @@ -379,8 +378,8 @@ def _write_buffer(self, frame_buffer: list): self.frames_written += num_frames self.samples_written += len(all_frame_data) - # Flush to disk periodically (not every write) - if current_timestamp_size % 1000 == 0: # Flush every 1000 frames + # Flush to disk periodically + if current_timestamp_size % 10000 == 0: self.file.flush() except Exception as e: @@ -662,7 +661,7 @@ def detect_stream_parameters(broadband_tap, console): try: # Get the first message to detect parameters - first_message = broadband_tap.read(timeout_ms=5000) # 5 second timeout + first_message = broadband_tap.read(timeout_ms=5000) if not first_message: console.print( "[bold red]Failed to receive first message for parameter detection[/bold red]" @@ -727,26 +726,54 @@ def get_broadband_tap(args, device, console): return None +def create_status_line(monitor, writer=None) -> Text: + """Create a single status line with all the important metrics""" + monitor_stats = monitor.get_current_stats() + + # Stream monitor stats + text = Text() + text.append(f"Messages: {monitor_stats['messages']:,} ", style="cyan") + text.append(f"({monitor_stats['rate']:.1f}/s) ", style="green") + text.append(f"Dropped: {monitor_stats['dropped']:,} ", style="red") + text.append(f"Queue Drops: {monitor_stats['queue_drops']:,} ", style="magenta") + text.append(f"Loss: {monitor_stats['loss_percent']:.2f}% ", style="yellow") + text.append(f"Runtime: {monitor_stats['runtime']:.1f}s", style="blue") + + if writer: + writer_stats = writer.get_stats() + text.append(" | ", style="dim") + text.append( + f"Written: {writer_stats['total_frames_written']:,} ", style="yellow" + ) + text.append(f"({writer_stats['frames_written_per_sec']:.1f}/s) ", style="green") + text.append( + f"Queue: {writer_stats['queue_size']}/{writer.data_queue.maxsize}", + style="cyan", + ) + + return text + + def stream_data(broadband_tap, writer, plotter, monitor, first_frame, console, args): """Simple streaming function using threaded writer""" duration_exceeded = False start_time = time.time() try: - # Use batch streaming for better throughput - # Create combined display showing both monitor and writer stats - initial_display = create_combined_display(monitor, writer) - with Live(initial_display, refresh_per_second=4) as live: - # Process the first frame that we already read for parameter detection - if first_frame: - if writer: - writer.put(first_frame) - if plotter: - plotter.put(first_frame) - monitor.put(first_frame) - + console.print("[cyan]Starting data streaming... (Ctrl+C to stop)[/cyan]") + + # Process the first frame that we already read for parameter detection + if first_frame: + if writer: + writer.put(first_frame) + if plotter: + plotter.put(first_frame) + monitor.put(first_frame) + + # Use live display for updating status line + with Live(create_status_line(monitor, writer), refresh_per_second=4) as live: # Continue with batch streaming for remaining frames - for message_batch in broadband_tap.stream_batch(batch_size=50): + for message_batch in broadband_tap.stream_batch(batch_size=500): # Check if duration limit has been reached if hasattr(args, "duration") and args.duration is not None: elapsed_time = time.time() - start_time @@ -773,8 +800,8 @@ def stream_data(broadband_tap, writer, plotter, monitor, first_frame, console, a for frame in frames: plotter.put(frame) - # Update the combined display - live.update(create_combined_display(monitor, writer)) + # Update the live status line + live.update(create_status_line(monitor, writer)) except KeyboardInterrupt: console.print("\n[yellow]Stopping data collection...[/yellow]") From 631a395e9c69856f733b4b51d3026646e2163921 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Mon, 30 Jun 2025 12:32:59 -0700 Subject: [PATCH 07/10] tuning things --- synapse/cli/synapse_plotter.py | 278 ++++++++++----------------------- 1 file changed, 86 insertions(+), 192 deletions(-) diff --git a/synapse/cli/synapse_plotter.py b/synapse/cli/synapse_plotter.py index 78b79df..91e29a3 100644 --- a/synapse/cli/synapse_plotter.py +++ b/synapse/cli/synapse_plotter.py @@ -22,17 +22,18 @@ def __init__(self, sample_rate: int, window_size: int, channel_ids): # Track which channels are selected for plotting (start with first 5) self.selected_channels = set(self.channel_ids[:5]) - # Optimized ring buffers - use circular indexing instead of rolling - self.data_buffers = np.zeros( - (self.num_channels, self.buffer_size), dtype=np.float32 - ) - self.timestamp_buffers = np.zeros( - (self.num_channels, self.buffer_size), dtype=np.float64 - ) - - # Single shared write position for all channels (assuming synchronized data) - self.write_position = 0 - self.buffer_filled = False # Track if we've wrapped around once + # One ring buffer (of length BUFFER_SIZE) per channel + self.data_buffers = [ + np.zeros(self.buffer_size) for _ in range(self.num_channels) + ] + + # Timestamp buffer for each channel (in seconds, relative to start) + self.timestamp_buffers = [ + np.zeros(self.buffer_size) for _ in range(self.num_channels) + ] + + # A separate ring-buffer pointer for each channel + self.buffer_positions = [0] * self.num_channels # Track which channel to display in the "zoom" (single channel) plot self.selected_channel_idx = 0 @@ -44,39 +45,16 @@ def __init__(self, sample_rate: int, window_size: int, channel_ids): self.latest_data_time = 0 # Track the most recent data timestamp in seconds # Defaults for the zoomed channel plot - self.zoom_y_min = -4096 - self.zoom_y_max = 4096 + self.zoom_y_min = -1000 + self.zoom_y_max = 1000 self.signal_separation = 1000 - self.downsample_factor = 4 # Default downsample factor for performance - self.center_data = ( - True # Whether to center data around zero by removing DC offset - ) # Dictionary to store line series for plotted channels self.active_lines = {} - # Optimized running statistics for centering data - self.running_means = np.zeros(self.num_channels, dtype=np.float32) - self.sample_counts = np.zeros(self.num_channels, dtype=np.int64) - self.alpha = 0.001 # Low-pass filter coefficient for running mean - - # Pre-allocated arrays for plotting to avoid memory allocations - self.plot_x_buffer = np.zeros( - self.buffer_size // self.downsample_factor, dtype=np.float64 - ) - self.plot_y_buffer = np.zeros( - self.buffer_size // self.downsample_factor, dtype=np.float32 - ) - - # Adaptive update frequency - self.last_update_time = 0 - self.min_update_interval = 1.0 / 60.0 # Max 60 FPS - self.frames_since_update = 0 - self.update_every_n_frames = 5 # Update every N frames by default - # Queue and threading for BroadbandFrame processing - self.data_queue = queue.Queue(maxsize=5000) # Larger queue + self.data_queue = queue.Queue(maxsize=32000) self.stop_event = Event() self.plot_thread = None self.running = False @@ -142,32 +120,19 @@ def setup_gui(self): callback=self.set_signal_separation, ) - dpg.add_text("Downsample Factor:") - dpg.add_input_int( - label="", - default_value=self.downsample_factor, - min_value=1, - max_value=20, - tag="downsample_factor_input", - callback=self.set_downsample_factor, - ) - - dpg.add_separator() - dpg.add_checkbox( - label="Center Data Around Zero", - default_value=self.center_data, - callback=self.set_center_data, - tag="center_data_checkbox", - ) - # Zoomed Channel Y-range - dpg.add_text("Zoomed Y-Axis Range (±):") + dpg.add_text("Zoomed Y-Axis Range (Manual):") + dpg.add_input_float( + label="Min", + default_value=self.zoom_y_min, + callback=self.set_zoom_y_min, + tag="zoom_y_min_input", + ) dpg.add_input_float( - label="Range", - default_value=abs(self.zoom_y_max), - callback=self.set_zoom_y_range, - tag="zoom_y_range_input", - min_value=1.0, + label="Max", + default_value=self.zoom_y_max, + callback=self.set_zoom_y_max, + tag="zoom_y_max_input", ) # ----------------------------- @@ -199,7 +164,7 @@ def setup_gui(self): self.y_axis_all = dpg.add_plot_axis( dpg.mvYAxis, label="Amplitude", tag="y_axis_all" ) - dpg.set_axis_limits("y_axis_all", -4096, 4096 * 10) + dpg.set_axis_limits("y_axis_all", -1096, 4096 * 10) # Create line series for initially selected channels for ch_id in self.selected_channels: @@ -269,31 +234,15 @@ def remove_line_series(self, ch_id): dpg.delete_item(line_tag) del self.active_lines[ch_id] - def set_zoom_y_range(self, sender, app_data): - # Ensure the range is centered around zero - abs_range = abs(app_data) - self.zoom_y_min = -abs_range - self.zoom_y_max = abs_range + def set_zoom_y_min(self, sender, app_data): + self.zoom_y_min = app_data + + def set_zoom_y_max(self, sender, app_data): + self.zoom_y_max = app_data def set_signal_separation(self, sender, app_data): self.signal_separation = app_data - def set_downsample_factor(self, sender, app_data): - # Ensure the downsample factor is at least 1 - self.downsample_factor = max(1, app_data) - # Reallocate plot buffers if needed - new_size = self.buffer_size // self.downsample_factor - if len(self.plot_x_buffer) != new_size: - self.plot_x_buffer = np.zeros(new_size, dtype=np.float64) - self.plot_y_buffer = np.zeros(new_size, dtype=np.float32) - - def set_center_data(self, sender, app_data): - self.center_data = app_data - if not app_data: - # Reset running means when centering is disabled - self.running_means.fill(0.0) - self.sample_counts.fill(0) - def put(self, frame: BroadbandFrame): """Add a BroadbandFrame to the processing queue""" try: @@ -301,7 +250,8 @@ def put(self, frame: BroadbandFrame): except queue.Full: # If queue is full, drop multiple old frames and add the new one dropped = 0 - while dropped < 10: # Drop up to 10 old frames + while dropped < 5: # Drop up to 5 old frames + print(f"Dropping frame {dropped}") try: self.data_queue.get_nowait() dropped += 1 @@ -353,11 +303,15 @@ def _plot_thread_main(self): # Record start time self.start_time = time.time() + # Main loop + fps_limit = 10 + frame_duration = 1.0 / fps_limit + last_time = time.time() + while dpg.is_dearpygui_running() and not self.stop_event.is_set(): - # Process ALL available frames per iteration for maximum throughput + # Process multiple frames per iteration for better throughput frames_processed = 0 - max_frames_per_iter = 50 # Increased batch size - + max_frames_per_iter = 10 while frames_processed < max_frames_per_iter: try: frame = self.data_queue.get_nowait() @@ -366,77 +320,26 @@ def _plot_thread_main(self): except queue.Empty: break - self.frames_since_update += frames_processed - - # Adaptive update frequency - only update when necessary + # Throttle rendering to the fps limit now = time.time() - should_update = ( - (now - self.last_update_time) >= self.min_update_interval - and self.frames_since_update >= self.update_every_n_frames - ) - - if should_update or frames_processed == 0: + if (now - last_time) >= frame_duration: self.update_plot() dpg.render_dearpygui_frame() - self.last_update_time = now - self.frames_since_update = 0 - - # Adaptive update frequency based on processing load - if frames_processed > 30: - self.update_every_n_frames = min(20, self.update_every_n_frames + 1) - elif frames_processed < 5: - self.update_every_n_frames = max(1, self.update_every_n_frames - 1) - else: - # Still need to render DearPyGui even if not updating plots - dpg.render_dearpygui_frame() + last_time = now dpg.destroy_context() - def _get_circular_data(self, channel_idx, downsample=True): - """Efficiently get data from circular buffer without copying the entire array""" - # Get the data and timestamp buffers for this channel - data_buf = self.data_buffers[channel_idx] - time_buf = self.timestamp_buffers[channel_idx] - - if not self.buffer_filled: - # Buffer hasn't wrapped yet, just use data from 0 to write_position - end_idx = self.write_position - if downsample: - step = self.downsample_factor - data_slice = data_buf[:end_idx:step] - time_slice = time_buf[:end_idx:step] - else: - data_slice = data_buf[:end_idx] - time_slice = time_buf[:end_idx] - else: - # Buffer has wrapped, need to get data in correct time order - if downsample: - step = self.downsample_factor - # Get newer data (from write_position to end) - newer_data = data_buf[self.write_position :: step] - newer_time = time_buf[self.write_position :: step] - # Get older data (from start to write_position) - older_data = data_buf[: self.write_position : step] - older_time = time_buf[: self.write_position : step] - # Concatenate in chronological order - data_slice = np.concatenate([newer_data, older_data]) - time_slice = np.concatenate([newer_time, older_time]) - else: - newer_data = data_buf[self.write_position :] - newer_time = time_buf[self.write_position :] - older_data = data_buf[: self.write_position] - older_time = time_buf[: self.write_position] - data_slice = np.concatenate([newer_data, older_data]) - time_slice = np.concatenate([newer_time, older_time]) - - return time_slice, data_slice - def update_plot(self): """ Update both the 'all channels' plot and the 'single channel' zoom plot. - Optimized to avoid expensive operations. + We 'roll' each channel's data so that the newest sample is on the right. """ + # Downsample factor for performance + # Note(gilbert): we should probably make this configurable, it is arbitrary + ds_factor = 4 + # Get the current time window for x-axis limits based on latest data + # Use latest data timestamp instead of wall clock for better sync current_data_time = self.latest_data_time x_min = max(0, current_data_time - self.window_size_seconds) x_max = current_data_time @@ -450,20 +353,23 @@ def update_plot(self): continue idx = self.channel_to_index[ch_id] + pos = self.buffer_positions[idx] - # Get data efficiently using circular buffer indexing - time_data, signal_data = self._get_circular_data(idx, downsample=True) + # Roll data so that index -1 corresponds to the newest sample + rolled_y = np.roll(self.data_buffers[idx], -pos) + rolled_x = np.roll(self.timestamp_buffers[idx], -pos) - if len(signal_data) == 0: - continue + # Downsample + ds_x = rolled_x[::ds_factor] + ds_y = rolled_y[::ds_factor] # Apply vertical offset for each active channel to avoid overlap offset = active_channel_idx * self.signal_separation - signal_data_offset = signal_data + offset + ds_y_offset = ds_y + offset # Update the line series for this channel line_tag = self.active_lines[ch_id] - dpg.set_value(line_tag, [time_data.tolist(), signal_data_offset.tolist()]) + dpg.set_value(line_tag, [ds_x.tolist(), ds_y_offset.tolist()]) active_channel_idx += 1 @@ -481,13 +387,20 @@ def update_plot(self): # Update Zoomed Channel Plot # ----------------------------- idx = self.selected_channel_idx - time_data_zoom, signal_data_zoom = self._get_circular_data(idx, downsample=True) + pos = self.buffer_positions[idx] - if len(signal_data_zoom) > 0: - # Update the single "zoomed_line" series - dpg.set_value( - "zoomed_line", [time_data_zoom.tolist(), signal_data_zoom.tolist()] - ) + rolled_y_ch = np.roll(self.data_buffers[idx], -pos) + rolled_x_ch = np.roll(self.timestamp_buffers[idx], -pos) + + ds_x_ch = rolled_x_ch[::ds_factor] + ds_y_ch = rolled_y_ch[::ds_factor] + + # Remove DC offset by subtracting the mean + if len(ds_y_ch) > 0: + ds_y_ch = ds_y_ch - np.mean(ds_y_ch) + + # Update the single "zoomed_line" series + dpg.set_value("zoomed_line", [ds_x_ch.tolist(), ds_y_ch.tolist()]) # Set axis limits for both plots dpg.set_axis_limits("x_axis_zoom", x_min, x_max) @@ -503,7 +416,7 @@ def update_plot(self): def process_broadband_frame(self, frame: BroadbandFrame): """ Process a BroadbandFrame and distribute the data to channel buffers. - Optimized for high throughput. + Uses actual timestamps from the frame for proper time synchronization. """ # Set start timestamp on first frame if self.start_timestamp_ns is None: @@ -517,41 +430,22 @@ def process_broadband_frame(self, frame: BroadbandFrame): self.latest_data_time = relative_time_s # frame_data is a flat array with one sample per channel + # We assume the data is organized as: [ch0_sample, ch1_sample, ch2_sample, ...] frame_data = frame.frame_data - num_samples = min(len(frame_data), self.num_channels) - if num_samples == 0: - return + # Distribute data to each channel buffer + for ch_idx, ch_id in enumerate(self.channel_ids): + if ch_idx < len(frame_data): + sample = frame_data[ch_idx] + + # Add sample to this channel's ring buffer + pos = self.buffer_positions[ch_idx] + self.data_buffers[ch_idx][pos] = sample + + # Add actual timestamp to this channel's timestamp buffer + self.timestamp_buffers[ch_idx][pos] = relative_time_s - # Vectorized processing for all channels at once - raw_samples = np.array(frame_data[:num_samples], dtype=np.float32) - - # Center data around zero if enabled - vectorized operation - if self.center_data: - # Update running means with exponential moving average - vectorized - mask = self.sample_counts[:num_samples] == 0 - self.running_means[:num_samples][mask] = raw_samples[mask] - self.running_means[:num_samples][~mask] = ( - 1 - self.alpha - ) * self.running_means[:num_samples][~mask] + self.alpha * raw_samples[ - ~mask - ] - self.sample_counts[:num_samples] += 1 - - # Subtract the running mean to center around zero - vectorized - processed_samples = raw_samples - self.running_means[:num_samples] - else: - processed_samples = raw_samples - - # Add samples to ring buffers - vectorized write - pos = self.write_position - self.data_buffers[:num_samples, pos] = processed_samples - self.timestamp_buffers[:num_samples, pos] = relative_time_s - - # Update write position - self.write_position = (pos + 1) % self.buffer_size - if pos + 1 >= self.buffer_size: - self.buffer_filled = True + self.buffer_positions[ch_idx] = (pos + 1) % self.buffer_size def select_all_channels(self): """Select all channels for plotting.""" From 6d47f0f532169a517c6ee84338c19b7c33c7bd4d Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Mon, 30 Jun 2025 12:40:17 -0700 Subject: [PATCH 08/10] Fix plotter --- synapse/cli/synapse_plotter.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/cli/synapse_plotter.py b/synapse/cli/synapse_plotter.py index 91e29a3..263db73 100644 --- a/synapse/cli/synapse_plotter.py +++ b/synapse/cli/synapse_plotter.py @@ -250,8 +250,7 @@ def put(self, frame: BroadbandFrame): except queue.Full: # If queue is full, drop multiple old frames and add the new one dropped = 0 - while dropped < 5: # Drop up to 5 old frames - print(f"Dropping frame {dropped}") + while dropped < 100: try: self.data_queue.get_nowait() dropped += 1 From 85f7e47591c776cb5aa493d1700a8b8596153389 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Mon, 30 Jun 2025 13:49:26 -0700 Subject: [PATCH 09/10] Add obvious plotting --- synapse/cli/streaming.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/synapse/cli/streaming.py b/synapse/cli/streaming.py index ef67ed1..d1e51ec 100644 --- a/synapse/cli/streaming.py +++ b/synapse/cli/streaming.py @@ -855,6 +855,14 @@ def stream_data(broadband_tap, writer, plotter, monitor, first_frame, console, a else: console.print("[green]All queued frames written to disk[/green]") + # Make the plot command more prominent + console.print("\n" + "=" * 60) + console.print("[bold green]Plot the data with:[/bold green]") + console.print( + f"[bold yellow]synapsectl plot --data {writer.filename}[/bold yellow]" + ) + console.print("=" * 60) + def read(args): console = Console() From 8e4447d1f5ab3a863d0a530d44b3c79d3b944489 Mon Sep 17 00:00:00 2001 From: Gilbert Montague Date: Mon, 30 Jun 2025 14:08:58 -0700 Subject: [PATCH 10/10] Add obvious plotting --- synapse/cli/offline_hdf5_plotter.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/synapse/cli/offline_hdf5_plotter.py b/synapse/cli/offline_hdf5_plotter.py index 5ece7be..7bba75d 100644 --- a/synapse/cli/offline_hdf5_plotter.py +++ b/synapse/cli/offline_hdf5_plotter.py @@ -242,9 +242,11 @@ def plot(plot_data, console): plot_single.showGrid(x=True, y=True) # Create a curve for the single channel + initial_data = plot_data.data.iloc[:, 0].to_numpy() + initial_data_centered = initial_data - np.mean(initial_data) curve_single = plot_single.plot( time_arr, - plot_data.data.iloc[:, 0].to_numpy(), + initial_data_centered, pen=pg.intColor(0, hues=plot_data.num_channels), name=f"Ch {plot_data.channel_ids[0]}", ) @@ -280,13 +282,15 @@ def update_single_channel(channel_id): return # Update time domain plot - curve_single.setData(time_arr, plot_data.data.iloc[:, channel_index].to_numpy()) + channel_data = plot_data.data.iloc[:, channel_index].to_numpy() + channel_data_centered = channel_data - np.mean(channel_data) + curve_single.setData(time_arr, channel_data_centered) curve_single.setPen(pg.intColor(channel_index, hues=plot_data.num_channels)) # Update FFT plot fft_plot.clear() fft_freq, fft_magnitude = compute_fft( - plot_data.data.iloc[:, channel_index].to_numpy(), plot_data.sample_rate + channel_data_centered, plot_data.sample_rate ) # Plot FFT with improved visibility