From c6ad6057f0ce488f483f2b00a8af7a15e3a73787 Mon Sep 17 00:00:00 2001 From: hulxv Date: Thu, 19 Mar 2026 21:53:38 +0200 Subject: [PATCH] fix: `EventLoop::post()` deadlocks when posted function throws --- src/mp/proxy.cpp | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index da22ae6..b5383e6 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -213,6 +213,12 @@ EventLoop::~EventLoop() { if (m_async_thread.joinable()) m_async_thread.join(); const Lock lock(m_mutex); + + if (m_post_fd != -1) { + KJ_SYSCALL(::close(m_post_fd)); + m_post_fd = -1; + } + KJ_ASSERT(m_post_fn == nullptr); KJ_ASSERT(!m_async_fns); KJ_ASSERT(m_wait_fd == -1); @@ -239,13 +245,26 @@ void EventLoop::loop() kj::Own wait_stream{ m_io_context.lowLevelProvider->wrapSocketFd(m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)}; int post_fd{m_post_fd}; + KJ_DEFER({ + Lock lock(m_mutex); + m_cv.wait(lock.m_lock, [this]() MP_REQUIRES(m_mutex) { return m_num_clients == 0; }); + m_wait_fd = -1; + m_async_fns.reset(); + m_cv.notify_all(); + }); char buffer = 0; for (;;) { const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope); if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly"); Lock lock(m_mutex); if (m_post_fn) { - Unlock(lock, *m_post_fn); + try { + Unlock(lock, *m_post_fn); + } catch (...) { + m_post_fn = nullptr; + m_cv.notify_all(); + throw; + } m_post_fn = nullptr; m_cv.notify_all(); } else if (done()) { @@ -262,10 +281,7 @@ void EventLoop::loop() wait_stream = nullptr; KJ_SYSCALL(::close(post_fd)); const Lock lock(m_mutex); - m_wait_fd = -1; m_post_fd = -1; - m_async_fns.reset(); - m_cv.notify_all(); } void EventLoop::post(kj::Function fn)