Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions synapse/cli/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
import traceback
import os
import logging
from typing import Optional
from operator import itemgetter
import copy
Expand Down Expand Up @@ -263,7 +264,7 @@ def read_packets(
node: syn.StreamOut,
q: queue.Queue,
plot_q: queue.Queue,
stop,
stop: threading.Event,
duration: Optional[int] = None,
num_ch: int = 32,
):
Expand All @@ -277,12 +278,12 @@ def read_packets(
while not stop.is_set():
read_ret = node.read()
if read_ret is None:
print("Could not get a valid read from the node")
logging.error("Could not get a valid read from the node")
continue

synapse_data, bytes_read = read_ret
if synapse_data is None or bytes_read == 0:
print("Could not read data from node")
logging.error("Could not read data from node")
continue
header, data = synapse_data
monitor.process_packet(header, data, bytes_read)
Expand All @@ -297,7 +298,7 @@ def read_packets(
break


def _binary_writer(stop, q, num_ch, output_base):
def _binary_writer(stop, q: queue.Queue, num_ch, output_base):
filename = f"{output_base}.dat"
full_path = os.path.join(output_base, filename)
if filename:
Expand Down
21 changes: 18 additions & 3 deletions synapse/client/nodes/stream_out.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)

DEFAULT_STREAM_OUT_PORT = 50038
STREAM_OUT_TIMEOUT_SEC = 1 # seconds


# Try to get the current user's ip for setting the destination address
Expand All @@ -36,9 +37,16 @@ def get_client_ip():
class StreamOut(Node):
type = NodeType.kStreamOut

def __init__(self, label=None, destination_address=None, destination_port=None):
def __init__(
self,
label=None,
destination_address=None,
destination_port=None,
read_timeout=STREAM_OUT_TIMEOUT_SEC,
):
self.__socket = None
self.__label = label
self.__read_timeout = read_timeout

# If we have been passed a None for destination address, try to resolve it
if not destination_address:
Expand All @@ -55,8 +63,12 @@ def read(self) -> Tuple[Optional[Tuple[NDTPHeader, SynapseData]], int]:
if self.__socket is None:
if self.open_socket() is None:
return None
data, _ = self.__socket.recvfrom(8192)
bytes_read = len(data)
try:
data, _ = self.__socket.recvfrom(8192)
bytes_read = len(data)
except socket.timeout:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could try to reconnect and read again here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like try to receive from the socket again? I don't quite understand why would try again after timing out. Shouldn't it be up to the caller to decide if they want to re-try the read?

logging.warning("StreamOut socket timed out.")
return None
return self._unpack(data), bytes_read

def open_socket(self):
Expand Down Expand Up @@ -86,6 +98,9 @@ def open_socket(self):
f"Could not set socket buffer size to {SOCKET_BUFSIZE_BYTES}. Current size is {recvbuf}. Consider increasing the system limit."
)

# Set a timeout
self.__socket.settimeout(self.__read_timeout)

# Bind to the destination address (our ip) and port
try:
self.__socket.bind((self.__destination_address, self.__destination_port))
Expand Down