From d3d1bef114eed10518b17ca0895a5f82a926b4e8 Mon Sep 17 00:00:00 2001 From: wokron Date: Thu, 21 May 2026 15:09:36 +0800 Subject: [PATCH 1/5] simplify pend/resume work --- include/condy/channel.hpp | 4 ++-- include/condy/futex.hpp | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/include/condy/channel.hpp b/include/condy/channel.hpp index 73adae0b..da1ab595 100644 --- a/include/condy/channel.hpp +++ b/include/condy/channel.hpp @@ -212,7 +212,6 @@ template class Channel { } assert(pop_awaiters_.empty()); push_awaiters_.push_back(finish_handle); - detail::Context::current().runtime()->pend_work(); return -EAGAIN; } @@ -233,7 +232,6 @@ template class Channel { return {-EPIPE, T()}; } pop_awaiters_.push_back(finish_handle); - detail::Context::current().runtime()->pend_work(); return {-EAGAIN, T()}; } @@ -401,6 +399,7 @@ class Channel::PushFinishHandle std::move(receiver_)(r); return; } + runtime->pend_work(); auto stop_token = receiver_.get_stop_token(); if (stop_token.stop_possible()) { @@ -487,6 +486,7 @@ class Channel::PopFinishHandle std::move(receiver_)(std::move(item)); return; } + runtime->pend_work(); auto stop_token = receiver_.get_stop_token(); if (stop_token.stop_possible()) { diff --git a/include/condy/futex.hpp b/include/condy/futex.hpp index 37576a47..be53341d 100644 --- a/include/condy/futex.hpp +++ b/include/condy/futex.hpp @@ -90,7 +90,6 @@ template class Futex { return 0; // No need to wait } wait_awaiters_.push_back(handle); - detail::Context::current().runtime()->pend_work(); return -EAGAIN; // Need to wait } @@ -151,6 +150,7 @@ class Futex::WaitFinishHandle std::move(receiver_)(r); return; } + runtime->pend_work(); auto stop_token = receiver_.get_stop_token(); if (stop_token.stop_possible()) { From a4334843d82bb5f773af65bfd124a4acafda4c3b Mon Sep 17 00:00:00 2001 From: wokron Date: Thu, 21 May 2026 15:14:57 +0800 Subject: [PATCH 2/5] add pend/resume work assert --- include/condy/runtime.hpp | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/include/condy/runtime.hpp b/include/condy/runtime.hpp index 6989af63..f98c3b59 100644 --- a/include/condy/runtime.hpp +++ b/include/condy/runtime.hpp @@ -200,9 +200,15 @@ class Runtime { request.wait(); } - void pend_work() noexcept { pending_works_++; } + void pend_work() noexcept { + assert(detail::Context::current().runtime() == this); + pending_works_++; + } - void resume_work() noexcept { pending_works_--; } + void resume_work() noexcept { + assert(detail::Context::current().runtime() == this); + pending_works_--; + } /** * @brief Run the runtime event loop in the current thread. From 85ca91a2a34a9b05037713a05c59c95f439898b0 Mon Sep 17 00:00:00 2001 From: wokron Date: Thu, 21 May 2026 15:23:44 +0800 Subject: [PATCH 3/5] make pending_works_ non-atomic --- include/condy/runtime.hpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/include/condy/runtime.hpp b/include/condy/runtime.hpp index f98c3b59..4a2b7cbe 100644 --- a/include/condy/runtime.hpp +++ b/include/condy/runtime.hpp @@ -138,7 +138,7 @@ class Runtime { * @note This function is thread-safe and can be called from any thread. */ void allow_exit() noexcept { - pending_works_--; + exit_allowed_.store(true); wakeup_(); } @@ -261,7 +261,7 @@ class Runtime { continue; } - if (pending_works_ == 0) { + if (pending_works_ == 0 && exit_allowed_.load()) { break; } flush_ring_wait_(); @@ -501,7 +501,8 @@ class Runtime { // Global state std::mutex mutex_; WorkListQueue global_queue_; - std::atomic_size_t pending_works_ = 1; + size_t pending_works_ = 0; + std::atomic_bool exit_allowed_ = false; std::atomic state_ = State::Idle; // Local state From 587c147f85a72a192a8a390d37521278690e74f9 Mon Sep 17 00:00:00 2001 From: wokron Date: Thu, 21 May 2026 15:52:00 +0800 Subject: [PATCH 4/5] use acquire release --- include/condy/runtime.hpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/include/condy/runtime.hpp b/include/condy/runtime.hpp index 4a2b7cbe..471d7829 100644 --- a/include/condy/runtime.hpp +++ b/include/condy/runtime.hpp @@ -138,7 +138,7 @@ class Runtime { * @note This function is thread-safe and can be called from any thread. */ void allow_exit() noexcept { - exit_allowed_.store(true); + exit_allowed_.store(true, std::memory_order_release); wakeup_(); } @@ -261,7 +261,8 @@ class Runtime { continue; } - if (pending_works_ == 0 && exit_allowed_.load()) { + if (pending_works_ == 0 && + exit_allowed_.load(std::memory_order_acquire)) { break; } flush_ring_wait_(); From 66db49aeafbd9a3c5211d4eb7e1852d68f831069 Mon Sep 17 00:00:00 2001 From: wokron Date: Thu, 21 May 2026 15:54:20 +0800 Subject: [PATCH 5/5] use resume_work in process_cqe_ --- include/condy/runtime.hpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/condy/runtime.hpp b/include/condy/runtime.hpp index 471d7829..2c66ee5d 100644 --- a/include/condy/runtime.hpp +++ b/include/condy/runtime.hpp @@ -463,7 +463,7 @@ class Runtime { panic_on(std::format("io_uring_prep_msg_ring: {}", std::strerror(-cqe->res))); } - pending_works_--; + resume_work(); } else { auto *work = static_cast(data); tsan_acquire(work); @@ -480,7 +480,7 @@ class Runtime { auto *handle = static_cast(data); auto op_finish = handle->handle(cqe); if (op_finish) { - pending_works_--; + resume_work(); } } else { unreachable();