diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index da22ae6..b5383e6 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -213,6 +213,12 @@ EventLoop::~EventLoop() { if (m_async_thread.joinable()) m_async_thread.join(); const Lock lock(m_mutex); + + if (m_post_fd != -1) { + KJ_SYSCALL(::close(m_post_fd)); + m_post_fd = -1; + } + KJ_ASSERT(m_post_fn == nullptr); KJ_ASSERT(!m_async_fns); KJ_ASSERT(m_wait_fd == -1); @@ -239,13 +245,26 @@ void EventLoop::loop() kj::Own wait_stream{ m_io_context.lowLevelProvider->wrapSocketFd(m_wait_fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP)}; int post_fd{m_post_fd}; + KJ_DEFER({ + Lock lock(m_mutex); + m_cv.wait(lock.m_lock, [this]() MP_REQUIRES(m_mutex) { return m_num_clients == 0; }); + m_wait_fd = -1; + m_async_fns.reset(); + m_cv.notify_all(); + }); char buffer = 0; for (;;) { const size_t read_bytes = wait_stream->read(&buffer, 0, 1).wait(m_io_context.waitScope); if (read_bytes != 1) throw std::logic_error("EventLoop wait_stream closed unexpectedly"); Lock lock(m_mutex); if (m_post_fn) { - Unlock(lock, *m_post_fn); + try { + Unlock(lock, *m_post_fn); + } catch (...) { + m_post_fn = nullptr; + m_cv.notify_all(); + throw; + } m_post_fn = nullptr; m_cv.notify_all(); } else if (done()) { @@ -262,10 +281,7 @@ void EventLoop::loop() wait_stream = nullptr; KJ_SYSCALL(::close(post_fd)); const Lock lock(m_mutex); - m_wait_fd = -1; m_post_fd = -1; - m_async_fns.reset(); - m_cv.notify_all(); } void EventLoop::post(kj::Function fn)