Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/condy/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ template <typename T, size_t N = 2> class Channel {
}
assert(pop_awaiters_.empty());
push_awaiters_.push_back(finish_handle);
detail::Context::current().runtime()->pend_work();
return -EAGAIN;
}

Expand All @@ -233,7 +232,6 @@ template <typename T, size_t N = 2> class Channel {
return {-EPIPE, T()};
}
pop_awaiters_.push_back(finish_handle);
detail::Context::current().runtime()->pend_work();
return {-EAGAIN, T()};
}

Expand Down Expand Up @@ -401,6 +399,7 @@ class Channel<T, N>::PushFinishHandle
std::move(receiver_)(r);
return;
}
runtime->pend_work();

auto stop_token = receiver_.get_stop_token();
if (stop_token.stop_possible()) {
Expand Down Expand Up @@ -487,6 +486,7 @@ class Channel<T, N>::PopFinishHandle
std::move(receiver_)(std::move(item));
return;
}
runtime->pend_work();

auto stop_token = receiver_.get_stop_token();
if (stop_token.stop_possible()) {
Expand Down
2 changes: 1 addition & 1 deletion include/condy/futex.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ template <typename T> class Futex {
return 0; // No need to wait
}
wait_awaiters_.push_back(handle);
detail::Context::current().runtime()->pend_work();
return -EAGAIN; // Need to wait
}

Expand Down Expand Up @@ -151,6 +150,7 @@ class Futex<T>::WaitFinishHandle
std::move(receiver_)(r);
return;
}
runtime->pend_work();

auto stop_token = receiver_.get_stop_token();
if (stop_token.stop_possible()) {
Expand Down
22 changes: 15 additions & 7 deletions include/condy/runtime.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class Runtime {
* @note This function is thread-safe and can be called from any thread.
*/
void allow_exit() noexcept {
pending_works_--;
exit_allowed_.store(true, std::memory_order_release);
wakeup_();
}

Expand Down Expand Up @@ -200,9 +200,15 @@ class Runtime {
request.wait();
}

void pend_work() noexcept { pending_works_++; }
void pend_work() noexcept {
assert(detail::Context::current().runtime() == this);
pending_works_++;
}

void resume_work() noexcept { pending_works_--; }
void resume_work() noexcept {
assert(detail::Context::current().runtime() == this);
pending_works_--;
}
Comment thread
wokron marked this conversation as resolved.

/**
* @brief Run the runtime event loop in the current thread.
Expand Down Expand Up @@ -255,7 +261,8 @@ class Runtime {
continue;
}

if (pending_works_ == 0) {
if (pending_works_ == 0 &&
exit_allowed_.load(std::memory_order_acquire)) {
break;
}
flush_ring_wait_();
Expand Down Expand Up @@ -456,7 +463,7 @@ class Runtime {
panic_on(std::format("io_uring_prep_msg_ring: {}",
std::strerror(-cqe->res)));
}
pending_works_--;
resume_work();
} else {
auto *work = static_cast<WorkInvoker *>(data);
tsan_acquire(work);
Expand All @@ -473,7 +480,7 @@ class Runtime {
auto *handle = static_cast<OpFinishHandleBase *>(data);
auto op_finish = handle->handle(cqe);
if (op_finish) {
pending_works_--;
resume_work();
}
} else {
unreachable();
Expand All @@ -495,7 +502,8 @@ class Runtime {
// Global state
std::mutex mutex_;
WorkListQueue global_queue_;
std::atomic_size_t pending_works_ = 1;
size_t pending_works_ = 0;
std::atomic_bool exit_allowed_ = false;
std::atomic<State> state_ = State::Idle;

// Local state
Expand Down
Loading