Add support for rosidl::Buffer-aware per-endpoint pub/sub#867
Add support for rosidl::Buffer-aware per-endpoint pub/sub#867
Conversation
Signed-off-by: CY Chen <cyc@nvidia.com>
hidmic
left a comment
There was a problem hiding this comment.
First pass. Another PR that needs many 👀 .
| bool gid_equal(const rmw_gid_t & a, const rmw_gid_t & b) | ||
| { | ||
| return std::memcmp(a.data, b.data, RMW_GID_STORAGE_SIZE) == 0; | ||
| } |
There was a problem hiding this comment.
@nvcyc there's rmw_compare_gids_equal() already
|
|
||
| for (const auto & endpoint : info->buffer_endpoints_) { | ||
| uint32_t serialized_size = callbacks->get_serialized_size(ros_message); | ||
| size_t buffer_size = serialized_size + 4096; |
There was a problem hiding this comment.
@nvcyc mind to explain those extra 4KB?
| auto gid_to_hex = [](const rmw_gid_t & gid, size_t bytes = 8) -> std::string { | ||
| static const char hex_chars[] = "0123456789abcdef"; | ||
| std::string result; | ||
| result.reserve(bytes * 2); | ||
| for (size_t i = 0; i < bytes && i < RMW_GID_STORAGE_SIZE; ++i) { | ||
| result += hex_chars[(gid.data[i] >> 4) & 0xF]; | ||
| result += hex_chars[gid.data[i] & 0xF]; | ||
| } | ||
| return result; | ||
| }; |
There was a problem hiding this comment.
@nvcyc nit: considering making this a free function.
| sub_info.gid.data, RMW_GID_STORAGE_SIZE); | ||
|
|
||
| eprosima::fastdds::dds::TopicQos topic_qos = | ||
| info->participant_->get_default_topic_qos(); |
There was a problem hiding this comment.
@nvcyc meta: we should notify the user somehow about the actual QoS profile used for buffer aware pub/sub, or fallback to CPU only, or something.
| const_cast<void **>(data_values.buffer())[0] = &data; | ||
| eprosima::fastdds::dds::SampleInfoSeq info_seq{1}; | ||
|
|
||
| auto ret_code = endpoint->data_reader->take(data_values, info_seq, 1); |
There was a problem hiding this comment.
@nvcyc meta: how do we enforce that we don't starve some endpoints? This implicitly prioritizes endpoints that were discovered earlier than others.
| if (subscription_options->acceptable_buffer_backends && | ||
| strlen(subscription_options->acceptable_buffer_backends) > 0) | ||
| { | ||
| std::string acceptable(subscription_options->acceptable_buffer_backends); |
There was a problem hiding this comment.
@nvcyc there's rcpputils::split() for this.
| if (backends.empty()) { | ||
| return {}; | ||
| } | ||
| std::string result = BUFFER_BACKEND_SENTINEL; |
| "rmw_fastrtps_cpp", | ||
| "BufferEndpointRegistry: firing %zu publisher callback(s) for discovered subscriber on '%s'", | ||
| callbacks.size(), info.topic_name.c_str()); | ||
| for (const auto & cb : callbacks) { |
There was a problem hiding this comment.
This looks safe from deadlock risk.
| "rmw_fastrtps_cpp", | ||
| "BufferEndpointRegistry: firing %zu subscriber callback(s) for discovered publisher on '%s'", | ||
| callbacks.size(), info.topic_name.c_str()); | ||
| for (const auto & cb : callbacks) { |
Signed-off-by: CY Chen <cyc@nvidia.com>
Signed-off-by: CY Chen <cyc@nvidia.com>
hidmic
left a comment
There was a problem hiding this comment.
Looks like things are in flight. @MiguelCompany @richiware we need your 👀 here.
| bool gid_equal(const rmw_gid_t & a, const rmw_gid_t & b) | ||
| { | ||
| return std::memcmp(a.data, b.data, RMW_GID_STORAGE_SIZE) == 0; | ||
| } |
|
|
||
| for (const auto & endpoint : info->buffer_endpoints_) { | ||
| uint32_t serialized_size = callbacks->get_serialized_size(ros_message); | ||
| size_t buffer_size = serialized_size + 4096; |
Description
This pull request adds full
rosidl::Buffersupport tormw_fastrtps_cpp, enabling per-endpoint DDS DataWriters/DataReaders for zero-copy buffer transport between compatible backends. Backend discovery uses DDSuser_dataQoS to advertise and detect supported backends.This pull request modifies the following packages:
rmw_fastrtps_shared_cpp-- shared types and QoScustom_participant_info.hpp: AddedBufferDiscoveryCallbacktype andbuffer_discovery_cb_field for participant-level buffer discovery.custom_publisher_info.hpp: AddedBufferPublisherEndpointstruct and buffer-related fields (is_buffer_aware_,backend_aux_info_,local_endpoint_info_,buffer_mutex_,buffer_endpoints_,pending_buffer_endpoints_,buffer_alive_flag_,participant_,dds_publisher_).custom_subscriber_info.hpp: AddedBufferSubscriptionEndpointstruct (withowns_topicflag for single-process topic reuse) and buffer-related fields. AddedRMWSubscriptionEvent::notify_buffer_data_available().qos.hpp/qos.cpp: Added optionalbuffer_backendsparameter to QoS functions; addedencode_buffer_backends_for_user_data()/parse_buffer_backends_from_user_data()(format:BUFBE:name=aux;name2=aux2).rmw_wait.cpp: Extended WaitSet to also checkbuffer_data_guard_; wakes subscription when buffer data is available.rmw_fastrtps_cpp-- buffer-aware per-endpoint pub/subinitialize_buffer_backends()/shutdown_buffer_backends()duringrmw_init/rmw_shutdown.buffer_endpoint_registry: Per-process singleton mapping topic names to discovery callbacks; publishers register subscriber-discovery callbacks and vice versa. Maintains known endpoints so late-registering callbacks see already-discovered peers.callbacks->has_buffer_fields; encodes backend names in DDSuser_datawith"cpu"always present. Registers subscriber-discovery callbacks that create per-subscriber DataWriters on unique topics ({topic}/_buf/{pub_gid}_{sub_gid}) with reliable QoS, synchronous publish mode, and data sharing off. Discovery callbacks usebuffer_alive_flag_to prevent use-after-free.acceptable_buffer_backends(NULL/empty/"cpu": CPU-only;"any": all; specific names: filtered). Creates per-publisher DataReaders withBufferDataReaderListenerthat triggersbuffer_data_guard_. Topic reuse: In single-process scenarios, useslookup_topicdescription()to reuse existing topics (owns_topic = false).rmw_publish.cpp): Iterates per-subscriber endpoints, callscdr_serialize_with_endpoint(). Conditionally publishes to the main DataWriter only when non-buffer-aware subscribers exist, avoiding unnecessary CPU conversion.rmw_take.cpp): Iterates per-publisher DataReaders, takes raw CDR bytes and callscdr_deserialize_with_endpoint(). Falls back to main DataReader when no buffer endpoint data is available. Guard condition re-arm: Re-setsbuffer_data_guard_when remaining data exists.rmw_zenoh_cpp: (1) acquirebuffer_mutex_to check duplicates and mark pending; (2) call DDS create/delete APIs without holding the lock; (3) re-acquire to store endpoints. Destroy functions move endpoints to local lists under lock and clean up DDS resources after releasing.Is this user-facing behavior change?
This pull request does not change existing
rmw_fastrtpsbehavior for standard messages. For messages withuint8[]fields, the per-endpoint transport is transparent -- publishers and subscribers share backend info via DDSuser_data, and CPU fallback ensures correctness when backends are incompatible.Did you use Generative AI?
Yes. Claude (claude-4.6-opus) via Cursor was used to assist with creating an initial prototype version of the changes contained in this PR.
Additional Information
This PR is part of the broader ROS 2 native buffer feature introduced in this post.