Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,9 @@ class Executor
/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
std::atomic_bool spinning;

/// Tracks a pending cancel request that has not yet been consumed by a spin.
std::atomic_bool cancel_requested_;

/// Guard condition for signaling the rmw layer to wake up for special events.
std::shared_ptr<rclcpp::GuardCondition> interrupt_guard_condition_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,12 @@ class EventsCBGExecutor : public rclcpp::Executor
if (spinning.exchange(true)) {
throw std::runtime_error("spin_until_future_complete() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
RCPPUTILS_SCOPE_EXIT(
this->spinning.store(false);
this->cancel_requested_.store(false); );
if (cancel_requested_.load()) {
return FutureReturnCode::INTERRUPTED;
}
while (rclcpp::ok(this->context_) && spinning.load()) {
// Do one item of work.
spin_once_internal(timeout_left);
Expand Down
32 changes: 29 additions & 3 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class rclcpp::ExecutorImplementation {};

Executor::Executor(const std::shared_ptr<rclcpp::Context> & context)
: spinning(false),
cancel_requested_(false),
context_(context),
entities_need_rebuild_(true),
collector_(nullptr),
Expand All @@ -62,6 +63,7 @@ Executor::Executor(const std::shared_ptr<rclcpp::Context> & context)

Executor::Executor(const rclcpp::ExecutorOptions & options)
: spinning(false),
cancel_requested_(false),
interrupt_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
shutdown_guard_condition_(std::make_shared<rclcpp::GuardCondition>(options.context)),
context_(options.context),
Expand Down Expand Up @@ -287,7 +289,13 @@ Executor::spin_until_future_complete_impl(
if (spinning.exchange(true)) {
throw std::runtime_error("spin_until_future_complete() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
RCPPUTILS_SCOPE_EXIT(
wait_result_.reset();
this->spinning.store(false);
this->cancel_requested_.store(false););
if (cancel_requested_.load()) {
return FutureReturnCode::INTERRUPTED;
}
while (rclcpp::ok(this->context_) && spinning.load()) {
// Do one item of work.
spin_once_impl(timeout_left);
Expand Down Expand Up @@ -378,7 +386,13 @@ Executor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
if (spinning.exchange(true)) {
throw std::runtime_error("spin_some() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
RCPPUTILS_SCOPE_EXIT(
wait_result_.reset();
this->spinning.store(false);
this->cancel_requested_.store(false););
if (cancel_requested_.load()) {
return;
}

// clear the wait result and wait for work without blocking to collect the work
// for the first time
Expand Down Expand Up @@ -460,13 +474,25 @@ Executor::spin_once(std::chrono::nanoseconds timeout)
if (spinning.exchange(true)) {
throw std::runtime_error("spin_once() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
RCPPUTILS_SCOPE_EXIT(
wait_result_.reset();
this->spinning.store(false);
this->cancel_requested_.store(false););
if (cancel_requested_.load()) {
return;
}
spin_once_impl(timeout);
}

void
Executor::cancel()
{
// Set the pending-cancel flag BEFORE clearing the spinning flag.
// This ordering ensures that a spin variant which is just entering and is
// about to do `spinning.exchange(true)` will observe the pending cancel and
// bail out early instead of re-arming spinning and blocking forever in
// wait_for_work().
cancel_requested_.store(true);
spinning.store(false);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now wrong.
The cancel should not touch the spinning at all, it shall be only controlled by the spin functions.

This would also fix a bug, that is_spinning would return false, directly even after the cancel, even though the executor might still be doing stuff.

try {
interrupt_guard_condition_->trigger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,12 @@ EventsCBGExecutor::spin_once(std::chrono::nanoseconds timeout)
if (spinning.exchange(true) ) {
throw std::runtime_error("spin_once() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
RCPPUTILS_SCOPE_EXIT(
this->spinning.store(false);
this->cancel_requested_.store(false););
if (cancel_requested_.load()) {
return;
}

spin_once_internal(timeout);
}
Expand Down Expand Up @@ -458,7 +463,12 @@ bool EventsCBGExecutor::collect_and_execute_ready_events(
if (spinning.exchange(true) ) {
throw std::runtime_error("collect_and_execute_ready_events() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
RCPPUTILS_SCOPE_EXIT(
this->spinning.store(false);
this->cancel_requested_.store(false););
if (cancel_requested_.load()) {
return false;
}

const auto start = std::chrono::steady_clock::now();
const auto end_time = start + max_duration;
Expand Down Expand Up @@ -491,7 +501,12 @@ EventsCBGExecutor::spin()
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
RCPPUTILS_SCOPE_EXIT(
this->spinning.store(false);
this->cancel_requested_.store(false););
if (cancel_requested_.load()) {
return;
}
std::vector<std::thread> threads;
size_t thread_id = 0;
for ( ; thread_id < number_of_threads_ - 1; ++thread_id) {
Expand All @@ -513,7 +528,12 @@ void EventsCBGExecutor::spin(
if (spinning.exchange(true) ) {
throw std::runtime_error("spin() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
RCPPUTILS_SCOPE_EXIT(
this->spinning.store(false);
this->cancel_requested_.store(false););
if (cancel_requested_.load()) {
return;
}
std::vector<std::thread> threads;
size_t thread_id = 0;
for ( ; thread_id < number_of_threads_ - 1; ++thread_id) {
Expand Down Expand Up @@ -564,6 +584,11 @@ EventsCBGExecutor::cancel()
{
bool was_spinning = spinning;

// Set the pending-cancel flag BEFORE clearing spinning, so that any spin
// variant that is just entering and is about to do `spinning.exchange(true)`
// observes the pending cancel and bails out early instead of re-arming
// spinning and blocking forever.
cancel_requested_.store(true);
spinning.store(false);

if(was_spinning) {
Expand Down
8 changes: 7 additions & 1 deletion rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,13 @@ MultiThreadedExecutor::spin()
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
RCPPUTILS_SCOPE_EXIT(
wait_result_.reset();
this->spinning.store(false);
this->cancel_requested_.store(false););
if (cancel_requested_.load()) {
return;
}
std::vector<std::thread> threads;
size_t thread_id = 0;
{
Expand Down
8 changes: 7 additions & 1 deletion rclcpp/src/rclcpp/executors/single_threaded_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ SingleThreadedExecutor::spin()
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false););
RCPPUTILS_SCOPE_EXIT(
wait_result_.reset();
this->spinning.store(false);
this->cancel_requested_.store(false););
if (cancel_requested_.load()) {
return;
}

// Clear any previous result and rebuild the waitset
this->wait_result_.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,12 @@ EventsExecutor::spin()
if (spinning.exchange(true)) {
throw std::runtime_error("spin() called while already spinning");
}
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
RCPPUTILS_SCOPE_EXIT(
this->spinning.store(false);
this->cancel_requested_.store(false););
if (cancel_requested_.load()) {
return;
}

timers_manager_->start();
RCPPUTILS_SCOPE_EXIT(timers_manager_->stop(); );
Expand Down Expand Up @@ -151,7 +156,12 @@ EventsExecutor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhau
throw std::runtime_error("spin_some() called while already spinning");
}

RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
RCPPUTILS_SCOPE_EXIT(
this->spinning.store(false);
this->cancel_requested_.store(false););
if (cancel_requested_.load()) {
return;
}

auto start = std::chrono::steady_clock::now();

Expand Down
Loading