diff --git a/include/condy/channel.hpp b/include/condy/channel.hpp index 73adae0b..da1ab595 100644 --- a/include/condy/channel.hpp +++ b/include/condy/channel.hpp @@ -212,7 +212,6 @@ template class Channel { } assert(pop_awaiters_.empty()); push_awaiters_.push_back(finish_handle); - detail::Context::current().runtime()->pend_work(); return -EAGAIN; } @@ -233,7 +232,6 @@ template class Channel { return {-EPIPE, T()}; } pop_awaiters_.push_back(finish_handle); - detail::Context::current().runtime()->pend_work(); return {-EAGAIN, T()}; } @@ -401,6 +399,7 @@ class Channel::PushFinishHandle std::move(receiver_)(r); return; } + runtime->pend_work(); auto stop_token = receiver_.get_stop_token(); if (stop_token.stop_possible()) { @@ -487,6 +486,7 @@ class Channel::PopFinishHandle std::move(receiver_)(std::move(item)); return; } + runtime->pend_work(); auto stop_token = receiver_.get_stop_token(); if (stop_token.stop_possible()) { diff --git a/include/condy/futex.hpp b/include/condy/futex.hpp index 37576a47..be53341d 100644 --- a/include/condy/futex.hpp +++ b/include/condy/futex.hpp @@ -90,7 +90,6 @@ template class Futex { return 0; // No need to wait } wait_awaiters_.push_back(handle); - detail::Context::current().runtime()->pend_work(); return -EAGAIN; // Need to wait } @@ -151,6 +150,7 @@ class Futex::WaitFinishHandle std::move(receiver_)(r); return; } + runtime->pend_work(); auto stop_token = receiver_.get_stop_token(); if (stop_token.stop_possible()) { diff --git a/include/condy/runtime.hpp b/include/condy/runtime.hpp index 6989af63..2c66ee5d 100644 --- a/include/condy/runtime.hpp +++ b/include/condy/runtime.hpp @@ -138,7 +138,7 @@ class Runtime { * @note This function is thread-safe and can be called from any thread. */ void allow_exit() noexcept { - pending_works_--; + exit_allowed_.store(true, std::memory_order_release); wakeup_(); } @@ -200,9 +200,15 @@ class Runtime { request.wait(); } - void pend_work() noexcept { pending_works_++; } + void pend_work() noexcept { + assert(detail::Context::current().runtime() == this); + pending_works_++; + } - void resume_work() noexcept { pending_works_--; } + void resume_work() noexcept { + assert(detail::Context::current().runtime() == this); + pending_works_--; + } /** * @brief Run the runtime event loop in the current thread. @@ -255,7 +261,8 @@ class Runtime { continue; } - if (pending_works_ == 0) { + if (pending_works_ == 0 && + exit_allowed_.load(std::memory_order_acquire)) { break; } flush_ring_wait_(); @@ -456,7 +463,7 @@ class Runtime { panic_on(std::format("io_uring_prep_msg_ring: {}", std::strerror(-cqe->res))); } - pending_works_--; + resume_work(); } else { auto *work = static_cast(data); tsan_acquire(work); @@ -473,7 +480,7 @@ class Runtime { auto *handle = static_cast(data); auto op_finish = handle->handle(cqe); if (op_finish) { - pending_works_--; + resume_work(); } } else { unreachable(); @@ -495,7 +502,8 @@ class Runtime { // Global state std::mutex mutex_; WorkListQueue global_queue_; - std::atomic_size_t pending_works_ = 1; + size_t pending_works_ = 0; + std::atomic_bool exit_allowed_ = false; std::atomic state_ = State::Idle; // Local state