From ae2f262adc2de329f6679592b073cd62e909218f Mon Sep 17 00:00:00 2001 From: Sage Date: Mon, 31 Mar 2025 14:00:33 -0700 Subject: [PATCH 1/2] Fix streaming getting stuck with no socket timeout --- synapse/cli/streaming.py | 9 +++++---- synapse/client/nodes/stream_out.py | 12 ++++++++++-- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/synapse/cli/streaming.py b/synapse/cli/streaming.py index d824af62..1ffd4f62 100644 --- a/synapse/cli/streaming.py +++ b/synapse/cli/streaming.py @@ -4,6 +4,7 @@ import time import traceback import os +import logging from typing import Optional from operator import itemgetter import copy @@ -263,7 +264,7 @@ def read_packets( node: syn.StreamOut, q: queue.Queue, plot_q: queue.Queue, - stop, + stop: threading.Event, duration: Optional[int] = None, num_ch: int = 32, ): @@ -277,12 +278,12 @@ def read_packets( while not stop.is_set(): read_ret = node.read() if read_ret is None: - print("Could not get a valid read from the node") + logging.error("Could not get a valid read from the node") continue synapse_data, bytes_read = read_ret if synapse_data is None or bytes_read == 0: - print("Could not read data from node") + logging.error("Could not read data from node") continue header, data = synapse_data monitor.process_packet(header, data, bytes_read) @@ -297,7 +298,7 @@ def read_packets( break -def _binary_writer(stop, q, num_ch, output_base): +def _binary_writer(stop, q: queue.Queue, num_ch, output_base): filename = f"{output_base}.dat" full_path = os.path.join(output_base, filename) if filename: diff --git a/synapse/client/nodes/stream_out.py b/synapse/client/nodes/stream_out.py index 327eaac0..f76f6b67 100644 --- a/synapse/client/nodes/stream_out.py +++ b/synapse/client/nodes/stream_out.py @@ -16,6 +16,7 @@ ) DEFAULT_STREAM_OUT_PORT = 50038 +STREAM_OUT_TIMEOUT_SEC = 1 # seconds # Try to get the current user's ip for setting the destination address @@ -55,8 +56,12 @@ def read(self) -> Tuple[Optional[Tuple[NDTPHeader, SynapseData]], int]: if self.__socket is None: if self.open_socket() is None: return None - data, _ = self.__socket.recvfrom(8192) - bytes_read = len(data) + try: + data, _ = self.__socket.recvfrom(8192) + bytes_read = len(data) + except socket.timeout: + logging.warning("StreamOut socket timed out.") + return None return self._unpack(data), bytes_read def open_socket(self): @@ -86,6 +91,9 @@ def open_socket(self): f"Could not set socket buffer size to {SOCKET_BUFSIZE_BYTES}. Current size is {recvbuf}. Consider increasing the system limit." ) + # Set a timeout + self.__socket.settimeout(STREAM_OUT_TIMEOUT_SEC) + # Bind to the destination address (our ip) and port try: self.__socket.bind((self.__destination_address, self.__destination_port)) From 775678be7fa8d502df40432e576c4aca8c7bf9a5 Mon Sep 17 00:00:00 2001 From: Sage Date: Tue, 1 Apr 2025 15:49:59 -0700 Subject: [PATCH 2/2] Add timeout param to StreamOut constructor --- synapse/client/nodes/stream_out.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/synapse/client/nodes/stream_out.py b/synapse/client/nodes/stream_out.py index f76f6b67..626c5009 100644 --- a/synapse/client/nodes/stream_out.py +++ b/synapse/client/nodes/stream_out.py @@ -37,9 +37,16 @@ def get_client_ip(): class StreamOut(Node): type = NodeType.kStreamOut - def __init__(self, label=None, destination_address=None, destination_port=None): + def __init__( + self, + label=None, + destination_address=None, + destination_port=None, + read_timeout=STREAM_OUT_TIMEOUT_SEC, + ): self.__socket = None self.__label = label + self.__read_timeout = read_timeout # If we have been passed a None for destination address, try to resolve it if not destination_address: @@ -92,7 +99,7 @@ def open_socket(self): ) # Set a timeout - self.__socket.settimeout(STREAM_OUT_TIMEOUT_SEC) + self.__socket.settimeout(self.__read_timeout) # Bind to the destination address (our ip) and port try: