diff --git a/.github/workflows/tsan.yml b/.github/workflows/tsan.yml new file mode 100644 index 00000000..a9fa9356 --- /dev/null +++ b/.github/workflows/tsan.yml @@ -0,0 +1,52 @@ +name: TSAN Race Check + +on: + workflow_dispatch: + +jobs: + tsan: + strategy: + fail-fast: false + matrix: + include: + - ros_distro: humble + container_image: rostooling/setup-ros-docker:ubuntu-jammy-latest + - ros_distro: jazzy + container_image: rostooling/setup-ros-docker:ubuntu-noble-latest + - ros_distro: kilted + container_image: rostooling/setup-ros-docker:ubuntu-noble-latest + runs-on: ubuntu-latest + container: + image: ${{ matrix.container_image }} + env: + RMW_IMPLEMENTATION: rmw_cyclonedds_cpp + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install tooling + shell: bash + run: | + apt-get update + apt-get install -y --no-install-recommends util-linux + + - name: Build with TSAN + shell: bash + run: | + source /opt/ros/${{ matrix.ros_distro }}/setup.bash + MAKEFLAGS="-j6" colcon build \ + --packages-select icey icey_examples \ + --cmake-args \ + -DCMAKE_BUILD_TYPE=RelWithDebInfo \ + -DICEY_ENABLE_TSAN=ON \ + -DICEY_ASYNC_AWAIT_THREAD_SAFE=1 + + - name: Run TSAN race repro tests + shell: bash + run: | + source /opt/ros/${{ matrix.ros_distro }}/setup.bash + source install/setup.bash + TSAN_OPTIONS="halt_on_error=1:detect_deadlocks=1:suppressions=${GITHUB_WORKSPACE}/icey/test/tsan_dds.supp" \ + setarch x86_64 -R \ + ${GITHUB_WORKSPACE}/build/icey/test_main \ + --gtest_filter=NodeTasksThreadSafety.TSanRaceRepro* diff --git a/CHANGELOG.md b/CHANGELOG.md index 35a55b3b..f45f54f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## 0.4.0 +### Changed + +- Timer callback signature not longer taking size_t as an argument +- Added CallbackGroup arguments to API + ### Added - Support for actions @@ -22,7 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - allowing coroutines without suspension points (i.e. co_await) but only a single co_return - Correct result type implementation - Missing request cleanup in ServiceClientImpl::our_to_real_req_id_ -- Added missing cancellation of timeout timer in Promise destruction +- Missing cancellation of timeout timer in Promise destruction ## 0.3.0 diff --git a/README.md b/README.md index 0c54175c..b4f266bd 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,11 @@ ROS 2 Humble Build Status

+### Updates: +- v0.4.0: + - Adds support multi-threaded executor, the async/await API is now thread-safe. + - Also adds support for actions. + ICEY is a new client API for modern asynchronous programming in the Robot Operating System (ROS) 2. It uses C++20 coroutines with async/await syntax for service calls and TF lookups. ICEY allows you to model data flows based on streams and promises. These features simplify application code and make asynchronous data flows clearly visible. ### Problems ICEY solves: @@ -22,7 +27,7 @@ ICEY is a new client API for modern asynchronous programming in the Robot Operat ICEY is fully compatible with the ROS 2 API since it is built on top of rclcpp. This allows for gradual adoption. It supports all major ROS features: parameters, subscriptions, publishers, timers, services, clients, actions and TF. Additionally, ICEY supports lifecycle nodes using a single API. ICEY operates smoothly with the message_filters package, using it for synchronization. ICEY is also extensible, as demonstrated by its support for image transport camera subscription/publishers. -ICEY supports ROS 2 Humble and ROS 2 Jazzy. +ICEY supports ROS 2 Humble/Jazzy/Kilted. The [icey_examples](icey_examples) package contains many different example nodes, demonstrating the capabilities of ICEY. diff --git a/icey/CMakeLists.txt b/icey/CMakeLists.txt index ad3eee6d..5c01a640 100644 --- a/icey/CMakeLists.txt +++ b/icey/CMakeLists.txt @@ -4,6 +4,9 @@ project(icey) set(CMAKE_CXX_STANDARD 20) set(CMAKE_CXX_EXTENSIONS OFF) +option(ICEY_ASYNC_AWAIT_THREAD_SAFE "Enable synchronization in async/await internals" ON) +option(ICEY_ENABLE_TSAN "Build tests with ThreadSanitizer instrumentation" OFF) + if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") add_compile_options(-Wall -Wextra -pedantic -Werror=return-type -Werror=init-self) endif() @@ -25,11 +28,14 @@ include_directories( ) ament_auto_add_library(icey + src/icey_async_await.cpp src/actions/client.cpp src/actions/server.cpp src/actions/server_goal_handle.cpp ) + + if(BUILD_TESTING) find_package(ament_cmake_gtest REQUIRED) @@ -50,6 +56,21 @@ if(BUILD_TESTING) target_link_libraries(test_main fmt::fmt) set_tests_properties(test_main PROPERTIES TIMEOUT 180) + + + if(ICEY_ENABLE_TSAN) + if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang") + target_compile_options(icey PUBLIC -fsanitize=thread -fno-omit-frame-pointer) + target_link_options(icey PUBLIC -fsanitize=thread) + target_compile_options(test_main PRIVATE -fsanitize=thread -fno-omit-frame-pointer) + target_link_options(test_main PRIVATE -fsanitize=thread) + set_tests_properties(test_main PROPERTIES + ENVIRONMENT "TSAN_OPTIONS=halt_on_error=1:suppressions=${CMAKE_CURRENT_SOURCE_DIR}/test/tsan_dds.supp") + else() + message(WARNING "ICEY_ENABLE_TSAN is ON but compiler does not support TSAN flags") + endif() + endif() + endif() if(ament_cmake_auto_VERSION EQUAL 2.7.3) diff --git a/icey/benchmark/src/events_throughput.cpp b/icey/benchmark/src/events_throughput.cpp index 2398c056..e3b37595 100644 --- a/icey/benchmark/src/events_throughput.cpp +++ b/icey/benchmark/src/events_throughput.cpp @@ -56,7 +56,9 @@ int main(int argc, char** argv) { // auto exec = rclcpp::experimental::executors::EventsExecutor() // exec.add(node); // exec.spin(); - rclcpp::spin(node); + rclcpp::executors::MultiThreadedExecutor exec{rclcpp::ExecutorOptions(), 8}; + exec.add_node(node->get_node_base_interface()); + exec.spin(); rclcpp::shutdown(); return 0; } \ No newline at end of file diff --git a/icey/benchmark/src/tf_lookup_async_ref.cpp b/icey/benchmark/src/tf_lookup_async_ref.cpp index bca26890..cf6c085a 100644 --- a/icey/benchmark/src/tf_lookup_async_ref.cpp +++ b/icey/benchmark/src/tf_lookup_async_ref.cpp @@ -35,5 +35,7 @@ int main(int argc, char **argv) { } }); - rclcpp::spin(node); + rclcpp::executors::MultiThreadedExecutor exec{rclcpp::ExecutorOptions(), 8}; + exec.add_node(node->get_node_base_interface()); + exec.spin(); } \ No newline at end of file diff --git a/icey/doc/source/api_ros.md b/icey/doc/source/api_ros.md index 8dc8edbd..abff19ae 100644 --- a/icey/doc/source/api_ros.md +++ b/icey/doc/source/api_ros.md @@ -21,9 +21,6 @@ Convenience node wrappers containing the Icey context ```{doxygenclass} icey::NodeWithIceyContext ``` -```{doxygenstruct} icey::TransformBufferImpl -``` - ```{doxygenstruct} icey::NodeBase ``` diff --git a/icey/doc/source/development.md b/icey/doc/source/development.md index 9b4e2445..67aaa3e1 100644 --- a/icey/doc/source/development.md +++ b/icey/doc/source/development.md @@ -50,6 +50,32 @@ Then, do not use FastDDS since it uses exceptions as part of it's regular (inste RMW_IMPLEMENTATION=rmw_cyclonedds_cpp gdb ./build/icey/test_main ``` + +## TSAN (ThreadSanitizer) + +TSAN is used to detect data races with multi-threaded executors. +Build both `icey` and `icey_examples` with TSAN instrumentation: + +```sh +cd +source /opt/ros/jazzy/setup.bash +MAKEFLAGS="-j6" colcon build --packages-select icey icey_examples --cmake-args -DCMAKE_BUILD_TYPE=RelWithDebInfo -DICEY_ENABLE_TSAN=ON -DICEY_ASYNC_AWAIT_THREAD_SAFE=1 +``` + +Run an example binary under TSAN: + +```sh +TSAN_OPTIONS="halt_on_error=1:detect_deadlocks=1:suppressions=/home/ivo/colcon_ws/src/icey/icey/test/tsan_dds.supp" setarch x86_64 -R /home/ivo/colcon_ws/install/icey_examples/lib/icey_examples/service_client_async_await_example +``` + +Why these workarounds are needed: + +- `setarch x86_64 -R`: disables ASLR for the process. Without this, TSAN can fail early with `unexpected memory mapping` on Ubuntu 24 (https://bugs.launchpad.net/ubuntu/+source/linux/+bug/2056762, https://github.com/google/sanitizers/issues/1716#issuecomment-2010399341) +- `suppressions=.../tsan_dds.supp`: filters for known DDS races and deadlocks that only cause noise. +- `halt_on_error=1`: fail fast +- `detect_deadlocks=0`: There is lock inversion issue in the rclcpp_actions currently + +By setting `-DICEY_ASYNC_AWAIT_THREAD_SAFE=0` during build, you can verify that indeed TSAN is able to detect the data races. ## Run clang-tidy: diff --git a/icey/doc/source/first_icey_node.md b/icey/doc/source/first_icey_node.md index 6182bb8b..ed1ad097 100644 --- a/icey/doc/source/first_icey_node.md +++ b/icey/doc/source/first_icey_node.md @@ -67,9 +67,4 @@ ICEY represents ROS primitives such as timers as a `Stream`, an abstraction over We also do not need to store the timer object anywhere, because the lifetime of entities in ICEY is bound to the lifetime of the node. In ICEY, you do not need to store subscriptions/timers/services as members of the class, ICEY does this bookkeeping for you. - -```{warning} -ICEY-nodes can currently only be used with a single-threaded executor. -``` - In the following, we will look more closely into how Subscriptions and Timers follow the `Stream` concept and how this changes the way of asynchronous programming. diff --git a/icey/include/icey/action/client_goal_handle.hpp b/icey/include/icey/action/client_goal_handle.hpp index c50b2ca9..1c89e0b1 100644 --- a/icey/include/icey/action/client_goal_handle.hpp +++ b/icey/include/icey/action/client_goal_handle.hpp @@ -23,22 +23,11 @@ #include "rcl_action/action_client.h" #include "rclcpp/macros.hpp" #include "rclcpp/time.hpp" +#include "rclcpp_action/client_goal_handle.hpp" #include "rclcpp_action/exceptions.hpp" #include "rclcpp_action/types.hpp" #include "rclcpp_action/visibility_control.hpp" -// I'm defining this in namespace rclcpp_action so that people don't have to change their code unnecessarily. -// I could also just include the rclcpp_action header, but just defining the enum here is faster to compile. -namespace rclcpp_action { -/// The possible statuses that an action goal can finish with. -enum class ResultCode : int8_t { - UNKNOWN = action_msgs::msg::GoalStatus::STATUS_UNKNOWN, - SUCCEEDED = action_msgs::msg::GoalStatus::STATUS_SUCCEEDED, - CANCELED = action_msgs::msg::GoalStatus::STATUS_CANCELED, - ABORTED = action_msgs::msg::GoalStatus::STATUS_ABORTED -}; -} // namespace rclcpp_action - namespace icey::rclcpp_action { using GoalUUID = std::array; @@ -66,14 +55,7 @@ class ClientGoalHandle { RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(ClientGoalHandle) // A wrapper that defines the result of an action - struct WrappedResult { - /// The unique identifier of the goal - GoalUUID goal_id; - /// A status to indicate if the goal was canceled, aborted, or succeeded - ResultCode code; - /// User defined fields sent back with an action - typename ActionT::Result::SharedPtr result; - }; + using WrappedResult = typename ::rclcpp_action::ClientGoalHandle::WrappedResult; using Feedback = typename ActionT::Feedback; using Result = typename ActionT::Result; diff --git a/icey/include/icey/icey_async_await.hpp b/icey/include/icey/icey_async_await.hpp index 2fb7d1ed..93915b0a 100644 --- a/icey/include/icey/icey_async_await.hpp +++ b/icey/include/icey/icey_async_await.hpp @@ -9,34 +9,40 @@ /// can include this header only and get faster compile times. #pragma once +#include #include #include #include +#include #include #include /// for ID #include +#include +#include #include "rclcpp/rclcpp.hpp" #include "rclcpp/version.h" #include "rclcpp_lifecycle/lifecycle_node.hpp" #include "tf2_msgs/msg/tf_message.hpp" #include "tf2_ros/buffer.hpp" -#include "tf2_ros/create_timer_ros.hpp" -#include "tf2_ros/qos.hpp" namespace icey { +#ifndef ICEY_ASYNC_AWAIT_THREAD_SAFE +#define ICEY_ASYNC_AWAIT_THREAD_SAFE 1 +#endif + using Clock = std::chrono::system_clock; using Time = std::chrono::time_point; using Duration = Clock::duration; -static rclcpp::Time rclcpp_from_chrono(const Time &time_point) { +[[maybe_unused]] static rclcpp::Time rclcpp_from_chrono(const Time &time_point) { return rclcpp::Time(std::chrono::time_point_cast(time_point) .time_since_epoch() .count()); } -static Time rclcpp_to_chrono(const rclcpp::Time &time_point) { +[[maybe_unused]] static Time rclcpp_to_chrono(const rclcpp::Time &time_point) { return Time(std::chrono::nanoseconds(time_point.nanoseconds())); } @@ -113,18 +119,25 @@ struct NodeBase { template void add_task_for(uint64_t id, const Duration &timeout, CallbackT on_timeout, rclcpp::CallbackGroup::SharedPtr group = nullptr) { +#if ICEY_ASYNC_AWAIT_THREAD_SAFE + std::lock_guard lock{oneoff_tasks_mutex_}; +#endif oneoff_cancelled_timers_.clear(); oneoff_active_timers_.emplace(id, create_wall_timer( timeout, [this, id, on_timeout]() { - cancel_task_for(id); - on_timeout(); + if (cancel_task_for(id)) { + on_timeout(); + } }, group)); } /// Cancel a previously scheduled task by key (no-op if not present) bool cancel_task_for(uint64_t id) { +#if ICEY_ASYNC_AWAIT_THREAD_SAFE + std::lock_guard lock{oneoff_tasks_mutex_}; +#endif auto it = oneoff_active_timers_.find(id); if (it == oneoff_active_timers_.end()) return false; auto timer = it->second; @@ -201,304 +214,42 @@ struct NodeBase { std::unordered_set> oneoff_cancelled_timers_; /// Active one-off tasks keyed by a stable uintptr_t key std::unordered_map> oneoff_active_timers_; +#if ICEY_ASYNC_AWAIT_THREAD_SAFE + mutable std::mutex oneoff_tasks_mutex_; +#endif public: // Test helpers (introspection) - std::size_t oneoff_active_task_count() const { return oneoff_active_timers_.size(); } - std::size_t oneoff_cancelled_task_count() const { return oneoff_cancelled_timers_.size(); } -}; - -/// A subscription + buffer for transforms that allows for asynchronous lookups and to subscribe on -/// a single transform between two coordinate systems. It is otherwise implemented similarly to the -/// tf2_ros::TransformListener but offering a well-developed asynchronous API. -/// It subscribes on the topic /tf and listens for relevant transforms (i.e. ones we subscribed to -/// or the ones we requested a lookup). If any relevant transform was received, it notifies via a -/// callback. -/// -/// Why not using `tf2_ros::AsyncBuffer` ? Due to multiple issues with it: (1) It uses -/// the `std::future/std::promise` primitives that are effectively useless. (2) -/// tf2_ros::AsyncBuffer::waitForTransform has a bug: We cannot make another lookup in the callback -/// of a lookup (it holds a (non-reentrant) mutex locked while calling the user callback), making it -/// impossible to chain asynchronous operations. -/// -/// @sa This class is used to implement the `TransformSubscriptionStream` and the `TransformBuffer`. -struct TransformBufferImpl { - using TransformMsg = geometry_msgs::msg::TransformStamped; - using TransformsMsg = tf2_msgs::msg::TFMessage::ConstSharedPtr; - using OnTransform = std::function; - using OnError = std::function; - using GetFrame = std::function; - - /// We make this class non-copyable since it captures the this-pointer in clojures. - TransformBufferImpl(const TransformBufferImpl &) = delete; - TransformBufferImpl(TransformBufferImpl &&) = delete; - TransformBufferImpl &operator=(const TransformBufferImpl &) = delete; - TransformBufferImpl &operator=(TransformBufferImpl &&) = delete; - - /// A transform request stores a request for looking up a transform between two coordinate - /// systems. Either (1) at a particular time with a timeout, or (2) as a subscription. When - /// "subscribing" to a transform, we yield a transform each time it changes. - struct TransformRequest { - TransformRequest(const GetFrame &target_frame, const GetFrame &source_frame, - OnTransform on_transform, OnError on_error, - std::optional