diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index 9413b86d6..5b91e5d93 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -608,6 +608,9 @@ def init_kernel(self): """Create the Kernel object itself""" if self.shell_channel_thread: shell_stream = ZMQStream(self.shell_socket, self.shell_channel_thread.io_loop) + # Hand the stream to the shell-channel thread so SubshellManager can re-arm + # the read after each out-of-band reply send (the wedge fix). + self.shell_channel_thread.shell_stream = shell_stream else: shell_stream = ZMQStream(self.shell_socket) control_stream = ZMQStream(self.control_socket, self.control_thread.io_loop) diff --git a/ipykernel/shellchannel.py b/ipykernel/shellchannel.py index 8205840d1..8425dab54 100644 --- a/ipykernel/shellchannel.py +++ b/ipykernel/shellchannel.py @@ -29,6 +29,10 @@ def __init__( self._manager: SubshellManager | None = None self._zmq_context = context # Avoid use of self._context self._shell_socket = shell_socket + # Set by kernelapp.init_kernel after it builds the shell ZMQStream (this thread + # is created before the stream). Threaded into SubshellManager so it can re-arm + # the read after each out-of-band reply send (the wedge fix). + self.shell_stream = None # Record the parent thread - the thread that started the app (usually the main thread) self.parent_thread = current_thread() @@ -43,6 +47,7 @@ def manager(self) -> SubshellManager: self._zmq_context, self.io_loop, self._shell_socket, + self.shell_stream, ) return self._manager diff --git a/ipykernel/subshell_manager.py b/ipykernel/subshell_manager.py index 3305bc67e..9fbc2711c 100644 --- a/ipykernel/subshell_manager.py +++ b/ipykernel/subshell_manager.py @@ -40,6 +40,7 @@ def __init__( context: zmq.Context[t.Any], shell_channel_io_loop: IOLoop, shell_socket: zmq.Socket[t.Any], + shell_stream=None, ): """Initialize the subshell manager.""" self._parent_thread = current_thread() @@ -47,6 +48,10 @@ def __init__( self._context: zmq.Context[t.Any] = context self._shell_channel_io_loop = shell_channel_io_loop self._shell_socket = shell_socket + # ZMQStream that reads `shell_socket`. Replies are sent on this same socket + # out-of-band below, draining its edge-triggered ZMQ_FD read edge; re-arm the + # stream after each send so a concurrently-arrived request cannot strand unread. + self._shell_stream = shell_stream self._cache: dict[str, SubshellThread] = {} self._lock_cache = Lock() # Sync lock across threads when accessing cache. @@ -226,6 +231,14 @@ def _process_control_request( def _send_on_shell_channel(self, msg) -> None: assert current_thread().name == SHELL_CHANNEL_THREAD_NAME self._shell_socket.send_multipart(msg) + # Re-arm the shell ZMQStream read: this out-of-band send drained the ROUTER's + # edge-triggered ZMQ_FD read edge; reschedule the stream's handler so a + # concurrently-arrived request cannot strand unread (the wedge fix). + stream = self._shell_stream + if stream is not None and stream.socket is self._shell_socket: + self._shell_channel_io_loop.add_callback( + lambda: stream._handle_events(stream.socket, 0) + ) def _stop_subshell(self, subshell_thread: SubshellThread) -> None: """Stop a subshell thread and close all of its resources."""