Skip to content

Add support for rosidl::Buffer-aware per-endpoint pub/sub#867

Draft
nvcyc wants to merge 3 commits intorollingfrom
native_buffer
Draft

Add support for rosidl::Buffer-aware per-endpoint pub/sub#867
nvcyc wants to merge 3 commits intorollingfrom
native_buffer

Conversation

@nvcyc
Copy link

@nvcyc nvcyc commented Mar 16, 2026

Description

This pull request adds full rosidl::Buffer support to rmw_fastrtps_cpp, enabling per-endpoint DDS DataWriters/DataReaders for zero-copy buffer transport between compatible backends. Backend discovery uses DDS user_data QoS to advertise and detect supported backends.

This pull request modifies the following packages:

rmw_fastrtps_shared_cpp -- shared types and QoS

  • custom_participant_info.hpp: Added BufferDiscoveryCallback type and buffer_discovery_cb_ field for participant-level buffer discovery.
  • custom_publisher_info.hpp: Added BufferPublisherEndpoint struct and buffer-related fields (is_buffer_aware_, backend_aux_info_, local_endpoint_info_, buffer_mutex_, buffer_endpoints_, pending_buffer_endpoints_, buffer_alive_flag_, participant_, dds_publisher_).
  • custom_subscriber_info.hpp: Added BufferSubscriptionEndpoint struct (with owns_topic flag for single-process topic reuse) and buffer-related fields. Added RMWSubscriptionEvent::notify_buffer_data_available().
  • qos.hpp/qos.cpp: Added optional buffer_backends parameter to QoS functions; added encode_buffer_backends_for_user_data() / parse_buffer_backends_from_user_data() (format: BUFBE:name=aux;name2=aux2).
  • rmw_wait.cpp: Extended WaitSet to also check buffer_data_guard_; wakes subscription when buffer data is available.

rmw_fastrtps_cpp -- buffer-aware per-endpoint pub/sub

  • Backend lifecycle: Calls initialize_buffer_backends() / shutdown_buffer_backends() during rmw_init/rmw_shutdown.
  • buffer_endpoint_registry: Per-process singleton mapping topic names to discovery callbacks; publishers register subscriber-discovery callbacks and vice versa. Maintains known endpoints so late-registering callbacks see already-discovered peers.
  • Publisher creation: Detects buffer-aware types via callbacks->has_buffer_fields; encodes backend names in DDS user_data with "cpu" always present. Registers subscriber-discovery callbacks that create per-subscriber DataWriters on unique topics ({topic}/_buf/{pub_gid}_{sub_gid}) with reliable QoS, synchronous publish mode, and data sharing off. Discovery callbacks use buffer_alive_flag_ to prevent use-after-free.
  • Subscription creation: Parses acceptable_buffer_backends (NULL/empty/"cpu": CPU-only; "any": all; specific names: filtered). Creates per-publisher DataReaders with BufferDataReaderListener that triggers buffer_data_guard_. Topic reuse: In single-process scenarios, uses lookup_topicdescription() to reuse existing topics (owns_topic = false).
  • Buffer-aware publish (rmw_publish.cpp): Iterates per-subscriber endpoints, calls cdr_serialize_with_endpoint(). Conditionally publishes to the main DataWriter only when non-buffer-aware subscribers exist, avoiding unnecessary CPU conversion.
  • Buffer-aware take (rmw_take.cpp): Iterates per-publisher DataReaders, takes raw CDR bytes and calls cdr_deserialize_with_endpoint(). Falls back to main DataReader when no buffer endpoint data is available. Guard condition re-arm: Re-sets buffer_data_guard_ when remaining data exists.
  • Lock-ordering safety: Both publisher and subscriber discovery use a three-phase pattern matching rmw_zenoh_cpp: (1) acquire buffer_mutex_ to check duplicates and mark pending; (2) call DDS create/delete APIs without holding the lock; (3) re-acquire to store endpoints. Destroy functions move endpoints to local lists under lock and clean up DDS resources after releasing.

Is this user-facing behavior change?

This pull request does not change existing rmw_fastrtps behavior for standard messages. For messages with uint8[] fields, the per-endpoint transport is transparent -- publishers and subscribers share backend info via DDS user_data, and CPU fallback ensures correctness when backends are incompatible.

Did you use Generative AI?

Yes. Claude (claude-4.6-opus) via Cursor was used to assist with creating an initial prototype version of the changes contained in this PR.

Additional Information

This PR is part of the broader ROS 2 native buffer feature introduced in this post.

Signed-off-by: CY Chen <cyc@nvidia.com>
Copy link

@hidmic hidmic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass. Another PR that needs many 👀 .

Comment on lines +28 to +31
bool gid_equal(const rmw_gid_t & a, const rmw_gid_t & b)
{
return std::memcmp(a.data, b.data, RMW_GID_STORAGE_SIZE) == 0;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still stands.


for (const auto & endpoint : info->buffer_endpoints_) {
uint32_t serialized_size = callbacks->get_serialized_size(ros_message);
size_t buffer_size = serialized_size + 4096;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nvcyc mind to explain those extra 4KB?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still needs an explanation.

Comment on lines +147 to +156
auto gid_to_hex = [](const rmw_gid_t & gid, size_t bytes = 8) -> std::string {
static const char hex_chars[] = "0123456789abcdef";
std::string result;
result.reserve(bytes * 2);
for (size_t i = 0; i < bytes && i < RMW_GID_STORAGE_SIZE; ++i) {
result += hex_chars[(gid.data[i] >> 4) & 0xF];
result += hex_chars[gid.data[i] & 0xF];
}
return result;
};
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nvcyc nit: considering making this a free function.

sub_info.gid.data, RMW_GID_STORAGE_SIZE);

eprosima::fastdds::dds::TopicQos topic_qos =
info->participant_->get_default_topic_qos();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nvcyc meta: we should notify the user somehow about the actual QoS profile used for buffer aware pub/sub, or fallback to CPU only, or something.

const_cast<void **>(data_values.buffer())[0] = &data;
eprosima::fastdds::dds::SampleInfoSeq info_seq{1};

auto ret_code = endpoint->data_reader->take(data_values, info_seq, 1);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nvcyc meta: how do we enforce that we don't starve some endpoints? This implicitly prioritizes endpoints that were discovered earlier than others.

if (subscription_options->acceptable_buffer_backends &&
strlen(subscription_options->acceptable_buffer_backends) > 0)
{
std::string acceptable(subscription_options->acceptable_buffer_backends);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nvcyc there's rcpputils::split() for this.

if (backends.empty()) {
return {};
}
std::string result = BUFFER_BACKEND_SENTINEL;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nvcyc fyi: there's std::stringstream.

@nvcyc nvcyc marked this pull request as draft March 17, 2026 03:15
"rmw_fastrtps_cpp",
"BufferEndpointRegistry: firing %zu publisher callback(s) for discovered subscriber on '%s'",
callbacks.size(), info.topic_name.c_str());
for (const auto & cb : callbacks) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks safe from deadlock risk.

"rmw_fastrtps_cpp",
"BufferEndpointRegistry: firing %zu subscriber callback(s) for discovered publisher on '%s'",
callbacks.size(), info.topic_name.c_str());
for (const auto & cb : callbacks) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one looks safe too.

Signed-off-by: CY Chen <cyc@nvidia.com>
Signed-off-by: CY Chen <cyc@nvidia.com>
Copy link

@hidmic hidmic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like things are in flight. @MiguelCompany @richiware we need your 👀 here.

Comment on lines +28 to +31
bool gid_equal(const rmw_gid_t & a, const rmw_gid_t & b)
{
return std::memcmp(a.data, b.data, RMW_GID_STORAGE_SIZE) == 0;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still stands.


for (const auto & endpoint : info->buffer_endpoints_) {
uint32_t serialized_size = callbacks->get_serialized_size(ros_message);
size_t buffer_size = serialized_size + 4096;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still needs an explanation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants