diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 87e0f3efdd..0290016176 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -567,6 +567,9 @@ class Executor /// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins. std::atomic_bool spinning; + /// Tracks a pending cancel request that has not yet been consumed by a spin. + std::atomic_bool cancel_requested_; + /// Guard condition for signaling the rmw layer to wake up for special events. std::shared_ptr interrupt_guard_condition_; diff --git a/rclcpp/include/rclcpp/executors/events_cbg_executor/events_cbg_executor.hpp b/rclcpp/include/rclcpp/executors/events_cbg_executor/events_cbg_executor.hpp index b791f6f5da..5260b3411f 100644 --- a/rclcpp/include/rclcpp/executors/events_cbg_executor/events_cbg_executor.hpp +++ b/rclcpp/include/rclcpp/executors/events_cbg_executor/events_cbg_executor.hpp @@ -205,7 +205,12 @@ class EventsCBGExecutor : public rclcpp::Executor if (spinning.exchange(true)) { throw std::runtime_error("spin_until_future_complete() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + RCPPUTILS_SCOPE_EXIT( + this->spinning.store(false); + this->cancel_requested_.store(false); ); + if (cancel_requested_.load()) { + return FutureReturnCode::INTERRUPTED; + } while (rclcpp::ok(this->context_) && spinning.load()) { // Do one item of work. spin_once_internal(timeout_left); diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 90f3277bc7..8baaa260fd 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -53,6 +53,7 @@ class rclcpp::ExecutorImplementation {}; Executor::Executor(const std::shared_ptr & context) : spinning(false), + cancel_requested_(false), context_(context), entities_need_rebuild_(true), collector_(nullptr), @@ -62,6 +63,7 @@ Executor::Executor(const std::shared_ptr & context) Executor::Executor(const rclcpp::ExecutorOptions & options) : spinning(false), + cancel_requested_(false), interrupt_guard_condition_(std::make_shared(options.context)), shutdown_guard_condition_(std::make_shared(options.context)), context_(options.context), @@ -287,7 +289,13 @@ Executor::spin_until_future_complete_impl( if (spinning.exchange(true)) { throw std::runtime_error("spin_until_future_complete() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false);); + RCPPUTILS_SCOPE_EXIT( + wait_result_.reset(); + this->spinning.store(false); + this->cancel_requested_.store(false);); + if (cancel_requested_.load()) { + return FutureReturnCode::INTERRUPTED; + } while (rclcpp::ok(this->context_) && spinning.load()) { // Do one item of work. spin_once_impl(timeout_left); @@ -378,7 +386,13 @@ Executor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive) if (spinning.exchange(true)) { throw std::runtime_error("spin_some() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false);); + RCPPUTILS_SCOPE_EXIT( + wait_result_.reset(); + this->spinning.store(false); + this->cancel_requested_.store(false);); + if (cancel_requested_.load()) { + return; + } // clear the wait result and wait for work without blocking to collect the work // for the first time @@ -460,13 +474,25 @@ Executor::spin_once(std::chrono::nanoseconds timeout) if (spinning.exchange(true)) { throw std::runtime_error("spin_once() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false);); + RCPPUTILS_SCOPE_EXIT( + wait_result_.reset(); + this->spinning.store(false); + this->cancel_requested_.store(false);); + if (cancel_requested_.load()) { + return; + } spin_once_impl(timeout); } void Executor::cancel() { + // Set the pending-cancel flag BEFORE clearing the spinning flag. + // This ordering ensures that a spin variant which is just entering and is + // about to do `spinning.exchange(true)` will observe the pending cancel and + // bail out early instead of re-arming spinning and blocking forever in + // wait_for_work(). + cancel_requested_.store(true); spinning.store(false); try { interrupt_guard_condition_->trigger(); diff --git a/rclcpp/src/rclcpp/executors/events_cbg_executor/events_cbg_executor.cpp b/rclcpp/src/rclcpp/executors/events_cbg_executor/events_cbg_executor.cpp index 80f87ac57a..98c5943214 100644 --- a/rclcpp/src/rclcpp/executors/events_cbg_executor/events_cbg_executor.cpp +++ b/rclcpp/src/rclcpp/executors/events_cbg_executor/events_cbg_executor.cpp @@ -430,7 +430,12 @@ EventsCBGExecutor::spin_once(std::chrono::nanoseconds timeout) if (spinning.exchange(true) ) { throw std::runtime_error("spin_once() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + RCPPUTILS_SCOPE_EXIT( + this->spinning.store(false); + this->cancel_requested_.store(false);); + if (cancel_requested_.load()) { + return; + } spin_once_internal(timeout); } @@ -458,7 +463,12 @@ bool EventsCBGExecutor::collect_and_execute_ready_events( if (spinning.exchange(true) ) { throw std::runtime_error("collect_and_execute_ready_events() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + RCPPUTILS_SCOPE_EXIT( + this->spinning.store(false); + this->cancel_requested_.store(false);); + if (cancel_requested_.load()) { + return false; + } const auto start = std::chrono::steady_clock::now(); const auto end_time = start + max_duration; @@ -491,7 +501,12 @@ EventsCBGExecutor::spin() if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + RCPPUTILS_SCOPE_EXIT( + this->spinning.store(false); + this->cancel_requested_.store(false);); + if (cancel_requested_.load()) { + return; + } std::vector threads; size_t thread_id = 0; for ( ; thread_id < number_of_threads_ - 1; ++thread_id) { @@ -513,7 +528,12 @@ void EventsCBGExecutor::spin( if (spinning.exchange(true) ) { throw std::runtime_error("spin() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + RCPPUTILS_SCOPE_EXIT( + this->spinning.store(false); + this->cancel_requested_.store(false);); + if (cancel_requested_.load()) { + return; + } std::vector threads; size_t thread_id = 0; for ( ; thread_id < number_of_threads_ - 1; ++thread_id) { @@ -564,6 +584,11 @@ EventsCBGExecutor::cancel() { bool was_spinning = spinning; + // Set the pending-cancel flag BEFORE clearing spinning, so that any spin + // variant that is just entering and is about to do `spinning.exchange(true)` + // observes the pending cancel and bails out early instead of re-arming + // spinning and blocking forever. + cancel_requested_.store(true); spinning.store(false); if(was_spinning) { diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 378fdfb8fa..abb7699a73 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -55,7 +55,13 @@ MultiThreadedExecutor::spin() if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false);); + RCPPUTILS_SCOPE_EXIT( + wait_result_.reset(); + this->spinning.store(false); + this->cancel_requested_.store(false);); + if (cancel_requested_.load()) { + return; + } std::vector threads; size_t thread_id = 0; { diff --git a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp index 689bbae398..bcef06f66e 100644 --- a/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/single_threaded_executor.cpp @@ -30,7 +30,13 @@ SingleThreadedExecutor::spin() if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(wait_result_.reset();this->spinning.store(false);); + RCPPUTILS_SCOPE_EXIT( + wait_result_.reset(); + this->spinning.store(false); + this->cancel_requested_.store(false);); + if (cancel_requested_.load()) { + return; + } // Clear any previous result and rebuild the waitset this->wait_result_.reset(); diff --git a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp index 4a3f8f796f..834d8ea53d 100644 --- a/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp +++ b/rclcpp/src/rclcpp/experimental/executors/events_executor/events_executor.cpp @@ -114,7 +114,12 @@ EventsExecutor::spin() if (spinning.exchange(true)) { throw std::runtime_error("spin() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + RCPPUTILS_SCOPE_EXIT( + this->spinning.store(false); + this->cancel_requested_.store(false);); + if (cancel_requested_.load()) { + return; + } timers_manager_->start(); RCPPUTILS_SCOPE_EXIT(timers_manager_->stop(); ); @@ -151,7 +156,12 @@ EventsExecutor::spin_some_impl(std::chrono::nanoseconds max_duration, bool exhau throw std::runtime_error("spin_some() called while already spinning"); } - RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + RCPPUTILS_SCOPE_EXIT( + this->spinning.store(false); + this->cancel_requested_.store(false);); + if (cancel_requested_.load()) { + return; + } auto start = std::chrono::steady_clock::now(); diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 8911c0a28c..ad1b24ebae 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -170,6 +171,172 @@ TYPED_TEST(TestExecutors, spinWithTimer) executor.remove_node(this->node, true); } +// If cancel() is called *before* spin() begins (e.g. a test fixture's +// TearDown() races with a background thread that has not yet entered +// spin()), the executor must not block forever on wait_for_work(). +// Each spin variant must consume the pending cancel and return promptly. +TYPED_TEST(TestExecutors, cancelBeforeSpinDoesNotHang) +{ + using ExecutorType = TypeParam; + ExecutorType executor; + executor.add_node(this->node); + + // Cancel before any spin has been issued. The cancel must be "held" until + // the next spin consumes it. + executor.cancel(); + + std::atomic spin_returned{false}; + std::thread spinner( + [&]() { + EXPECT_NO_THROW(executor.spin()); + spin_returned.store(true); + }); + + // Without the fix, spin() would block in wait_for_work() forever and the + // join below would deadlock. With the fix, spin() returns immediately. + auto start = std::chrono::steady_clock::now(); + while (!spin_returned.load() && (std::chrono::steady_clock::now() - start) < 5s) { + std::this_thread::sleep_for(1ms); + } + EXPECT_TRUE(spin_returned.load()) + << "spin() did not return after cancel() was called before spin start"; + + // If spin did not return, cancel again from the main thread to unblock the + // (now broken) test before joining, otherwise the test process hangs. + if (!spin_returned.load()) { + executor.cancel(); + } + spinner.join(); + executor.remove_node(this->node, true); +} + +// If cancel() is called while spin() is racing to enter wait_for_work(), +// the executor must not block forever on wait_for_work(). +TYPED_TEST(TestExecutors, cancelRacingSpinDoesNotHang) +{ + using ExecutorType = TypeParam; + + // Repeat the race a few times to increase the chance of hitting the bad + // interleaving on machines where the threads tend to schedule the same way. + for (int i = 0; i < 10; ++i) { + ExecutorType executor; + executor.add_node(this->node); + + std::atomic spin_returned{false}; + std::thread spinner( + [&]() { + EXPECT_NO_THROW(executor.spin()); + spin_returned.store(true); + }); + + // Issue cancel from the main thread without sleeping; depending on + // scheduling, cancel may fire before or after spin() entered its loop. + executor.cancel(); + + // Spin must return within a short, generous timeout in either case. + auto start = std::chrono::steady_clock::now(); + while (!spin_returned.load() && (std::chrono::steady_clock::now() - start) < 5s) { + std::this_thread::sleep_for(1ms); + } + EXPECT_TRUE(spin_returned.load()) + << "spin() did not return on iteration " << i + << " when cancel() raced with spin()"; + + // Defensive cleanup so a failing iteration does not deadlock the suite. + if (!spin_returned.load()) { + executor.cancel(); + } + spinner.join(); + executor.remove_node(this->node, true); + } +} + +// After cancel() consumed by an early-exit spin(), a subsequent spin() must +// behave normally (no stale cancel state leaks across spin invocations). +TYPED_TEST(TestExecutors, spinAfterCancelBeforeSpin) +{ + using ExecutorType = TypeParam; + ExecutorType executor; + + std::atomic timer_completed{false}; + auto timer = this->node->create_wall_timer( + 1ms, [&]() {timer_completed.store(true);}); + executor.add_node(this->node); + + // First, exercise the cancel-before-spin path: spin() must return promptly. + executor.cancel(); + std::thread first_spinner([&]() {EXPECT_NO_THROW(executor.spin());}); + first_spinner.join(); + EXPECT_FALSE(timer_completed.load()) + << "timer should not have fired during a cancel-before-spin no-op spin"; + + // Now spin again; the previous cancel must have been consumed and the + // executor should run normally. + std::thread second_spinner([&]() {EXPECT_NO_THROW(executor.spin());}); + + auto start = std::chrono::steady_clock::now(); + while (!timer_completed.load() && (std::chrono::steady_clock::now() - start) < 10s) { + std::this_thread::sleep_for(1ms); + } + EXPECT_TRUE(timer_completed.load()) + << "timer did not fire on the spin that followed a cancel-before-spin"; + + executor.cancel(); + second_spinner.join(); + executor.remove_node(this->node, true); +} + +// Cancel-before-spin must also be honored by spin_some(), spin_all(), +// spin_once(), and spin_until_future_complete(). +TYPED_TEST(TestExecutors, cancelBeforeNonBlockingSpinVariantsDoNotHang) +{ + using ExecutorType = TypeParam; + ExecutorType executor; + executor.add_node(this->node); + + // spin_some() + { + executor.cancel(); + auto start = std::chrono::steady_clock::now(); + EXPECT_NO_THROW(executor.spin_some(10s)); + EXPECT_LT(std::chrono::steady_clock::now() - start, 1s) + << "spin_some() blocked after cancel() was called before it"; + } + + // spin_all() + { + executor.cancel(); + auto start = std::chrono::steady_clock::now(); + EXPECT_NO_THROW(executor.spin_all(10s)); + EXPECT_LT(std::chrono::steady_clock::now() - start, 1s) + << "spin_all() blocked after cancel() was called before it"; + } + + // spin_once() + { + executor.cancel(); + auto start = std::chrono::steady_clock::now(); + EXPECT_NO_THROW(executor.spin_once(10s)); + EXPECT_LT(std::chrono::steady_clock::now() - start, 1s) + << "spin_once() blocked after cancel() was called before it"; + } + + // spin_until_future_complete() must report INTERRUPTED, not block. + { + std::promise never_set; + auto future = never_set.get_future().share(); + executor.cancel(); + auto start = std::chrono::steady_clock::now(); + rclcpp::FutureReturnCode ret = rclcpp::FutureReturnCode::TIMEOUT; + EXPECT_NO_THROW(ret = executor.spin_until_future_complete(future, 10s)); + EXPECT_LT(std::chrono::steady_clock::now() - start, 1s) + << "spin_until_future_complete() blocked after cancel() was called before it"; + EXPECT_EQ(ret, rclcpp::FutureReturnCode::INTERRUPTED); + } + + executor.remove_node(this->node, true); +} + TYPED_TEST(TestExecutors, spinWhileAlreadySpinning) { using ExecutorType = TypeParam;