diff --git a/.github/workflows/tsan.yml b/.github/workflows/tsan.yml
new file mode 100644
index 00000000..a9fa9356
--- /dev/null
+++ b/.github/workflows/tsan.yml
@@ -0,0 +1,52 @@
+name: TSAN Race Check
+
+on:
+ workflow_dispatch:
+
+jobs:
+ tsan:
+ strategy:
+ fail-fast: false
+ matrix:
+ include:
+ - ros_distro: humble
+ container_image: rostooling/setup-ros-docker:ubuntu-jammy-latest
+ - ros_distro: jazzy
+ container_image: rostooling/setup-ros-docker:ubuntu-noble-latest
+ - ros_distro: kilted
+ container_image: rostooling/setup-ros-docker:ubuntu-noble-latest
+ runs-on: ubuntu-latest
+ container:
+ image: ${{ matrix.container_image }}
+ env:
+ RMW_IMPLEMENTATION: rmw_cyclonedds_cpp
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v4
+
+ - name: Install tooling
+ shell: bash
+ run: |
+ apt-get update
+ apt-get install -y --no-install-recommends util-linux
+
+ - name: Build with TSAN
+ shell: bash
+ run: |
+ source /opt/ros/${{ matrix.ros_distro }}/setup.bash
+ MAKEFLAGS="-j6" colcon build \
+ --packages-select icey icey_examples \
+ --cmake-args \
+ -DCMAKE_BUILD_TYPE=RelWithDebInfo \
+ -DICEY_ENABLE_TSAN=ON \
+ -DICEY_ASYNC_AWAIT_THREAD_SAFE=1
+
+ - name: Run TSAN race repro tests
+ shell: bash
+ run: |
+ source /opt/ros/${{ matrix.ros_distro }}/setup.bash
+ source install/setup.bash
+ TSAN_OPTIONS="halt_on_error=1:detect_deadlocks=1:suppressions=${GITHUB_WORKSPACE}/icey/test/tsan_dds.supp" \
+ setarch x86_64 -R \
+ ${GITHUB_WORKSPACE}/build/icey/test_main \
+ --gtest_filter=NodeTasksThreadSafety.TSanRaceRepro*
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 35a55b3b..f45f54f8 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## 0.4.0
+### Changed
+
+- Timer callback signature not longer taking size_t as an argument
+- Added CallbackGroup arguments to API
+
### Added
- Support for actions
@@ -22,7 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- allowing coroutines without suspension points (i.e. co_await) but only a single co_return
- Correct result type implementation
- Missing request cleanup in ServiceClientImpl::our_to_real_req_id_
-- Added missing cancellation of timeout timer in Promise destruction
+- Missing cancellation of timeout timer in Promise destruction
## 0.3.0
diff --git a/README.md b/README.md
index 0c54175c..b4f266bd 100644
--- a/README.md
+++ b/README.md
@@ -10,6 +10,11 @@
+### Updates:
+- v0.4.0:
+ - Adds support multi-threaded executor, the async/await API is now thread-safe.
+ - Also adds support for actions.
+
ICEY is a new client API for modern asynchronous programming in the Robot Operating System (ROS) 2. It uses C++20 coroutines with async/await syntax for service calls and TF lookups. ICEY allows you to model data flows based on streams and promises. These features simplify application code and make asynchronous data flows clearly visible.
### Problems ICEY solves:
@@ -22,7 +27,7 @@ ICEY is a new client API for modern asynchronous programming in the Robot Operat
ICEY is fully compatible with the ROS 2 API since it is built on top of rclcpp. This allows for gradual adoption. It supports all major ROS features: parameters, subscriptions, publishers, timers, services, clients, actions and TF. Additionally, ICEY supports lifecycle nodes using a single API.
ICEY operates smoothly with the message_filters package, using it for synchronization. ICEY is also extensible, as demonstrated by its support for image transport camera subscription/publishers.
-ICEY supports ROS 2 Humble and ROS 2 Jazzy.
+ICEY supports ROS 2 Humble/Jazzy/Kilted.
The [icey_examples](icey_examples) package contains many different example nodes, demonstrating the capabilities of ICEY.
diff --git a/icey/CMakeLists.txt b/icey/CMakeLists.txt
index ad3eee6d..5c01a640 100644
--- a/icey/CMakeLists.txt
+++ b/icey/CMakeLists.txt
@@ -4,6 +4,9 @@ project(icey)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_EXTENSIONS OFF)
+option(ICEY_ASYNC_AWAIT_THREAD_SAFE "Enable synchronization in async/await internals" ON)
+option(ICEY_ENABLE_TSAN "Build tests with ThreadSanitizer instrumentation" OFF)
+
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -pedantic -Werror=return-type -Werror=init-self)
endif()
@@ -25,11 +28,14 @@ include_directories(
)
ament_auto_add_library(icey
+ src/icey_async_await.cpp
src/actions/client.cpp
src/actions/server.cpp
src/actions/server_goal_handle.cpp
)
+
+
if(BUILD_TESTING)
find_package(ament_cmake_gtest REQUIRED)
@@ -50,6 +56,21 @@ if(BUILD_TESTING)
target_link_libraries(test_main fmt::fmt)
set_tests_properties(test_main PROPERTIES TIMEOUT 180)
+
+
+ if(ICEY_ENABLE_TSAN)
+ if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang")
+ target_compile_options(icey PUBLIC -fsanitize=thread -fno-omit-frame-pointer)
+ target_link_options(icey PUBLIC -fsanitize=thread)
+ target_compile_options(test_main PRIVATE -fsanitize=thread -fno-omit-frame-pointer)
+ target_link_options(test_main PRIVATE -fsanitize=thread)
+ set_tests_properties(test_main PROPERTIES
+ ENVIRONMENT "TSAN_OPTIONS=halt_on_error=1:suppressions=${CMAKE_CURRENT_SOURCE_DIR}/test/tsan_dds.supp")
+ else()
+ message(WARNING "ICEY_ENABLE_TSAN is ON but compiler does not support TSAN flags")
+ endif()
+ endif()
+
endif()
if(ament_cmake_auto_VERSION EQUAL 2.7.3)
diff --git a/icey/benchmark/src/events_throughput.cpp b/icey/benchmark/src/events_throughput.cpp
index 2398c056..e3b37595 100644
--- a/icey/benchmark/src/events_throughput.cpp
+++ b/icey/benchmark/src/events_throughput.cpp
@@ -56,7 +56,9 @@ int main(int argc, char** argv) {
// auto exec = rclcpp::experimental::executors::EventsExecutor()
// exec.add(node);
// exec.spin();
- rclcpp::spin(node);
+ rclcpp::executors::MultiThreadedExecutor exec{rclcpp::ExecutorOptions(), 8};
+ exec.add_node(node->get_node_base_interface());
+ exec.spin();
rclcpp::shutdown();
return 0;
}
\ No newline at end of file
diff --git a/icey/benchmark/src/tf_lookup_async_ref.cpp b/icey/benchmark/src/tf_lookup_async_ref.cpp
index bca26890..cf6c085a 100644
--- a/icey/benchmark/src/tf_lookup_async_ref.cpp
+++ b/icey/benchmark/src/tf_lookup_async_ref.cpp
@@ -35,5 +35,7 @@ int main(int argc, char **argv) {
}
});
- rclcpp::spin(node);
+ rclcpp::executors::MultiThreadedExecutor exec{rclcpp::ExecutorOptions(), 8};
+ exec.add_node(node->get_node_base_interface());
+ exec.spin();
}
\ No newline at end of file
diff --git a/icey/doc/source/api_ros.md b/icey/doc/source/api_ros.md
index 8dc8edbd..abff19ae 100644
--- a/icey/doc/source/api_ros.md
+++ b/icey/doc/source/api_ros.md
@@ -21,9 +21,6 @@ Convenience node wrappers containing the Icey context
```{doxygenclass} icey::NodeWithIceyContext
```
-```{doxygenstruct} icey::TransformBufferImpl
-```
-
```{doxygenstruct} icey::NodeBase
```
diff --git a/icey/doc/source/development.md b/icey/doc/source/development.md
index 9b4e2445..67aaa3e1 100644
--- a/icey/doc/source/development.md
+++ b/icey/doc/source/development.md
@@ -50,6 +50,32 @@ Then, do not use FastDDS since it uses exceptions as part of it's regular (inste
RMW_IMPLEMENTATION=rmw_cyclonedds_cpp gdb ./build/icey/test_main
```
+
+## TSAN (ThreadSanitizer)
+
+TSAN is used to detect data races with multi-threaded executors.
+Build both `icey` and `icey_examples` with TSAN instrumentation:
+
+```sh
+cd
+source /opt/ros/jazzy/setup.bash
+MAKEFLAGS="-j6" colcon build --packages-select icey icey_examples --cmake-args -DCMAKE_BUILD_TYPE=RelWithDebInfo -DICEY_ENABLE_TSAN=ON -DICEY_ASYNC_AWAIT_THREAD_SAFE=1
+```
+
+Run an example binary under TSAN:
+
+```sh
+TSAN_OPTIONS="halt_on_error=1:detect_deadlocks=1:suppressions=/home/ivo/colcon_ws/src/icey/icey/test/tsan_dds.supp" setarch x86_64 -R /home/ivo/colcon_ws/install/icey_examples/lib/icey_examples/service_client_async_await_example
+```
+
+Why these workarounds are needed:
+
+- `setarch x86_64 -R`: disables ASLR for the process. Without this, TSAN can fail early with `unexpected memory mapping` on Ubuntu 24 (https://bugs.launchpad.net/ubuntu/+source/linux/+bug/2056762, https://github.com/google/sanitizers/issues/1716#issuecomment-2010399341)
+- `suppressions=.../tsan_dds.supp`: filters for known DDS races and deadlocks that only cause noise.
+- `halt_on_error=1`: fail fast
+- `detect_deadlocks=0`: There is lock inversion issue in the rclcpp_actions currently
+
+By setting `-DICEY_ASYNC_AWAIT_THREAD_SAFE=0` during build, you can verify that indeed TSAN is able to detect the data races.
## Run clang-tidy:
diff --git a/icey/doc/source/first_icey_node.md b/icey/doc/source/first_icey_node.md
index 6182bb8b..ed1ad097 100644
--- a/icey/doc/source/first_icey_node.md
+++ b/icey/doc/source/first_icey_node.md
@@ -67,9 +67,4 @@ ICEY represents ROS primitives such as timers as a `Stream`, an abstraction over
We also do not need to store the timer object anywhere, because the lifetime of entities in ICEY is bound to the lifetime of the node. In ICEY, you do not need to store subscriptions/timers/services as members of the class, ICEY does this bookkeeping for you.
-
-```{warning}
-ICEY-nodes can currently only be used with a single-threaded executor.
-```
-
In the following, we will look more closely into how Subscriptions and Timers follow the `Stream` concept and how this changes the way of asynchronous programming.
diff --git a/icey/include/icey/action/client_goal_handle.hpp b/icey/include/icey/action/client_goal_handle.hpp
index c50b2ca9..1c89e0b1 100644
--- a/icey/include/icey/action/client_goal_handle.hpp
+++ b/icey/include/icey/action/client_goal_handle.hpp
@@ -23,22 +23,11 @@
#include "rcl_action/action_client.h"
#include "rclcpp/macros.hpp"
#include "rclcpp/time.hpp"
+#include "rclcpp_action/client_goal_handle.hpp"
#include "rclcpp_action/exceptions.hpp"
#include "rclcpp_action/types.hpp"
#include "rclcpp_action/visibility_control.hpp"
-// I'm defining this in namespace rclcpp_action so that people don't have to change their code unnecessarily.
-// I could also just include the rclcpp_action header, but just defining the enum here is faster to compile.
-namespace rclcpp_action {
-/// The possible statuses that an action goal can finish with.
-enum class ResultCode : int8_t {
- UNKNOWN = action_msgs::msg::GoalStatus::STATUS_UNKNOWN,
- SUCCEEDED = action_msgs::msg::GoalStatus::STATUS_SUCCEEDED,
- CANCELED = action_msgs::msg::GoalStatus::STATUS_CANCELED,
- ABORTED = action_msgs::msg::GoalStatus::STATUS_ABORTED
-};
-} // namespace rclcpp_action
-
namespace icey::rclcpp_action {
using GoalUUID = std::array;
@@ -66,14 +55,7 @@ class ClientGoalHandle {
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(ClientGoalHandle)
// A wrapper that defines the result of an action
- struct WrappedResult {
- /// The unique identifier of the goal
- GoalUUID goal_id;
- /// A status to indicate if the goal was canceled, aborted, or succeeded
- ResultCode code;
- /// User defined fields sent back with an action
- typename ActionT::Result::SharedPtr result;
- };
+ using WrappedResult = typename ::rclcpp_action::ClientGoalHandle::WrappedResult;
using Feedback = typename ActionT::Feedback;
using Result = typename ActionT::Result;
diff --git a/icey/include/icey/icey_async_await.hpp b/icey/include/icey/icey_async_await.hpp
index 2fb7d1ed..93915b0a 100644
--- a/icey/include/icey/icey_async_await.hpp
+++ b/icey/include/icey/icey_async_await.hpp
@@ -9,34 +9,40 @@
/// can include this header only and get faster compile times.
#pragma once
+#include
#include
#include
#include
+#include
#include
#include /// for ID
#include
+#include
+#include
#include "rclcpp/rclcpp.hpp"
#include "rclcpp/version.h"
#include "rclcpp_lifecycle/lifecycle_node.hpp"
#include "tf2_msgs/msg/tf_message.hpp"
#include "tf2_ros/buffer.hpp"
-#include "tf2_ros/create_timer_ros.hpp"
-#include "tf2_ros/qos.hpp"
namespace icey {
+#ifndef ICEY_ASYNC_AWAIT_THREAD_SAFE
+#define ICEY_ASYNC_AWAIT_THREAD_SAFE 1
+#endif
+
using Clock = std::chrono::system_clock;
using Time = std::chrono::time_point;
using Duration = Clock::duration;
-static rclcpp::Time rclcpp_from_chrono(const Time &time_point) {
+[[maybe_unused]] static rclcpp::Time rclcpp_from_chrono(const Time &time_point) {
return rclcpp::Time(std::chrono::time_point_cast(time_point)
.time_since_epoch()
.count());
}
-static Time rclcpp_to_chrono(const rclcpp::Time &time_point) {
+[[maybe_unused]] static Time rclcpp_to_chrono(const rclcpp::Time &time_point) {
return Time(std::chrono::nanoseconds(time_point.nanoseconds()));
}
@@ -113,18 +119,25 @@ struct NodeBase {
template
void add_task_for(uint64_t id, const Duration &timeout, CallbackT on_timeout,
rclcpp::CallbackGroup::SharedPtr group = nullptr) {
+#if ICEY_ASYNC_AWAIT_THREAD_SAFE
+ std::lock_guard lock{oneoff_tasks_mutex_};
+#endif
oneoff_cancelled_timers_.clear();
oneoff_active_timers_.emplace(id, create_wall_timer(
timeout,
[this, id, on_timeout]() {
- cancel_task_for(id);
- on_timeout();
+ if (cancel_task_for(id)) {
+ on_timeout();
+ }
},
group));
}
/// Cancel a previously scheduled task by key (no-op if not present)
bool cancel_task_for(uint64_t id) {
+#if ICEY_ASYNC_AWAIT_THREAD_SAFE
+ std::lock_guard lock{oneoff_tasks_mutex_};
+#endif
auto it = oneoff_active_timers_.find(id);
if (it == oneoff_active_timers_.end()) return false;
auto timer = it->second;
@@ -201,304 +214,42 @@ struct NodeBase {
std::unordered_set> oneoff_cancelled_timers_;
/// Active one-off tasks keyed by a stable uintptr_t key
std::unordered_map> oneoff_active_timers_;
+#if ICEY_ASYNC_AWAIT_THREAD_SAFE
+ mutable std::mutex oneoff_tasks_mutex_;
+#endif
public:
// Test helpers (introspection)
- std::size_t oneoff_active_task_count() const { return oneoff_active_timers_.size(); }
- std::size_t oneoff_cancelled_task_count() const { return oneoff_cancelled_timers_.size(); }
-};
-
-/// A subscription + buffer for transforms that allows for asynchronous lookups and to subscribe on
-/// a single transform between two coordinate systems. It is otherwise implemented similarly to the
-/// tf2_ros::TransformListener but offering a well-developed asynchronous API.
-/// It subscribes on the topic /tf and listens for relevant transforms (i.e. ones we subscribed to
-/// or the ones we requested a lookup). If any relevant transform was received, it notifies via a
-/// callback.
-///
-/// Why not using `tf2_ros::AsyncBuffer` ? Due to multiple issues with it: (1) It uses
-/// the `std::future/std::promise` primitives that are effectively useless. (2)
-/// tf2_ros::AsyncBuffer::waitForTransform has a bug: We cannot make another lookup in the callback
-/// of a lookup (it holds a (non-reentrant) mutex locked while calling the user callback), making it
-/// impossible to chain asynchronous operations.
-///
-/// @sa This class is used to implement the `TransformSubscriptionStream` and the `TransformBuffer`.
-struct TransformBufferImpl {
- using TransformMsg = geometry_msgs::msg::TransformStamped;
- using TransformsMsg = tf2_msgs::msg::TFMessage::ConstSharedPtr;
- using OnTransform = std::function;
- using OnError = std::function;
- using GetFrame = std::function;
-
- /// We make this class non-copyable since it captures the this-pointer in clojures.
- TransformBufferImpl(const TransformBufferImpl &) = delete;
- TransformBufferImpl(TransformBufferImpl &&) = delete;
- TransformBufferImpl &operator=(const TransformBufferImpl &) = delete;
- TransformBufferImpl &operator=(TransformBufferImpl &&) = delete;
-
- /// A transform request stores a request for looking up a transform between two coordinate
- /// systems. Either (1) at a particular time with a timeout, or (2) as a subscription. When
- /// "subscribing" to a transform, we yield a transform each time it changes.
- struct TransformRequest {
- TransformRequest(const GetFrame &target_frame, const GetFrame &source_frame,
- OnTransform on_transform, OnError on_error,
- std::optional _maybe_time = {})
- : target_frame(target_frame),
- source_frame(source_frame),
- on_transform(on_transform),
- on_error(on_error),
- maybe_time(_maybe_time) {}
- GetFrame target_frame;
- GetFrame source_frame;
- std::optional last_received_transform;
- OnTransform on_transform;
- OnError on_error;
- std::optional maybe_time;
- };
-
- /// A request for a transform lookup: It represents effectively a single call to the async_lookup
- /// function. Even if you call async_lookup with the exact same arguments (same frames and time),
- /// this will count as two separate requests.
- using RequestHandle = std::shared_ptr;
-
- explicit TransformBufferImpl(std::weak_ptr node) : node_(node) { init(); }
-
- /// @brief Subscribe to a single transform between two coordinate systems. Every time this
- /// transform changes, the `on_transform` callback function is called. More precisely, every time
- /// a message arrives on the `/tf` or `/tf_static` topic, a lookup is attempted. If the lookup
- /// fails, `on_error` is called. If the lookup succeeds and if the looked up transform is
- // different to the previously emitted one, the `on_transform` callback is called.
- void add_subscription(const GetFrame &target_frame, const GetFrame &source_frame,
- const OnTransform &on_transform, const OnError &on_error) {
- std::lock_guard lock{mutex_};
- requests_.emplace(
- std::make_shared(target_frame, source_frame, on_transform, on_error));
- }
-
- /// @brief Queries the TF buffer for a transform at the given time between the given frames. It
- /// does not wait but instead only returns something if the transform is already in the buffer.
- ///
- /// @param target_frame
- /// @param source_frame
- /// @param time
- /// @return The transform or the TF error if the transform is not in the buffer.
- Result get_from_buffer(
- std::string target_frame, std::string source_frame, const Time &time) const {
- try {
- const tf2::TimePoint legacy_timepoint = tf2_ros::fromRclcpp(rclcpp_from_chrono(time));
- // Note that this call does not wait, the transform must already have arrived.
- auto tf = buffer_->lookupTransform(target_frame, source_frame, legacy_timepoint);
- return Ok(tf);
- } catch (const tf2::TransformException &e) {
- return Err(std::string(e.what()));
- }
- }
-
- /// @brief Makes an asynchronous lookup for a for a transform at the given time between the given
- /// frames. Callback-based API.
- ///
- /// @param target_frame
- /// @param source_frame
- /// @param time
- /// @param timeout
- /// @param on_transform The callback that is called with the requested transform after it becomes
- /// available
- /// @param on_error The callback that is called if a timeout occurs.
- /// @return The "handle", identifying this request. You can use this handle to call
- /// `cancel_request` if you want to cancel the request.
- /// @warning Currently the timeout is measured with wall-time, i.e. does not consider sim-time due
- /// to the limitation of ROS 2 Humble only offering wall-timers
- RequestHandle lookup(const std::string &target_frame, const std::string &source_frame, Time time,
- const Duration &timeout, OnTransform on_transform, OnError on_error) {
- auto request{std::make_shared([target_frame]() { return target_frame; },
- [source_frame]() { return source_frame; },
- on_transform, on_error, time)};
-
- auto weak_request = std::weak_ptr(request);
- requests_.emplace(request);
- node_.lock()->add_task_for(
- uint64_t(request.get()), timeout, [this, on_error, request = weak_request]() {
- if (auto req = request.lock()) {
- requests_.erase(req); // Destroy the request
- on_error(tf2::TimeoutException{"Timed out waiting for transform"});
- }
- });
- return request;
- }
-
- /// @brief Does an asynchronous lookup for a single transform that can be awaited using `co_await`
- ///
- /// @param target_frame
- /// @param source_frame
- /// @param time At which time to get the transform
- /// @param timeout How long to wait for the transform
- /// @return A future that resolves with the transform or with an error if a timeout occurs
- impl::Promise lookup(
- const std::string &target_frame, const std::string &source_frame, const Time &time,
- const Duration &timeout) {
- return impl::Promise(
- [this, target_frame, source_frame, time, timeout](auto &promise) {
- auto request_handle = this->lookup(
- target_frame, source_frame, time, timeout,
- [&promise](const geometry_msgs::msg::TransformStamped &tf) { promise.resolve(tf); },
- [&promise](const tf2::TransformException &ex) { promise.reject(ex.what()); });
- });
- }
-
- /// @brief Same as `lookup`, but accepts a ROS time point
- ///
- /// @param target_frame
- /// @param source_frame
- /// @param time At which time to get the transform
- /// @param timeout How long to wait for the transform
- /// @return A future that resolves with the transform or with an error if a timeout occurs
- impl::Promise lookup(
- const std::string &target_frame, const std::string &source_frame, const rclcpp::Time &time,
- const Duration &timeout) {
- return lookup(target_frame, source_frame, icey::rclcpp_to_chrono(time), timeout);
- }
-
- /// Cancel a transform request: This means that the registered callbacks will no longer be called.
- /// If the given request does not exist, this function does nothing.
- bool cancel_request(RequestHandle request) {
- std::lock_guard lock{mutex_};
- requests_.erase(request);
- return node_.lock()->cancel_task_for(uint64_t(request.get()));
- }
-
- /// We take a tf2_ros::Buffer instead of a tf2::BufferImpl only to be able to use ROS-time API
- /// (internally TF2 has it's own timestamps...), not because we need to wait on anything (that's
- /// what tf2_ros::Buffer does in addition to tf2::BufferImpl).
- std::shared_ptr buffer_;
-
-protected:
- void init() {
- init_tf_buffer();
- message_subscription_tf_ = node_.lock()->create_subscription(
- "/tf", [this](TransformsMsg msg) { on_tf_message(msg, false); },
- tf2_ros::DynamicListenerQoS());
- message_subscription_tf_static_ = node_.lock()->create_subscription(
- "/tf_static", [this](TransformsMsg msg) { on_tf_message(msg, true); },
- tf2_ros::StaticListenerQoS());
- }
-
- void init_tf_buffer() {
- /// Mind that the official example regarding the piece of code below is incomplete:
- /// ([official](https://docs.ros.org/en/jazzy/Tutorials/Intermediate/Tf2/Writing-A-Tf2-Listener-Cpp.html))
- /// . I'm following this example instead:
- /// https://github.com/ros-perception/imu_pipeline/blob/ros2/imu_transformer/src/imu_transformer.cpp#L16
- /// See also the following discussions on why this code needs to be this way:
- /// https://answers.ros.org/question/372608/?sort=votes
- /// https://github.com/ros-navigation/navigation2/issues/1182
- /// https://github.com/ros2/geometry2/issues/446
- buffer_ =
- std::make_shared(node_.lock()->get_node_clock_interface()->get_clock());
- auto timer_interface = std::make_shared(
- node_.lock()->get_node_base_interface(), node_.lock()->get_node_timers_interface());
- buffer_->setCreateTimerInterface(timer_interface);
- }
-
- /// Store the received transforms in the buffer.
- void store_in_buffer(const tf2_msgs::msg::TFMessage &msg_in, bool is_static) {
- std::string authority = "Authority undetectable";
- for (const auto &transform : msg_in.transforms) {
- try {
- buffer_->setTransform(transform, authority, is_static);
- } catch (const tf2::TransformException &ex) {
- std::string temp = ex.what();
- RCLCPP_ERROR(node_.lock()->get_node_logging_interface()->get_logger(),
- "Failure to set received transform from %s to %s with error: %s\n",
- transform.child_frame_id.c_str(), transform.header.frame_id.c_str(),
- temp.c_str());
- }
- }
- }
-
- /// This simply looks up the transform in the buffer at the latest stamp and checks if it
- /// changed with respect to the previously received one. If the transform has changed, we know
- /// we have to notify. Returns true when it notified about a changed transform (called
- /// on_transform) and false otherwise.
- bool maybe_notify(TransformRequest &info) {
- try {
- /// Note that this does not wait/thread-sleep etc. This is simply a lookup in a
- /// std::vector/tree.
- geometry_msgs::msg::TransformStamped tf_msg =
- buffer_->lookupTransform(info.target_frame(), info.source_frame(), tf2::TimePointZero);
- if (!info.last_received_transform || tf_msg != *info.last_received_transform) {
- info.last_received_transform = tf_msg;
- info.on_transform(tf_msg);
- return true;
- }
- } catch (const tf2::TransformException &e) {
- info.on_error(e);
- }
- return false;
- }
-
- bool maybe_notify_specific_time(RequestHandle request) {
- try {
- /// Note that this does not wait/thread-sleep etc. This is simply a lookup in a
- /// std::vector/tree.
- geometry_msgs::msg::TransformStamped tf_msg = buffer_->lookupTransform(
- request->target_frame(), request->source_frame(), request->maybe_time.value());
- request->on_transform(tf_msg);
- /// If the request is destroyed gracefully (lookup succeeded), cancel the associated task
- /// timer
- node_.lock()->cancel_task_for(uint64_t(request.get()));
- return true;
- } catch (
- const tf2::TransformException &e) { /// TODO we could catch for extrapolation here as well
- /// For any other error, we continue waiting
- }
- return false;
- }
-
- void notify_if_any_relevant_transform_was_received() {
- /// Iterate all requests, notify and maybe erase them if they are requests for a specific time
- std::vector requests_to_delete;
- mutex_.lock();
- auto requests = requests_;
- mutex_.unlock();
-
- for (auto req : requests) {
- if (req->maybe_time) {
- if (maybe_notify_specific_time(req)) {
- requests_to_delete.push_back(req);
- }
- } else {
- // If it is a regular subscription, is is persistend and never erased
- maybe_notify(*req);
- }
- }
- mutex_.lock();
- for (auto k : requests_to_delete) requests_.erase(k);
- mutex_.unlock();
+ std::size_t oneoff_active_task_count() const {
+#if ICEY_ASYNC_AWAIT_THREAD_SAFE
+ std::lock_guard lock{oneoff_tasks_mutex_};
+#endif
+ return oneoff_active_timers_.size();
}
-
- void on_tf_message(const TransformsMsg &msg, bool is_static) {
- store_in_buffer(*msg, is_static);
- notify_if_any_relevant_transform_was_received();
+ std::size_t oneoff_cancelled_task_count() const {
+#if ICEY_ASYNC_AWAIT_THREAD_SAFE
+ std::lock_guard lock{oneoff_tasks_mutex_};
+#endif
+ return oneoff_cancelled_timers_.size();
}
-
- std::weak_ptr node_; /// Hold weak reference to the NodeBase because the Context owns
- /// the NodeBase as well, so we avoid circular reference
-
- rclcpp::Subscription::SharedPtr message_subscription_tf_;
- rclcpp::Subscription::SharedPtr message_subscription_tf_static_;
-
- std::recursive_mutex mutex_;
- std::unordered_set requests_;
};
-/// A TransformBuffer offers a modern, asynchronous interface for looking up transforms at a given
+struct TransformBufferImpl;
+/// A TransformBuffer offers an async/await interface for looking up transforms at a given
/// point in time. This class is lightweight pointer to underlying implementation and can therefore
/// be copied and passed around by value (PIMPL idiom)
struct TransformBuffer {
- using RequestHandle = TransformBufferImpl::RequestHandle;
+ using RequestHandle = uint64_t;
using OnTransform = std::function;
using OnError = std::function;
+ using GetFrame = std::function;
- TransformBuffer() = default;
- TransformBuffer(std::shared_ptr impl) : impl_(impl) {}
+ /// Initializes the transform buffer by starting the subscriptions on TF. Returns *this;
+ /// Does nothing if already initialized.
+ TransformBuffer &init(std::weak_ptr node_base);
+
+ /// Return the underlying tf2_ros::Buffer.
+ std::shared_ptr get_buffer() const;
/// @brief Makes an asynchronous lookup for a for a transform at the given time between the given
/// frames. Callback-based API.
@@ -510,21 +261,25 @@ struct TransformBuffer {
/// @param on_transform The callback that is called with the requested transform after it becomes
/// available
/// @param on_error The callback that is called if a timeout occurs.
- /// @return The "handle", identifying this request. You can use this handle to call
+ /// @return The request id, identifying this request. You can use this handle to call
/// `cancel_request` if you want to cancel the request.
/// @warning Currently the timeout is measured with wall-time, i.e. does not consider sim-time due
/// to the limitation of ROS 2 Humble only offering wall-timers
- RequestHandle lookup(const std::string &target_frame, const std::string &source_frame, Time time,
- const Duration &timeout, OnTransform on_transform, OnError on_error) {
- return impl_.lock()->lookup(target_frame, source_frame, time, timeout, on_transform, on_error);
- }
+ uint64_t lookup(const std::string &target_frame, const std::string &source_frame, Time time,
+ const Duration &timeout, OnTransform on_transform, OnError on_error);
+
+ /// @brief Subscribe to a single transform between two coordinate systems. Every time this
+ /// transform changes, the `on_transform` callback function is called. More precisely, every time
+ /// a message arrives on the `/tf` or `/tf_static` topic, a lookup is attempted. If the lookup
+ /// fails, `on_error` is called. If the lookup succeeds and if the looked up transform is
+ // different to the previously emitted one, the `on_transform` callback is called.
+ void add_subscription(const GetFrame &target_frame, const GetFrame &source_frame,
+ const OnTransform &on_transform, const OnError &on_error);
/// Cancel a transform request: This means that the registered callbacks will no longer be called.
/// If the given request does not exist, this function does nothing, in this case it returns
/// false. If a request was cleaned up, this function returns true.
- bool cancel_request(RequestHandle request_handle) {
- return impl_.lock()->cancel_request(request_handle);
- }
+ bool cancel_request(uint64_t request_id);
/// @brief Does an asynchronous lookup for a single transform that can be awaited using `co_await`
///
@@ -535,9 +290,7 @@ struct TransformBuffer {
/// @return A future that resolves with the transform or with an error if a timeout occurs
impl::Promise lookup(
const std::string &target_frame, const std::string &source_frame, const Time &time,
- const Duration &timeout) {
- return impl_.lock()->lookup(target_frame, source_frame, time, timeout);
- }
+ const Duration &timeout);
/// @brief Same as `lookup`, but accepts a ROS time point
///
@@ -548,12 +301,10 @@ struct TransformBuffer {
/// @return A future that resolves with the transform or with an error if a timeout occurs
impl::Promise lookup(
const std::string &target_frame, const std::string &source_frame, const rclcpp::Time &time,
- const Duration &timeout) {
- return impl_.lock()->lookup(target_frame, source_frame, time, timeout);
- }
+ const Duration &timeout);
private:
- std::weak_ptr impl_;
+ std::shared_ptr impl_;
};
template
@@ -570,19 +321,21 @@ struct ServiceClientImpl {
ServiceClientImpl &operator=(ServiceClientImpl &&) = delete;
ServiceClientImpl(std::weak_ptr node, const std::string &service_name,
- const rclcpp::QoS &qos = rclcpp::ServicesQoS())
- : node_(node), client(node.lock()->create_client(service_name, qos)) {}
+ const rclcpp::QoS &qos = rclcpp::ServicesQoS(),
+ rclcpp::CallbackGroup::SharedPtr group = nullptr)
+ : node_(node), client(node.lock()->create_client(service_name, qos, group)) {}
impl::Promise call(Request request, const Duration &timeout) {
return impl::Promise([this, request, timeout](auto &promise) {
auto future_and_req_id = client->async_send_request(
request, [this, &promise](typename Client::SharedFuture result) {
/// Cancel the timeout task since we got a response
- node_.lock()->cancel_task_for(uint64_t(&promise));
- if (!result.valid()) {
- promise.reject(rclcpp::ok() ? "TIMEOUT" : "INTERRUPTED");
- } else {
- promise.resolve(result.get());
+ if (node_.lock()->cancel_task_for(uint64_t(&promise))) {
+ if (!result.valid()) {
+ promise.reject(rclcpp::ok() ? "TIMEOUT" : "INTERRUPTED");
+ } else {
+ promise.resolve(result.get());
+ }
}
});
auto request_id = future_and_req_id.request_id;
@@ -620,8 +373,9 @@ struct ServiceClient {
/// Constructs the service client. A node has to be provided because it is needed to create
/// timeout timers for every service call.
ServiceClient(std::weak_ptr node, const std::string &service_name,
- const rclcpp::QoS &qos = rclcpp::ServicesQoS())
- : impl_(std::make_shared>(node, service_name, qos)) {}
+ const rclcpp::QoS &qos = rclcpp::ServicesQoS(),
+ rclcpp::CallbackGroup::SharedPtr group = nullptr)
+ : impl_(std::make_shared>(node, service_name, qos, group)) {}
// clang-format off
/*! Make an asynchronous call to the service. Returns a impl::Promise that can be awaited using `co_await`.
@@ -686,10 +440,11 @@ struct AsyncGoalHandle {
try {
auto request_id =
client_.lock()->async_get_result(goal_handle_, [this, &promise](const Result &result) {
- node_.lock()->cancel_task_for(uint64_t(&promise));
- promise.resolve(result);
+ if (node_.lock()->cancel_task_for(uint64_t(&promise))) {
+ promise.resolve(result);
+ }
});
- if (!request_id) {
+ if (!request_id) { /// This can only happen in case the rcl/rmw function fails
promise.reject("FAILED");
} else {
node_.lock()->add_task_for(uint64_t(&promise), timeout,
@@ -706,15 +461,14 @@ struct AsyncGoalHandle {
/// Cancel this goal. Warning: Not co_awaiting this promise leads to use-after free !
// The promise is rejected in case the goal was already reached.
- /// TODO check in gdb whether it is a problem that this promise might resolve synchronously, i.e.
- /// do we get a stack overflow ?
impl::Promise cancel(const Duration &timeout) const {
return impl::Promise([this, timeout](auto &promise) {
try {
auto request_id = client_.lock()->async_cancel_goal(
goal_handle_, [this, &promise](CancelResponse cancel_response) {
- node_.lock()->cancel_task_for(uint64_t(&promise));
- promise.resolve(cancel_response);
+ if (node_.lock()->cancel_task_for(uint64_t(&promise))) {
+ promise.resolve(cancel_response);
+ }
});
/// Add timeout task
node_.lock()->add_task_for(uint64_t(&promise), timeout, [this, &promise, request_id] {
@@ -752,11 +506,21 @@ struct ActionClient {
using RequestID = int64_t;
using AsyncGoalHandleT = AsyncGoalHandle;
- ActionClient(std::weak_ptr node, const std::string &action_name) : node_(node) {
+ /// Construct an async action client wrapper.
+ /// @param node Node base used to create the underlying rclcpp_action client.
+ /// @param action_name Name of the action topic (e.g. "/fibonacci").
+ /// @param group Optional callback group for the action client waitable. If nullptr, the node's
+ /// default callback group is used.
+ /// @param options rcl_action client options. Defaults to
+ /// rcl_action_client_get_default_options().
+ ActionClient(std::weak_ptr node, const std::string &action_name,
+ rclcpp::CallbackGroup::SharedPtr group = nullptr,
+ const rcl_action_client_options_t &options = rcl_action_client_get_default_options())
+ : node_(node) {
client_ = icey::rclcpp_action::create_client(
node.lock()->get_node_base_interface(), node.lock()->get_node_graph_interface(),
node.lock()->get_node_logging_interface(), node.lock()->get_node_waitables_interface(),
- action_name);
+ action_name, group, options);
}
/// Send asynchronously an action goal: if it is accepted, then you get a goal handle.
@@ -767,11 +531,12 @@ struct ActionClient {
[this, goal, timeout, feedback_callback](auto &promise) {
typename Client::SendGoalOptions options;
options.goal_response_callback = [this, &promise](auto goal_handle) {
- node_.lock()->cancel_task_for(uint64_t(&promise));
- if (goal_handle == nullptr) {
- promise.reject("GOAL REJECTED"); /// TODO error type
- } else {
- promise.resolve(AsyncGoalHandle(node_, client_, goal_handle));
+ if (node_.lock()->cancel_task_for(uint64_t(&promise))) {
+ if (goal_handle == nullptr) {
+ promise.reject("GOAL REJECTED"); /// TODO error type
+ } else {
+ promise.resolve(AsyncGoalHandle(node_, client_, goal_handle));
+ }
}
};
/// HINT: Wrap inside a lambda to support feedback_callback being a coroutine
@@ -814,8 +579,16 @@ class ContextAsyncAwait {
const std::string &topic_name, Callback &&callback,
const rclcpp::QoS &qos = rclcpp::SystemDefaultsQoS(),
const rclcpp::SubscriptionOptions &options = rclcpp::SubscriptionOptions()) {
+ auto callback_wrapper = [callback = std::forward(callback)](auto... args) {
+ using ReturnType = decltype(callback(args...));
+ if constexpr (!impl::HasPromiseType) {
+ callback(args...);
+ } else {
+ callback(args...).detach();
+ }
+ };
auto subscription = node_base()->create_subscription(
- topic_name, [callback](typename MessageT::SharedPtr msg) { callback(msg); }, qos, options);
+ topic_name, std::move(callback_wrapper), qos, options);
std::lock_guard lock{bookkeeping_mutex_};
subscriptions_.push_back(std::dynamic_pointer_cast(subscription));
return subscription;
@@ -829,8 +602,22 @@ class ContextAsyncAwait {
/// \note A callback signature that accepts a TimerInfo argument is not implemented yet
/// Works otherwise the same as [rclcpp::Node::create_timer].
template
- std::shared_ptr create_timer_async(const Duration &period, Callback callback) {
- auto timer = node_base()->create_wall_timer(period, [callback]() { callback(std::size_t{}); });
+ std::shared_ptr create_timer_async(
+ const Duration &period, Callback &&callback,
+ rclcpp::CallbackGroup::SharedPtr group = nullptr) {
+ auto timer = node_base()->create_wall_timer(
+ period,
+ [callback = std::forward(callback)]() {
+ static_assert(std::is_invocable_v &>,
+ "create_timer_async callback must be invocable as ()");
+ using ReturnType = decltype(callback());
+ if constexpr (!impl::HasPromiseType) {
+ callback();
+ } else {
+ callback().detach();
+ }
+ },
+ group);
std::lock_guard lock{bookkeeping_mutex_};
timers_.push_back(std::dynamic_pointer_cast(timer));
return timer;
@@ -849,8 +636,9 @@ class ContextAsyncAwait {
/// icey::Promise>
template
std::shared_ptr> create_service(
- const std::string &service_name, Callback callback,
- const rclcpp::QoS &qos = rclcpp::ServicesQoS()) {
+ const std::string &service_name, Callback &&callback,
+ const rclcpp::QoS &qos = rclcpp::ServicesQoS(),
+ rclcpp::CallbackGroup::SharedPtr group = nullptr) {
using Request = std::shared_ptr;
using RequestID = std::shared_ptr;
/// The type of the user callback that can response synchronously (i.e. immediately): It
@@ -862,8 +650,9 @@ class ContextAsyncAwait {
/// [rcl_send_response](http://docs.ros.org/en/jazzy/p/rcl/generated/function_service_8h_1a8631f47c48757228b813d0849d399d81.html#_CPPv417rcl_send_responsePK13rcl_service_tP16rmw_request_id_tPv)
auto service = node_base()->create_service(
service_name,
- [callback](std::shared_ptr> server, RequestID request_id,
- Request request) {
+ [callback = std::forward(callback)](
+ std::shared_ptr> server, RequestID request_id,
+ Request request) {
using ReturnType = decltype(callback(request));
if constexpr (!impl::HasPromiseType) {
auto response = callback(request);
@@ -880,7 +669,7 @@ class ContextAsyncAwait {
continuation(server, callback, request_id, request).detach();
}
},
- qos);
+ qos, group);
std::lock_guard lock{bookkeeping_mutex_};
services_.push_back(std::dynamic_pointer_cast(service));
return service;
@@ -890,8 +679,9 @@ class ContextAsyncAwait {
/// Works otherwise the same as [rclcpp::Node::create_client].
template
ServiceClient create_client(const std::string &service_name,
- const rclcpp::QoS &qos = rclcpp::ServicesQoS()) {
- return ServiceClient(node_base(), service_name, qos);
+ const rclcpp::QoS &qos = rclcpp::ServicesQoS(),
+ rclcpp::CallbackGroup::SharedPtr group = nullptr) {
+ return ServiceClient(node_base(), service_name, qos, group);
}
/// Create an action server with a synchronous or asynchronous callbacks.
@@ -955,36 +745,31 @@ class ContextAsyncAwait {
return server;
}
- /// Create an action client that supports async/await send_goal
+ /// Create an action client that supports async/await send_goal.
+ /// @param action_name Name of the action topic (e.g. "/fibonacci").
+ /// @param group Optional callback group for the action client waitable. If nullptr, the node's
+ /// default callback group is used.
+ /// @param options rcl_action client options. Defaults to
+ /// rcl_action_client_get_default_options().
template
- ActionClient create_action_client(const std::string &action_name) {
- return ActionClient(node_base(), action_name);
+ ActionClient create_action_client(
+ const std::string &action_name, rclcpp::CallbackGroup::SharedPtr group = nullptr,
+ const rcl_action_client_options_t &options = rcl_action_client_get_default_options()) {
+ return ActionClient(node_base(), action_name, group, options);
}
/// Creates a transform buffer that works like the usual combination of a tf2_ros::Buffer and a
/// tf2_ros::TransformListener. It is used to `lookup()` transforms asynchronously at a specific
/// point in time.
- TransformBuffer create_transform_buffer() { return TransformBuffer{add_tf_listener_if_needed()}; }
+ TransformBuffer create_transform_buffer() { return tf_buffer_.init(this->node_base()); }
/// Get the NodeBase, i.e. the ROS node using which this Context was created.
std::shared_ptr node_base() { return node_base_; }
- std::shared_ptr add_tf_listener_if_needed() {
- std::call_once(tf_buffer_init_flag_, [this]() {
- /// We need only one subscription on /tf, but we can have multiple transforms on which we
- /// listen to
- tf_buffer_impl_ = std::make_shared(this->node_base());
- });
- return tf_buffer_impl_;
- }
-
- /// The TF async interface impl
- std::shared_ptr tf_buffer_impl_;
- /// We need bookkeeping for the service servers.
protected:
+ /// The TF async interface impl
+ TransformBuffer tf_buffer_;
std::shared_ptr node_base_;
- std::once_flag
- tf_buffer_init_flag_; /// Needed for atomic, i.e. thread-safe initialization of tf buffer.
std::recursive_mutex bookkeeping_mutex_;
std::vector> timers_;
std::vector> services_;
diff --git a/icey/include/icey/icey_rx.hpp b/icey/include/icey/icey_rx.hpp
index 959b6249..8dc3a2a6 100644
--- a/icey/include/icey/icey_rx.hpp
+++ b/icey/include/icey/icey_rx.hpp
@@ -274,8 +274,8 @@ class Context : public ContextAsyncAwait,
template
void add_tf_subscription(GetValue target_frame, GetValue source_frame,
OnTransform &&on_transform, OnError &&on_error) {
- this->add_tf_listener_if_needed();
- tf_buffer_impl_->add_subscription(target_frame, source_frame, on_transform, on_error);
+ this->create_transform_buffer().add_subscription(target_frame, source_frame, on_transform,
+ on_error);
}
auto add_tf_broadcaster_if_needed() {
@@ -1316,7 +1316,7 @@ template
struct TransformSynchronizerImpl {
using Self = TransformSynchronizerImpl;
/// The book owns it.
- Weak tf_listener;
+ std::weak_ptr tf_buffer;
std::string target_frame;
std::shared_ptr> input_filter;
std::shared_ptr> synchronizer;
@@ -1327,13 +1327,14 @@ struct TransformSynchronizerImpl {
void create_synchronizer(Context &context, const std::string &_target_frame,
const Duration &lookup_timeout) {
- tf_listener = context.add_tf_listener_if_needed();
+ auto buf = context.create_transform_buffer();
+ tf_buffer = buf.get_buffer();
target_frame = _target_frame;
input_filter = std::make_shared>(context);
/// The argument "0" means here infinite message queue size. We set it so
/// because we must buffer every message for as long as we are waiting for a transform.
synchronizer = std::make_shared>(
- *input_filter->impl(), *tf_listener->buffer_, target_frame, 0,
+ *input_filter->impl(), *tf_buffer.lock(), target_frame, 0,
context.node_base()->get_node_logging_interface(),
context.node_base()->get_node_clock_interface(), lookup_timeout);
@@ -1341,16 +1342,18 @@ struct TransformSynchronizerImpl {
}
void on_message(typename Message::SharedPtr message) {
- const auto timestamp = rclcpp_to_chrono(rclcpp::Time(message->header.stamp));
- auto maybe_transform =
- this->tf_listener->get_from_buffer(target_frame, message->header.frame_id, timestamp);
- if (maybe_transform.has_value())
- static_cast(this)->put_value(std::make_tuple(message, maybe_transform.value()));
- else /// Invariant trap since I don't own the tf2_ros code and therefore can't prove it is
- /// correct:
+ try {
+ // Note that this call does not wait, the transform must already have arrived.
+ auto transform = tf_buffer.lock()->lookupTransform(target_frame, message->header.frame_id,
+ rclcpp::Time(message->header.stamp));
+ static_cast(this)->put_value(std::make_tuple(message, transform));
+ } catch (const tf2::TransformException &e) {
+ /// Invariant trap since I don't own the tf2_ros code and therefore can't prove it is
+ /// correct:
throw std::logic_error(
"Invariant broke: tf2_ros::MessageFilter broke the promise that the transform is "
"available");
+ }
}
};
diff --git a/icey/include/icey/impl/promise.hpp b/icey/include/icey/impl/promise.hpp
index 474594d9..8f22214d 100644
--- a/icey/include/icey/impl/promise.hpp
+++ b/icey/include/icey/impl/promise.hpp
@@ -29,7 +29,6 @@ static std::string get_type(T &t) {
#endif
/// This header defines a promise type supporting async/await (C++ 20 coroutines) only.
-/// Not thread-safe.
namespace icey {
inline bool icey_coro_debug_print = false;
@@ -37,7 +36,6 @@ inline bool icey_coro_debug_print = false;
template
class Promise;
-/// Alias used where "no value"/"no error" is needed while preserving existing API spelling.
using Nothing = std::monostate;
namespace impl {
@@ -182,10 +180,6 @@ class PromiseBase : public PromiseTag {
/// as continuation. Suspends the current coroutine, i.e. returns true. Used only when wrapping a
/// callback-based API.
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept {
- this->continuation_ = awaiting_coroutine;
- if (this->launch_async_) {
- this->launch_async_(*this);
- }
#ifdef ICEY_CORO_DEBUG_PRINT
std::cout << fmt::format(
"PromiseBase await_suspend(), awaiting coroutine is 0x{:x}, "
@@ -193,6 +187,20 @@ class PromiseBase : public PromiseTag {
std::size_t(awaiting_coroutine.address()), get_type(*this))
<< std::endl;
#endif
+ this->continuation_ = awaiting_coroutine;
+ if (this->launch_async_) {
+ // copy the lambda to avoid a race between two threads, one resuming the coroutine that holds
+ // this promise object and therefore eventually destroying this promise, while this
+ // thread still has not finished calling launch_async (such a race is detected by TSAN
+ // otherwise)
+ /// Note that it is perfectly legal that when calling launch, another thread might imediately
+ /// resume this coroutine. Citing cppreference
+ /// (https://en.cppreference.com/w/cpp/language/coroutines.html): "Note that the coroutine is
+ /// fully suspended before entering awaiter.await_suspend(). Its handle can be shared with
+ /// another thread and resumed before the await_suspend() function returns."
+ auto launch = launch_async_;
+ launch(*this);
+ }
return true;
}
@@ -255,6 +263,9 @@ class PromiseBase : public PromiseTag {
/// State of the promise: May be nothing, value or error.
State state_;
+ /// An exception that may be present additionally
+ std::exception_ptr exception_ptr_{nullptr};
+
/// A function called on await_suspend that starts the asynchronous operation
LaunchAsync launch_async_;
@@ -265,8 +276,6 @@ class PromiseBase : public PromiseTag {
/// The continuation that is registered when co_awaiting this promise
std::coroutine_handle<> continuation_{nullptr};
- /// An exception that may be present additionally
- std::exception_ptr exception_ptr_{nullptr};
};
/// A Promise is used in ICEY for async/await. It is returned from asynchronous operations such as a
@@ -379,20 +388,29 @@ class [[nodiscard("Promise must be used")]] Promise {
std::size_t(coroutine_.address()))
<< std::endl;
#endif
- if (coroutine_ && coroutine_.done()) {
- coroutine_.destroy();
- } else if (coroutine_) {
- /// TODO to enforce users explicitly detaching, this should probably throw
- coroutine_.promise().detach();
+ if (coroutine_) {
+ if (coroutine_.done()) {
+ coroutine_.destroy();
+ } else {
+ /// TODO to enforce users explicitly detaching, this should probably throw
+ detach();
+ }
}
}
/// You need to detach a Promise if you are not awaiting it but it should still continue to run.
/// Detaching this outer promise means the coroutine state won't be destroyed on destruction of
/// outer promise. This also avoid the discarting warning
- void detach() { coroutine_.promise().detach(); }
+ void detach() {
+ coroutine_.promise()
+ .detach(); // Tell the promise it must continue after final suspend so that it reaches the
+ // final state where the coroutine state is destroyed.
+ coroutine_ = nullptr; /// set to nullptr so that we do not access coroutine and cause a race on
+ /// destruction where potentially another thread continues the coroutine
+ /// that this thread has just detached
+ }
- bool await_ready() const noexcept { return coroutine_ && coroutine_.done(); }
+ bool await_ready() const noexcept { return !coroutine_ || coroutine_.done(); }
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept {
if (coroutine_.done()) {
diff --git a/icey/src/actions/client.cpp b/icey/src/actions/client.cpp
index 22c9391f..c81aaa93 100644
--- a/icey/src/actions/client.cpp
+++ b/icey/src/actions/client.cpp
@@ -158,6 +158,7 @@ class ClientBaseImpl {
std::map pending_cancel_responses;
std::mutex cancel_requests_mutex;
+ std::mutex goal_id_rng_mutex;
std::independent_bits_engine random_bytes_generator;
};
@@ -260,7 +261,7 @@ size_t ClientBase::get_number_of_ready_services() { return pimpl_->num_services;
void ClientBase::add_to_wait_set(rcl_wait_set_t &wait_set) {
std::lock_guard lock(pimpl_->action_client_mutex_);
rcl_ret_t ret = rcl_action_wait_set_add_action_client(&wait_set, pimpl_->client_handle.get(),
- nullptr, nullptr);
+ nullptr, nullptr);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "ClientBase::add_to_wait_set() failed");
}
@@ -274,7 +275,7 @@ void ClientBase::add_to_wait_set(rcl_wait_set_t *wait_set) {
}
std::lock_guard lock(pimpl_->action_client_mutex_);
rcl_ret_t ret = rcl_action_wait_set_add_action_client(wait_set, pimpl_->client_handle.get(),
- nullptr, nullptr);
+ nullptr, nullptr);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "ClientBase::add_to_wait_set() failed");
}
@@ -340,22 +341,23 @@ bool ClientBase::is_ready(rcl_wait_set_t *wait_set) {
void ClientBase::handle_goal_response(const rmw_request_id_t &response_header,
std::shared_ptr response) {
- pimpl_->goal_requests_mutex.lock();
- auto it = pimpl_->pending_goal_responses.find(response_header.sequence_number);
- if (it == pimpl_->pending_goal_responses.end()) {
- pimpl_->goal_requests_mutex.unlock();
- return;
- }
- auto callback = it->second;
- pimpl_->goal_requests_mutex.unlock();
+ const int64_t sequence_number = response_header.sequence_number;
+ ResponseCallback callback;
+ {
+ std::lock_guard lock(pimpl_->goal_requests_mutex);
+ auto it = pimpl_->pending_goal_responses.find(sequence_number);
+ if (it == pimpl_->pending_goal_responses.end()) return;
+ callback = it->second;
+ }
callback(response);
- pimpl_->goal_requests_mutex.lock();
- pimpl_->pending_goal_responses.erase(it);
- pimpl_->goal_requests_mutex.unlock();
+ {
+ std::lock_guard lock(pimpl_->goal_requests_mutex);
+ pimpl_->pending_goal_responses.erase(sequence_number);
+ }
}
int64_t ClientBase::send_goal_request(std::shared_ptr request, ResponseCallback callback) {
- std::unique_lock guard(pimpl_->goal_requests_mutex);
+ std::lock_guard guard(pimpl_->goal_requests_mutex);
int64_t sequence_number;
rcl_ret_t ret =
rcl_action_send_goal_request(pimpl_->client_handle.get(), request.get(), &sequence_number);
@@ -369,18 +371,19 @@ int64_t ClientBase::send_goal_request(std::shared_ptr request, ResponseCal
void ClientBase::handle_result_response(const rmw_request_id_t &response_header,
std::shared_ptr response) {
- pimpl_->result_requests_mutex.lock();
- auto it = pimpl_->pending_result_responses.find(response_header.sequence_number);
- if (it == pimpl_->pending_result_responses.end()) {
- pimpl_->result_requests_mutex.unlock();
- return;
- }
- auto callback = it->second;
- pimpl_->result_requests_mutex.unlock();
+ const int64_t sequence_number = response_header.sequence_number;
+ ResponseCallback callback;
+ {
+ std::lock_guard lock(pimpl_->result_requests_mutex);
+ auto it = pimpl_->pending_result_responses.find(sequence_number);
+ if (it == pimpl_->pending_result_responses.end()) return;
+ callback = it->second;
+ }
callback(response);
- pimpl_->result_requests_mutex.lock();
- pimpl_->pending_result_responses.erase(it);
- pimpl_->result_requests_mutex.unlock();
+ {
+ std::lock_guard lock(pimpl_->result_requests_mutex);
+ pimpl_->pending_result_responses.erase(sequence_number);
+ }
}
int64_t ClientBase::send_result_request(std::shared_ptr request, ResponseCallback callback) {
@@ -398,19 +401,19 @@ int64_t ClientBase::send_result_request(std::shared_ptr request, ResponseC
void ClientBase::handle_cancel_response(const rmw_request_id_t &response_header,
std::shared_ptr response) {
- pimpl_->cancel_requests_mutex.lock();
- const int64_t &sequence_number = response_header.sequence_number;
- auto it = pimpl_->pending_cancel_responses.find(sequence_number);
- if (it == pimpl_->pending_cancel_responses.end()) {
- return;
- }
- auto callback = it->second;
- pimpl_->goal_requests_mutex.unlock();
+ const int64_t sequence_number = response_header.sequence_number;
+ ResponseCallback callback;
+ {
+ std::lock_guard lock(pimpl_->cancel_requests_mutex);
+ auto it = pimpl_->pending_cancel_responses.find(sequence_number);
+ if (it == pimpl_->pending_cancel_responses.end()) return;
+ callback = it->second;
+ }
callback(response);
- pimpl_->cancel_requests_mutex.lock();
- pimpl_->pending_cancel_responses.erase(
- it); /// imagine thread-safe datastructures like tbb::concurrent_hash_map would have been discovered already
- pimpl_->cancel_requests_mutex.unlock();
+ {
+ std::lock_guard lock(pimpl_->cancel_requests_mutex);
+ pimpl_->pending_cancel_responses.erase(sequence_number);
+ }
}
int64_t ClientBase::send_cancel_request(std::shared_ptr request, ResponseCallback callback) {
@@ -428,6 +431,7 @@ int64_t ClientBase::send_cancel_request(std::shared_ptr request, ResponseC
GoalUUID ClientBase::generate_goal_id() {
GoalUUID goal_id;
+ std::lock_guard lock(pimpl_->goal_id_rng_mutex);
// TODO(hidmic): Do something better than this for UUID generation.
// std::generate(
// goal_id.uuid.begin(), goal_id.uuid.end(),
@@ -697,14 +701,17 @@ void ClientBase::execute(std::shared_ptr &data_in) {
}
bool ClientBase::cancel_cancellation_request(int64_t request_id) {
+ std::lock_guard lock(pimpl_->cancel_requests_mutex);
return pimpl_->pending_cancel_responses.erase(request_id);
}
bool ClientBase::cancel_goal_request(int64_t request_id) {
+ std::lock_guard lock(pimpl_->goal_requests_mutex);
return pimpl_->pending_goal_responses.erase(request_id);
}
bool ClientBase::cancel_result_request(int64_t request_id) {
+ std::lock_guard lock(pimpl_->result_requests_mutex);
return pimpl_->pending_result_responses.erase(request_id);
}
diff --git a/icey/src/icey_async_await.cpp b/icey/src/icey_async_await.cpp
new file mode 100644
index 00000000..6ee7974e
--- /dev/null
+++ b/icey/src/icey_async_await.cpp
@@ -0,0 +1,375 @@
+/// Copyright © 2025 Technische Hochschule Augsburg
+/// Copyright 2018 Open Source Robotics Foundation, Inc.
+/// All rights reserved.
+/// Author: Ivo Ivanov
+/// This software is licensed under the Apache License, Version 2.0.
+
+#include
+
+#include "tf2_ros/create_timer_ros.hpp"
+#include "tf2_ros/qos.hpp"
+
+namespace icey {
+
+/// A subscription + buffer for transforms that allows for asynchronous lookups and to subscribe on
+/// a single transform between two coordinate systems. It is otherwise implemented similarly to the
+/// tf2_ros::TransformListener but offering a well-developed asynchronous API.
+/// It subscribes on the topic /tf and listens for relevant transforms (i.e. ones we subscribed to
+/// or the ones we requested a lookup). If any relevant transform was received, it notifies via a
+/// callback.
+///
+/// Why not using `tf2_ros::AsyncBuffer` ? Due to multiple issues with it: (1) It uses
+/// the `std::future/std::promise` primitives that are effectively useless. (2)
+/// tf2_ros::AsyncBuffer::waitForTransform has a bug: We cannot make another lookup in the callback
+/// of a lookup (it holds a (non-reentrant) mutex locked while calling the user callback), making it
+/// impossible to chain asynchronous operations.
+///
+/// @sa This class is used to implement the `TransformSubscriptionStream` and the `TransformBuffer`.
+struct TransformBufferImpl {
+ using TransformMsg = geometry_msgs::msg::TransformStamped;
+ using TransformsMsg = tf2_msgs::msg::TFMessage::ConstSharedPtr;
+ using OnTransform = std::function;
+ using OnError = std::function;
+ using GetFrame = std::function;
+ /// A transform request stores a request for looking up a transform between two coordinate
+ /// systems. Either (1) at a particular time with a timeout, or (2) as a subscription. When
+ /// "subscribing" to a transform, we yield a transform each time it changes.
+ struct TransformRequest {
+ TransformRequest(const GetFrame &target_frame, const GetFrame &source_frame,
+ OnTransform on_transform, OnError on_error,
+ std::optional _maybe_time = {})
+ : target_frame(target_frame),
+ source_frame(source_frame),
+ on_transform(on_transform),
+ on_error(on_error),
+ maybe_time(_maybe_time) {}
+ GetFrame target_frame;
+ GetFrame source_frame;
+ std::optional last_received_transform;
+ OnTransform on_transform;
+ OnError on_error;
+ std::optional maybe_time;
+#if ICEY_ASYNC_AWAIT_THREAD_SAFE
+ std::atomic_bool finished{false};
+ mutable std::mutex state_mutex;
+#endif
+ };
+
+ /// A request for a transform lookup: It represents effectively a single call to the async_lookup
+ /// function. Even if you call async_lookup with the exact same arguments (same frames and time),
+ /// this will count as two separate requests.
+ /// TODO do not use the address, ABA problem !
+ using RequestHandle = std::shared_ptr;
+
+ /// We make this class non-copyable since it captures the this-pointer in clojures.
+ TransformBufferImpl(const TransformBufferImpl &) = delete;
+ TransformBufferImpl(TransformBufferImpl &&) = delete;
+ TransformBufferImpl &operator=(const TransformBufferImpl &) = delete;
+ TransformBufferImpl &operator=(TransformBufferImpl &&) = delete;
+
+ explicit TransformBufferImpl(std::weak_ptr node) : node_(node) { init(); }
+
+ /// @brief Subscribe to a single transform between two coordinate systems. Every time this
+ /// transform changes, the `on_transform` callback function is called. More precisely, every time
+ /// a message arrives on the `/tf` or `/tf_static` topic, a lookup is attempted. If the lookup
+ /// fails, `on_error` is called. If the lookup succeeds and if the looked up transform is
+ // different to the previously emitted one, the `on_transform` callback is called.
+ void add_subscription(const GetFrame &target_frame, const GetFrame &source_frame,
+ const OnTransform &on_transform, const OnError &on_error) {
+ std::lock_guard lock{mutex_};
+ auto req =
+ std::make_shared(target_frame, source_frame, on_transform, on_error);
+ requests_.emplace(uint64_t(req.get()), req);
+ }
+
+ /// @brief Queries the TF buffer for a transform at the given time between the given frames. It
+ /// does not wait but instead only returns something if the transform is already in the buffer.
+ ///
+ /// @param target_frame
+ /// @param source_frame
+ /// @param time
+ /// @return The transform or the TF error if the transform is not in the buffer.
+ Result get_from_buffer(
+ std::string target_frame, std::string source_frame, const Time &time) const {
+ try {
+ const tf2::TimePoint legacy_timepoint = tf2_ros::fromRclcpp(rclcpp_from_chrono(time));
+ // Note that this call does not wait, the transform must already have arrived.
+ auto tf = buffer_->lookupTransform(target_frame, source_frame, legacy_timepoint);
+ return Ok(tf);
+ } catch (const tf2::TransformException &e) {
+ return Err(std::string(e.what()));
+ }
+ }
+
+ /// @brief Makes an asynchronous lookup for a for a transform at the given time between the given
+ /// frames. Callback-based API.
+ ///
+ /// @param target_frame
+ /// @param source_frame
+ /// @param time
+ /// @param timeout
+ /// @param on_transform The callback that is called with the requested transform after it becomes
+ /// available
+ /// @param on_error The callback that is called if a timeout occurs.
+ /// @return The "handle", identifying this request. You can use this handle to call
+ /// `cancel_request` if you want to cancel the request.
+ /// @warning Currently the timeout is measured with wall-time, i.e. does not consider sim-time due
+ /// to the limitation of ROS 2 Humble only offering wall-timers
+ uint64_t lookup(const std::string &target_frame, const std::string &source_frame, Time time,
+ const Duration &timeout, OnTransform on_transform, OnError on_error) {
+ auto request{std::make_shared([target_frame]() { return target_frame; },
+ [source_frame]() { return source_frame; },
+ on_transform, on_error, time)};
+
+ auto weak_request = std::weak_ptr(request);
+#if ICEY_ASYNC_AWAIT_THREAD_SAFE
+ std::lock_guard lock{mutex_};
+#endif
+ requests_.emplace(uint64_t(request.get()), request);
+ node_.lock()->add_task_for(
+ uint64_t(request.get()), timeout, [this, on_error, request = weak_request]() {
+ if (auto req = request.lock()) {
+#if ICEY_ASYNC_AWAIT_THREAD_SAFE
+ bool expected = false;
+ if (!req->finished.compare_exchange_strong(expected, true)) {
+ return;
+ }
+ std::lock_guard lock{mutex_};
+#endif
+ requests_.erase(uint64_t(req.get())); // Destroy the request
+ on_error(tf2::TimeoutException{"Timed out waiting for transform"});
+ }
+ });
+ return uint64_t(request.get());
+ }
+
+ /// @brief Does an asynchronous lookup for a single transform that can be awaited using `co_await`
+ ///
+ /// @param target_frame
+ /// @param source_frame
+ /// @param time At which time to get the transform
+ /// @param timeout How long to wait for the transform
+ /// @return A future that resolves with the transform or with an error if a timeout occurs
+ impl::Promise lookup(
+ const std::string &target_frame, const std::string &source_frame, const Time &time,
+ const Duration &timeout) {
+ return impl::Promise(
+ [this, target_frame, source_frame, time, timeout](auto &promise) {
+ (void)this->lookup(
+ target_frame, source_frame, time, timeout,
+ [&promise](const geometry_msgs::msg::TransformStamped &tf) { promise.resolve(tf); },
+ [&promise](const tf2::TransformException &ex) { promise.reject(ex.what()); });
+ });
+ }
+
+ /// @brief Same as `lookup`, but accepts a ROS time point
+ ///
+ /// @param target_frame
+ /// @param source_frame
+ /// @param time At which time to get the transform
+ /// @param timeout How long to wait for the transform
+ /// @return A future that resolves with the transform or with an error if a timeout occurs
+ impl::Promise lookup(
+ const std::string &target_frame, const std::string &source_frame, const rclcpp::Time &time,
+ const Duration &timeout) {
+ return lookup(target_frame, source_frame, icey::rclcpp_to_chrono(time), timeout);
+ }
+
+ /// Cancel a transform request: This means that the registered callbacks will no longer be called.
+ /// If the given request does not exist, this function does nothing.
+ bool cancel_request(uint64_t request_id) {
+ std::lock_guard lock{mutex_};
+ requests_.erase(request_id);
+ return node_.lock()->cancel_task_for(request_id);
+ }
+
+ /// We take a tf2_ros::Buffer instead of a tf2::BufferImpl only to be able to use ROS-time API
+ /// (internally TF2 has it's own timestamps...), not because we need to wait on anything (that's
+ /// what tf2_ros::Buffer does in addition to tf2::BufferImpl).
+ std::shared_ptr buffer_;
+
+protected:
+ void init() {
+ init_tf_buffer();
+ message_subscription_tf_ = node_.lock()->create_subscription(
+ "/tf", [this](TransformsMsg msg) { on_tf_message(msg, false); },
+ tf2_ros::DynamicListenerQoS());
+ message_subscription_tf_static_ = node_.lock()->create_subscription(
+ "/tf_static", [this](TransformsMsg msg) { on_tf_message(msg, true); },
+ tf2_ros::StaticListenerQoS());
+ }
+
+ void init_tf_buffer() {
+ /// Mind that the official example regarding the piece of code below is incomplete:
+ /// ([official](https://docs.ros.org/en/jazzy/Tutorials/Intermediate/Tf2/Writing-A-Tf2-Listener-Cpp.html))
+ /// . I'm following this example instead:
+ /// https://github.com/ros-perception/imu_pipeline/blob/ros2/imu_transformer/src/imu_transformer.cpp#L16
+ /// See also the following discussions on why this code needs to be this way:
+ /// https://answers.ros.org/question/372608/?sort=votes
+ /// https://github.com/ros-navigation/navigation2/issues/1182
+ /// https://github.com/ros2/geometry2/issues/446
+ buffer_ =
+ std::make_shared(node_.lock()->get_node_clock_interface()->get_clock());
+ auto timer_interface = std::make_shared(
+ node_.lock()->get_node_base_interface(), node_.lock()->get_node_timers_interface());
+ buffer_->setCreateTimerInterface(timer_interface);
+ }
+
+ /// Store the received transforms in the buffer.
+ void store_in_buffer(const tf2_msgs::msg::TFMessage &msg_in, bool is_static) {
+ std::string authority = "Authority undetectable";
+ for (const auto &transform : msg_in.transforms) {
+ try {
+ buffer_->setTransform(transform, authority, is_static);
+ } catch (const tf2::TransformException &ex) {
+ std::string temp = ex.what();
+ RCLCPP_ERROR(node_.lock()->get_node_logging_interface()->get_logger(),
+ "Failure to set received transform from %s to %s with error: %s\n",
+ transform.child_frame_id.c_str(), transform.header.frame_id.c_str(),
+ temp.c_str());
+ }
+ }
+ }
+
+ /// This simply looks up the transform in the buffer at the latest stamp and checks if it
+ /// changed with respect to the previously received one. If the transform has changed, we know
+ /// we have to notify. Returns true when it notified about a changed transform (called
+ /// on_transform) and false otherwise.
+ bool maybe_notify(TransformRequest &info) {
+ try {
+ /// Note that this does not wait/thread-sleep etc. This is simply a lookup in a
+ /// std::vector/tree.
+ geometry_msgs::msg::TransformStamped tf_msg =
+ buffer_->lookupTransform(info.target_frame(), info.source_frame(), tf2::TimePointZero);
+ bool changed;
+#if ICEY_ASYNC_AWAIT_THREAD_SAFE
+ {
+ std::lock_guard lock{info.state_mutex};
+ changed = !info.last_received_transform || tf_msg != *info.last_received_transform;
+ if (changed) info.last_received_transform = tf_msg;
+ }
+#else
+ changed = !info.last_received_transform || tf_msg != *info.last_received_transform;
+ if (changed) info.last_received_transform = tf_msg;
+#endif
+ if (changed) {
+ info.on_transform(tf_msg);
+ return true;
+ }
+ } catch (const tf2::TransformException &e) {
+ info.on_error(e);
+ }
+ return false;
+ }
+
+ bool maybe_notify_specific_time(RequestHandle request) {
+ try {
+ /// Note that this does not wait/thread-sleep etc. This is simply a lookup in a
+ /// std::vector/tree.
+ geometry_msgs::msg::TransformStamped tf_msg = buffer_->lookupTransform(
+ request->target_frame(), request->source_frame(), request->maybe_time.value());
+#if ICEY_ASYNC_AWAIT_THREAD_SAFE
+ bool expected = false;
+ if (!request->finished.compare_exchange_strong(expected, true, std::memory_order_acq_rel,
+ std::memory_order_acquire)) {
+ return false;
+ }
+#endif
+ request->on_transform(tf_msg);
+ /// If the request is destroyed gracefully (lookup succeeded), cancel the associated task
+ /// timer
+ node_.lock()->cancel_task_for(uint64_t(request.get()));
+ return true;
+ } catch (
+ const tf2::TransformException &e) { /// TODO we could catch for extrapolation here as well
+ /// For any other error, we continue waiting
+ }
+ return false;
+ }
+
+ void notify_if_any_relevant_transform_was_received() {
+/// Iterate all requests, notify and maybe erase them if they are requests for a specific time
+#if ICEY_ASYNC_AWAIT_THREAD_SAFE
+ std::vector requests_to_delete;
+ std::unordered_map requests_copy;
+ /// TODO do something more clever than copying the entire hashtable under a lock
+ {
+ std::lock_guard lock{mutex_};
+ requests_copy = requests_;
+ }
+ for (auto [req_id, req] : requests_copy) {
+ if (req->maybe_time) {
+ if (maybe_notify_specific_time(req)) {
+ requests_to_delete.push_back(req_id);
+ }
+ } else {
+ // If it is a regular subscription, is is persistend and never erased
+ maybe_notify(*req);
+ }
+ }
+ std::lock_guard lock{mutex_};
+ for (const auto &k : requests_to_delete) requests_.erase(k);
+#else
+ std::erase_if(requests_, [this](auto req) {
+ if (req->maybe_time) {
+ return maybe_notify_specific_time(req);
+ } else {
+ // If it is a regular subscription, is is persistend and never erased
+ maybe_notify(*req);
+ return false;
+ }
+ });
+#endif
+ }
+
+ void on_tf_message(const TransformsMsg &msg, bool is_static) {
+ store_in_buffer(*msg, is_static);
+ notify_if_any_relevant_transform_was_received();
+ }
+
+ std::weak_ptr node_; /// Hold weak reference to the NodeBase because the Context owns
+ /// the NodeBase as well, so we avoid circular reference
+
+ rclcpp::Subscription::SharedPtr message_subscription_tf_;
+ rclcpp::Subscription::SharedPtr message_subscription_tf_static_;
+
+ std::recursive_mutex mutex_;
+ std::unordered_map requests_;
+};
+
+TransformBuffer &TransformBuffer::init(std::weak_ptr node_base) {
+ if (!impl_) {
+ impl_ = std::make_shared(node_base);
+ }
+ return *this;
+}
+
+uint64_t TransformBuffer::lookup(const std::string &target_frame, const std::string &source_frame,
+ Time time, const Duration &timeout, OnTransform on_transform,
+ OnError on_error) {
+ return impl_->lookup(target_frame, source_frame, time, timeout, on_transform, on_error);
+}
+
+bool TransformBuffer::cancel_request(uint64_t request_id) {
+ return impl_->cancel_request(request_id);
+}
+
+void TransformBuffer::add_subscription(const GetFrame &target_frame, const GetFrame &source_frame,
+ const OnTransform &on_transform, const OnError &on_error) {
+ impl_->add_subscription(target_frame, source_frame, on_transform, on_error);
+}
+
+impl::Promise TransformBuffer::lookup(
+ const std::string &target_frame, const std::string &source_frame, const Time &time,
+ const Duration &timeout) {
+ return impl_->lookup(target_frame, source_frame, time, timeout);
+}
+
+impl::Promise TransformBuffer::lookup(
+ const std::string &target_frame, const std::string &source_frame, const rclcpp::Time &time,
+ const Duration &timeout) {
+ return impl_->lookup(target_frame, source_frame, time, timeout);
+}
+
+std::shared_ptr TransformBuffer::get_buffer() const { return impl_->buffer_; }
+} // namespace icey
\ No newline at end of file
diff --git a/icey/test/entities_async_await_test.cpp b/icey/test/entities_async_await_test.cpp
index 519bdb3d..23dfca71 100644
--- a/icey/test/entities_async_await_test.cpp
+++ b/icey/test/entities_async_await_test.cpp
@@ -102,13 +102,12 @@ TEST_F(AsyncAwaitTwoNodeTest, PubSubTest) {
TEST_F(AsyncAwaitTwoNodeTest, PubSubTest2) {
const auto l = [this]() -> icey::Promise {
- std::size_t received_cnt{0};
-
+ auto received_cnt = std::make_shared(0);
receiver_->icey()
.create_subscription("/icey_test/sine_signal")
- .then([&](auto msg) {
- EXPECT_EQ(received_cnt, std::size_t(msg->data));
- received_cnt++;
+ .then([received_cnt](auto msg) {
+ EXPECT_EQ(*received_cnt, std::size_t(msg->data));
+ (*received_cnt)++;
});
auto timer = sender_->icey().create_timer(100ms);
@@ -126,7 +125,7 @@ TEST_F(AsyncAwaitTwoNodeTest, PubSubTest2) {
/// We do not await till the published message was received.
/// Since the ACK from DDS is exposed in rclcpp via PublisherBase::wait_for_all_acked, we could
/// implement a co_await publish(msg, timeout).
- EXPECT_EQ(received_cnt, 9);
+ EXPECT_EQ(*received_cnt, 9);
async_completed = true;
co_return 0;
};
@@ -244,7 +243,7 @@ TEST_F(AsyncAwaitTwoNodeTest, TFAsyncLookupTest) {
sender_->icey()
.create_timer(100ms)
- .then([&](size_t ticks) -> std::optional {
+ .then([base_time](size_t ticks) -> std::optional {
geometry_msgs::msg::TransformStamped tf1;
tf1.header.stamp = icey::rclcpp_from_chrono(base_time + ticks * 100ms);
tf1.header.frame_id = "icey_test_frame1";
@@ -257,7 +256,7 @@ TEST_F(AsyncAwaitTwoNodeTest, TFAsyncLookupTest) {
sender_->icey()
.create_timer(100ms)
- .then([&](size_t ticks) -> std::optional {
+ .then([base_time](size_t ticks) -> std::optional {
geometry_msgs::msg::TransformStamped tf1;
tf1.header.stamp = icey::rclcpp_from_chrono(base_time + ticks * 100ms);
tf1.header.frame_id = "icey_test_frame2";
diff --git a/icey/test/node_tasks_test.cpp b/icey/test/node_tasks_test.cpp
index e69de29b..7e534d64 100644
--- a/icey/test/node_tasks_test.cpp
+++ b/icey/test/node_tasks_test.cpp
@@ -0,0 +1,146 @@
+#include
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+using namespace std::chrono_literals;
+
+namespace {
+
+using ExampleService = std_srvs::srv::SetBool;
+using Fibonacci = example_interfaces::action::Fibonacci;
+using GoalHandleFibonacci = icey::rclcpp_action::ServerGoalHandle;
+
+struct Harness {
+ std::shared_ptr sender{std::make_shared("node_tasks_sender")};
+ std::shared_ptr receiver{std::make_shared("node_tasks_receiver")};
+ std::shared_ptr sender_ctx{
+ std::make_shared(sender.get())};
+ std::shared_ptr receiver_ctx{
+ std::make_shared(receiver.get())};
+ rclcpp::executors::MultiThreadedExecutor exec{rclcpp::ExecutorOptions(), 8};
+
+ Harness() {
+ exec.add_node(sender->get_node_base_interface());
+ exec.add_node(receiver->get_node_base_interface());
+ }
+
+ ~Harness() {
+ exec.cancel();
+ exec.remove_node(sender->get_node_base_interface());
+ exec.remove_node(receiver->get_node_base_interface());
+ }
+};
+
+template
+[[maybe_unused]]
+void spin_until(Harness &h, Pred pred, std::chrono::milliseconds timeout) {
+ const auto deadline = std::chrono::steady_clock::now() + timeout;
+ while (std::chrono::steady_clock::now() < deadline && !pred()) {
+ h.exec.spin_some();
+ std::this_thread::sleep_for(1ms);
+ }
+}
+
+void spin_executor_for(Harness &h, std::chrono::seconds duration) {
+ std::thread spin_thread([&]() { h.exec.spin(); });
+ std::this_thread::sleep_for(duration);
+ h.exec.cancel();
+ if (spin_thread.joinable()) spin_thread.join();
+}
+
+void run_tf_race_repro() {
+ Harness h;
+ auto tf_pub = h.sender->create_publisher("/tf", 10);
+
+ auto pub_timer = h.sender->create_wall_timer(10ms, [&]() {
+ geometry_msgs::msg::TransformStamped t;
+ t.header.stamp = icey::rclcpp_from_chrono(icey::Clock::now());
+ t.header.frame_id = "map";
+ t.child_frame_id = "base_link";
+ t.transform.rotation.w = 1.0;
+ tf2_msgs::msg::TFMessage msg;
+ msg.transforms.push_back(t);
+ tf_pub->publish(msg);
+ });
+
+ auto tf = h.receiver_ctx->create_transform_buffer();
+ auto cb_group{h.receiver->create_callback_group(rclcpp::CallbackGroupType::Reentrant, false)};
+ h.exec.add_callback_group(cb_group, h.receiver->get_node_base_interface());
+ for (size_t t = 0; t < 100; ++t) {
+ h.receiver_ctx->create_timer_async(
+ 50ms,
+ [&]() -> icey::Promise {
+ (void)co_await tf.lookup("map", "base_link", icey::Clock::now(), 3ms);
+ },
+ cb_group);
+ }
+ (void)pub_timer;
+ spin_executor_for(h, 10s);
+}
+
+void run_service_race_repro() {
+ Harness h;
+ h.sender_ctx->create_service(
+ "race_set_bool", [](std::shared_ptr req) {
+ auto resp = std::make_shared();
+ resp->success = req->data;
+ return resp;
+ });
+ auto cb_group{h.receiver->create_callback_group(rclcpp::CallbackGroupType::Reentrant, false)};
+ h.exec.add_callback_group(cb_group, h.receiver->get_node_base_interface());
+ auto client = h.receiver_ctx->create_client("race_set_bool",
+ rclcpp::ServicesQoS(), cb_group);
+ for (size_t t = 0; t < 100; ++t) {
+ h.receiver_ctx->create_timer_async(
+ 1ms,
+ [&]() -> icey::Promise {
+ auto req = std::make_shared();
+ req->data = true;
+ (void)co_await client.call(req, 3ms);
+ },
+ cb_group);
+ }
+ spin_executor_for(h, 10s);
+}
+
+void run_action_race_repro() {
+ Harness h;
+ h.sender_ctx->create_action_server(
+ "race_fib",
+ [](const rclcpp_action::GoalUUID &, std::shared_ptr) {
+ return rclcpp_action::GoalResponse::ACCEPT_AND_EXECUTE;
+ },
+ [](std::shared_ptr) { return rclcpp_action::CancelResponse::ACCEPT; },
+ [](std::shared_ptr gh) {
+ auto result = std::make_shared();
+ result->sequence = {0, 1, 1, 2};
+ gh->succeed(result);
+ });
+ auto cb_group{h.receiver->create_callback_group(rclcpp::CallbackGroupType::Reentrant, false)};
+ h.exec.add_callback_group(cb_group, h.receiver->get_node_base_interface());
+ auto client = h.receiver_ctx->create_action_client("race_fib", cb_group);
+ for (size_t t = 0; t < 100; ++t) {
+ h.receiver_ctx->create_timer_async(
+ 5ms,
+ [&]() -> icey::Promise {
+ Fibonacci::Goal goal;
+ goal.order = 4;
+ auto sent = co_await client.send_goal(goal, 3ms, [](auto, auto) {});
+ if (sent.has_value()) (void)co_await sent.value().result(3ms);
+ },
+ cb_group);
+ }
+ spin_executor_for(h, 10s);
+}
+
+} // namespace
+
+TEST(ThreadSafety, TFRaces) { run_tf_race_repro(); }
+TEST(ThreadSafety, Services) { run_service_race_repro(); }
+TEST(ThreadSafety, Actions) { run_action_race_repro(); }
diff --git a/icey/test/tsan_dds.supp b/icey/test/tsan_dds.supp
new file mode 100644
index 00000000..4879f422
--- /dev/null
+++ b/icey/test/tsan_dds.supp
@@ -0,0 +1,5 @@
+# Suppress all known CycloneDDS / DDS runtime races so TSAN reports focus on icey.
+race:libddsc.so
+race:dds_
+race:ddsi_
+race:ddsrt_
diff --git a/icey_examples/CMakeLists.txt b/icey_examples/CMakeLists.txt
index e66df8f4..ca5296ab 100644
--- a/icey_examples/CMakeLists.txt
+++ b/icey_examples/CMakeLists.txt
@@ -3,6 +3,7 @@ project(icey_examples)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_EXTENSIONS OFF)
+option(ICEY_ENABLE_TSAN "Build examples with ThreadSanitizer instrumentation" OFF)
set(USE_ASAN_CHECKS OFF)
if(USE_ASAN_CHECKS)
@@ -71,6 +72,41 @@ target_link_libraries(action_client_async_await_example fmt::fmt)
ament_auto_add_executable(action_server_async_await_example src/action_server_async_await.cpp)
target_link_libraries(action_server_async_await_example fmt::fmt)
+if(ICEY_ENABLE_TSAN)
+ if(CMAKE_CXX_COMPILER_ID MATCHES "GNU|Clang")
+ set(ICEY_EXAMPLE_TARGETS
+ service_server_example
+ signal_generator_example
+ parameters_struct_example
+ tf_subscription_example
+ tf_broadcaster_example
+ using_icey_context_example
+ signal_generator_async_await_example
+ listener_async_await_example
+ talker_async_await_example
+ tf_pub_test_example
+ tf_sychronization_example
+ service_server_async_await_example
+ using_async_await_only_context_example
+ tf_lookup_async_await_example
+ using_image_transport_example
+ listener_example
+ synchronization_example
+ lifecycle_node_example
+ coroutine_example
+ service_client_async_await_example
+ action_client_async_await_example
+ action_server_async_await_example
+ )
+ foreach(target_name IN LISTS ICEY_EXAMPLE_TARGETS)
+ target_compile_options(${target_name} PRIVATE -fsanitize=thread -fno-omit-frame-pointer)
+ target_link_options(${target_name} PRIVATE -fsanitize=thread)
+ endforeach()
+ else()
+ message(WARNING "ICEY_ENABLE_TSAN is ON but compiler does not support TSAN flags")
+ endif()
+endif()
+
if(ament_cmake_auto_VERSION EQUAL 2.7.3)
ament_auto_package()
else()
@@ -78,4 +114,3 @@ else()
USE_SCOPED_HEADER_INSTALL_DIR # can be removed after Jazzy EOL
)
endif()
-
diff --git a/icey_examples/src/action_client_async_await.cpp b/icey_examples/src/action_client_async_await.cpp
index 53cb1301..cc08a466 100644
--- a/icey_examples/src/action_client_async_await.cpp
+++ b/icey_examples/src/action_client_async_await.cpp
@@ -3,18 +3,21 @@
/// Author: Ivo Ivanov
/// This software is licensed under the Apache License, Version 2.0.
-/// Action client example using ICEY async/await.
-///
-/// Key differences vs regular rclcpp_action API:
-/// - No manual SendGoalOptions wiring for result/feedback callbacks when using co_await.
-/// - Per-goal timeout is explicit and automatic cleanup on timeout.
-/// - Looks synchronous (co_await), but uses rclcpp under the hood without threads.
+/// Action client example using ICEY async/await. It demonstrates an async/await API for sending the
+/// goal and obtaining the result. Feedback messages are received via a callback because ROS does
+/// not guarantee that feedback cannot be received before or after the result request
+/// (https://github.com/ros2/rclcpp/issues/2782#issuecomment-2761123049). To prevent losing any
+/// feedback messages, ICEY does not provide an async/await API for obtaining feedback and instead
+/// uses a callback. See https://github.com/iv461/icey/pull/17 for an alternative API proposal once
+/// this issue is fixed in rclcpp.
#include
#include
using namespace std::chrono_literals;
using Fibonacci = example_interfaces::action::Fibonacci;
+using GoalHandle = rclcpp_action::ClientGoalHandle;
+using ActionResult = typename GoalHandle::WrappedResult;
icey::Promise call_action(std::shared_ptr node) {
auto ctx = std::make_shared(node.get());
@@ -23,10 +26,14 @@ icey::Promise call_action(std::shared_ptr node) {
Fibonacci::Goal goal;
goal.order = 7;
+ const auto on_feedback = [node](auto, auto) {
+ RCLCPP_WARN(node->get_logger(), "Got some feedback");
+ };
+
RCLCPP_INFO_STREAM(node->get_logger(), "Sending goal.. ");
- // Request a goal with 2 seconds timeout
- auto maybe_goal_handle = co_await client.send_goal(
- goal, 5s, [node](auto, auto) { RCLCPP_WARN(node->get_logger(), "Got some feedback"); });
+ // Request a goal with 2 seconds timeout, pass a callback for obtaining feedback messages
+ icey::Result, std::string> maybe_goal_handle =
+ co_await client.send_goal(goal, 5s, on_feedback);
if (maybe_goal_handle.has_error()) {
RCLCPP_WARN_STREAM(node->get_logger(),
@@ -35,14 +42,14 @@ icey::Promise call_action(std::shared_ptr node) {
}
RCLCPP_INFO_STREAM(node->get_logger(),
"Goal request was accepted, starting to wait for the result..");
- auto goal_handle = maybe_goal_handle.value();
+ icey::AsyncGoalHandle goal_handle = maybe_goal_handle.value();
/// Now wait for the result of the action for 20 seconds.
- auto maybe_result = co_await goal_handle.result(20s);
+ icey::Result maybe_result = co_await goal_handle.result(20s);
if (maybe_result.has_error()) {
RCLCPP_WARN_STREAM(node->get_logger(), "Action failed: " << maybe_result.error());
} else {
- auto const &result = maybe_result.value();
+ const ActionResult &result = maybe_result.value();
if (result.code == rclcpp_action::ResultCode::SUCCEEDED) {
RCLCPP_INFO(node->get_logger(), "Fibonacci done. Size: %zu", result.result->sequence.size());
} else {
diff --git a/icey_examples/src/action_server_async_await.cpp b/icey_examples/src/action_server_async_await.cpp
index b9f1110b..0e995479 100644
--- a/icey_examples/src/action_server_async_await.cpp
+++ b/icey_examples/src/action_server_async_await.cpp
@@ -3,10 +3,10 @@
/// Author: Ivo Ivanov
/// This software is licensed under the Apache License, Version 2.0.
-/// Action server example that demonstrates how callbacks of the action server can be coroutines. In
-/// this case, the handle_goal-callback is a coroutine and calls a service. Once the service
-/// response, the goal is accepted. The handle_cancel and handle_accepted could be coroutines as
-/// well.
+/// This is an action server example that demonstrates how callbacks of the action server can be
+/// coroutines. In this case, the handle_goal callback is a coroutine that calls a service. Once the
+/// service responds, the goal is accepted. handle_cancel and handle_accepted could also be
+/// coroutines.
#include
#include
@@ -15,7 +15,6 @@
using namespace std::chrono_literals;
using Fibonacci = example_interfaces::action::Fibonacci;
-using GoalHandleFibonacci = icey::rclcpp_action::ClientGoalHandle;
using ServerGoalHandleFibonacci = icey::rclcpp_action::ServerGoalHandle;
int main(int argc, char **argv) {
diff --git a/icey_examples/src/filters.cpp b/icey_examples/src/filters.cpp
index 572353a4..31fd196f 100644
--- a/icey_examples/src/filters.cpp
+++ b/icey_examples/src/filters.cpp
@@ -1 +1 @@
-/// TODO
\ No newline at end of file
+/// TODO
\ No newline at end of file
diff --git a/icey_examples/src/service_client_async_await.cpp b/icey_examples/src/service_client_async_await.cpp
index b18e782e..a34d7e4a 100644
--- a/icey_examples/src/service_client_async_await.cpp
+++ b/icey_examples/src/service_client_async_await.cpp
@@ -24,7 +24,7 @@ int main(int argc, char **argv) {
/// Create the service client beforehand
auto client = ctx->create_client