From c53bfe6c603741eaea0db34488395bc3608e306f Mon Sep 17 00:00:00 2001 From: BoykoNeov Date: Sun, 14 Jun 2026 17:49:57 +0300 Subject: [PATCH] Re-arm shell ZMQStream read after out-of-band reply send On Windows, ipykernel 7 intermittently drops an execute_request on the shell channel: the kernel goes idle and never replies, and the client times out waiting for execute_reply (~30% of headless notebook runs in our measurements; which cell hangs wanders run to run). Root cause: the shell ROUTER socket is dual-use on the shell-channel thread. A ZMQStream reads execute_requests off it, while replies are sent back over the SAME socket out-of-band via a raw send_multipart in SubshellManager._send_on_shell_channel. That out-of-band send drains the socket's edge-triggered ZMQ_FD read edge (a documented libzmq corollary: after zmq_send the socket may become readable without a new edge). The send is not ZMQStream-mediated, so the stream is never re-armed and a request that arrived concurrently strands unread on a registered-but- non-readable fd. The strand is terminal: no later arrival re-edges it. Fix: after each out-of-band reply send, schedule the shell ZMQStream's read handler on the shell-channel loop -- the same edge-trap reschedule ZMQStream._update_handler already runs internally (add_callback(lambda: stream._handle_events(stream.socket, 0))) -- so the concurrently-arrived request cannot strand. The shell_stream (built in kernelapp.init_kernel) is threaded through ShellChannelThread into SubshellManager so the reply path can reach it. Validated on Windows (Python 3.13/3.14, pyzmq 27.1.0 / libzmq 4.3.5): the wedge went from 6/20 (control) to 0/20 with this patch applied, same machine/session, P(0/20 | p=0.30) ~ 8e-4, with the threaded reference live on every send (551 re-arms, 0 None/mismatch). A sham arm with the same scheduling overhead but no re-arm stayed at the control rate. Co-Authored-By: Claude Opus 4.8 --- ipykernel/kernelapp.py | 3 +++ ipykernel/shellchannel.py | 5 +++++ ipykernel/subshell_manager.py | 13 +++++++++++++ 3 files changed, 21 insertions(+) diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index 9413b86d6..5b91e5d93 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -608,6 +608,9 @@ def init_kernel(self): """Create the Kernel object itself""" if self.shell_channel_thread: shell_stream = ZMQStream(self.shell_socket, self.shell_channel_thread.io_loop) + # Hand the stream to the shell-channel thread so SubshellManager can re-arm + # the read after each out-of-band reply send (the wedge fix). + self.shell_channel_thread.shell_stream = shell_stream else: shell_stream = ZMQStream(self.shell_socket) control_stream = ZMQStream(self.control_socket, self.control_thread.io_loop) diff --git a/ipykernel/shellchannel.py b/ipykernel/shellchannel.py index 8205840d1..8425dab54 100644 --- a/ipykernel/shellchannel.py +++ b/ipykernel/shellchannel.py @@ -29,6 +29,10 @@ def __init__( self._manager: SubshellManager | None = None self._zmq_context = context # Avoid use of self._context self._shell_socket = shell_socket + # Set by kernelapp.init_kernel after it builds the shell ZMQStream (this thread + # is created before the stream). Threaded into SubshellManager so it can re-arm + # the read after each out-of-band reply send (the wedge fix). + self.shell_stream = None # Record the parent thread - the thread that started the app (usually the main thread) self.parent_thread = current_thread() @@ -43,6 +47,7 @@ def manager(self) -> SubshellManager: self._zmq_context, self.io_loop, self._shell_socket, + self.shell_stream, ) return self._manager diff --git a/ipykernel/subshell_manager.py b/ipykernel/subshell_manager.py index 3305bc67e..9fbc2711c 100644 --- a/ipykernel/subshell_manager.py +++ b/ipykernel/subshell_manager.py @@ -40,6 +40,7 @@ def __init__( context: zmq.Context[t.Any], shell_channel_io_loop: IOLoop, shell_socket: zmq.Socket[t.Any], + shell_stream=None, ): """Initialize the subshell manager.""" self._parent_thread = current_thread() @@ -47,6 +48,10 @@ def __init__( self._context: zmq.Context[t.Any] = context self._shell_channel_io_loop = shell_channel_io_loop self._shell_socket = shell_socket + # ZMQStream that reads `shell_socket`. Replies are sent on this same socket + # out-of-band below, draining its edge-triggered ZMQ_FD read edge; re-arm the + # stream after each send so a concurrently-arrived request cannot strand unread. + self._shell_stream = shell_stream self._cache: dict[str, SubshellThread] = {} self._lock_cache = Lock() # Sync lock across threads when accessing cache. @@ -226,6 +231,14 @@ def _process_control_request( def _send_on_shell_channel(self, msg) -> None: assert current_thread().name == SHELL_CHANNEL_THREAD_NAME self._shell_socket.send_multipart(msg) + # Re-arm the shell ZMQStream read: this out-of-band send drained the ROUTER's + # edge-triggered ZMQ_FD read edge; reschedule the stream's handler so a + # concurrently-arrived request cannot strand unread (the wedge fix). + stream = self._shell_stream + if stream is not None and stream.socket is self._shell_socket: + self._shell_channel_io_loop.add_callback( + lambda: stream._handle_events(stream.socket, 0) + ) def _stop_subshell(self, subshell_thread: SubshellThread) -> None: """Stop a subshell thread and close all of its resources."""