diff --git a/synapse/cli/offline_hdf5_plotter.py b/synapse/cli/offline_hdf5_plotter.py index 5ece7be..7bba75d 100644 --- a/synapse/cli/offline_hdf5_plotter.py +++ b/synapse/cli/offline_hdf5_plotter.py @@ -242,9 +242,11 @@ def plot(plot_data, console): plot_single.showGrid(x=True, y=True) # Create a curve for the single channel + initial_data = plot_data.data.iloc[:, 0].to_numpy() + initial_data_centered = initial_data - np.mean(initial_data) curve_single = plot_single.plot( time_arr, - plot_data.data.iloc[:, 0].to_numpy(), + initial_data_centered, pen=pg.intColor(0, hues=plot_data.num_channels), name=f"Ch {plot_data.channel_ids[0]}", ) @@ -280,13 +282,15 @@ def update_single_channel(channel_id): return # Update time domain plot - curve_single.setData(time_arr, plot_data.data.iloc[:, channel_index].to_numpy()) + channel_data = plot_data.data.iloc[:, channel_index].to_numpy() + channel_data_centered = channel_data - np.mean(channel_data) + curve_single.setData(time_arr, channel_data_centered) curve_single.setPen(pg.intColor(channel_index, hues=plot_data.num_channels)) # Update FFT plot fft_plot.clear() fft_freq, fft_magnitude = compute_fft( - plot_data.data.iloc[:, channel_index].to_numpy(), plot_data.sample_rate + channel_data_centered, plot_data.sample_rate ) # Plot FFT with improved visibility diff --git a/synapse/cli/offline_plot.py b/synapse/cli/offline_plot.py index 0d520a7..8670e87 100644 --- a/synapse/cli/offline_plot.py +++ b/synapse/cli/offline_plot.py @@ -172,6 +172,9 @@ def plot(args): console.print( "[yellow bold]Legacy plotting is deprecated, please use the hdf5 files going forward[/yellow bold]" ) + console.print( + "[yellow bold]Use --data to plot hdf5 files[/yellow bold]" + ) app = QtWidgets.QApplication.instance() if not app: diff --git a/synapse/cli/streaming.py b/synapse/cli/streaming.py index 73d8fe8..d1e51ec 100644 --- a/synapse/cli/streaming.py +++ b/synapse/cli/streaming.py @@ -8,6 +8,8 @@ from rich.table import Table from rich.console import Console from rich.text import Text +from rich.layout import Layout +from rich.panel import Panel import synapse as syn from synapse.api.status_pb2 import DeviceState, StatusCode @@ -25,7 +27,8 @@ def __init__(self, console: Console): self.last_count = 0 self.last_sequence = 0 self.total_dropped = 0 - self.queue = queue.Queue(maxsize=100) + self.queue_overflow_drops = 0 # Track frames dropped due to queue being full + self.queue = queue.Queue(maxsize=10000) self.stop_event = threading.Event() self.monitor_thread = None @@ -37,6 +40,7 @@ def start(self): self.last_count = 0 self.last_sequence = 0 self.total_dropped = 0 + self.queue_overflow_drops = 0 self.stop_event.clear() self.monitor_thread = threading.Thread(target=self._monitor_loop) self.monitor_thread.start() @@ -53,7 +57,18 @@ def put(self, frame: BroadbandFrame): self.queue.put(frame, block=False) except queue.Full: # Drop frame if queue is full to prevent blocking - pass + self.queue_overflow_drops += 1 + + def put_batch(self, frames: list): + """Add multiple frames to monitoring queue efficiently (non-blocking)""" + for frame in frames: + try: + self.queue.put(frame, block=False) + except queue.Full: + # Drop frame if queue is full to prevent blocking + # Only break if queue is full to avoid flooding + self.queue_overflow_drops += 1 + break def _monitor_loop(self): """Process frames for monitoring in separate thread""" @@ -75,8 +90,8 @@ def _update_stats(self, frame: BroadbandFrame): self.total_dropped += frame.sequence_number - expected_sequence self.last_sequence = frame.sequence_number - def get_current_stats(self) -> Text: - """Get current statistics as formatted text""" + def get_current_stats(self) -> dict: + """Get current statistics as dictionary""" current_time = time.time() # Calculate message rate @@ -99,105 +114,154 @@ def get_current_stats(self) -> Text: (self.total_dropped / total_expected * 100) if total_expected > 0 else 0 ) - # Create styled text - stats_text = Text() - stats_text.append("Messages: ", style="bold") - stats_text.append(f"{self.message_count:,}", style="cyan") - stats_text.append(" | msgs/sec: ", style="bold") - stats_text.append(f"{rate:.1f}/s", style="green") - stats_text.append(" | Dropped: ", style="bold") - stats_text.append(f"{self.total_dropped:,}", style="red") - stats_text.append(" | Loss: ", style="bold") - stats_text.append(f"{loss_percent:.2f}%", style="yellow") - stats_text.append(" | Runtime: ", style="bold") - stats_text.append(f"{current_time - self.start_time:.1f}s", style="blue") - - return stats_text + return { + "messages": self.message_count, + "rate": rate, + "dropped": self.total_dropped, + "queue_drops": self.queue_overflow_drops, + "loss_percent": loss_percent, + "runtime": current_time - self.start_time, + } class BroadbandFrameWriter: + """ + Threaded HDF5 writer for broadband data streams + + Features: + - Single writer thread with bounded queue (prevents blocking the reader) + - Non-blocking puts with frame dropping if queue is full + - Batch writes to HDF5 for better I/O performance + - Compressed datasets to reduce disk space and I/O + - Periodic flushes for optimal performance + """ + def __init__(self, output_dir: str): self.output_dir = output_dir - self.data_queue = queue.Queue(maxsize=2000) # Increased queue size + self.data_queue = queue.Queue(maxsize=32000) self.stop_event = threading.Event() self.writer_thread = None - # Stats tracking + # Stats tracking - separate queued vs actually written self.start_time = time.time() - self.frames_received = 0 - self.samples_received = 0 + self.frames_queued = 0 # Frames successfully added to queue + self.samples_queued = 0 # Samples successfully added to queue + self.frames_written = 0 # Frames actually written to disk + self.samples_written = 0 # Samples actually written to disk self.last_sequence = 0 - self.dropped_frames = 0 + self.dropped_frames = 0 # Missing sequence numbers in stream + self.queue_overflow_drops = 0 # Frames dropped due to queue being full + self.write_errors = 0 # Count of write errors + self.last_write_error = None # Last write error message # Create HDF5 file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") self.filename = os.path.join(output_dir, f"broadband_data_{timestamp}.h5") self.file = h5py.File(self.filename, "w") - # Create datasets + # Create datasets with chunking and compression for better performance self.timestamp_dataset = self.file.create_dataset( - "/acquisition/timestamp", shape=(0,), maxshape=(None,), dtype="uint64" + "/acquisition/timestamp", + shape=(0,), + maxshape=(None,), + dtype="uint64", + chunks=True, + compression="gzip", + compression_opts=1, # Fast compression ) self.sequence_dataset = self.file.create_dataset( - "/acquisition/sequence_number", shape=(0,), maxshape=(None,), dtype="uint64" + "/acquisition/sequence_number", + shape=(0,), + maxshape=(None,), + dtype="uint64", + chunks=True, + compression="gzip", + compression_opts=1, ) - # Create frame data dataset as a flat array of samples self.frame_data_dataset = self.file.create_dataset( - "/acquisition/ElectricalSeries", shape=(0,), maxshape=(None,), dtype="int32" + "/acquisition/ElectricalSeries", + shape=(0,), + maxshape=(None,), + dtype="int32", + chunks=True, + compression="gzip", + compression_opts=1, ) - # Buffer for collecting frames before writing - self.frame_buffer = [] - self.buffer_size = 500 # Reduced buffer size for more frequent writes - def get_stats(self): """Get current statistics""" elapsed = time.time() - self.start_time if elapsed == 0: return { - "frames_per_sec": 0, - "samples_per_sec": 0, - "total_frames": 0, - "total_samples": 0, + "frames_queued_per_sec": 0, + "samples_queued_per_sec": 0, + "frames_written_per_sec": 0, + "samples_written_per_sec": 0, + "total_frames_queued": 0, + "total_samples_queued": 0, + "total_frames_written": 0, + "total_samples_written": 0, "dropped_frames": 0, + "queue_overflow_drops": 0, + "write_errors": 0, "last_sequence": 0, + "queue_size": 0, + "queue_utilization": 0.0, + "memory_pressure": "Low", + "write_lag": 0, + "last_write_error": None, } + queue_size = self.data_queue.qsize() + write_lag = self.frames_queued - self.frames_written + return { - "frames_per_sec": self.frames_received / elapsed, - "samples_per_sec": self.samples_received / elapsed, - "total_frames": self.frames_received, - "total_samples": self.samples_received, + "frames_queued_per_sec": self.frames_queued / elapsed, + "samples_queued_per_sec": self.samples_queued / elapsed, + "frames_written_per_sec": self.frames_written / elapsed, + "samples_written_per_sec": self.samples_written / elapsed, + "total_frames_queued": self.frames_queued, + "total_samples_queued": self.samples_queued, + "total_frames_written": self.frames_written, + "total_samples_written": self.samples_written, "dropped_frames": self.dropped_frames, + "queue_overflow_drops": self.queue_overflow_drops, + "write_errors": self.write_errors, "last_sequence": self.last_sequence, + "queue_size": queue_size, + "queue_utilization": queue_size / self.data_queue.maxsize, + "memory_pressure": "High" + if queue_size > 800 + else "Medium" + if queue_size > 500 + else "Low", + "write_lag": write_lag, + "last_write_error": self.last_write_error, } def set_attributes( self, sample_rate_hz: float, channels: list, session_description: str = "" ): - """Set HDF5 attributes similar to C++ implementation""" - # Set basic attributes + """Set HDF5 attributes""" self.file.attrs["sample_rate_hz"] = sample_rate_hz if session_description: self.file.attrs["session_description"] = session_description - - # Set session start time self.file.attrs["session_start_time"] = datetime.now().isoformat() - # Set device type device_group = self.file.create_group("general/device") device_group.attrs["device_type"] = "SciFi" - # Create electrodes group and write channel IDs electrodes_group = self.file.create_group( "general/extracellular_ephys/electrodes" ) - channel_ids = channels - electrodes_group.create_dataset("id", data=channel_ids, dtype="uint32") + electrodes_group.create_dataset("id", data=channels, dtype="uint32") def start(self): """Start the writer thread""" - self.writer_thread = threading.Thread(target=self._write_loop) + self.stop_event.clear() + self.writer_thread = threading.Thread( + target=self._write_loop, name="HDF5Writer" + ) self.writer_thread.start() def stop(self): @@ -205,122 +269,222 @@ def stop(self): self.stop_event.set() if self.writer_thread: self.writer_thread.join() - self.flush() self.file.close() def put(self, frame: BroadbandFrame): - """Add frame to the write queue (non-blocking)""" - # Update stats - self.frames_received += 1 - self.samples_received += len(frame.frame_data) - - # Check for dropped frames - if self.last_sequence != 0: - expected_sequence = self.last_sequence + 1 - if frame.sequence_number != expected_sequence: - self.dropped_frames += frame.sequence_number - expected_sequence - self.last_sequence = frame.sequence_number - + """Add frame to write queue (non-blocking)""" + # Try to put in queue first, drop if full (non-blocking) try: self.data_queue.put(frame, block=False) - except queue.Full: - # If queue is full, we'll drop the oldest data - try: - self.data_queue.get_nowait() - self.data_queue.put(frame, block=False) - except queue.Empty: - pass - def put_batch(self, frames: list): - """Add multiple frames to the write queue efficiently""" - for frame in frames: - self.frames_received += 1 - self.samples_received += len(frame.frame_data) + # Only update stats for frames that actually made it into the queue + self.frames_queued += 1 + self.samples_queued += len(frame.frame_data) - # Check for dropped frames + # Check for dropped frames in the data stream (not queue drops) if self.last_sequence != 0: expected_sequence = self.last_sequence + 1 if frame.sequence_number != expected_sequence: self.dropped_frames += frame.sequence_number - expected_sequence self.last_sequence = frame.sequence_number - # Try to add all frames to queue + except queue.Full: + # Queue is full, drop this frame to prevent blocking the reader + # Note: This is a queue overflow drop, not a network/source drop + self.queue_overflow_drops += 1 + + def put_batch(self, frames: list): + """Add multiple frames efficiently""" for frame in frames: - try: - self.data_queue.put(frame, block=False) - except queue.Full: - # If queue is full, drop oldest and try again - try: - self.data_queue.get_nowait() - self.data_queue.put(frame, block=False) - except queue.Empty: - pass + self.put(frame) def _write_loop(self): - """Main writing loop that consumes data from the queue""" + frame_buffer = [] + buffer_size = 1000 + last_flush_time = time.time() + while not self.stop_event.is_set() or not self.data_queue.empty(): try: - frame = self.data_queue.get(timeout=0.1) - self.frame_buffer.append(frame) - - # Write when buffer is full - if len(self.frame_buffer) >= self.buffer_size: - self._write_buffer() + # Get frame from queue with timeout + frame = self.data_queue.get(timeout=0.5) + frame_buffer.append(frame) + + current_time = time.time() + # Write buffer if it's full or if enough time has passed + if len(frame_buffer) >= buffer_size or ( + frame_buffer and current_time - last_flush_time > 1.0 + ): + self._write_buffer(frame_buffer) + frame_buffer = [] + last_flush_time = current_time except queue.Empty: + current_time = time.time() + # Flush any remaining data if timeout occurred + if frame_buffer and current_time - last_flush_time > 1.0: + self._write_buffer(frame_buffer) + frame_buffer = [] + last_flush_time = current_time continue except Exception as e: - print(f"Error writing data: {e}") + self.write_errors += 1 + self.last_write_error = str(e) + print(f"Error in writer thread: {e}") continue - def _write_buffer(self): - """Write the buffered frames to disk""" - if not self.frame_buffer: - return + # Write any remaining frames when stopping + if frame_buffer: + self._write_buffer(frame_buffer) - # Get current sizes - current_timestamp_size = self.timestamp_dataset.shape[0] - current_frame_size = self.frame_data_dataset.shape[0] - num_frames = len(self.frame_buffer) + def _write_buffer(self, frame_buffer: list): + """Write buffered frames to HDF5""" + if not frame_buffer: + return - # Resize datasets - new_timestamp_size = current_timestamp_size + num_frames - new_frame_size = current_frame_size + ( - num_frames * len(self.frame_buffer[0].frame_data) + try: + # Get current dataset sizes + current_timestamp_size = self.timestamp_dataset.shape[0] + current_frame_size = self.frame_data_dataset.shape[0] + + num_frames = len(frame_buffer) + samples_per_frame = len(frame_buffer[0].frame_data) + + # Resize datasets + new_timestamp_size = current_timestamp_size + num_frames + new_frame_size = current_frame_size + (num_frames * samples_per_frame) + + self.timestamp_dataset.resize(new_timestamp_size, axis=0) + self.sequence_dataset.resize(new_timestamp_size, axis=0) + self.frame_data_dataset.resize(new_frame_size, axis=0) + + # Write data in batch + timestamps = [] + sequences = [] + all_frame_data = [] + + for frame in frame_buffer: + timestamps.append(frame.timestamp_ns) + sequences.append(frame.sequence_number) + all_frame_data.extend(frame.frame_data) + + # Write all data at once (more efficient) + self.timestamp_dataset[current_timestamp_size:new_timestamp_size] = ( + timestamps + ) + self.sequence_dataset[current_timestamp_size:new_timestamp_size] = sequences + self.frame_data_dataset[current_frame_size:new_frame_size] = all_frame_data + + # Update written stats AFTER successful write + self.frames_written += num_frames + self.samples_written += len(all_frame_data) + + # Flush to disk periodically + if current_timestamp_size % 10000 == 0: + self.file.flush() + + except Exception as e: + self.write_errors += 1 + self.last_write_error = str(e) + print(f"Error writing buffer to HDF5: {e}") + + +def create_combined_display(monitor, writer=None) -> Layout: + """Create a combined display showing both monitor and writer statistics""" + layout = Layout() + + # Create monitor stats + monitor_stats = monitor.get_current_stats() + monitor_text = Text() + monitor_text.append("Stream Monitor\n", style="bold cyan") + monitor_text.append(f"Messages: {monitor_stats['messages']:,} ", style="cyan") + monitor_text.append(f"({monitor_stats['rate']:.1f}/s)\n", style="green") + monitor_text.append(f"Dropped: {monitor_stats['dropped']:,} ", style="red") + monitor_text.append( + f"Queue Drops: {monitor_stats['queue_drops']:,}\n", style="magenta" + ) + monitor_text.append(f"Loss: {monitor_stats['loss_percent']:.2f}% ", style="yellow") + monitor_text.append(f"Runtime: {monitor_stats['runtime']:.1f}s", style="blue") + + if writer: + writer_stats = writer.get_stats() + writer_text = Text() + writer_text.append("HDF5 Writer\n", style="bold yellow") + + # Queue status + util_color = ( + "green" + if writer_stats["queue_utilization"] < 0.5 + else "yellow" + if writer_stats["queue_utilization"] < 0.8 + else "red" + ) + writer_text.append( + f"Queue: {writer_stats['queue_size']}/{writer.data_queue.maxsize} ", + style=util_color, + ) + writer_text.append( + f"({writer_stats['queue_utilization']:.1%})\n", style=util_color ) - self.timestamp_dataset.resize(new_timestamp_size, axis=0) - self.sequence_dataset.resize(new_timestamp_size, axis=0) - self.frame_data_dataset.resize(new_frame_size, axis=0) - - # Write data - for i, frame in enumerate(self.frame_buffer): - idx = current_timestamp_size + i - self.timestamp_dataset[idx] = frame.timestamp_ns - self.sequence_dataset[idx] = frame.sequence_number - - # Write frame data - frame_start = current_frame_size + (i * len(frame.frame_data)) - frame_end = frame_start + len(frame.frame_data) - self.frame_data_dataset[frame_start:frame_end] = frame.frame_data + # Write performance + write_lag = writer_stats["write_lag"] + lag_color = ( + "green" if write_lag < 50 else "yellow" if write_lag < 200 else "red" + ) + writer_text.append( + f"Queued: {writer_stats['total_frames_queued']:,} frames\n", style="cyan" + ) + writer_text.append( + f"Written: {writer_stats['total_frames_written']:,} frames\n", style="green" + ) + writer_text.append(f"Write Lag: {write_lag:,} frames\n", style=lag_color) + writer_text.append( + f"Write Rate: {writer_stats['frames_written_per_sec']:.1f}/s\n", + style="green", + ) - # Clear buffer - self.frame_buffer = [] + # Error tracking + if writer_stats["write_errors"] > 0: + writer_text.append( + f"Write Errors: {writer_stats['write_errors']}\n", style="bold red" + ) + if writer_stats["last_write_error"]: + error_msg = ( + writer_stats["last_write_error"][:50] + "..." + if len(writer_stats["last_write_error"]) > 50 + else writer_stats["last_write_error"] + ) + writer_text.append(f"Last Error: {error_msg}\n", style="red") + else: + writer_text.append("Write Errors: 0\n", style="green") + + # Memory pressure + pressure_color = ( + "green" + if writer_stats["memory_pressure"] == "Low" + else "yellow" + if writer_stats["memory_pressure"] == "Medium" + else "red" + ) + writer_text.append( + f"Memory: {writer_stats['memory_pressure']}", style=pressure_color + ) - # Flush to disk - self.flush() + # Split layout to show both + layout.split_row( + Layout(Panel(monitor_text, title="Stream Monitor", border_style="cyan")), + Layout(Panel(writer_text, title="HDF5 Writer", border_style="yellow")), + ) + else: + # Only monitor stats + layout.add_split( + Layout(Panel(monitor_text, title="Stream Monitor", border_style="cyan")) + ) - def flush(self): - """Flush all datasets to disk""" - if self.frame_buffer: - self._write_buffer() - self.timestamp_dataset.flush() - self.sequence_dataset.flush() - self.frame_data_dataset.flush() - self.file.flush() + return layout -def create_status_table(writer: BroadbandFrameWriter) -> Table: +def create_status_table(writer) -> Table: """Create a status table for display""" stats = writer.get_stats() table = Table(title="Streaming Status") @@ -328,19 +492,43 @@ def create_status_table(writer: BroadbandFrameWriter) -> Table: table.add_column("Metric", style="cyan") table.add_column("Value", style="green") - table.add_row("Frames/sec", f"{stats['frames_per_sec']:.1f}") - table.add_row("Samples/sec", f"{stats['samples_per_sec']:.1f}") - table.add_row("Total Frames", str(stats["total_frames"])) - table.add_row("Total Samples", str(stats["total_samples"])) + table.add_row("Frames/sec", f"{stats['frames_written_per_sec']:.1f}") + table.add_row("Samples/sec", f"{stats['samples_written_per_sec']:.1f}") + table.add_row("Total Frames", str(stats["total_frames_written"])) + table.add_row("Total Samples", str(stats["total_samples_written"])) table.add_row("Dropped Frames", str(stats["dropped_frames"])) + table.add_row("Queue Overflow Drops", str(stats["queue_overflow_drops"])) table.add_row("Last Sequence", str(stats["last_sequence"])) + # Add queue health information + utilization = stats["queue_utilization"] + util_color = ( + "green" if utilization < 0.5 else "yellow" if utilization < 0.8 else "red" + ) + + max_size = writer.data_queue.maxsize + table.add_row( + "Queue Usage", + f"{stats['queue_size']}/{max_size} ({utilization:.1%})", + style=util_color, + ) + + # Add memory pressure indicator + pressure_color = ( + "green" + if stats["memory_pressure"] == "Low" + else "yellow" + if stats["memory_pressure"] == "Medium" + else "red" + ) + table.add_row("Memory Pressure", stats["memory_pressure"], style=pressure_color) + return table def add_commands(subparsers): read_parser = subparsers.add_parser( - "read", help="Read from a device's Broadband Tap" + "read", help="Read from a device's Broadband Tap and save to HDF5" ) read_parser.add_argument( @@ -363,6 +551,11 @@ def add_commands(subparsers): read_parser.add_argument( "--list-taps", action="store_true", help="List all available taps and exit" ) + read_parser.add_argument( + "--duration", + type=float, + help="Duration in seconds to stream data (if not specified, streams until Ctrl+C)", + ) read_parser.set_defaults(func=read) @@ -404,6 +597,17 @@ def start_device(device, console): return True +def stop_device(device, console): + with console.status("Stopping device...", spinner="bouncingBall"): + stop_status = device.stop_with_status() + if stop_status.code != StatusCode.kOk: + console.print( + f"[bold red]Failed to stop device: {stop_status.message}[/bold red]" + ) + return False + return True + + def setup_output(args, console): if not args.output: console.print("[bold red]No output directory specified[/bold red]") @@ -457,7 +661,7 @@ def detect_stream_parameters(broadband_tap, console): try: # Get the first message to detect parameters - first_message = broadband_tap.read(timeout_ms=5000) # 5 second timeout + first_message = broadband_tap.read(timeout_ms=5000) if not first_message: console.print( "[bold red]Failed to receive first message for parameter detection[/bold red]" @@ -522,6 +726,144 @@ def get_broadband_tap(args, device, console): return None +def create_status_line(monitor, writer=None) -> Text: + """Create a single status line with all the important metrics""" + monitor_stats = monitor.get_current_stats() + + # Stream monitor stats + text = Text() + text.append(f"Messages: {monitor_stats['messages']:,} ", style="cyan") + text.append(f"({monitor_stats['rate']:.1f}/s) ", style="green") + text.append(f"Dropped: {monitor_stats['dropped']:,} ", style="red") + text.append(f"Queue Drops: {monitor_stats['queue_drops']:,} ", style="magenta") + text.append(f"Loss: {monitor_stats['loss_percent']:.2f}% ", style="yellow") + text.append(f"Runtime: {monitor_stats['runtime']:.1f}s", style="blue") + + if writer: + writer_stats = writer.get_stats() + text.append(" | ", style="dim") + text.append( + f"Written: {writer_stats['total_frames_written']:,} ", style="yellow" + ) + text.append(f"({writer_stats['frames_written_per_sec']:.1f}/s) ", style="green") + text.append( + f"Queue: {writer_stats['queue_size']}/{writer.data_queue.maxsize}", + style="cyan", + ) + + return text + + +def stream_data(broadband_tap, writer, plotter, monitor, first_frame, console, args): + """Simple streaming function using threaded writer""" + duration_exceeded = False + start_time = time.time() + + try: + console.print("[cyan]Starting data streaming... (Ctrl+C to stop)[/cyan]") + + # Process the first frame that we already read for parameter detection + if first_frame: + if writer: + writer.put(first_frame) + if plotter: + plotter.put(first_frame) + monitor.put(first_frame) + + # Use live display for updating status line + with Live(create_status_line(monitor, writer), refresh_per_second=4) as live: + # Continue with batch streaming for remaining frames + for message_batch in broadband_tap.stream_batch(batch_size=500): + # Check if duration limit has been reached + if hasattr(args, "duration") and args.duration is not None: + elapsed_time = time.time() - start_time + if elapsed_time >= args.duration: + duration_exceeded = True + console.print( + f"\n[yellow]Duration limit of {args.duration:.1f} seconds reached. Stopping data collection...[/yellow]" + ) + break + + frames = [] + for message in message_batch: + frame = BroadbandFrame() + frame.ParseFromString(message) + frames.append(frame) + + # Send batch to monitor and writer for better performance + monitor.put_batch(frames) + if writer and frames: + writer.put_batch(frames) + + # Send to plotter individually (plotter might need individual frames) + if plotter: + for frame in frames: + plotter.put(frame) + + # Update the live status line + live.update(create_status_line(monitor, writer)) + + except KeyboardInterrupt: + console.print("\n[yellow]Stopping data collection...[/yellow]") + finally: + if writer: + writer.stop() + if plotter: + plotter.stop() + if monitor: + monitor.stop() + if args.output: + console.print(f"[green]Data saved to {args.output}[/green]") + if args.plot: + console.print("[green]Plotter stopped[/green]") + + # Show final duration info + final_elapsed = time.time() - start_time + if duration_exceeded: + console.print( + f"[blue]Streaming completed after {final_elapsed:.1f} seconds (duration limit reached)[/blue]" + ) + else: + console.print( + f"[blue]Total streaming time: {final_elapsed:.1f} seconds[/blue]" + ) + + # Show final statistics summary + if writer: + final_stats = writer.get_stats() + console.print("\n[bold cyan]Final Statistics:[/bold cyan]") + console.print( + f"[green]Frames Written to Disk: {final_stats['total_frames_written']:,}[/green]" + ) + console.print( + f"[green]Samples Written to Disk: {final_stats['total_samples_written']:,}[/green]" + ) + console.print( + f"[cyan]Frames Queued: {final_stats['total_frames_queued']:,}[/cyan]" + ) + if final_stats["write_errors"] > 0: + console.print(f"[red]Write Errors: {final_stats['write_errors']}[/red]") + if final_stats["last_write_error"]: + console.print( + f"[red]Last Error: {final_stats['last_write_error']}[/red]" + ) + final_lag = final_stats["write_lag"] + if final_lag > 0: + console.print( + f"[yellow]Unwritten Frames in Queue: {final_lag:,}[/yellow]" + ) + else: + console.print("[green]All queued frames written to disk[/green]") + + # Make the plot command more prominent + console.print("\n" + "=" * 60) + console.print("[bold green]Plot the data with:[/bold green]") + console.print( + f"[bold yellow]synapsectl plot --data {writer.filename}[/bold yellow]" + ) + console.print("=" * 60) + + def read(args): console = Console() @@ -578,6 +920,7 @@ def read(args): writer = BroadbandFrameWriter(args.output) writer.set_attributes(sample_rate_hz=sample_rate, channels=available_channels) writer.start() + console.log("[cyan]Using threaded writer for serializing data[/cyan]") # Setup plotter if requested plotter = None @@ -604,46 +947,14 @@ def read(args): monitor = StreamMonitor(console) monitor.start() - try: - # Use batch streaming for better throughput - with Live(monitor.get_current_stats(), refresh_per_second=4) as live: - # Process the first frame that we already read for parameter detection - if first_frame: - if writer: - writer.put(first_frame) - if plotter: - plotter.put(first_frame) - monitor.put(first_frame) - - # Continue with batch streaming for remaining frames - for message_batch in broadband_tap.stream_batch(batch_size=10): - frames = [] - for message in message_batch: - frame = BroadbandFrame() - frame.ParseFromString(message) - frames.append(frame) - - # Send to monitor (non-blocking) - monitor.put(frame) - if plotter: - plotter.put(frame) - - # Batch write for better performance - if writer and frames: - writer.put_batch(frames) + # Run the streaming function + stream_data(broadband_tap, writer, plotter, monitor, first_frame, console, args) - live.update(monitor.get_current_stats()) - - except KeyboardInterrupt: - console.print("\n[yellow]Stopping data collection...[/yellow]") - finally: - if writer: - writer.stop() - if plotter: - plotter.stop() - if monitor: - monitor.stop() - if args.output: - console.print(f"[green]Data saved to {args.output}[/green]") - if args.plot: - console.print("[green]Plotter stopped[/green]") + # Stop the device after streaming is complete + console.log("[cyan]Stopping device...[/cyan]") + if not stop_device(device, console): + console.print( + "[bold yellow]Warning: Failed to stop device cleanly[/bold yellow]" + ) + else: + console.log("[green]Device stopped successfully[/green]") diff --git a/synapse/cli/synapse_plotter.py b/synapse/cli/synapse_plotter.py index 1c98dbe..263db73 100644 --- a/synapse/cli/synapse_plotter.py +++ b/synapse/cli/synapse_plotter.py @@ -45,8 +45,8 @@ def __init__(self, sample_rate: int, window_size: int, channel_ids): self.latest_data_time = 0 # Track the most recent data timestamp in seconds # Defaults for the zoomed channel plot - self.zoom_y_min = -4096 - self.zoom_y_max = 4096 + self.zoom_y_min = -1000 + self.zoom_y_max = 1000 self.signal_separation = 1000 @@ -54,7 +54,7 @@ def __init__(self, sample_rate: int, window_size: int, channel_ids): self.active_lines = {} # Queue and threading for BroadbandFrame processing - self.data_queue = queue.Queue(maxsize=2000) + self.data_queue = queue.Queue(maxsize=32000) self.stop_event = Event() self.plot_thread = None self.running = False @@ -164,7 +164,7 @@ def setup_gui(self): self.y_axis_all = dpg.add_plot_axis( dpg.mvYAxis, label="Amplitude", tag="y_axis_all" ) - dpg.set_axis_limits("y_axis_all", -4096, 4096 * 10) + dpg.set_axis_limits("y_axis_all", -1096, 4096 * 10) # Create line series for initially selected channels for ch_id in self.selected_channels: @@ -250,7 +250,7 @@ def put(self, frame: BroadbandFrame): except queue.Full: # If queue is full, drop multiple old frames and add the new one dropped = 0 - while dropped < 5: # Drop up to 5 old frames + while dropped < 100: try: self.data_queue.get_nowait() dropped += 1 @@ -303,7 +303,7 @@ def _plot_thread_main(self): self.start_time = time.time() # Main loop - fps_limit = 30 + fps_limit = 10 frame_duration = 1.0 / fps_limit last_time = time.time() @@ -311,7 +311,6 @@ def _plot_thread_main(self): # Process multiple frames per iteration for better throughput frames_processed = 0 max_frames_per_iter = 10 - while frames_processed < max_frames_per_iter: try: frame = self.data_queue.get_nowait() @@ -395,6 +394,10 @@ def update_plot(self): ds_x_ch = rolled_x_ch[::ds_factor] ds_y_ch = rolled_y_ch[::ds_factor] + # Remove DC offset by subtracting the mean + if len(ds_y_ch) > 0: + ds_y_ch = ds_y_ch - np.mean(ds_y_ch) + # Update the single "zoomed_line" series dpg.set_value("zoomed_line", [ds_x_ch.tolist(), ds_y_ch.tolist()]) diff --git a/synapse/client/taps.py b/synapse/client/taps.py index 4feb38d..3b75bdc 100644 --- a/synapse/client/taps.py +++ b/synapse/client/taps.py @@ -2,6 +2,7 @@ import time import zmq from typing import Optional, Generator +import sys from synapse.api.query_pb2 import QueryRequest from synapse.api.status_pb2 import StatusCode @@ -88,10 +89,14 @@ def connect(self, name: str) -> bool: # Optimize ZMQ for high-throughput data # Increase receive buffer size significantly for high-speed data - self.zmq_socket.setsockopt( - zmq.RCVHWM, 10000 - ) # High water mark - buffer up to 10K messages - self.zmq_socket.setsockopt(zmq.RCVBUF, 16 * 1024 * 1024) # 16MB receive buffer + if sys.platform == "win32": + # Use smaller, more achievable buffer sizes for Windows + self.zmq_socket.setsockopt(zmq.RCVBUF, 2 * 1024 * 1024) # 2MB + self.zmq_socket.setsockopt(zmq.RCVHWM, 5000) # Lower HWM + else: + # Linux can handle larger buffers + self.zmq_socket.setsockopt(zmq.RCVBUF, 16 * 1024 * 1024) + self.zmq_socket.setsockopt(zmq.RCVHWM, 10000) # Set TCP keepalive for connection stability self.zmq_socket.setsockopt(zmq.TCP_KEEPALIVE, 1) @@ -112,7 +117,10 @@ def connect(self, name: str) -> bool: # Reduce connection wait time to minimize startup delay self.logger.info("Connecting to tap...") - time.sleep(0.1) # Reduced from 1 second + if sys.platform == "win32": + time.sleep(0.001) # 1ms for Windows + else: + time.sleep(0.0001) # 0.1ms for Linux # Only set subscription options for subscriber sockets if selected_tap.tap_type != TapType.TAP_TYPE_CONSUMER: @@ -201,7 +209,10 @@ def stream(self, timeout_ms: int = 100) -> Generator[bytes, None, None]: yield data except zmq.Again: # No data available right now, yield control briefly - time.sleep(0.0001) # 0.1ms sleep to prevent busy waiting + if sys.platform == "win32": + time.sleep(0.001) # 1ms for Windows + else: + time.sleep(0.0001) # 0.1ms for Linux continue except KeyboardInterrupt: self.logger.info("Stream interrupted") @@ -244,7 +255,10 @@ def stream_batch( if batch: yield batch batch = [] - time.sleep(0.0001) + if sys.platform == "win32": + time.sleep(0.001) # 1ms for Windows + else: + time.sleep(0.0001) # 0.1ms for Linux continue except KeyboardInterrupt: self.logger.info("Stream interrupted")