diff --git a/synapse/cli/streaming.py b/synapse/cli/streaming.py index d824af62..1ffd4f62 100644 --- a/synapse/cli/streaming.py +++ b/synapse/cli/streaming.py @@ -4,6 +4,7 @@ import time import traceback import os +import logging from typing import Optional from operator import itemgetter import copy @@ -263,7 +264,7 @@ def read_packets( node: syn.StreamOut, q: queue.Queue, plot_q: queue.Queue, - stop, + stop: threading.Event, duration: Optional[int] = None, num_ch: int = 32, ): @@ -277,12 +278,12 @@ def read_packets( while not stop.is_set(): read_ret = node.read() if read_ret is None: - print("Could not get a valid read from the node") + logging.error("Could not get a valid read from the node") continue synapse_data, bytes_read = read_ret if synapse_data is None or bytes_read == 0: - print("Could not read data from node") + logging.error("Could not read data from node") continue header, data = synapse_data monitor.process_packet(header, data, bytes_read) @@ -297,7 +298,7 @@ def read_packets( break -def _binary_writer(stop, q, num_ch, output_base): +def _binary_writer(stop, q: queue.Queue, num_ch, output_base): filename = f"{output_base}.dat" full_path = os.path.join(output_base, filename) if filename: diff --git a/synapse/client/nodes/stream_out.py b/synapse/client/nodes/stream_out.py index 327eaac0..626c5009 100644 --- a/synapse/client/nodes/stream_out.py +++ b/synapse/client/nodes/stream_out.py @@ -16,6 +16,7 @@ ) DEFAULT_STREAM_OUT_PORT = 50038 +STREAM_OUT_TIMEOUT_SEC = 1 # seconds # Try to get the current user's ip for setting the destination address @@ -36,9 +37,16 @@ def get_client_ip(): class StreamOut(Node): type = NodeType.kStreamOut - def __init__(self, label=None, destination_address=None, destination_port=None): + def __init__( + self, + label=None, + destination_address=None, + destination_port=None, + read_timeout=STREAM_OUT_TIMEOUT_SEC, + ): self.__socket = None self.__label = label + self.__read_timeout = read_timeout # If we have been passed a None for destination address, try to resolve it if not destination_address: @@ -55,8 +63,12 @@ def read(self) -> Tuple[Optional[Tuple[NDTPHeader, SynapseData]], int]: if self.__socket is None: if self.open_socket() is None: return None - data, _ = self.__socket.recvfrom(8192) - bytes_read = len(data) + try: + data, _ = self.__socket.recvfrom(8192) + bytes_read = len(data) + except socket.timeout: + logging.warning("StreamOut socket timed out.") + return None return self._unpack(data), bytes_read def open_socket(self): @@ -86,6 +98,9 @@ def open_socket(self): f"Could not set socket buffer size to {SOCKET_BUFSIZE_BYTES}. Current size is {recvbuf}. Consider increasing the system limit." ) + # Set a timeout + self.__socket.settimeout(self.__read_timeout) + # Bind to the destination address (our ip) and port try: self.__socket.bind((self.__destination_address, self.__destination_port))