@@ -155,34 +155,53 @@ def _read(self, transport, n: int) -> bytes:
155155 if len (data ) != n :
156156 raise TimeoutError (f"serial read short: expected { n } , got { len (data )} " )
157157 return data
158-
159- def _send_and_receive (self , cmd , ethernet_socket = None ):
158+ def _send_and_receive (self , cmd , ethernet_socket = None , * , return_timings : bool = False ):
160159 """Send a command and wait for a binary response.
161160
162161 If no socket is provided and we're in Ethernet mode, this reuses a
163162 persistent TCP connection to avoid per-command connect overhead.
163+
164+ Parameters
165+ ----------
166+ return_timings:
167+ If True, return a tuple: (payload_bytes, send_ms, recv_ms), where
168+ send_ms is time spent in send/write calls and recv_ms is time spent
169+ waiting for and reading the response bytes.
164170 """
165171 if self ._serial :
172+ t0 = time .perf_counter_ns ()
166173 if isinstance (cmd , str ):
167174 self ._serial .write (cmd .encode ())
168175 else :
169176 self ._serial .write (cmd )
177+ t1 = time .perf_counter_ns ()
170178 resp_len = self ._serial .read (1 )
171179 response = resp_len + self ._serial .read (int (resp_len [0 ]))
172- return response [3 :]
180+ t2 = time .perf_counter_ns ()
181+ payload = response [3 :]
182+ if return_timings :
183+ return payload , (t1 - t0 ) / 1e6 , (t2 - t1 ) / 1e6
184+ return payload
173185
174186 # Ethernet
175187 sock = ethernet_socket if (ethernet_socket is not None ) else self ._connect_ethernet_socket (reuse = True )
176188
177189 def _do_io (s : socket .socket ):
190+ t0 = time .perf_counter_ns ()
178191 if isinstance (cmd , str ):
179192 s .sendall (cmd .encode ())
180193 else :
181194 s .sendall (cmd )
195+ t1 = time .perf_counter_ns ()
182196
183197 resp_len = self ._recv_exact (s , 1 )
184198 payload = self ._recv_exact (s , int (resp_len [0 ]))
185- return (resp_len + payload )[3 :]
199+ t2 = time .perf_counter_ns ()
200+
201+ out = (resp_len + payload )[3 :]
202+ if return_timings :
203+ return out , (t1 - t0 ) / 1e6 , (t2 - t1 ) / 1e6
204+ return out
186205
187206 # If we're using the persistent socket, allow one reconnect attempt.
188207 attempts = 1 if (ethernet_socket is not None ) else 2
@@ -199,16 +218,59 @@ def _do_io(s: socket.socket):
199218 sock = self ._connect_ethernet_socket (reuse = True )
200219
201220 raise ConnectionError ("failed to send/receive over Ethernet after reconnect" )
221+ def _send_and_receive_stream (
222+ self ,
223+ stream_header : bytes ,
224+ frame_chunked : list [bytes ],
225+ ethernet_socket : socket .socket | None = None ,
226+ * ,
227+ return_timings : bool = False ,
228+ ):
229+ """Send a stream frame (header + payload) and wait for response.
202230
203- def _send_and_receive_stream (self , stream_header , frame_chunked , ethernet_socket ):
204- """Send a stream frame (header + frame payload) and wait for response."""
205- ethernet_socket .sendall (stream_header )
206- for chunk in frame_chunked :
207- ethernet_socket .sendall (chunk )
231+ This mirrors `_send_and_receive()` but supports chunked payload sends.
208232
209- resp_len = self ._recv_exact (ethernet_socket , 1 )
210- payload = self ._recv_exact (ethernet_socket , int (resp_len [0 ]))
211- return (resp_len + payload )[3 :]
233+ Parameters
234+ ----------
235+ ethernet_socket:
236+ Optional explicit socket. If omitted, uses (and can reconnect) the
237+ instance's persistent Ethernet socket.
238+ return_timings:
239+ If True, return (payload_bytes, send_ms, recv_ms).
240+ """
241+ sock = ethernet_socket if (ethernet_socket is not None ) else self ._connect_ethernet_socket (reuse = True )
242+
243+ def _do_io (s : socket .socket ):
244+ t0 = time .perf_counter_ns ()
245+ s .sendall (stream_header )
246+ for chunk in frame_chunked :
247+ s .sendall (chunk )
248+ t1 = time .perf_counter_ns ()
249+
250+ resp_len = self ._recv_exact (s , 1 )
251+ payload = self ._recv_exact (s , int (resp_len [0 ]))
252+ t2 = time .perf_counter_ns ()
253+
254+ out = (resp_len + payload )[3 :]
255+ if return_timings :
256+ return out , (t1 - t0 ) / 1e6 , (t2 - t1 ) / 1e6
257+ return out
258+
259+ # If we're using the persistent socket, allow one reconnect attempt.
260+ attempts = 1 if (ethernet_socket is not None ) else 2
261+ for _ in range (attempts ):
262+ try :
263+ return _do_io (sock )
264+ except (OSError , ConnectionError ) as e :
265+ if ethernet_socket is not None :
266+ raise
267+ self ._socket_reconnects += 1
268+ self ._socket_last_error = repr (e )
269+ self ._debug_print (f"socket error ({ e } ), reconnecting" )
270+ self ._close_ethernet_socket ()
271+ sock = self ._connect_ethernet_socket (reuse = True )
272+
273+ raise ConnectionError ("failed to send/receive STREAM_FRAME over Ethernet after reconnect" )
212274
213275 def set_ethernet_mode (self , ip_address ):
214276 """Set ethernet mode."""
@@ -485,6 +547,7 @@ def stream_frames(
485547 analog_frequency ,
486548 stream_cmd_coalesced = False ,
487549 progress_interval_s = 1.0 ,
550+ collect_timings : bool = False ,
488551 ):
489552 """Stream a `.pattern` file's frames at a fixed rate for a fixed duration.
490553
@@ -571,11 +634,16 @@ def analog_waveform_for(name: str):
571634 return lambda x : 0.0
572635 raise ValueError (f'Invalid analog output waveform: { name } ' )
573636
574- ethernet_socket = self ._connect_ethernet_socket (reuse = True )
637+ # Ensure persistent socket is established once for the run.
638+ self ._connect_ethernet_socket (reuse = True )
575639
576640 bytes_sent = 0
577641 frames_streamed = 0
578642
643+ send_ms_samples : list [float ] = []
644+ resp_wait_ms_samples : list [float ] = []
645+ cmd_rtt_ms_samples : list [float ] = []
646+
579647 wf = analog_waveform_for (str (analog_out_waveform ))
580648 last_analog_update_ns = 0
581649 analog_value_cached = int (round (analog_offset ))
@@ -638,11 +706,27 @@ def analog_waveform_for(name: str):
638706 stream_header = struct .pack ('<BHHH' , 0x32 , data_len , analog_output_value , 0 )
639707
640708 if stream_cmd_coalesced :
641- self ._send_and_receive (stream_header + frame , ethernet_socket )
709+ if collect_timings :
710+ _ , send_ms , recv_ms = self ._send_and_receive (stream_header + frame , return_timings = True )
711+ send_ms_samples .append (float (send_ms ))
712+ resp_wait_ms_samples .append (float (recv_ms ))
713+ cmd_rtt_ms_samples .append (float (send_ms ) + float (recv_ms ))
714+ else :
715+ self ._send_and_receive (stream_header + frame )
642716 else :
643717 # Chunk the frame payload for better control over send sizes.
644718 frame_chunked = [frame [i :i + CHUNK_SIZE ] for i in range (0 , len (frame ), CHUNK_SIZE )]
645- self ._send_and_receive_stream (stream_header , frame_chunked , ethernet_socket )
719+ if collect_timings :
720+ _ , send_ms , recv_ms = self ._send_and_receive_stream (
721+ stream_header ,
722+ frame_chunked ,
723+ return_timings = True ,
724+ )
725+ send_ms_samples .append (float (send_ms ))
726+ resp_wait_ms_samples .append (float (recv_ms ))
727+ cmd_rtt_ms_samples .append (float (send_ms ) + float (recv_ms ))
728+ else :
729+ self ._send_and_receive_stream (stream_header , frame_chunked )
646730
647731 frames_streamed += 1
648732 bytes_sent += (len (stream_header ) + data_len )
@@ -662,25 +746,34 @@ def analog_waveform_for(name: str):
662746 if frame_period_ns :
663747 next_frame_deadline_ns += frame_period_ns
664748 i += 1
665-
666- # End the mode
667- self ._send_and_receive (bytes ([1 , 0 ]), ethernet_socket )
749+ # End the mode
750+ self ._send_and_receive (bytes ([1 , 0 ]))
668751
669752 elapsed_s = (time .perf_counter_ns () - start_time_ns ) / 1e9
670753 rate_hz = frames_streamed / elapsed_s if elapsed_s > 0 else 0.0
671754 mbps = (bytes_sent * 8 ) / (elapsed_s * 1e6 ) if elapsed_s > 0 else 0.0
672755 print (f'frames streamed: { frames_streamed } , elapsed_s: { elapsed_s :.3f} , rate: { rate_hz :.1f} Hz, tx: { mbps :.2f} Mb/s' )
673756
674- return {
757+ result = {
675758 "frames" : frames_streamed ,
676759 "elapsed_s" : elapsed_s ,
677760 "rate_hz" : rate_hz ,
678761 "bytes_sent" : bytes_sent ,
679762 "tx_mbps" : mbps ,
680763 "duration_requested_s" : runtime_duration_s ,
681764 "frames_target" : frames_target ,
765+ "frame_bytes" : int (frame_size ),
766+ "stream_header_bytes" : 7 ,
767+ "bytes_per_frame" : int (frame_size ) + 7 ,
682768 }
683769
770+ if collect_timings and cmd_rtt_ms_samples :
771+ result ["cmd_rtt_ms" ] = self ._bench_summarize_ms (cmd_rtt_ms_samples )
772+ result ["send_ms" ] = self ._bench_summarize_ms (send_ms_samples )
773+ result ["response_wait_ms" ] = self ._bench_summarize_ms (resp_wait_ms_samples )
774+
775+ return result
776+
684777 def all_off_str (self ):
685778 """Turn all panels off with string."""
686779 self ._send_and_receive ('ALL_OFF' )
@@ -1101,6 +1194,7 @@ def bench_stream_frames(
11011194 analog_out_waveform : str = "constant" ,
11021195 analog_update_rate : float = 1.0 ,
11031196 analog_frequency : float = 0.0 ,
1197+ collect_timings : bool = True ,
11041198 ) -> dict :
11051199 """Benchmark STREAM_FRAME throughput using `stream_frames()`.
11061200
@@ -1128,6 +1222,7 @@ def bench_stream_frames(
11281222 float (analog_frequency ),
11291223 stream_cmd_coalesced = bool (stream_cmd_coalesced ),
11301224 progress_interval_s = float (progress_interval_s ),
1225+ collect_timings = bool (collect_timings ),
11311226 )
11321227 stats .update (
11331228 {
0 commit comments