From a5b25a140ec21c2214c908e5ca9ca9272727c869 Mon Sep 17 00:00:00 2001 From: William Emfinger Date: Mon, 29 Jun 2026 16:47:33 -0500 Subject: [PATCH 1/2] feat(rtps): Improve implementation to talk to FastRTPS and implement HEARTBEAT/ACKNACK --- .../esp32_p4_function_ev_board_example.cpp | 5 +- components/rtps/RELIABLE_RTPS_PLAN.md | 259 +-- components/rtps/include/rtps.hpp | 158 +- components/rtps/src/rtps.cpp | 1466 ++++++++++++++--- pc/tests/rtps_subscriber.cpp | 2 + 5 files changed, 1568 insertions(+), 322 deletions(-) diff --git a/components/esp32-p4-function-ev-board/example/main/esp32_p4_function_ev_board_example.cpp b/components/esp32-p4-function-ev-board/example/main/esp32_p4_function_ev_board_example.cpp index fb0f1c7bd..5c3216dde 100644 --- a/components/esp32-p4-function-ev-board/example/main/esp32_p4_function_ev_board_example.cpp +++ b/components/esp32-p4-function-ev-board/example/main/esp32_p4_function_ev_board_example.cpp @@ -406,8 +406,8 @@ extern "C" void app_main(void) { const std::string rtps_type = "std_msgs::msg::dds_::UInt32_"; uint32_t value = 0; bool published = false; - static constexpr auto loop_tick = 20ms; // RTPS loop tick - static constexpr int64_t publish_period_us = 500'000; // 2 Hz RTPS publish + static constexpr auto loop_tick = 20ms; // RTPS loop tick + static constexpr int64_t publish_period_us = 50'000'000; int64_t last_publish_us = 0; while (true) { @@ -438,6 +438,7 @@ extern "C" void app_main(void) { logger.info("discovered {} '{}'", endpoint.is_reader ? "reader" : "writer", endpoint.topic_name); }, + .log_level = espp::Logger::Verbosity::DEBUG, }); participant->add_writer({ .topic_name = topic, diff --git a/components/rtps/RELIABLE_RTPS_PLAN.md b/components/rtps/RELIABLE_RTPS_PLAN.md index 1205a05db..490e61bc6 100644 --- a/components/rtps/RELIABLE_RTPS_PLAN.md +++ b/components/rtps/RELIABLE_RTPS_PLAN.md @@ -1,101 +1,172 @@ -# Plan: Reliable RTPS (HEARTBEAT / ACKNACK) — separate PR +# Design: Reliable RTPS (HEARTBEAT / ACKNACK) for user data -Status: **planned, not started.** This is a fast-follow to the best-effort -CDR-over-RTPS data path and should land as its own PR after that work is merged -and tested. +Status: **in progress.** Phase 0 (submessage codecs + dispatch seam), Phase 1 +(reliable writer: history cache + HEARTBEAT emission), and Phase 2 (reliable +reader: dedup + in-order delivery + ACKNACK generation) are implemented; Phase 3 +(writer-side retransmission on ACKNACK) follows as a separate reviewable PR. +Best-effort behavior is unchanged for endpoints that advertise `BEST_EFFORT`. + +With Phases 1–2, an espp reliable reader already recovers lost samples from a +**DDS/ROS 2 reliable writer** (which retransmits on our ACKNACK). espp↔espp +recovery needs Phase 3 (our writer answering NACKs). ## Goal -Add interoperable RELIABLE delivery for user data: a stateful writer that -retains history and retransmits, and a stateful reader that detects gaps and -requests retransmission. Best-effort behavior must remain unchanged for -endpoints that advertise `BEST_EFFORT`. - -## Why this is a bigger change - -The current user-data path is **stateless** — `publish()` builds a `DATA` -message and fires it once; the reader dispatches whatever arrives. Reliability -requires **per-endpoint state machines** and a **writer history cache**, plus -new submessages and timers. The reliable handshake only runs between endpoints -that both advertise `RELIABLE` (discovered via SEDP). - -## What Tier 1 already gives us - -- Per-writer monotonic sequence numbers (`next_user_data_sequence_number`). -- Writer-GUID-based receive routing and discovered-endpoint reliability flags. -- Standard `DATA` submessage build/parse and the CDR payload path. -- `parse_data_submessage` already honors endianness and skips inline QoS. - -## Work breakdown - -### 1. Submessage codecs (~1 day) -- `HEARTBEAT` (0x07): `{readerId, writerId, firstSN, lastSN, count}` + flags - (Final `F`, Liveliness `L`). -- `ACKNACK` (0x06): `{readerId, writerId, readerSNState, count}` + Final flag. -- `GAP` (0x08): `{readerId, writerId, gapStart, gapList}` (can defer to a - follow-up; needed for irrelevant/removed samples). -- **`SequenceNumberSet`** encoding/parsing: `{bitmapBase (SN), numBits (u32), - bitmap (ceil(numBits/32) u32 words)}`. This is the fiddliest piece — unit-test - it in isolation (round-trip + boundary cases: empty set, 256-bit max, base - alignment). -- Wire each new submessage through `Message::serialize` / `Message::parse` - (they already carry arbitrary submessages; only the payload codecs are new). - -### 2. Reliable writer (~2–3 days) -- **History cache**: retain `(SN -> serialized DATA)` until acked by all matched - reliable readers; cap by a configurable depth (KEEP_LAST) and drop oldest. -- **Periodic HEARTBEAT** task (reuse the `Task` pattern): announce - `(firstSN, lastSN)` with an incrementing count to each matched reliable - reader's unicast locator. -- **ReaderProxy** per matched reliable reader: highest contiguous acked SN + - requested (nacked) set. -- **On ACKNACK**: advance the acked watermark, resend the nacked SNs still in - history, dedup by `count`. -- Send a final HEARTBEAT (F flag) after a burst to prompt a prompt ACKNACK. - -### 3. Reliable reader (~2–3 days) -- **WriterProxy** per matched reliable writer: highest contiguous received SN + - missing set + last heartbeat count. -- **On HEARTBEAT**: compute missing SNs in `(firstSN, lastSN)`, reply with an - `ACKNACK` carrying the `SequenceNumberSet` of missing SNs (or an ack-all when - caught up); honor the F/L flags and dedup by count. -- **In-order delivery**: buffer out-of-order samples and release to `on_sample` - in SN order; bound the reorder buffer. -- Send ACKNACK to the writer's unicast locator (from discovery). - -### 4. Glue, timers, safety (~1–2 days) -- Run the handshake only when both endpoints advertise `RELIABLE`. -- Heartbeat period / nack-response / retransmit timers via `Task`; jitter to - avoid sync storms. -- Thread-safety: the history cache and proxy maps are touched by the publish - path, the receive path, and timer tasks — guard with a dedicated mutex - (distinct from `mutex_`/`receivers_mutex_`; document lock ordering). -- Remove the "reliable not implemented; sending best-effort" downgrade warnings - in `publish()` and `handle_user_message`. - -### 5. Interop validation (~2–3 days, often dominates) -- Round-trip against another espp participant first. -- Then Fast DDS / ROS 2 with Wireshark: confirm HEARTBEAT/ACKNACK exchange, - `SequenceNumberSet` bitmaps, retransmission, and that dropped packets recover. -- Test packet loss explicitly (drop a percentage in a test transport or via - `tc netem`). - -## Estimate - -- **Minimal happy-path** (periodic heartbeat, retransmit-on-any-nack, - ack-up-to-lastSN, no GAP): ~3–5 days. -- **Robust** (proper bitmap nacks, GAP, counts, reorder buffer, edge cases): - ~1.5–2.5 weeks including interop debugging. - -## Suggested PR sequencing - -1. Submessage codecs + `SequenceNumberSet` with unit tests (item 1) — small, - reviewable, no behavior change. -2. Reliable writer (item 2). -3. Reliable reader (item 3) + glue (item 4) + interop validation (item 5). +Add interoperable `RELIABLE` delivery for user data: a stateful writer that +retains a history of sent samples and retransmits on request, and a stateful +reader that detects gaps and requests retransmission. The reliable handshake +runs only between endpoints that *both* advertise `RELIABLE` (discovered via +SEDP). DDS/ROS 2 interop is a first-class requirement. + +## Background — what exists today + +The user-data path is **stateless** (see `components/rtps/src/rtps.cpp`): + +- `publish()` (rtps.cpp:1150) builds one `DATA` submessage with a per-writer + monotonic sequence number (`next_user_data_sequence_number`) and sends it once + to each destination. There is no history cache and no retransmit. +- `handle_user_message()` (rtps.cpp:1427) parses `DATA`, resolves the topic via + the remote writer GUID, and invokes matching reader callbacks. For a reliable + writer it logs *"ACKNACK/HEARTBEAT is not implemented yet"* (1473) and delivers + anyway. There is no dedup, ordering, or per-writer receive state. +- `Message::parse` (rtps.cpp:793) decodes the generic submessage frame + (kind/flags/length/payload); only `DATA` is interpreted. `HEARTBEAT`/`ACKNACK` + exist in `SubmessageKind` (rtps.hpp:124) but are never parsed or generated. + `INFO_DST` is never emitted. + +What we can build on: + +- Per-writer monotonic sequence numbers and writer-GUID-based receive routing. +- `EndpointProxy` (rtps.hpp:202) already carries the remote `guid`, + `reliability`, `unicast_locator`, and `multicast_locators`, and + `ParticipantProxy` (rtps.hpp:191) carries the remote `address` + `ports`, so + the addressing data needed for stateful matching already exists in discovery. +- `parse_data_submessage` honors submessage endianness (the `E` flag) and skips + inline QoS — the new parsers mirror that. + +## Wire formats + +All submessages we emit are little-endian (`E` flag set), consistent with the +existing `DATA` path. Parsers honor the `E` flag for the submessage body. + +### SequenceNumber (8 bytes) +`high : int32` then `low : uint32` (each in submessage endianness). Already +handled by `ByteWriter::append_sequence_number_le` / `ByteReader::read_sequence_number`. + +### SequenceNumberSet +`bitmapBase : SequenceNumber (8)`, `numBits : uint32 (4)`, +`bitmap : uint32[ceil(numBits/32)]`. Bit *i* (MSB-first within each word) set ⇒ +`bitmapBase + i` is requested/missing. `numBits` is 0..256. This is the fiddliest +piece — unit-tested in isolation (empty set, single bit, 256-bit max, base +alignment, word boundaries). + +### HEARTBEAT (0x07) +Body: `readerId(4) writerId(4) firstSN(8) lastSN(8) count : uint32(4)`. +Flags: `E` (endian, bit 0), `F` (final — no response required, bit 1), +`L` (liveliness, bit 2). `firstSN..lastSN` is the inclusive range of sequence +numbers currently available in the writer's history. `count` is monotonic per +writer (stale-heartbeat detection). + +### ACKNACK (0x06) +Body: `readerId(4) writerId(4) readerSNState : SequenceNumberSet count : uint32(4)`. +Flags: `E`, `F` (final). `readerSNState.bitmapBase` is the lowest sequence number +the reader still needs; the bitmap marks which of `[base, base+numBits)` are +missing. An empty set with `base = lastSN + 1` is a positive ack of everything. +`count` is monotonic per reader. + +### INFO_DST (0x0e) +Body: `guidPrefix(12)`. Prefixes a directed HEARTBEAT/ACKNACK so the receiving +participant routes it to the right entity. Required for robust DDS interop. + +## Architecture + +### Writer side (`WriterReliableState`, per local reliable writer) +- `history : std::map>` — CDR payloads keyed by SN, + capped to a KEEP_LAST depth (drop oldest, advance `first_sn`). +- `first_sn`, `last_sn`, `heartbeat_count`. +- Per matched reliable reader: highest acked SN (`ReaderProxy`), to know when a + sample can be purged and whether a heartbeat still needs a response. +- On `publish()`: store sample → send `DATA` → send a (non-final) `HEARTBEAT` + (INFO_DST + HEARTBEAT) to each matched reliable reader's unicast locator. +- Periodic `HEARTBEAT` (extend `announce_task_` or a dedicated heartbeat task, + with jitter) while any reader has unacked samples. +- On `ACKNACK`: resend the requested SNs still in history to the requesting + reader's locator; advance that reader's acked watermark (`base - 1`). + +### Reader side (`ReaderReliableState`, per (local reader, remote writer GUID)) +- Highest-contiguous received SN + a bounded out-of-order/reorder set, + `last_heartbeat_count`, `acknack_count`. +- On `DATA` from a reliable writer: record the SN, **dedup**, deliver in SN order + (bounded reorder buffer). +- On `HEARTBEAT`: compute missing SNs in `[firstSN, lastSN]`, reply with `ACKNACK` + (INFO_DST + ACKNACK) addressed to the writer's user-unicast endpoint (resolved + from `discovered_writers_ → participant_guid → ParticipantProxy.address/ports.user_unicast`, + **not** the raw `Socket::Info` source port). Respond to non-final heartbeats + even when caught up so the writer can stop repeating / purge. + +### Addressing +- Writer → reader (HEARTBEAT, DATA resend): `EndpointProxy.unicast_locator` of the + matched remote reader. +- Reader → writer (ACKNACK): the writer's participant user-unicast address+port. +- Both are sent on the existing `user_unicast_receiver_` socket. Phase 1–3 are + unicast-only; reliable-over-multicast is a later extension. + +### Concurrency +The history cache and the reader/writer proxy maps are touched by the publish +path (app thread), the receive path (one or more receive tasks), and the +heartbeat/retransmit timer. Guard them with a dedicated `reliable_mutex_` +(distinct from `mutex_` / `receivers_mutex_` / `sequence_mutex_`); never hold it +nested under `mutex_`. Document the lock ordering as: `mutex_` (discovery state) +is taken to snapshot endpoints, released, then `reliable_mutex_` for reliable +state — matching the existing snapshot-then-act pattern in +`build_user_send_configs`. + +### Config additions +- `WriterConfig.history_depth` (KEEP_LAST, default e.g. 16); replaces the + `kHistoryKeepLast = 0` placeholder (rtps.cpp:54) and feeds the SEDP history QoS. +- `Config.heartbeat_period` (default ~200 ms) and `Config.reliable_reorder_depth`. + +## Phase plan + +- **Phase 0 — submessage codecs + dispatch seam** *(this PR)*. Add the + `SequenceNumberSet` codec, `HEARTBEAT`/`ACKNACK`/`INFO_DST` build + parse + helpers (internal, endian-aware), and dispatch `HEARTBEAT`/`ACKNACK` in + `handle_user_message` to `handle_heartbeat_submessage` / `handle_acknack_submessage` + handlers (initially logging at debug). No behavior change for best-effort; the + reliable downgrade warning stays until Phase 2. Round-trip tests for the codecs. +- **Phase 1 — reliable writer** *(done)*: `WriterReliableState` history cache + (keyed by SN, bounded by `WriterConfig.history_depth`), sample cached on + `publish()`, and HEARTBEAT (INFO_DST + HEARTBEAT) emitted after publish and + periodically (`Config.heartbeat_period`, `heartbeat_task_`) to each matched + reliable reader's unicast locator. SEDP now advertises the real history depth. + Guarded by `reliable_mutex_`. The ACKNACK-driven retransmission response is + Phase 3. +- **Phase 2 — reliable reader** *(done)*: `ReaderReliableState` keyed by + "#"; `deliver_reliable_sample()` dedups and delivers + in order via a bounded reorder buffer (`Config.reliable_reorder_depth`); + `send_acknack_for_heartbeat()` replies to a HEARTBEAT with an `INFO_DST + + ACKNACK` carrying the `SequenceNumberSet` of missing SNs (or a positive ack), + addressed to the writer's unicast locator (fallback: participant user-unicast). + Handles writer-purged gaps (advance past lost SNs) and stale heartbeats. + Reliable handshake runs only when both endpoints advertise RELIABLE; the + downgrade warning is removed. +- **Phase 3 — writer retransmission**: resend NACKed SNs from history on ACKNACK; + per-reader acked watermark + purge. +- **Phase 4 — hardening**: initial heartbeat on reader match, stale-count + rejection, jittered timers, `stop()` cleanup, edge cases. +- **Phase 5 — interop validation**: espp↔espp loopback with induced drops, then + Fast DDS / Cyclone / ROS 2 reliable with Wireshark; explicit packet-loss tests. + +## Design decisions +- **History model**: KEEP_LAST depth per writer (bounded memory on embedded), not + KEEP_ALL, by default. +- **Delivery**: dedup + monotonic in-order with a small bounded reorder buffer. +- **INFO_DST**: emitted from the start (needed for DDS interop). +- **Endianness**: emit little-endian; parse honoring the `E` flag. +- **Scope**: unicast reliable first. ## Out of scope (later) - -- Durability beyond VOLATILE (TRANSIENT_LOCAL history replay to late-joiners). -- Full QoS matching/incompatibility reporting. +- Durability beyond VOLATILE (TRANSIENT_LOCAL replay to late joiners). +- `GAP` submessage (irrelevant/removed samples). - Fragmentation (`DATA_FRAG`) for samples larger than the MTU. +- Full QoS-incompatibility reporting. diff --git a/components/rtps/include/rtps.hpp b/components/rtps/include/rtps.hpp index 4fcdddb86..3df8eeca4 100644 --- a/components/rtps/include/rtps.hpp +++ b/components/rtps/include/rtps.hpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -170,6 +171,10 @@ class RtpsParticipant : public BaseComponent { std::string multicast_group{}; ///< Optional multicast group advertised for this writer and used ///< by `publish()` when set. uint32_t entity_index{0}; ///< Local entity slot used to derive the RTPS entity ID. + uint32_t history_depth{16}; ///< KEEP_LAST history depth advertised through SEDP and used to + ///< bound the reliable-writer history cache (number of recent + ///< samples retained for retransmission). Only used when + ///< reliability is RELIABLE. }; /// @brief Configuration for a locally advertised reader endpoint. @@ -196,6 +201,10 @@ class RtpsParticipant : public BaseComponent { std::string address{}; ///< Preferred remote IPv4 address for user traffic. PortMapping ports{}; ///< Remote participant port mapping derived from discovery data. uint32_t builtin_endpoints{0}; ///< Remote builtin-endpoint bitmask from SPDP. + Locator metatraffic_unicast_locator{}; ///< Full metatraffic unicast locator (address + port). + Locator metatraffic_multicast_locator{}; ///< Full metatraffic multicast locator. + Locator default_unicast_locator{}; ///< Full default (user-data) unicast locator. + Locator default_multicast_locator{}; ///< Full default (user-data) multicast locator. }; /// @brief Cached information about a discovered remote reader or writer endpoint. @@ -233,6 +242,14 @@ class RtpsParticipant : public BaseComponent { .name = "RtpsAnnounce", .stack_size_bytes = 6 * 1024}; ///< Task configuration for periodic discovery announcements. std::chrono::milliseconds announce_period{1000}; ///< Interval between periodic SPDP/SEDP sends. + Task::BaseConfig heartbeat_task_config{ + .name = "RtpsHeartbeat", .stack_size_bytes = 6 * 1024}; ///< Task configuration for periodic + ///< reliable-writer heartbeats. + std::chrono::milliseconds heartbeat_period{200}; ///< Interval between periodic HEARTBEATs sent + ///< for reliable writers with cached samples. + uint32_t reliable_reorder_depth{ + 32}; ///< Max number of out-of-order samples buffered per reliable reader for in-order + ///< delivery. Samples beyond this are dropped and re-requested via ACKNACK. std::string enclave{"/"}; ///< User-data enclave string advertised in SPDP. std::function on_participant_discovered{ nullptr}; ///< Callback invoked when a remote participant is first discovered. @@ -363,8 +380,18 @@ class RtpsParticipant : public BaseComponent { bool handle_metatraffic_message(std::vector &data, const Socket::Info &sender); bool handle_user_message(std::vector &data, const Socket::Info &sender); bool ensure_user_multicast_receivers_started(const std::string &extra_group = {}); - std::vector - build_user_send_configs(std::string_view topic_name, const WriterConfig &writer_config) const; + + /// @brief A user-data send destination plus the RTPS reader entity it targets. + /// @details For unicast sends to a specific matched reader, reader_id is that + /// reader's entity id so the DATA submessage addresses it directly; + /// for multicast sends (which target all matched readers) it is left + /// as ENTITYID_UNKNOWN. + struct UserDataDestination { + UdpSocket::SendConfig send_config{}; + EntityId reader_id{}; + }; + std::vector build_user_send_configs(std::string_view topic_name, + const WriterConfig &writer_config) const; int64_t next_spdp_sequence_number() const; int64_t next_sedp_publication_sequence_number() const; int64_t next_sedp_subscription_sequence_number() const; @@ -373,6 +400,114 @@ class RtpsParticipant : public BaseComponent { bool send_sedp_announcements_to(const ParticipantProxy &participant); bool send_discovery_now(); + // SEDP serialized-payload builders (the parameter list of a publication/subscription sample, + // without the DATA submessage framing) so the same sample can be re-sent under a stable sequence + // number and retransmitted on ACKNACK. + std::vector build_sedp_publication_payload(const WriterConfig &writer_config) const; + std::vector build_sedp_subscription_payload(const ReaderConfig &reader_config) const; + // Build an INFO_DST + DATA message directed at a specific reader (used for both user-data and + // SEDP retransmission). + std::vector + build_directed_data_message(const GuidPrefix &dest_prefix, EntityId reader_id, EntityId writer_id, + int64_t sequence_number, + std::span serialized_payload) const; + void send_sedp_heartbeats_to(const std::string &dest_address, uint16_t dest_port, + const GuidPrefix &dest_prefix, size_t writer_count, + size_t reader_count); + // Writer-side ACKNACK response: resend the requested sequence numbers to the requesting reader. + void retransmit_user_data(const GuidPrefix &reader_prefix, EntityId reader_id, EntityId writer_id, + const std::vector &requested_sequence_numbers); + void retransmit_sedp(const GuidPrefix &reader_prefix, EntityId reader_id, EntityId writer_id, + const std::vector &requested_sequence_numbers); + + /// @brief Per-writer reliable-QoS state: the history of recently-sent samples + /// (keyed by sequence number) plus heartbeat bookkeeping. Guarded by + /// reliable_mutex_. + struct WriterReliableState { + std::map> + history{}; ///< Cached CDR payloads keyed by sequence number. + int64_t last_sequence_number{0}; ///< Highest sequence number written so far (0 = none). + uint32_t heartbeat_count{0}; ///< Monotonic HEARTBEAT count for this writer. + }; + + /// @brief Per-(local reader, remote writer) reliable-QoS receive state used for + /// duplicate suppression, in-order delivery, and ACKNACK generation. + /// Guarded by reliable_mutex_. + struct ReaderReliableState { + int64_t highest_delivered{0}; ///< Highest sequence number delivered in order (0 = none). + std::map> + reorder{}; ///< Out-of-order samples awaiting their predecessors. + uint32_t last_heartbeat_count{0}; ///< Highest HEARTBEAT count seen (stale-heartbeat detection). + uint32_t acknack_count{0}; ///< Monotonic ACKNACK count emitted by this reader. + }; + + /// @brief Hash functor for using a Guid as an unordered_map key. + struct GuidHash { + size_t operator()(const Guid &guid) const; + }; + + /// @brief Owns the discovered-participant and discovered-endpoint records and + /// the lock that guards them. + /// @details Identity is the GUID. Updates *merge* the incoming fields into the + /// existing record (the caller's `apply` mutator sets only the fields + /// actually present in the received message), so a later announcement + /// that omits a locator/QoS/name does not erase previously-learned + /// values. Centralizing storage + merge + locking here keeps the + /// participant's discovery bookkeeping in one place (and is the + /// natural home for future lease-based expiry). + class DiscoveryDb { + public: + /// @brief Result of an upsert: whether the record was newly created and a + /// snapshot of the merged record. + template struct UpsertResult { + bool is_new{false}; + Proxy value{}; + }; + + UpsertResult + upsert_participant(const Guid &participant_guid, + const std::function &apply); + UpsertResult upsert_endpoint(bool is_reader, const Guid &endpoint_guid, + const std::function &apply); + + std::optional find_participant_by_prefix(const GuidPrefix &prefix) const; + std::optional find_writer(const Guid &guid) const; + + std::vector participants() const; + std::vector writers() const; + std::vector readers() const; + + void clear(); + + private: + mutable std::mutex mutex_; + std::unordered_map participants_; + std::unordered_map writers_; + std::unordered_map readers_; + }; + + std::vector build_data_message_with_sequence_number(const WriterConfig &writer_config, + std::span cdr_payload, + int64_t sequence_number, + EntityId reader_id) const; + void store_reliable_sample(const WriterConfig &writer_config, int64_t sequence_number, + std::span cdr_payload); + bool send_heartbeat_for_writer(const WriterConfig &writer_config); + bool send_heartbeats_now(); + void deliver_reliable_sample(uint32_t reader_entity_index, const Guid &writer_guid, + int64_t sequence_number, std::span payload, + const std::function)> &on_sample); + void send_acknack_for_heartbeat(const GuidPrefix &writer_prefix, const EntityId &writer_id, + int64_t first_sn, int64_t last_sn, uint32_t heartbeat_count, + bool heartbeat_final); + // Builtin (SPDP/SEDP) reliability: track received discovery samples and ACKNACK the heartbeats a + // reliable peer (e.g. Fast DDS) sends for its builtin SEDP writers, so it (re)sends us the + // endpoint discovery data we need to match it. + void record_builtin_sample(const Guid &writer_guid, int64_t sequence_number); + void send_builtin_acknack(const GuidPrefix &writer_prefix, const EntityId &writer_id, + int64_t first_sn, int64_t last_sn, uint32_t heartbeat_count, + bool heartbeat_final); + Config config_; GuidPrefix guid_prefix_{}; std::atomic_bool started_{false}; @@ -382,6 +517,7 @@ class RtpsParticipant : public BaseComponent { std::vector user_multicast_receivers_; std::unique_ptr user_unicast_receiver_; std::unique_ptr announce_task_; + std::unique_ptr heartbeat_task_; mutable std::mutex mutex_; mutable std::mutex receivers_mutex_; ///< Guards user_multicast_receivers_ against concurrent @@ -391,10 +527,22 @@ class RtpsParticipant : public BaseComponent { mutable std::atomic sedp_publications_sequence_number_{1}; mutable std::atomic sedp_subscriptions_sequence_number_{1}; mutable std::unordered_map user_data_sequence_numbers_; + mutable std::mutex reliable_mutex_; ///< Guards writer_reliable_states_ against concurrent + ///< publish()/heartbeat/receive access. + std::unordered_map + writer_reliable_states_; ///< Reliable-QoS state keyed by writer entity_index. + std::unordered_map + reader_reliable_states_; ///< Reliable-QoS receive state keyed by "#". + std::unordered_map + builtin_reader_states_; ///< Received-SEDP-sample state keyed by remote builtin writer GUID, + ///< used to ACKNACK a reliable peer's discovery heartbeats. + mutable std::atomic sedp_pub_heartbeat_count_{0}; ///< HEARTBEAT count for our builtin + ///< SEDP publications writer. + mutable std::atomic sedp_sub_heartbeat_count_{0}; ///< HEARTBEAT count for our builtin + ///< SEDP subscriptions writer. std::vector writers_; std::vector readers_; - std::vector discovered_participants_; - std::vector discovered_writers_; - std::vector discovered_readers_; + DiscoveryDb discovery_; ///< Discovered participants + endpoints (owns its own lock). }; } // namespace espp diff --git a/components/rtps/src/rtps.cpp b/components/rtps/src/rtps.cpp index 171efb49d..d4c90dbc2 100644 --- a/components/rtps/src/rtps.cpp +++ b/components/rtps/src/rtps.cpp @@ -24,6 +24,10 @@ constexpr uint16_t kUserUnicastOffset = 11; constexpr uint8_t kSubmessageFlagLittleEndian = 0x01; constexpr uint8_t kSubmessageFlagInlineQos = 0x02; constexpr uint8_t kSubmessageFlagData = 0x04; +// For HEARTBEAT and ACKNACK submessages, bit 1 is the Final flag and bit 2 is +// the Liveliness flag (the InlineQos/Data bits above are DATA-specific). +constexpr uint8_t kSubmessageFlagFinal = 0x02; +constexpr uint8_t kSubmessageFlagLiveliness = 0x04; constexpr uint16_t kDataSubmessageOctetsToInlineQos = 16; constexpr uint32_t kBuiltinEndpointParticipantAnnouncer = 1u << 0; @@ -37,8 +41,7 @@ constexpr uint32_t kBuiltinEndpointParticipantMessageReader = 1u << 11; constexpr uint32_t kBuiltinEndpointSet = kBuiltinEndpointParticipantAnnouncer | kBuiltinEndpointParticipantDetector | kBuiltinEndpointPublicationAnnouncer | kBuiltinEndpointPublicationDetector | - kBuiltinEndpointSubscriptionAnnouncer | kBuiltinEndpointSubscriptionDetector | - kBuiltinEndpointParticipantMessageWriter | kBuiltinEndpointParticipantMessageReader; + kBuiltinEndpointSubscriptionAnnouncer | kBuiltinEndpointSubscriptionDetector; constexpr std::array kEntityIdUnknown{{0x00, 0x00, 0x00, 0x00}}; constexpr std::array kParticipantEntityId{{0x00, 0x00, 0x01, 0xc1}}; @@ -226,6 +229,10 @@ class ByteReader { return true; } + bool read_u32(uint32_t &value, bool little_endian) { + return little_endian ? read_u32_le(value) : read_u32_be(value); + } + bool read_sequence_number(int64_t &value, bool little_endian) { uint32_t high = 0; uint32_t low = 0; @@ -446,10 +453,10 @@ void append_parameter_liveliness(ByteWriter &writer) { writer.append_u32_le(ntp_fraction_from_nanoseconds(kDefaultLeaseDurationNanoseconds)); } -void append_parameter_history(ByteWriter &writer) { +void append_parameter_history(ByteWriter &writer, uint32_t depth = 1) { append_parameter_header(writer, ParameterId::PID_HISTORY, 8); - writer.append_u32_le(kHistoryKeepLast); - writer.append_u32_le(1); + writer.append_u32_le(kHistoryKeepLast); // history kind: KEEP_LAST + writer.append_u32_le(depth); // history depth } void append_parameter_key_hash(ByteWriter &writer, const espp::RtpsParticipant::Guid &guid) { @@ -599,6 +606,19 @@ bool has_valid_locator(const espp::RtpsParticipant::Locator &locator) { [](uint8_t octet) { return octet != 0; }); } +// Human-readable "kind/address:port" for a locator, for discovery logging. +std::string locator_to_string(const espp::RtpsParticipant::Locator &locator) { + if (locator.kind == espp::RtpsParticipant::Locator::Kind::INVALID) { + return ""; + } + return fmt::format("udpv4/{}:{}", locator.address_string(), locator.port); +} + +// Same as above for an optional locator that may not have been present in a message. +std::string locator_to_string(const std::optional &locator) { + return locator ? locator_to_string(*locator) : ""; +} + std::optional parse_reliability(std::span value) { auto maybe_kind = parse_u32_le(value); @@ -737,6 +757,208 @@ espp::RtpsParticipant::Message build_message(const espp::RtpsParticipant::GuidPr serialized_payload), }}}; } + +// --------------------------------------------------------------------------- +// Reliable QoS submessage codecs (HEARTBEAT / ACKNACK / INFO_DST). See +// RELIABLE_RTPS_PLAN.md. These are the Phase 0 wire-format primitives; they are +// wired into the writer/reader state machines in later phases. +// --------------------------------------------------------------------------- + +// RTPS SequenceNumberSet: a bitmapBase plus up to 256 bits marking which of +// [bitmapBase, bitmapBase + numBits) sequence numbers are present in the set +// (used by ACKNACK to mark the missing/requested sequence numbers). Bit i is the +// (31 - i % 32)-th bit (MSB-first) of word (i / 32). +struct SequenceNumberSet { + static constexpr uint32_t kMaxBits = 256; + int64_t base{1}; ///< bitmapBase: lowest sequence number the set can represent. + uint32_t num_bits{0}; ///< Number of valid bits (0..256). + std::array bitmap{}; ///< Up to 256 bits (8 x uint32), MSB-first within each word. + + uint32_t num_words() const { return (num_bits + 31) / 32; } + + // Mark a sequence number as present in the set (no-op if out of [base, base+256)). + void set(int64_t sequence_number) { + if (sequence_number < base) { + return; + } + int64_t delta = sequence_number - base; + if (delta >= static_cast(kMaxBits)) { + return; + } + auto index = static_cast(delta); + num_bits = std::max(num_bits, index + 1); + bitmap[index / 32] |= (1u << (31 - (index % 32))); + } + + bool contains(int64_t sequence_number) const { + if (sequence_number < base) { + return false; + } + int64_t delta = sequence_number - base; + if (delta >= static_cast(num_bits)) { + return false; + } + auto index = static_cast(delta); + return (bitmap[index / 32] >> (31 - (index % 32))) & 1u; + } +}; + +void append_sequence_number_set(ByteWriter &writer, const SequenceNumberSet &set) { + writer.append_sequence_number_le(set.base); + writer.append_u32_le(set.num_bits); + for (uint32_t word = 0; word < set.num_words(); word++) { + writer.append_u32_le(set.bitmap[word]); + } +} + +bool read_sequence_number_set(ByteReader &reader, bool little_endian, SequenceNumberSet &set) { + set = SequenceNumberSet{}; + if (!reader.read_sequence_number(set.base, little_endian) || + !reader.read_u32(set.num_bits, little_endian)) { + return false; + } + if (set.num_bits > SequenceNumberSet::kMaxBits) { + return false; + } + for (uint32_t word = 0; word < set.num_words(); word++) { + if (!reader.read_u32(set.bitmap[word], little_endian)) { + return false; + } + } + return true; +} + +std::vector build_info_dst_payload(const espp::RtpsParticipant::GuidPrefix &dest_prefix) { + ByteWriter writer; + writer.append_bytes(dest_prefix.value); + return writer.take(); +} + +espp::RtpsParticipant::Submessage +build_info_dst_submessage(const espp::RtpsParticipant::GuidPrefix &dest_prefix) { + return {.kind = espp::RtpsParticipant::SubmessageKind::INFO_DST, + .flags = kSubmessageFlagLittleEndian, + .payload = build_info_dst_payload(dest_prefix)}; +} + +std::vector build_heartbeat_payload(const espp::RtpsParticipant::EntityId &reader_id, + const espp::RtpsParticipant::EntityId &writer_id, + int64_t first_sn, int64_t last_sn, uint32_t count) { + ByteWriter writer; + writer.append_bytes(reader_id.value); + writer.append_bytes(writer_id.value); + writer.append_sequence_number_le(first_sn); + writer.append_sequence_number_le(last_sn); + writer.append_u32_le(count); + return writer.take(); +} + +espp::RtpsParticipant::Submessage +build_heartbeat_submessage(const espp::RtpsParticipant::EntityId &reader_id, + const espp::RtpsParticipant::EntityId &writer_id, int64_t first_sn, + int64_t last_sn, uint32_t count, bool final) { + uint8_t flags = kSubmessageFlagLittleEndian | (final ? kSubmessageFlagFinal : 0); + return {.kind = espp::RtpsParticipant::SubmessageKind::HEARTBEAT, + .flags = flags, + .payload = build_heartbeat_payload(reader_id, writer_id, first_sn, last_sn, count)}; +} + +std::vector build_acknack_payload(const espp::RtpsParticipant::EntityId &reader_id, + const espp::RtpsParticipant::EntityId &writer_id, + const SequenceNumberSet &reader_sn_state, + uint32_t count) { + ByteWriter writer; + writer.append_bytes(reader_id.value); + writer.append_bytes(writer_id.value); + append_sequence_number_set(writer, reader_sn_state); + writer.append_u32_le(count); + return writer.take(); +} + +espp::RtpsParticipant::Submessage +build_acknack_submessage(const espp::RtpsParticipant::EntityId &reader_id, + const espp::RtpsParticipant::EntityId &writer_id, + const SequenceNumberSet &reader_sn_state, uint32_t count, bool final) { + uint8_t flags = kSubmessageFlagLittleEndian | (final ? kSubmessageFlagFinal : 0); + return {.kind = espp::RtpsParticipant::SubmessageKind::ACKNACK, + .flags = flags, + .payload = build_acknack_payload(reader_id, writer_id, reader_sn_state, count)}; +} + +struct HeartbeatView { + espp::RtpsParticipant::EntityId reader_id{}; + espp::RtpsParticipant::EntityId writer_id{}; + int64_t first_sn{0}; + int64_t last_sn{0}; + uint32_t count{0}; + bool final{false}; + bool liveliness{false}; + bool valid{false}; +}; + +HeartbeatView parse_heartbeat_submessage(const espp::RtpsParticipant::Submessage &submessage) { + HeartbeatView view; + if (submessage.kind != espp::RtpsParticipant::SubmessageKind::HEARTBEAT) { + return view; + } + const bool little_endian = (submessage.flags & kSubmessageFlagLittleEndian) != 0; + ByteReader reader(std::span{submessage.payload.data(), submessage.payload.size()}); + if (!reader.read_bytes( + std::span{view.reader_id.value.data(), view.reader_id.value.size()}) || + !reader.read_bytes( + std::span{view.writer_id.value.data(), view.writer_id.value.size()}) || + !reader.read_sequence_number(view.first_sn, little_endian) || + !reader.read_sequence_number(view.last_sn, little_endian) || + !reader.read_u32(view.count, little_endian)) { + return view; + } + view.final = (submessage.flags & kSubmessageFlagFinal) != 0; + view.liveliness = (submessage.flags & kSubmessageFlagLiveliness) != 0; + view.valid = true; + return view; +} + +struct AckNackView { + espp::RtpsParticipant::EntityId reader_id{}; + espp::RtpsParticipant::EntityId writer_id{}; + SequenceNumberSet reader_sn_state{}; + uint32_t count{0}; + bool final{false}; + bool valid{false}; +}; + +AckNackView parse_acknack_submessage(const espp::RtpsParticipant::Submessage &submessage) { + AckNackView view; + if (submessage.kind != espp::RtpsParticipant::SubmessageKind::ACKNACK) { + return view; + } + const bool little_endian = (submessage.flags & kSubmessageFlagLittleEndian) != 0; + ByteReader reader(std::span{submessage.payload.data(), submessage.payload.size()}); + if (!reader.read_bytes( + std::span{view.reader_id.value.data(), view.reader_id.value.size()}) || + !reader.read_bytes( + std::span{view.writer_id.value.data(), view.writer_id.value.size()}) || + !read_sequence_number_set(reader, little_endian, view.reader_sn_state) || + !reader.read_u32(view.count, little_endian)) { + return view; + } + view.final = (submessage.flags & kSubmessageFlagFinal) != 0; + view.valid = true; + return view; +} + +// Return the sequence numbers a reader is requesting (the set bits) from an ACKNACK's reader SN +// state. A positive ack (empty set) yields an empty list. +std::vector requested_sequence_numbers(const SequenceNumberSet &set) { + std::vector sequence_numbers; + for (uint32_t bit = 0; bit < set.num_bits; bit++) { + const int64_t sequence_number = set.base + static_cast(bit); + if (set.contains(sequence_number)) { + sequence_numbers.push_back(sequence_number); + } + } + return sequence_numbers; +} } // namespace namespace espp { @@ -852,6 +1074,13 @@ bool RtpsParticipant::start() { } auto port_mapping = ports(); + logger_.info("RTPS participant {} starting: node '{}', domain {}, pid {}, bind {}, advertised {} " + "| ports meta_mc={} meta_uc={} user_mc={} user_uc={} | meta_mc_group {}", + guid_prefix_.to_string(), config_.node_name, config_.domain_id, + config_.participant_id, config_.bind_address, config_.advertised_address, + port_mapping.metatraffic_multicast, port_mapping.metatraffic_unicast, + port_mapping.user_multicast, port_mapping.user_unicast, + config_.metatraffic_multicast_group); metatraffic_multicast_receiver_ = std::make_unique(UdpSocket::Config{.log_level = config_.socket_log_level}); metatraffic_unicast_receiver_ = @@ -866,6 +1095,7 @@ bool RtpsParticipant::start() { .buffer_size = 4096, .is_multicast_endpoint = true, .multicast_group = config_.metatraffic_multicast_group, + .multicast_interface = config_.bind_address, .on_receive_callback = [this](auto &data, const auto &sender) -> std::optional> { handle_metatraffic_message(data, sender); @@ -933,6 +1163,22 @@ bool RtpsParticipant::start() { }); announce_task_->start(); send_discovery_now(); + + // Periodic HEARTBEAT for reliable writers so matched reliable readers can detect gaps and catch + // up (e.g. late joiners). Does nothing while there are no reliable writers / cached samples. + heartbeat_task_ = Task::make_unique({ + .callback = [this](std::mutex &mutex, std::condition_variable &cv, bool ¬ified) -> bool { + send_heartbeats_now(); + std::unique_lock lock(mutex); + auto stop_requested = + cv.wait_for(lock, config_.heartbeat_period, [¬ified] { return notified; }); + notified = false; + return stop_requested; + }, + .task_config = config_.heartbeat_task_config, + .log_level = get_log_level(), + }); + heartbeat_task_->start(); return true; } @@ -942,6 +1188,10 @@ void RtpsParticipant::stop() { announce_task_->stop(); announce_task_.reset(); } + if (heartbeat_task_) { + heartbeat_task_->stop(); + heartbeat_task_.reset(); + } if (metatraffic_multicast_receiver_) { metatraffic_multicast_receiver_->stop_receiving(); metatraffic_multicast_receiver_.reset(); @@ -963,6 +1213,12 @@ void RtpsParticipant::stop() { user_unicast_receiver_->stop_receiving(); user_unicast_receiver_.reset(); } + { + std::lock_guard lock(reliable_mutex_); + writer_reliable_states_.clear(); + reader_reliable_states_.clear(); + builtin_reader_states_.clear(); + } } bool RtpsParticipant::is_started() const { return started_.load(); } @@ -986,19 +1242,109 @@ bool RtpsParticipant::add_reader(const ReaderConfig &reader_config) { return true; } -std::vector RtpsParticipant::discovered_participants() const { +size_t RtpsParticipant::GuidHash::operator()(const Guid &guid) const { + // FNV-1a over the 12-byte prefix + 4-byte entity id. + uint64_t hash = 1469598103934665603ull; + auto mix = [&hash](uint8_t byte) { + hash ^= byte; + hash *= 1099511628211ull; + }; + for (auto byte : guid.prefix.value) { + mix(byte); + } + for (auto byte : guid.entity_id.value) { + mix(byte); + } + return static_cast(hash); +} + +RtpsParticipant::DiscoveryDb::UpsertResult +RtpsParticipant::DiscoveryDb::upsert_participant( + const Guid &participant_guid, const std::function &apply) { std::lock_guard lock(mutex_); - return discovered_participants_; + auto [iterator, inserted] = participants_.try_emplace(participant_guid); + apply(iterator->second); // merge only the fields present in this announcement + return {.is_new = inserted, .value = iterator->second}; } -std::vector RtpsParticipant::discovered_writers() const { +RtpsParticipant::DiscoveryDb::UpsertResult +RtpsParticipant::DiscoveryDb::upsert_endpoint(bool is_reader, const Guid &endpoint_guid, + const std::function &apply) { std::lock_guard lock(mutex_); - return discovered_writers_; + auto &endpoints = is_reader ? readers_ : writers_; + auto [iterator, inserted] = endpoints.try_emplace(endpoint_guid); + apply(iterator->second); // merge only the fields present in this announcement + return {.is_new = inserted, .value = iterator->second}; } -std::vector RtpsParticipant::discovered_readers() const { +std::optional +RtpsParticipant::DiscoveryDb::find_participant_by_prefix(const GuidPrefix &prefix) const { + std::lock_guard lock(mutex_); + for (const auto &[guid, participant] : participants_) { + if (participant.guid_prefix == prefix) { + return participant; + } + } + return std::nullopt; +} + +std::optional +RtpsParticipant::DiscoveryDb::find_writer(const Guid &guid) const { std::lock_guard lock(mutex_); - return discovered_readers_; + auto iterator = writers_.find(guid); + if (iterator == writers_.end()) { + return std::nullopt; + } + return iterator->second; +} + +std::vector RtpsParticipant::DiscoveryDb::participants() const { + std::lock_guard lock(mutex_); + std::vector result; + result.reserve(participants_.size()); + for (const auto &[guid, participant] : participants_) { + result.push_back(participant); + } + return result; +} + +std::vector RtpsParticipant::DiscoveryDb::writers() const { + std::lock_guard lock(mutex_); + std::vector result; + result.reserve(writers_.size()); + for (const auto &[guid, writer] : writers_) { + result.push_back(writer); + } + return result; +} + +std::vector RtpsParticipant::DiscoveryDb::readers() const { + std::lock_guard lock(mutex_); + std::vector result; + result.reserve(readers_.size()); + for (const auto &[guid, reader] : readers_) { + result.push_back(reader); + } + return result; +} + +void RtpsParticipant::DiscoveryDb::clear() { + std::lock_guard lock(mutex_); + participants_.clear(); + writers_.clear(); + readers_.clear(); +} + +std::vector RtpsParticipant::discovered_participants() const { + return discovery_.participants(); +} + +std::vector RtpsParticipant::discovered_writers() const { + return discovery_.writers(); +} + +std::vector RtpsParticipant::discovered_readers() const { + return discovery_.readers(); } std::vector RtpsParticipant::writers() const { @@ -1071,7 +1417,7 @@ std::vector RtpsParticipant::build_spdp_announce_message() const { } std::vector -RtpsParticipant::build_sedp_publication_message(const WriterConfig &writer_config) const { +RtpsParticipant::build_sedp_publication_payload(const WriterConfig &writer_config) const { ByteWriter parameters; auto guid = writer_guid(writer_config.entity_index); append_parameter_guid(parameters, ParameterId::PID_ENDPOINT_GUID, guid); @@ -1093,18 +1439,22 @@ RtpsParticipant::build_sedp_publication_message(const WriterConfig &writer_confi append_parameter_durability(parameters); append_parameter_liveliness(parameters); append_parameter_reliability(parameters, writer_config.reliability); - append_parameter_history(parameters); + append_parameter_history(parameters, writer_config.history_depth); append_parameter_sentinel(parameters); + return build_parameter_list_payload(parameters); +} - auto payload = build_parameter_list_payload(parameters); +std::vector +RtpsParticipant::build_sedp_publication_message(const WriterConfig &writer_config) const { return build_message(guid_prefix_, {.value = kSedpPublicationsReaderEntityId}, {.value = kSedpPublicationsWriterEntityId}, - next_sedp_publication_sequence_number(), payload) + next_sedp_publication_sequence_number(), + build_sedp_publication_payload(writer_config)) .serialize(); } std::vector -RtpsParticipant::build_sedp_subscription_message(const ReaderConfig &reader_config) const { +RtpsParticipant::build_sedp_subscription_payload(const ReaderConfig &reader_config) const { ByteWriter parameters; auto guid = reader_guid(reader_config.entity_index); append_parameter_guid(parameters, ParameterId::PID_ENDPOINT_GUID, guid); @@ -1127,26 +1477,52 @@ RtpsParticipant::build_sedp_subscription_message(const ReaderConfig &reader_conf append_parameter_reliability(parameters, reader_config.reliability); append_parameter_history(parameters); append_parameter_sentinel(parameters); + return build_parameter_list_payload(parameters); +} - auto payload = build_parameter_list_payload(parameters); +std::vector +RtpsParticipant::build_sedp_subscription_message(const ReaderConfig &reader_config) const { return build_message(guid_prefix_, {.value = kSedpSubscriptionsReaderEntityId}, {.value = kSedpSubscriptionsWriterEntityId}, - next_sedp_subscription_sequence_number(), payload) + next_sedp_subscription_sequence_number(), + build_sedp_subscription_payload(reader_config)) .serialize(); } std::vector RtpsParticipant::build_data_message(const WriterConfig &writer_config, std::span cdr_payload) const { + return build_data_message_with_sequence_number( + writer_config, cdr_payload, next_user_data_sequence_number(writer_config.entity_index), + EntityId{}); +} + +std::vector RtpsParticipant::build_data_message_with_sequence_number( + const WriterConfig &writer_config, std::span cdr_payload, + int64_t sequence_number, EntityId reader_id) const { // Standard RTPS: the DATA submessage serializedPayload is exactly the CDR-encapsulated sample. // The topic is identified by the writer GUID (resolved by the receiver via SEDP discovery), so no - // topic name or other framing is embedded in the payload. + // topic name or other framing is embedded in the payload. readerId addresses a specific matched + // reader for unicast sends (ENTITYID_UNKNOWN for multicast, which targets all matched readers). auto guid = writer_guid(writer_config.entity_index); - return build_message(guid_prefix_, {.value = kEntityIdUnknown}, guid.entity_id, - next_user_data_sequence_number(writer_config.entity_index), cdr_payload) + return build_message(guid_prefix_, reader_id, guid.entity_id, sequence_number, cdr_payload) .serialize(); } +void RtpsParticipant::store_reliable_sample(const WriterConfig &writer_config, + int64_t sequence_number, + std::span cdr_payload) { + std::lock_guard lock(reliable_mutex_); + auto &state = writer_reliable_states_[writer_config.entity_index]; + state.history[sequence_number].assign(cdr_payload.begin(), cdr_payload.end()); + state.last_sequence_number = std::max(state.last_sequence_number, sequence_number); + // Bound the cache to the configured KEEP_LAST depth, dropping the oldest samples. + uint32_t depth = std::max(writer_config.history_depth, 1); + while (state.history.size() > depth) { + state.history.erase(state.history.begin()); + } +} + bool RtpsParticipant::publish(std::string_view topic_name, std::span cdr_payload) { WriterConfig writer_config; { @@ -1161,29 +1537,261 @@ bool RtpsParticipant::publish(std::string_view topic_name, std::spansend(payload, destination.send_config) || sent; + } + + // Reliable writers announce the available sequence-number range so matched reliable readers can + // detect gaps and request retransmission (Phase 3). The retransmission response to ACKNACK is not + // wired up yet, but the HEARTBEAT exchange is interoperable with DDS/ROS 2 peers. + if (reliable) { + send_heartbeat_for_writer(writer_config); + } + return sent; +} + +bool RtpsParticipant::send_heartbeat_for_writer(const WriterConfig &writer_config) { + if (!user_unicast_receiver_) { return false; } + // Snapshot the available sequence-number range and bump the heartbeat count under + // reliable_mutex_. An empty history is advertised as firstSN=1, lastSN=0 (no samples available). + int64_t first_sn = 1; + int64_t last_sn = 0; + uint32_t count = 0; + { + std::lock_guard lock(reliable_mutex_); + auto &state = writer_reliable_states_[writer_config.entity_index]; + if (!state.history.empty()) { + first_sn = state.history.begin()->first; + last_sn = state.last_sequence_number; + } + count = ++state.heartbeat_count; + } + + auto writer_entity_id = writer_guid(writer_config.entity_index).entity_id; + + // Snapshot the matched reliable readers for this topic. + std::vector matched_readers; + for (auto &reader : discovery_.readers()) { + if (reader.is_reader && reader.topic_name == writer_config.topic_name && + reader.reliability == ReliabilityKind::RELIABLE) { + matched_readers.push_back(std::move(reader)); + } + } + bool sent = false; - for (const auto &send_config : send_configs) { - sent = user_unicast_receiver_->send(payload, send_config) || sent; + for (const auto &reader : matched_readers) { + if (!has_valid_locator(reader.unicast_locator)) { + continue; + } + Message message; + message.header.guid_prefix = guid_prefix_; + // INFO_DST tells the destination participant which entity the HEARTBEAT is for, so DDS/ROS 2 + // peers route it to the right reader. + message.submessages.push_back(build_info_dst_submessage(reader.guid.prefix)); + message.submessages.push_back(build_heartbeat_submessage( + reader.guid.entity_id, writer_entity_id, first_sn, last_sn, count, /*final=*/false)); + auto bytes = message.serialize(); + UdpSocket::SendConfig send_config{ + .ip_address = reader.unicast_locator.address_string(), + .port = static_cast(reader.unicast_locator.port), + .is_multicast_endpoint = false, + }; + sent = user_unicast_receiver_->send(bytes, send_config) || sent; } return sent; } +bool RtpsParticipant::send_heartbeats_now() { + std::vector reliable_writers; + { + std::lock_guard lock(mutex_); + for (const auto &writer : writers_) { + if (writer.reliability == ReliabilityKind::RELIABLE) { + reliable_writers.push_back(writer); + } + } + } + bool sent = false; + for (const auto &writer : reliable_writers) { + sent = send_heartbeat_for_writer(writer) || sent; + } + return sent; +} + +void RtpsParticipant::deliver_reliable_sample( + uint32_t reader_entity_index, const Guid &writer_guid, int64_t sequence_number, + std::span payload, + const std::function)> &on_sample) { + // Collect the in-order samples to deliver under the lock, then invoke the callback after + // releasing it (the callback may re-enter the participant, e.g. publish a response). + std::vector> to_deliver; + { + std::lock_guard lock(reliable_mutex_); + auto key = fmt::format("{}#{}", reader_entity_index, writer_guid.to_string()); + auto &state = reader_reliable_states_[key]; + if (sequence_number <= state.highest_delivered) { + // Duplicate (e.g. a retransmission of an already-delivered sample); drop it. + return; + } + if (sequence_number == state.highest_delivered + 1) { + // Next expected sample: deliver it, then drain any now-contiguous buffered samples. + to_deliver.emplace_back(payload.begin(), payload.end()); + state.highest_delivered = sequence_number; + while (true) { + auto iterator = state.reorder.find(state.highest_delivered + 1); + if (iterator == state.reorder.end()) { + break; + } + to_deliver.push_back(std::move(iterator->second)); + state.reorder.erase(iterator); + state.highest_delivered++; + } + } else if (state.reorder.find(sequence_number) == state.reorder.end() && + state.reorder.size() < config_.reliable_reorder_depth) { + // Out of order: buffer for in-order delivery once the gap is filled. If the buffer is full + // the sample is dropped and will be re-requested via the next ACKNACK. + state.reorder[sequence_number].assign(payload.begin(), payload.end()); + } + } + for (const auto &sample : to_deliver) { + on_sample(sample); + } +} + +void RtpsParticipant::send_acknack_for_heartbeat(const GuidPrefix &writer_prefix, + const EntityId &writer_id, int64_t first_sn, + int64_t last_sn, uint32_t heartbeat_count, + bool heartbeat_final) { + if (!user_unicast_receiver_) { + return; + } + Guid remote_writer_guid{.prefix = writer_prefix, .entity_id = writer_id}; + + // Resolve the writer's topic + where to send the ACKNACK from discovery, then the matched + // reliable local readers. + auto writer = discovery_.find_writer(remote_writer_guid); + if (!writer || writer->reliability != ReliabilityKind::RELIABLE) { + return; + } + const Locator writer_locator = writer->unicast_locator; + std::vector matched_reader_indices; + { + std::lock_guard lock(mutex_); + for (const auto &reader_config : readers_) { + if (reader_config.topic_name == writer->topic_name && + reader_config.reliability == ReliabilityKind::RELIABLE) { + matched_reader_indices.push_back(reader_config.entity_index); + } + } + } + if (matched_reader_indices.empty()) { + return; + } + std::string participant_address; + uint16_t participant_user_unicast = 0; + if (auto participant = discovery_.find_participant_by_prefix(writer_prefix)) { + participant_address = participant->address; + participant_user_unicast = participant->ports.user_unicast; + } + + // Prefer the writer's advertised unicast locator; fall back to the participant's user-unicast + // endpoint (the raw datagram source port is the writer's ephemeral send port, not its receiver). + std::string dest_address; + uint16_t dest_port = 0; + if (has_valid_locator(writer_locator)) { + dest_address = writer_locator.address_string(); + dest_port = static_cast(writer_locator.port); + } else if (!participant_address.empty() && participant_user_unicast != 0) { + dest_address = participant_address; + dest_port = participant_user_unicast; + } else { + return; + } + + for (uint32_t reader_entity_index : matched_reader_indices) { + auto reader_entity_id = reader_guid(reader_entity_index).entity_id; + SequenceNumberSet reader_sn_state; + uint32_t count = 0; + { + std::lock_guard lock(reliable_mutex_); + auto key = fmt::format("{}#{}", reader_entity_index, remote_writer_guid.to_string()); + auto &state = reader_reliable_states_[key]; + // Samples below firstSN are no longer available from the writer (history purged); treat the + // gap as permanently lost and skip past it so we do not NACK samples that no longer exist. + if (first_sn > state.highest_delivered + 1) { + logger_.warn("Reliable reader missed samples [{}, {}] from writer {} (writer history " + "advanced past them)", + state.highest_delivered + 1, first_sn - 1, remote_writer_guid.to_string()); + state.highest_delivered = first_sn - 1; + while (!state.reorder.empty() && state.reorder.begin()->first <= state.highest_delivered) { + state.reorder.erase(state.reorder.begin()); + } + } + // Ignore a stale heartbeat that does not require a response. + if (heartbeat_count <= state.last_heartbeat_count && heartbeat_final) { + continue; + } + state.last_heartbeat_count = std::max(state.last_heartbeat_count, heartbeat_count); + + int64_t base = state.highest_delivered + 1; + reader_sn_state.base = base; + bool any_missing = false; + for (int64_t sn = base; + sn <= last_sn && (sn - base) < static_cast(SequenceNumberSet::kMaxBits); sn++) { + if (state.reorder.find(sn) == state.reorder.end()) { + reader_sn_state.set(sn); + any_missing = true; + } + } + if (!any_missing) { + // Positive acknowledgement of everything up to lastSN (empty set, base = next expected). + reader_sn_state = SequenceNumberSet{}; + reader_sn_state.base = (last_sn >= base) ? last_sn + 1 : base; + } + count = ++state.acknack_count; + } + + Message message; + message.header.guid_prefix = guid_prefix_; + message.submessages.push_back(build_info_dst_submessage(writer_prefix)); + message.submessages.push_back(build_acknack_submessage(reader_entity_id, writer_id, + reader_sn_state, count, /*final=*/true)); + auto bytes = message.serialize(); + UdpSocket::SendConfig send_config{ + .ip_address = dest_address, + .port = dest_port, + .is_multicast_endpoint = false, + }; + user_unicast_receiver_->send(bytes, send_config); + } +} + int64_t RtpsParticipant::next_spdp_sequence_number() const { return spdp_sequence_number_.fetch_add(1, std::memory_order_relaxed); } @@ -1216,6 +1824,108 @@ RtpsParticipant::PortMapping RtpsParticipant::compute_port_mapping(uint16_t doma .user_unicast = static_cast(base + kUserUnicastOffset + participant_offset)}; } +void RtpsParticipant::record_builtin_sample(const Guid &writer_guid, int64_t sequence_number) { + std::lock_guard lock(reliable_mutex_); + auto &state = builtin_reader_states_[writer_guid.to_string()]; + if (sequence_number <= state.highest_delivered) { + return; // already accounted for + } + if (sequence_number == state.highest_delivered + 1) { + state.highest_delivered = sequence_number; + while (true) { + auto iterator = state.reorder.find(state.highest_delivered + 1); + if (iterator == state.reorder.end()) { + break; + } + state.reorder.erase(iterator); + state.highest_delivered++; + } + } else { + state.reorder[sequence_number]; // mark this out-of-order SEDP sample as received (key only) + } +} + +void RtpsParticipant::send_builtin_acknack(const GuidPrefix &writer_prefix, + const EntityId &writer_id, int64_t first_sn, + int64_t last_sn, uint32_t heartbeat_count, + bool /*heartbeat_final*/) { + if (!metatraffic_unicast_receiver_) { + return; + } + // Only the reliable builtin SEDP writers need an ACKNACK; map each to our matching builtin + // reader. + EntityId reader_id; + if (writer_id.value == kSedpPublicationsWriterEntityId) { + reader_id.value = kSedpPublicationsReaderEntityId; + } else if (writer_id.value == kSedpSubscriptionsWriterEntityId) { + reader_id.value = kSedpSubscriptionsReaderEntityId; + } else { + return; // SPDP is best-effort; other builtin writers are not tracked. + } + + // Resolve where the peer's builtin SEDP reader receives traffic: its metatraffic unicast + // endpoint. + std::string dest_address; + uint16_t dest_port = 0; + if (auto participant = discovery_.find_participant_by_prefix(writer_prefix)) { + if (has_valid_locator(participant->metatraffic_unicast_locator)) { + dest_address = participant->metatraffic_unicast_locator.address_string(); + dest_port = static_cast(participant->metatraffic_unicast_locator.port); + } else { + dest_address = participant->address; + dest_port = participant->ports.metatraffic_unicast; + } + } + if (dest_address.empty() || dest_port == 0) { + return; + } + + Guid writer_guid{.prefix = writer_prefix, .entity_id = writer_id}; + SequenceNumberSet reader_sn_state; + uint32_t count = 0; + bool any_missing = false; + { + std::lock_guard lock(reliable_mutex_); + auto &state = builtin_reader_states_[writer_guid.to_string()]; + if (first_sn > state.highest_delivered + 1) { + // Samples below firstSN are no longer available from the writer; skip past them. + state.highest_delivered = first_sn - 1; + while (!state.reorder.empty() && state.reorder.begin()->first <= state.highest_delivered) { + state.reorder.erase(state.reorder.begin()); + } + } + state.last_heartbeat_count = std::max(state.last_heartbeat_count, heartbeat_count); + int64_t base = state.highest_delivered + 1; + reader_sn_state.base = base; + for (int64_t sn = base; + sn <= last_sn && (sn - base) < static_cast(SequenceNumberSet::kMaxBits); sn++) { + if (state.reorder.find(sn) == state.reorder.end()) { + reader_sn_state.set(sn); + any_missing = true; + } + } + if (!any_missing) { + // Positive ack of everything announced (empty set, base = next expected). + reader_sn_state = SequenceNumberSet{}; + reader_sn_state.base = (last_sn >= base) ? last_sn + 1 : base; + } + count = ++state.acknack_count; + } + + Message ack_message; + ack_message.header.guid_prefix = guid_prefix_; + ack_message.submessages.push_back(build_info_dst_submessage(writer_prefix)); + // Leave the Final flag unset while samples are still missing so the writer responds promptly. + ack_message.submessages.push_back(build_acknack_submessage(reader_id, writer_id, reader_sn_state, + count, /*final=*/!any_missing)); + auto bytes = ack_message.serialize(); + UdpSocket::SendConfig send_config{.ip_address = dest_address, .port = dest_port}; + metatraffic_unicast_receiver_->send(bytes, send_config); + logger_.debug("Sent builtin ACKNACK to {}:{} for writer {} (base={}, missing={}, count={})", + dest_address, dest_port, writer_guid.to_string(), reader_sn_state.base, any_missing, + count); +} + bool RtpsParticipant::handle_metatraffic_message(std::vector &data, const Socket::Info &sender) { auto message = Message::parse(data); @@ -1224,6 +1934,30 @@ bool RtpsParticipant::handle_metatraffic_message(std::vector &data, } for (const auto &submessage : message->submessages) { + // Reliable peers (e.g. Fast DDS) heartbeat their builtin SEDP writers and will not (re)send + // discovery data until they get an ACKNACK. Reply so we receive their endpoint announcements. + if (submessage.kind == SubmessageKind::HEARTBEAT) { + if (message->header.guid_prefix != guid_prefix_) { + auto heartbeat = parse_heartbeat_submessage(submessage); + if (heartbeat.valid) { + send_builtin_acknack(message->header.guid_prefix, heartbeat.writer_id, heartbeat.first_sn, + heartbeat.last_sn, heartbeat.count, heartbeat.final); + } + } + continue; + } + // A reliable peer ACKNACKs our builtin SEDP writers; resend any NACKed discovery samples. + if (submessage.kind == SubmessageKind::ACKNACK) { + if (message->header.guid_prefix != guid_prefix_) { + auto acknack = parse_acknack_submessage(submessage); + if (acknack.valid) { + retransmit_sedp(message->header.guid_prefix, acknack.reader_id, acknack.writer_id, + requested_sequence_numbers(acknack.reader_sn_state)); + } + } + continue; + } + bool valid_data = false; auto data_view = parse_data_submessage(submessage, valid_data); if (!valid_data) { @@ -1246,83 +1980,103 @@ bool RtpsParticipant::handle_metatraffic_message(std::vector &data, continue; } - ParticipantProxy participant; - participant.participant_guid = *maybe_participant_guid; - participant.guid_prefix = maybe_participant_guid->prefix; - participant.address = sender.address; - - if (auto maybe_name_parameter = find_parameter(parameters, ParameterId::PID_ENTITY_NAME)) { - if (auto maybe_name = parse_cdr_string(maybe_name_parameter->value)) { - participant.name = *maybe_name; - } + // Parse only the fields actually present in this announcement; the upsert below merges them + // into the existing record, so a later (trimmed) announcement does not erase what we learned. + std::optional name; + if (auto parameter = find_parameter(parameters, ParameterId::PID_ENTITY_NAME)) { + name = parse_cdr_string(parameter->value); } - if (auto maybe_user_data_parameter = find_parameter(parameters, ParameterId::PID_USER_DATA)) { - if (auto maybe_user_data = parse_octet_sequence(maybe_user_data_parameter->value)) { - participant.enclave = extract_enclave(*maybe_user_data); + std::optional enclave; + if (auto parameter = find_parameter(parameters, ParameterId::PID_USER_DATA)) { + if (auto user_data = parse_octet_sequence(parameter->value)) { + enclave = extract_enclave(*user_data); } } - if (auto maybe_builtin_endpoint_parameter = - find_parameter(parameters, ParameterId::PID_BUILTIN_ENDPOINT_SET)) { - if (auto maybe_builtin_endpoints = parse_u32_le(maybe_builtin_endpoint_parameter->value)) { - participant.builtin_endpoints = *maybe_builtin_endpoints; - } + std::optional builtin_endpoints; + if (auto parameter = find_parameter(parameters, ParameterId::PID_BUILTIN_ENDPOINT_SET)) { + builtin_endpoints = parse_u32_le(parameter->value); } - if (auto maybe_meta_unicast_parameter = + std::optional meta_unicast; + if (auto parameter = find_parameter(parameters, ParameterId::PID_METATRAFFIC_UNICAST_LOCATOR)) { - if (auto maybe_locator = parse_locator(maybe_meta_unicast_parameter->value)) { - participant.ports.metatraffic_unicast = static_cast(maybe_locator->port); - } + meta_unicast = parse_locator(parameter->value); } - if (auto maybe_meta_multicast_parameter = + std::optional meta_multicast; + if (auto parameter = find_parameter(parameters, ParameterId::PID_METATRAFFIC_MULTICAST_LOCATOR)) { - if (auto maybe_locator = parse_locator(maybe_meta_multicast_parameter->value)) { - participant.ports.metatraffic_multicast = static_cast(maybe_locator->port); - } - } - if (auto maybe_default_unicast_parameter = - find_parameter(parameters, ParameterId::PID_DEFAULT_UNICAST_LOCATOR)) { - if (auto maybe_locator = parse_locator(maybe_default_unicast_parameter->value)) { - participant.ports.user_unicast = static_cast(maybe_locator->port); - const auto advertised_address = maybe_locator->address_string(); - if (advertised_address != "0.0.0.0") { - participant.address = advertised_address; - } - } + meta_multicast = parse_locator(parameter->value); } - if (auto maybe_default_multicast_parameter = - find_parameter(parameters, ParameterId::PID_DEFAULT_MULTICAST_LOCATOR)) { - if (auto maybe_locator = parse_locator(maybe_default_multicast_parameter->value)) { - participant.ports.user_multicast = static_cast(maybe_locator->port); - } + std::optional default_unicast; + if (auto parameter = find_parameter(parameters, ParameterId::PID_DEFAULT_UNICAST_LOCATOR)) { + default_unicast = parse_locator(parameter->value); } - - std::function callback; - bool is_new_participant = false; - { - std::lock_guard lock(mutex_); - auto iterator = - std::find_if(discovered_participants_.begin(), discovered_participants_.end(), - [&participant](const auto &candidate) { - return candidate.participant_guid == participant.participant_guid; - }); - if (iterator == discovered_participants_.end()) { - discovered_participants_.push_back(participant); - is_new_participant = true; - } else { - *iterator = participant; - } - callback = config_.on_participant_discovered; + std::optional default_multicast; + if (auto parameter = find_parameter(parameters, ParameterId::PID_DEFAULT_MULTICAST_LOCATOR)) { + default_multicast = parse_locator(parameter->value); } - - if (is_new_participant) { + const std::string sender_address = sender.address; + + auto result = discovery_.upsert_participant( + *maybe_participant_guid, [&](ParticipantProxy &participant) { + participant.participant_guid = *maybe_participant_guid; + participant.guid_prefix = maybe_participant_guid->prefix; + if (participant.address.empty()) { + participant.address = sender_address; + } + if (name) { + participant.name = *name; + } + if (enclave) { + participant.enclave = *enclave; + } + if (builtin_endpoints) { + participant.builtin_endpoints = *builtin_endpoints; + } + if (meta_unicast) { + participant.ports.metatraffic_unicast = static_cast(meta_unicast->port); + if (has_valid_locator(*meta_unicast)) { + participant.metatraffic_unicast_locator = *meta_unicast; + } + } + if (meta_multicast) { + participant.ports.metatraffic_multicast = static_cast(meta_multicast->port); + if (has_valid_locator(*meta_multicast)) { + participant.metatraffic_multicast_locator = *meta_multicast; + } + } + if (default_unicast) { + participant.ports.user_unicast = static_cast(default_unicast->port); + if (has_valid_locator(*default_unicast)) { + participant.default_unicast_locator = *default_unicast; + participant.address = default_unicast->address_string(); + } + } + if (default_multicast) { + participant.ports.user_multicast = static_cast(default_multicast->port); + if (has_valid_locator(*default_multicast)) { + participant.default_multicast_locator = *default_multicast; + } + } + }); + + logger_.debug("SPDP parsed participant {} from src {}: meta_uc={} meta_mc={} default_uc={} " + "default_mc={} -> stored(address={}, ports.meta_uc={}, ports.user_uc={})", + maybe_participant_guid->prefix.to_string(), sender_address, + locator_to_string(meta_unicast), locator_to_string(meta_multicast), + locator_to_string(default_unicast), locator_to_string(default_multicast), + result.value.address, result.value.ports.metatraffic_unicast, + result.value.ports.user_unicast); + + if (result.is_new) { + const auto &participant = result.value; logger_.info("SPDP discovered participant '{}' at {} (meta {}, user {})", participant.name.empty() ? participant.guid_prefix.to_string() : participant.name, participant.address, participant.ports.metatraffic_unicast, participant.ports.user_unicast); send_sedp_announcements_to(participant); - if (callback) { - callback(participant); + if (config_.on_participant_discovered) { + config_.on_participant_discovered(participant); } } continue; @@ -1337,6 +2091,10 @@ bool RtpsParticipant::handle_metatraffic_message(std::vector &data, continue; } + // Record the SEDP sample so we can correctly ACKNACK the peer's builtin SEDP heartbeats. + record_builtin_sample({.prefix = message->header.guid_prefix, .entity_id = data_view.writer_id}, + data_view.writer_sn); + auto maybe_endpoint_guid_parameter = find_parameter(parameters, ParameterId::PID_ENDPOINT_GUID); if (!maybe_endpoint_guid_parameter) { continue; @@ -1346,78 +2104,88 @@ bool RtpsParticipant::handle_metatraffic_message(std::vector &data, continue; } - EndpointProxy endpoint; - endpoint.guid = *maybe_endpoint_guid; - endpoint.is_reader = is_reader; - - if (auto maybe_participant_guid_parameter = - find_parameter(parameters, ParameterId::PID_PARTICIPANT_GUID)) { - if (auto maybe_participant_guid = parse_guid(maybe_participant_guid_parameter->value)) { - endpoint.participant_guid = *maybe_participant_guid; - } + // Parse only the present fields; the upsert below merges them into the existing endpoint. + const Guid endpoint_guid = *maybe_endpoint_guid; + std::optional endpoint_participant_guid; + if (auto parameter = find_parameter(parameters, ParameterId::PID_PARTICIPANT_GUID)) { + endpoint_participant_guid = parse_guid(parameter->value); } - if (endpoint.participant_guid.entity_id.value == std::array{}) { - endpoint.participant_guid = {.prefix = endpoint.guid.prefix, - .entity_id = {.value = kParticipantEntityId}}; + std::optional topic_name; + if (auto parameter = find_parameter(parameters, ParameterId::PID_TOPIC_NAME)) { + topic_name = parse_cdr_string(parameter->value); } - if (auto maybe_topic_name_parameter = find_parameter(parameters, ParameterId::PID_TOPIC_NAME)) { - if (auto maybe_topic_name = parse_cdr_string(maybe_topic_name_parameter->value)) { - endpoint.topic_name = *maybe_topic_name; - } + std::optional type_name; + if (auto parameter = find_parameter(parameters, ParameterId::PID_TYPE_NAME)) { + type_name = parse_cdr_string(parameter->value); } - if (auto maybe_type_name_parameter = find_parameter(parameters, ParameterId::PID_TYPE_NAME)) { - if (auto maybe_type_name = parse_cdr_string(maybe_type_name_parameter->value)) { - endpoint.type_name = *maybe_type_name; - } - } - if (auto maybe_locator_parameter = - find_parameter(parameters, ParameterId::PID_UNICAST_LOCATOR)) { - if (auto maybe_locator = parse_locator(maybe_locator_parameter->value)) { - endpoint.unicast_locator = *maybe_locator; - } + std::optional unicast_locator; + if (auto parameter = find_parameter(parameters, ParameterId::PID_UNICAST_LOCATOR)) { + unicast_locator = parse_locator(parameter->value); } + std::vector multicast_locators; for (const auto &locator_parameter : find_parameters(parameters, ParameterId::PID_MULTICAST_LOCATOR)) { - if (auto maybe_locator = parse_locator(locator_parameter.value)) { - endpoint.multicast_locators.push_back(*maybe_locator); + if (auto parsed = parse_locator(locator_parameter.value)) { + multicast_locators.push_back(*parsed); } } - if (auto maybe_reliability_parameter = - find_parameter(parameters, ParameterId::PID_RELIABILITY)) { - if (auto maybe_reliability = parse_reliability(maybe_reliability_parameter->value)) { - endpoint.reliability = *maybe_reliability; - } + std::optional reliability; + if (auto parameter = find_parameter(parameters, ParameterId::PID_RELIABILITY)) { + reliability = parse_reliability(parameter->value); } - if (auto maybe_inline_qos_parameter = - find_parameter(parameters, ParameterId::PID_EXPECTS_INLINE_QOS)) { - if (auto maybe_inline_qos = parse_bool(maybe_inline_qos_parameter->value)) { - endpoint.expects_inline_qos = *maybe_inline_qos; - } + std::optional expects_inline_qos; + if (auto parameter = find_parameter(parameters, ParameterId::PID_EXPECTS_INLINE_QOS)) { + expects_inline_qos = parse_bool(parameter->value); } - std::function endpoint_callback; - bool is_new_endpoint = false; - { - std::lock_guard lock(mutex_); - auto &endpoint_list = is_reader ? discovered_readers_ : discovered_writers_; - auto iterator = std::find_if( - endpoint_list.begin(), endpoint_list.end(), - [&endpoint](const auto &candidate) { return candidate.guid == endpoint.guid; }); - if (iterator == endpoint_list.end()) { - endpoint_list.push_back(endpoint); - is_new_endpoint = true; - } else { - *iterator = endpoint; - } - endpoint_callback = config_.on_endpoint_discovered; - } + auto result = + discovery_.upsert_endpoint(is_reader, endpoint_guid, [&](EndpointProxy &endpoint) { + endpoint.guid = endpoint_guid; + endpoint.is_reader = is_reader; + if (endpoint_participant_guid) { + endpoint.participant_guid = *endpoint_participant_guid; + } + if (endpoint.participant_guid.entity_id.value == std::array{}) { + // Fall back to the implicit participant entity within the endpoint's prefix. + endpoint.participant_guid = {.prefix = endpoint_guid.prefix, + .entity_id = {.value = kParticipantEntityId}}; + } + if (topic_name) { + endpoint.topic_name = *topic_name; + } + if (type_name) { + endpoint.type_name = *type_name; + } + if (unicast_locator && has_valid_locator(*unicast_locator)) { + endpoint.unicast_locator = *unicast_locator; + } + if (!multicast_locators.empty()) { + endpoint.multicast_locators = multicast_locators; + } + if (reliability) { + endpoint.reliability = *reliability; + } + if (expects_inline_qos) { + endpoint.expects_inline_qos = *expects_inline_qos; + } + }); - if (is_new_endpoint) { + logger_.debug( + "SEDP parsed {} {} topic '{}' [{}] reliability={} unicast={} multicast_count={} -> " + "participant {}", + is_reader ? "reader" : "writer", endpoint_guid.to_string(), result.value.topic_name, + result.value.type_name, + result.value.reliability == ReliabilityKind::RELIABLE ? "RELIABLE" : "BEST_EFFORT", + locator_to_string(result.value.unicast_locator), result.value.multicast_locators.size(), + result.value.participant_guid.to_string()); + + if (result.is_new) { + const auto &endpoint = result.value; logger_.info("SEDP discovered {} '{}' [{}] from participant {}", endpoint.is_reader ? "reader" : "writer", endpoint.topic_name, endpoint.type_name, endpoint.participant_guid.to_string()); - if (endpoint_callback) { - endpoint_callback(endpoint); + if (config_.on_endpoint_discovered) { + config_.on_endpoint_discovered(endpoint); } } } @@ -1434,6 +2202,40 @@ bool RtpsParticipant::handle_user_message(std::vector &data, const Sock } for (const auto &submessage : message->submessages) { + // Reliable-QoS submessages (Phase 0: parsed and logged; the writer/reader + // state machines that act on them land in later phases — see + // RELIABLE_RTPS_PLAN.md). + if (submessage.kind == SubmessageKind::HEARTBEAT) { + auto heartbeat = parse_heartbeat_submessage(submessage); + if (heartbeat.valid) { + logger_.debug("HEARTBEAT from {} (writer {}, reader {}): firstSN={} lastSN={} count={} " + "final={}", + message->header.guid_prefix.to_string(), heartbeat.writer_id.to_string(), + heartbeat.reader_id.to_string(), heartbeat.first_sn, heartbeat.last_sn, + heartbeat.count, heartbeat.final); + // A matched reliable reader replies with an ACKNACK of the missing sequence numbers in + // [firstSN, lastSN] so the writer can retransmit them. + send_acknack_for_heartbeat(message->header.guid_prefix, heartbeat.writer_id, + heartbeat.first_sn, heartbeat.last_sn, heartbeat.count, + heartbeat.final); + } + continue; + } + if (submessage.kind == SubmessageKind::ACKNACK) { + auto acknack = parse_acknack_submessage(submessage); + if (acknack.valid) { + logger_.debug( + "ACKNACK from {} (writer {}, reader {}): base={} numBits={} count={} final={}", + message->header.guid_prefix.to_string(), acknack.writer_id.to_string(), + acknack.reader_id.to_string(), acknack.reader_sn_state.base, + acknack.reader_sn_state.num_bits, acknack.count, acknack.final); + // A matched reliable writer resends the NACKed sequence numbers from its history. + retransmit_user_data(message->header.guid_prefix, acknack.reader_id, acknack.writer_id, + requested_sequence_numbers(acknack.reader_sn_state)); + } + continue; + } + bool valid_data = false; auto data_view = parse_data_submessage(submessage, valid_data); if (!valid_data) { @@ -1446,38 +2248,44 @@ bool RtpsParticipant::handle_user_message(std::vector &data, const Sock // local reader callbacks under a single lock. Guid remote_writer_guid{.prefix = message->header.guid_prefix, .entity_id = data_view.writer_id}; - std::string topic_name; - bool writer_is_reliable = false; - std::vector)>> callbacks; + struct MatchedReader { + uint32_t entity_index{0}; + bool reliable{false}; + std::function)> on_sample{}; + }; + auto writer = discovery_.find_writer(remote_writer_guid); + if (!writer) { + // Sample arrived before the writer was discovered via SEDP; drop it (best-effort). + logger_.debug("Received DATA for unknown writer {} from {} (not discovered via SEDP yet)", + remote_writer_guid.to_string(), sender.address); + continue; + } + const bool writer_is_reliable = writer->reliability == ReliabilityKind::RELIABLE; + std::vector matched_readers; { std::lock_guard lock(mutex_); - auto writer = std::find_if( - discovered_writers_.begin(), discovered_writers_.end(), - [&remote_writer_guid](const auto &w) { return w.guid == remote_writer_guid; }); - if (writer == discovered_writers_.end()) { - // Sample arrived before the writer was discovered via SEDP; drop it (best-effort). - continue; - } - topic_name = writer->topic_name; - writer_is_reliable = writer->reliability == ReliabilityKind::RELIABLE; for (const auto &reader_config : readers_) { - if (reader_config.topic_name == topic_name && reader_config.on_sample) { - callbacks.push_back(reader_config.on_sample); + if (reader_config.topic_name == writer->topic_name && reader_config.on_sample) { + matched_readers.push_back( + {.entity_index = reader_config.entity_index, + .reliable = reader_config.reliability == ReliabilityKind::RELIABLE, + .on_sample = reader_config.on_sample}); } } } - if (callbacks.empty()) { + if (matched_readers.empty()) { continue; } - if (writer_is_reliable) { - logger_.warn("Received sample on reliable topic '{}' from {}, but ACKNACK/HEARTBEAT is not " - "implemented " - "yet", - topic_name, sender); - } - for (const auto &callback : callbacks) { - callback(data_view.serialized_payload); + // The reliable handshake (dedup + in-order delivery) runs only between endpoints that both + // advertise RELIABLE; otherwise the sample is delivered best-effort (immediately, as received). + for (const auto &reader : matched_readers) { + if (writer_is_reliable && reader.reliable) { + deliver_reliable_sample(reader.entity_index, remote_writer_guid, data_view.writer_sn, + data_view.serialized_payload, reader.on_sample); + } else { + reader.on_sample(data_view.serialized_payload); + } } } return false; @@ -1530,6 +2338,7 @@ bool RtpsParticipant::ensure_user_multicast_receivers_started(const std::string .buffer_size = 4096, .is_multicast_endpoint = true, .multicast_group = group, + .multicast_interface = config_.bind_address, .on_receive_callback = [this](auto &data, const auto &sender) -> std::optional> { handle_user_message(data, sender); @@ -1548,47 +2357,47 @@ bool RtpsParticipant::ensure_user_multicast_receivers_started(const std::string return true; } -std::vector +std::vector RtpsParticipant::build_user_send_configs(std::string_view topic_name, const WriterConfig &writer_config) const { - std::vector send_configs; - auto add_send_config = [&send_configs](std::string ip_address, uint16_t port, bool is_multicast) { + std::vector destinations; + const std::string &multicast_interface = config_.bind_address; + auto add_destination = [&destinations, &multicast_interface](std::string ip_address, + uint16_t port, bool is_multicast, + const EntityId &reader_id) { if (ip_address.empty() || port == 0) { return; } - auto existing = - std::find_if(send_configs.begin(), send_configs.end(), [&](const auto &send_config) { - return send_config.ip_address == ip_address && send_config.port == port && - send_config.is_multicast_endpoint == is_multicast; + auto existing = std::find_if( + destinations.begin(), destinations.end(), [&](const UserDataDestination &destination) { + return destination.send_config.ip_address == ip_address && + destination.send_config.port == port && + destination.send_config.is_multicast_endpoint == is_multicast && + destination.reader_id == reader_id; }); - if (existing == send_configs.end()) { - send_configs.push_back({ - .ip_address = std::move(ip_address), - .port = port, - .is_multicast_endpoint = is_multicast, - }); + if (existing == destinations.end()) { + destinations.push_back( + {.send_config = {.ip_address = std::move(ip_address), + .port = port, + .is_multicast_endpoint = is_multicast, + .multicast_interface = + is_multicast ? multicast_interface : std::string{}}, + .reader_id = reader_id}); } }; + // Multicast sends target all matched readers, so the DATA readerId is left as ENTITYID_UNKNOWN. if (!writer_config.multicast_group.empty()) { - add_send_config(writer_config.multicast_group, ports().user_multicast, true); - return send_configs; + add_destination(writer_config.multicast_group, ports().user_multicast, true, EntityId{}); + return destinations; } if (config_.use_multicast_for_user_data) { - add_send_config(config_.user_multicast_group, ports().user_multicast, true); - return send_configs; - } - - std::vector remote_readers; - std::vector participants; - { - std::lock_guard lock(mutex_); - remote_readers = discovered_readers_; - participants = discovered_participants_; + add_destination(config_.user_multicast_group, ports().user_multicast, true, EntityId{}); + return destinations; } - for (const auto &reader : remote_readers) { + for (const auto &reader : discovery_.readers()) { if (!reader.is_reader || reader.topic_name != topic_name) { continue; } @@ -1598,29 +2407,30 @@ RtpsParticipant::build_user_send_configs(std::string_view topic_name, if (!has_valid_locator(locator)) { continue; } - add_send_config(locator.address_string(), static_cast(locator.port), true); + // Multicast to a reader's group: addressed to all readers in the group (readerId UNKNOWN). + add_destination(locator.address_string(), static_cast(locator.port), true, + EntityId{}); used_multicast = true; } if (used_multicast) { continue; } + // Unicast to a specific reader: address the DATA to that reader's entity id. if (has_valid_locator(reader.unicast_locator)) { - add_send_config(reader.unicast_locator.address_string(), - static_cast(reader.unicast_locator.port), false); + add_destination(reader.unicast_locator.address_string(), + static_cast(reader.unicast_locator.port), false, + reader.guid.entity_id); continue; } - auto participant = - std::find_if(participants.begin(), participants.end(), [&](const auto &proxy) { - return proxy.guid_prefix == reader.participant_guid.prefix; - }); - if (participant != participants.end()) { - add_send_config(participant->address, participant->ports.user_unicast, false); + if (auto participant = discovery_.find_participant_by_prefix(reader.participant_guid.prefix)) { + add_destination(participant->address, participant->ports.user_unicast, false, + reader.guid.entity_id); } } - return send_configs; + return destinations; } bool RtpsParticipant::send_spdp_announce_now() { @@ -1632,13 +2442,37 @@ bool RtpsParticipant::send_spdp_announce_now() { .ip_address = config_.metatraffic_multicast_group, .port = ports().metatraffic_multicast, .is_multicast_endpoint = true, + .multicast_interface = config_.bind_address, }; return metatraffic_unicast_receiver_->send(payload, send_config); } bool RtpsParticipant::send_sedp_announcements_to(const ParticipantProxy &participant) { - if (!metatraffic_unicast_receiver_ || participant.ports.metatraffic_unicast == 0 || - participant.address.empty()) { + // Prefer the peer's advertised metatraffic unicast locator (its full address + port); fall back + // to participant.address (the SPDP source / default-unicast address) + the derived metatraffic + // port. + std::string meta_address; + uint16_t meta_port = 0; + const char *address_source = "metatraffic_unicast_locator"; + if (has_valid_locator(participant.metatraffic_unicast_locator)) { + meta_address = participant.metatraffic_unicast_locator.address_string(); + meta_port = static_cast(participant.metatraffic_unicast_locator.port); + } else { + meta_address = participant.address; + meta_port = participant.ports.metatraffic_unicast; + address_source = "participant.address fallback"; + } + logger_.debug( + "send_sedp -> participant {} dest {}:{} (via {}) [meta_uc_loc={}, default_uc_loc={}, " + "participant.address={}, ports.meta_uc={}]", + participant.guid_prefix.to_string(), meta_address, meta_port, address_source, + locator_to_string(participant.metatraffic_unicast_locator), + locator_to_string(participant.default_unicast_locator), participant.address, + participant.ports.metatraffic_unicast); + if (!metatraffic_unicast_receiver_ || meta_port == 0 || meta_address.empty()) { + logger_.warn("send_sedp: no usable metatraffic destination for participant {} " + "(address '{}', port {})", + participant.guid_prefix.to_string(), meta_address, meta_port); return false; } @@ -1651,25 +2485,215 @@ bool RtpsParticipant::send_sedp_announcements_to(const ParticipantProxy &partici local_readers = readers_; } - for (const auto &writer_config : local_writers) { - auto payload = build_sedp_publication_message(writer_config); - auto send_config = UdpSocket::SendConfig{ - .ip_address = participant.address, - .port = participant.ports.metatraffic_unicast, - }; + // Each local endpoint is a stable SEDP sample: writer/reader at index i has sequence number i+1. + // Re-announcing resends the same sample under the same SN, so the SEDP HEARTBEAT range stays + // meaningful and a reliable peer can detect/recover a missed announcement. + const UdpSocket::SendConfig send_config{.ip_address = meta_address, .port = meta_port}; + for (size_t i = 0; i < local_writers.size(); i++) { + auto payload = + build_message(guid_prefix_, {.value = kSedpPublicationsReaderEntityId}, + {.value = kSedpPublicationsWriterEntityId}, static_cast(i + 1), + build_sedp_publication_payload(local_writers[i])) + .serialize(); sent = metatraffic_unicast_receiver_->send(payload, send_config) || sent; } - for (const auto &reader_config : local_readers) { - auto payload = build_sedp_subscription_message(reader_config); - auto send_config = UdpSocket::SendConfig{ - .ip_address = participant.address, - .port = participant.ports.metatraffic_unicast, - }; + for (size_t i = 0; i < local_readers.size(); i++) { + auto payload = + build_message(guid_prefix_, {.value = kSedpSubscriptionsReaderEntityId}, + {.value = kSedpSubscriptionsWriterEntityId}, static_cast(i + 1), + build_sedp_subscription_payload(local_readers[i])) + .serialize(); sent = metatraffic_unicast_receiver_->send(payload, send_config) || sent; } + // HEARTBEAT our builtin SEDP writers so the peer's reliable SEDP readers ACKNACK (which lets us + // detect and retransmit a missed announcement on a lossy link). + send_sedp_heartbeats_to(meta_address, meta_port, participant.guid_prefix, local_writers.size(), + local_readers.size()); return sent; } +void RtpsParticipant::send_sedp_heartbeats_to(const std::string &dest_address, uint16_t dest_port, + const GuidPrefix &dest_prefix, size_t writer_count, + size_t reader_count) { + if (!metatraffic_unicast_receiver_ || dest_address.empty() || dest_port == 0) { + return; + } + const UdpSocket::SendConfig send_config{.ip_address = dest_address, .port = dest_port}; + auto emit = [&](const std::array &reader_eid, + const std::array &writer_eid, size_t count, + std::atomic &heartbeat_count) { + if (count == 0) { + return; // nothing announced on this builtin writer yet + } + Message message; + message.header.guid_prefix = guid_prefix_; + message.submessages.push_back(build_info_dst_submessage(dest_prefix)); + message.submessages.push_back(build_heartbeat_submessage( + {.value = reader_eid}, {.value = writer_eid}, /*first_sn=*/1, + /*last_sn=*/static_cast(count), ++heartbeat_count, /*final=*/false)); + metatraffic_unicast_receiver_->send(message.serialize(), send_config); + }; + emit(kSedpPublicationsReaderEntityId, kSedpPublicationsWriterEntityId, writer_count, + sedp_pub_heartbeat_count_); + emit(kSedpSubscriptionsReaderEntityId, kSedpSubscriptionsWriterEntityId, reader_count, + sedp_sub_heartbeat_count_); +} + +std::vector +RtpsParticipant::build_directed_data_message(const GuidPrefix &dest_prefix, EntityId reader_id, + EntityId writer_id, int64_t sequence_number, + std::span serialized_payload) const { + Message message; + message.header.guid_prefix = guid_prefix_; + message.submessages.push_back(build_info_dst_submessage(dest_prefix)); + message.submessages.push_back( + {.kind = SubmessageKind::DATA, + .flags = static_cast(kSubmessageFlagLittleEndian | kSubmessageFlagData), + .payload = build_data_submessage_payload(reader_id, writer_id, sequence_number, + serialized_payload)}); + return message.serialize(); +} + +void RtpsParticipant::retransmit_user_data(const GuidPrefix &reader_prefix, EntityId reader_id, + EntityId writer_id, + const std::vector &requested_sequence_numbers) { + if (!user_unicast_receiver_ || requested_sequence_numbers.empty()) { + return; + } + // Find the local writer this ACKNACK targets (by its entity id) -> its reliable history slot. + std::optional writer_entity_index; + { + std::lock_guard lock(mutex_); + for (const auto &writer_config : writers_) { + if (writer_guid(writer_config.entity_index).entity_id == writer_id) { + writer_entity_index = writer_config.entity_index; + break; + } + } + } + if (!writer_entity_index) { + return; + } + + // Resolve where to send: the requesting reader's unicast locator, else its participant's user + // unicast endpoint. + const Guid requesting_reader_guid{.prefix = reader_prefix, .entity_id = reader_id}; + std::string dest_address; + uint16_t dest_port = 0; + for (const auto &reader : discovery_.readers()) { + if (reader.guid == requesting_reader_guid && has_valid_locator(reader.unicast_locator)) { + dest_address = reader.unicast_locator.address_string(); + dest_port = static_cast(reader.unicast_locator.port); + break; + } + } + if (dest_address.empty()) { + if (auto participant = discovery_.find_participant_by_prefix(reader_prefix)) { + dest_address = participant->address; + dest_port = participant->ports.user_unicast; + } + } + if (dest_address.empty() || dest_port == 0) { + return; + } + + // Collect the requested samples still in history (copied under the lock; sent after releasing + // it). + std::vector>> samples; + { + std::lock_guard lock(reliable_mutex_); + auto state = writer_reliable_states_.find(*writer_entity_index); + if (state == writer_reliable_states_.end()) { + return; + } + for (int64_t sequence_number : requested_sequence_numbers) { + auto entry = state->second.history.find(sequence_number); + if (entry != state->second.history.end()) { + samples.emplace_back(sequence_number, entry->second); + } + } + } + + const UdpSocket::SendConfig send_config{.ip_address = dest_address, .port = dest_port}; + for (const auto &[sequence_number, payload] : samples) { + auto bytes = + build_directed_data_message(reader_prefix, reader_id, writer_id, sequence_number, payload); + user_unicast_receiver_->send(bytes, send_config); + } + if (!samples.empty()) { + logger_.debug("Retransmitted {} user-data sample(s) to {}:{} (writer {}, reader {})", + samples.size(), dest_address, dest_port, writer_id.to_string(), + reader_id.to_string()); + } +} + +void RtpsParticipant::retransmit_sedp(const GuidPrefix &reader_prefix, EntityId reader_id, + EntityId writer_id, + const std::vector &requested_sequence_numbers) { + if (!metatraffic_unicast_receiver_ || requested_sequence_numbers.empty()) { + return; + } + const bool is_publications = writer_id.value == kSedpPublicationsWriterEntityId; + const bool is_subscriptions = writer_id.value == kSedpSubscriptionsWriterEntityId; + if (!is_publications && !is_subscriptions) { + return; // not one of our builtin SEDP writers + } + + std::string dest_address; + uint16_t dest_port = 0; + if (auto participant = discovery_.find_participant_by_prefix(reader_prefix)) { + if (has_valid_locator(participant->metatraffic_unicast_locator)) { + dest_address = participant->metatraffic_unicast_locator.address_string(); + dest_port = static_cast(participant->metatraffic_unicast_locator.port); + } else { + dest_address = participant->address; + dest_port = participant->ports.metatraffic_unicast; + } + } + if (dest_address.empty() || dest_port == 0) { + return; + } + + std::vector local_writers; + std::vector local_readers; + { + std::lock_guard lock(mutex_); + local_writers = writers_; + local_readers = readers_; + } + + // Each SEDP sample's sequence number is the 1-based index of the local endpoint, so it is rebuilt + // deterministically from writers_/readers_ (no separate SEDP history cache needed). + const UdpSocket::SendConfig send_config{.ip_address = dest_address, .port = dest_port}; + size_t retransmitted = 0; + for (int64_t sequence_number : requested_sequence_numbers) { + if (sequence_number < 1) { + continue; + } + const auto index = static_cast(sequence_number - 1); + std::vector payload; + if (is_publications) { + if (index >= local_writers.size()) { + continue; + } + payload = build_sedp_publication_payload(local_writers[index]); + } else { + if (index >= local_readers.size()) { + continue; + } + payload = build_sedp_subscription_payload(local_readers[index]); + } + auto bytes = + build_directed_data_message(reader_prefix, reader_id, writer_id, sequence_number, payload); + metatraffic_unicast_receiver_->send(bytes, send_config); + retransmitted++; + } + if (retransmitted > 0) { + logger_.debug("Retransmitted {} SEDP sample(s) to {}:{} (writer {})", retransmitted, + dest_address, dest_port, writer_id.to_string()); + } +} + bool RtpsParticipant::send_discovery_now() { auto participants = discovered_participants(); return std::accumulate(participants.begin(), participants.end(), send_spdp_announce_now(), diff --git a/pc/tests/rtps_subscriber.cpp b/pc/tests/rtps_subscriber.cpp index 501dddfec..44c91c764 100644 --- a/pc/tests/rtps_subscriber.cpp +++ b/pc/tests/rtps_subscriber.cpp @@ -23,12 +23,14 @@ int main(int argc, char **argv) { espp::RtpsParticipant participant({ .node_name = "espp_subscriber", .participant_id = 12, + .bind_address = address, .advertised_address = address, .announce_period = 500ms, .on_participant_discovered = [&logger](const auto &proxy) { logger.info("discovered participant '{}' at {}", proxy.name, proxy.address); }, + .log_level = espp::Logger::Verbosity::DEBUG, }); participant.add_reader({ .topic_name = topic, From 0a8b0f80f10139836d0e18c035711b103788ad1f Mon Sep 17 00:00:00 2001 From: William Emfinger Date: Mon, 29 Jun 2026 18:23:59 -0500 Subject: [PATCH 2/2] add support for GAP message; allow GUID to be randomized; fix static analysis --- components/rtps/RELIABLE_RTPS_PLAN.md | 50 +++-- components/rtps/include/rtps.hpp | 28 +++ components/rtps/src/rtps.cpp | 269 +++++++++++++++++++++++--- 3 files changed, 308 insertions(+), 39 deletions(-) diff --git a/components/rtps/RELIABLE_RTPS_PLAN.md b/components/rtps/RELIABLE_RTPS_PLAN.md index 490e61bc6..d4dcf48c1 100644 --- a/components/rtps/RELIABLE_RTPS_PLAN.md +++ b/components/rtps/RELIABLE_RTPS_PLAN.md @@ -1,14 +1,24 @@ # Design: Reliable RTPS (HEARTBEAT / ACKNACK) for user data -Status: **in progress.** Phase 0 (submessage codecs + dispatch seam), Phase 1 -(reliable writer: history cache + HEARTBEAT emission), and Phase 2 (reliable -reader: dedup + in-order delivery + ACKNACK generation) are implemented; Phase 3 -(writer-side retransmission on ACKNACK) follows as a separate reviewable PR. -Best-effort behavior is unchanged for endpoints that advertise `BEST_EFFORT`. +Status: **complete (pending final interop sign-off).** Phases 0–4 are +implemented and hardware-validated against Fast DDS/RTPS: + +- Phase 0 — submessage codecs (`SequenceNumberSet`, `HEARTBEAT`, `ACKNACK`, + `INFO_DST`, `GAP`) + dispatch seam. +- Phase 1 — reliable writer: history cache + HEARTBEAT emission. +- Phase 2 — reliable reader: dedup + in-order delivery + ACKNACK generation. +- Phase 3 — writer-side retransmission on ACKNACK, for **both** user-data + writers and the builtin **SEDP** writers, plus `GAP` for evicted/irrelevant + samples (emitted by the writer, honored by the reader). +- Phase 4 — hardening: builtin SEDP reliability (stable SEDP sequence numbers + + SEDP HEARTBEATs so a reliable peer recovers a missed announcement), per-reader + `DATA` `readerId` addressing, multi-homed multicast interface selection, and a + GUID-keyed `DiscoveryDb` for discovery state. -With Phases 1–2, an espp reliable reader already recovers lost samples from a -**DDS/ROS 2 reliable writer** (which retransmits on our ACKNACK). espp↔espp -recovery needs Phase 3 (our writer answering NACKs). +Best-effort behavior is unchanged for endpoints that advertise `BEST_EFFORT`. +Reliable recovery now works in both directions: an espp reliable reader recovers +lost samples from a DDS/ROS 2 writer, and an espp reliable writer answers a +peer's NACKs (and GAPs samples it can no longer provide). ## Goal @@ -150,12 +160,22 @@ state — matching the existing snapshot-then-act pattern in Handles writer-purged gaps (advance past lost SNs) and stale heartbeats. Reliable handshake runs only when both endpoints advertise RELIABLE; the downgrade warning is removed. -- **Phase 3 — writer retransmission**: resend NACKed SNs from history on ACKNACK; - per-reader acked watermark + purge. -- **Phase 4 — hardening**: initial heartbeat on reader match, stale-count - rejection, jittered timers, `stop()` cleanup, edge cases. -- **Phase 5 — interop validation**: espp↔espp loopback with induced drops, then - Fast DDS / Cyclone / ROS 2 reliable with Wireshark; explicit packet-loss tests. +- **Phase 3 — writer retransmission** *(done)*: on ACKNACK, a writer resends the + NACKed SNs still in history to the requesting reader. Shared + `build_directed_data_message` (INFO_DST + DATA addressed to the requesting + reader) drives both `retransmit_user_data` (from the per-writer history) and + `retransmit_sedp` (rebuilt deterministically from `writers_`/`readers_` via the + stable index-based SEDP sequence numbers — no separate SEDP cache). Samples no + longer in history are answered with a `GAP`; the reader's `apply_gap` advances + its frontier over the irrelevant SNs and releases anything buffered behind them. +- **Phase 4 — hardening** *(done)*: builtin SEDP reliability (stable SEDP SNs + + SEDP HEARTBEATs so a reliable peer ACKNACKs and we retransmit a missed + announcement), per-reader `DATA` `readerId` addressing, `stop()` cleanup of all + reliable state, and bounded reorder/irrelevant buffers. (Remaining nice-to-haves: + initial heartbeat on reader match, stale-count rejection, jittered timers.) +- **Phase 5 — interop validation** *(in progress)*: hardware-validated publishing + from an ESP32-P4 (Ethernet) to a Fast RTPS subscriber, including the SEDP + HEARTBEAT/ACKNACK exchange. Still TODO: induced packet-loss tests and Cyclone. ## Design decisions - **History model**: KEEP_LAST depth per writer (bounded memory on embedded), not @@ -167,6 +187,6 @@ state — matching the existing snapshot-then-act pattern in ## Out of scope (later) - Durability beyond VOLATILE (TRANSIENT_LOCAL replay to late joiners). -- `GAP` submessage (irrelevant/removed samples). - Fragmentation (`DATA_FRAG`) for samples larger than the MTU. - Full QoS-incompatibility reporting. +- Reliable-over-multicast (the reliable handshake is unicast-only). diff --git a/components/rtps/include/rtps.hpp b/components/rtps/include/rtps.hpp index 3df8eeca4..d869aef47 100644 --- a/components/rtps/include/rtps.hpp +++ b/components/rtps/include/rtps.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -126,6 +127,7 @@ class RtpsParticipant : public BaseComponent { PAD = 0x01, ///< Padding submessage. ACKNACK = 0x06, ///< Reliable-reader acknowledgement submessage. HEARTBEAT = 0x07, ///< Reliable-writer heartbeat submessage. + GAP = 0x08, ///< Writer notification that sequence numbers are irrelevant/unavailable. INFO_TS = 0x09, ///< Timestamp information submessage. INFO_DST = 0x0e, ///< Destination GUID-prefix information submessage. DATA = 0x15, ///< User or discovery data submessage. @@ -226,6 +228,13 @@ class RtpsParticipant : public BaseComponent { std::string node_name{"espp_rtps"}; ///< Local participant name advertised in discovery. uint16_t domain_id{0}; ///< RTPS domain ID used for port derivation and discovery scope. uint16_t participant_id{0}; ///< RTPS participant ID used for GUID and port derivation. + bool randomize_guid_prefix{true}; ///< When true (default), mix per-instance entropy into the + ///< participant GUID so a restarted participant is seen as a + ///< new participant — DDS/ROS 2 peers then accept its + ///< republished samples instead of dropping them as + ///< already-seen duplicates. Set false for a deterministic + ///< GUID derived only from node_name/domain/participant id + ///< (e.g. reproducible tests). std::string bind_address{"0.0.0.0"}; ///< Local IPv4 address to bind sockets to. std::string advertised_address{ "127.0.0.1"}; ///< IPv4 address advertised to peers for unicast traffic. @@ -419,6 +428,13 @@ class RtpsParticipant : public BaseComponent { const std::vector &requested_sequence_numbers); void retransmit_sedp(const GuidPrefix &reader_prefix, EntityId reader_id, EntityId writer_id, const std::vector &requested_sequence_numbers); + // Reader-side GAP handling: mark the irrelevant sequence numbers as skipped and advance. + // [gap_start, gap_list_base) is the contiguous irrelevant range; bitmap_irrelevant are individual + // irrelevant sequence numbers >= gap_list_base. + void handle_user_gap(const GuidPrefix &writer_prefix, EntityId writer_id, int64_t gap_start, + int64_t gap_list_base, const std::vector &bitmap_irrelevant); + void handle_builtin_gap(const GuidPrefix &writer_prefix, EntityId writer_id, int64_t gap_start, + int64_t gap_list_base, const std::vector &bitmap_irrelevant); /// @brief Per-writer reliable-QoS state: the history of recently-sent samples /// (keyed by sequence number) plus heartbeat bookkeeping. Guarded by @@ -437,10 +453,22 @@ class RtpsParticipant : public BaseComponent { int64_t highest_delivered{0}; ///< Highest sequence number delivered in order (0 = none). std::map> reorder{}; ///< Out-of-order samples awaiting their predecessors. + std::set irrelevant{}; ///< Out-of-order sequence numbers a GAP marked irrelevant + ///< (skipped, not delivered, when the frontier reaches them). uint32_t last_heartbeat_count{0}; ///< Highest HEARTBEAT count seen (stale-heartbeat detection). uint32_t acknack_count{0}; ///< Monotonic ACKNACK count emitted by this reader. }; + // Advance the reader's in-order frontier over any now-contiguous buffered samples (collected into + // `delivered`) and irrelevant (GAP-skipped) sequence numbers. Caller must hold reliable_mutex_. + static void drain_reader_frontier(ReaderReliableState &state, + std::vector> &delivered); + // Mark a GAP's irrelevant sequence numbers as skipped, then drain the frontier. Caller must hold + // reliable_mutex_. + static void apply_gap(ReaderReliableState &state, int64_t gap_start, int64_t gap_list_base, + const std::vector &bitmap_irrelevant, + std::vector> &delivered); + /// @brief Hash functor for using a Guid as an unordered_map key. struct GuidHash { size_t operator()(const Guid &guid) const; diff --git a/components/rtps/src/rtps.cpp b/components/rtps/src/rtps.cpp index d4c90dbc2..57696e539 100644 --- a/components/rtps/src/rtps.cpp +++ b/components/rtps/src/rtps.cpp @@ -2,17 +2,41 @@ #include #include +#include #include #include +#include #include #include #include +#if defined(ESP_PLATFORM) +#include "esp_random.h" +#else +#include +#endif + #include "cdr.hpp" namespace { constexpr std::array kRtpsMagic{'R', 'T', 'P', 'S'}; +// Per-instance entropy mixed into the participant GUID so a restarted participant presents a new +// GUID. Without this a restart reuses the same writer GUID and republishes sequence numbers from 1, +// which reliable DDS/ROS 2 peers treat as already-seen duplicates and drop. +uint64_t random_guid_entropy() { +#if defined(ESP_PLATFORM) + return (static_cast(esp_random()) << 32) ^ esp_random(); +#else + std::random_device device; + uint64_t value = (static_cast(device()) << 32) ^ device(); + // Mix in a high-resolution clock sample so rapid restarts diverge even if random_device repeats. + value ^= + static_cast(std::chrono::high_resolution_clock::now().time_since_epoch().count()); + return value; +#endif +} + constexpr uint16_t kPortBase = 7400; constexpr uint16_t kDomainGain = 250; constexpr uint16_t kParticipantGain = 2; @@ -959,6 +983,55 @@ std::vector requested_sequence_numbers(const SequenceNumberSet &set) { } return sequence_numbers; } + +// GAP submessage: readerId(4) writerId(4) gapStart:SN(8) gapList:SequenceNumberSet. The samples in +// [gapStart, gapList.base - 1] plus the set bits in gapList are irrelevant / no longer available. +std::vector build_gap_payload(const espp::RtpsParticipant::EntityId &reader_id, + const espp::RtpsParticipant::EntityId &writer_id, + int64_t gap_start, const SequenceNumberSet &gap_list) { + ByteWriter writer; + writer.append_bytes(reader_id.value); + writer.append_bytes(writer_id.value); + writer.append_sequence_number_le(gap_start); + append_sequence_number_set(writer, gap_list); + return writer.take(); +} + +espp::RtpsParticipant::Submessage +build_gap_submessage(const espp::RtpsParticipant::EntityId &reader_id, + const espp::RtpsParticipant::EntityId &writer_id, int64_t gap_start, + const SequenceNumberSet &gap_list) { + return {.kind = espp::RtpsParticipant::SubmessageKind::GAP, + .flags = kSubmessageFlagLittleEndian, + .payload = build_gap_payload(reader_id, writer_id, gap_start, gap_list)}; +} + +struct GapView { + espp::RtpsParticipant::EntityId reader_id{}; + espp::RtpsParticipant::EntityId writer_id{}; + int64_t gap_start{0}; + SequenceNumberSet gap_list{}; + bool valid{false}; +}; + +GapView parse_gap_submessage(const espp::RtpsParticipant::Submessage &submessage) { + GapView view; + if (submessage.kind != espp::RtpsParticipant::SubmessageKind::GAP) { + return view; + } + const bool little_endian = (submessage.flags & kSubmessageFlagLittleEndian) != 0; + ByteReader reader(std::span{submessage.payload.data(), submessage.payload.size()}); + if (!reader.read_bytes( + std::span{view.reader_id.value.data(), view.reader_id.value.size()}) || + !reader.read_bytes( + std::span{view.writer_id.value.data(), view.writer_id.value.size()}) || + !reader.read_sequence_number(view.gap_start, little_endian) || + !read_sequence_number_set(reader, little_endian, view.gap_list)) { + return view; + } + view.valid = true; + return view; +} } // namespace namespace espp { @@ -1055,8 +1128,13 @@ RtpsParticipant::RtpsParticipant(const Config &config) , config_(config) { // GUID prefix layout: bytes 0..1 = participant_id, 2..3 = domain_id, 4..11 = 64-bit node-name // hash. Uniqueness across participants on one host relies on distinct participant_ids; the - // node-name hash distinguishes different nodes/applications. + // node-name hash distinguishes different nodes/applications. By default we also mix per-instance + // entropy into the hash so a restarted participant presents a new GUID (otherwise reliable peers + // drop its republished samples as already-seen duplicates). uint64_t hash = fnv1a_64(config_.node_name); + if (config_.randomize_guid_prefix) { + hash ^= random_guid_entropy(); + } guid_prefix_.value[0] = config_.participant_id & 0xff; guid_prefix_.value[1] = (config_.participant_id >> 8) & 0xff; guid_prefix_.value[2] = config_.domain_id & 0xff; @@ -1631,11 +1709,9 @@ bool RtpsParticipant::send_heartbeats_now() { std::vector reliable_writers; { std::lock_guard lock(mutex_); - for (const auto &writer : writers_) { - if (writer.reliability == ReliabilityKind::RELIABLE) { - reliable_writers.push_back(writer); - } - } + std::copy_if( + writers_.begin(), writers_.end(), std::back_inserter(reliable_writers), + [](const WriterConfig &writer) { return writer.reliability == ReliabilityKind::RELIABLE; }); } bool sent = false; for (const auto &writer : reliable_writers) { @@ -1644,6 +1720,66 @@ bool RtpsParticipant::send_heartbeats_now() { return sent; } +void RtpsParticipant::drain_reader_frontier(ReaderReliableState &state, + std::vector> &delivered) { + while (true) { + const int64_t next = state.highest_delivered + 1; + auto buffered = state.reorder.find(next); + if (buffered != state.reorder.end()) { + delivered.push_back(std::move(buffered->second)); + state.reorder.erase(buffered); + state.highest_delivered = next; + continue; + } + auto skipped = state.irrelevant.find(next); + if (skipped != state.irrelevant.end()) { + state.irrelevant.erase(skipped); // irrelevant sample: advance past it without delivering + state.highest_delivered = next; + continue; + } + break; + } +} + +void RtpsParticipant::apply_gap(ReaderReliableState &state, int64_t gap_start, + int64_t gap_list_base, + const std::vector &bitmap_irrelevant, + std::vector> &delivered) { + constexpr int64_t kMaxGapRange = 4096; + // Contiguous irrelevant range [gap_start, gap_list_base - 1]. + const int64_t range_end = gap_list_base - 1; + if (range_end > state.highest_delivered) { + if (gap_start <= state.highest_delivered + 1) { + // Touches the frontier: advance directly past the whole irrelevant range. + state.highest_delivered = range_end; + } else if (range_end - gap_start <= kMaxGapRange) { + // A hole precedes the gap: remember the irrelevant SNs so the frontier skips them later. + for (int64_t sn = gap_start; sn <= range_end; sn++) { + if (sn > state.highest_delivered) { + state.irrelevant.insert(sn); + } + } + } else { + // Unreasonably large gap ahead of our frontier: we are hopelessly behind; jump past it. + state.highest_delivered = range_end; + } + } + // Individual irrelevant sequence numbers (the set bits) at/after gap_list_base. + for (int64_t sn : bitmap_irrelevant) { + if (sn > state.highest_delivered) { + state.irrelevant.insert(sn); + } + } + // Drop anything that is now at/below the frontier. + while (!state.reorder.empty() && state.reorder.begin()->first <= state.highest_delivered) { + state.reorder.erase(state.reorder.begin()); + } + while (!state.irrelevant.empty() && *state.irrelevant.begin() <= state.highest_delivered) { + state.irrelevant.erase(state.irrelevant.begin()); + } + drain_reader_frontier(state, delivered); +} + void RtpsParticipant::deliver_reliable_sample( uint32_t reader_entity_index, const Guid &writer_guid, int64_t sequence_number, std::span payload, @@ -1660,18 +1796,10 @@ void RtpsParticipant::deliver_reliable_sample( return; } if (sequence_number == state.highest_delivered + 1) { - // Next expected sample: deliver it, then drain any now-contiguous buffered samples. + // Next expected sample: deliver it, then drain any now-contiguous buffered / irrelevant SNs. to_deliver.emplace_back(payload.begin(), payload.end()); state.highest_delivered = sequence_number; - while (true) { - auto iterator = state.reorder.find(state.highest_delivered + 1); - if (iterator == state.reorder.end()) { - break; - } - to_deliver.push_back(std::move(iterator->second)); - state.reorder.erase(iterator); - state.highest_delivered++; - } + drain_reader_frontier(state, to_deliver); } else if (state.reorder.find(sequence_number) == state.reorder.end() && state.reorder.size() < config_.reliable_reorder_depth) { // Out of order: buffer for in-order delivery once the gap is filled. If the buffer is full @@ -1957,6 +2085,17 @@ bool RtpsParticipant::handle_metatraffic_message(std::vector &data, } continue; } + // A peer GAPs its builtin SEDP writer (samples it will never send); advance past them. + if (submessage.kind == SubmessageKind::GAP) { + if (message->header.guid_prefix != guid_prefix_) { + auto gap = parse_gap_submessage(submessage); + if (gap.valid) { + handle_builtin_gap(message->header.guid_prefix, gap.writer_id, gap.gap_start, + gap.gap_list.base, requested_sequence_numbers(gap.gap_list)); + } + } + continue; + } bool valid_data = false; auto data_view = parse_data_submessage(submessage, valid_data); @@ -2235,6 +2374,16 @@ bool RtpsParticipant::handle_user_message(std::vector &data, const Sock } continue; } + // A reliable writer GAPs samples it will never send (evicted/irrelevant); advance past them so + // we stop NACKing them and release any buffered samples blocked behind them. + if (submessage.kind == SubmessageKind::GAP) { + auto gap = parse_gap_submessage(submessage); + if (gap.valid) { + handle_user_gap(message->header.guid_prefix, gap.writer_id, gap.gap_start, + gap.gap_list.base, requested_sequence_numbers(gap.gap_list)); + } + continue; + } bool valid_data = false; auto data_view = parse_data_submessage(submessage, valid_data); @@ -2597,19 +2746,28 @@ void RtpsParticipant::retransmit_user_data(const GuidPrefix &reader_prefix, Enti return; } - // Collect the requested samples still in history (copied under the lock; sent after releasing - // it). + // Collect the requested samples still in history, plus the lowest requested sequence number that + // has already been evicted (below the history floor) so we can GAP it. Copied under the lock; + // sent after releasing it. std::vector>> samples; + int64_t first_available = 0; // lowest SN still in history (0 = history empty) + int64_t lowest_evicted = 0; // lowest requested SN below first_available (0 = none) { std::lock_guard lock(reliable_mutex_); auto state = writer_reliable_states_.find(*writer_entity_index); if (state == writer_reliable_states_.end()) { return; } + const auto &history = state->second.history; + first_available = + history.empty() ? state->second.last_sequence_number + 1 : history.begin()->first; for (int64_t sequence_number : requested_sequence_numbers) { - auto entry = state->second.history.find(sequence_number); - if (entry != state->second.history.end()) { + auto entry = history.find(sequence_number); + if (entry != history.end()) { samples.emplace_back(sequence_number, entry->second); + } else if (sequence_number < first_available && + (lowest_evicted == 0 || sequence_number < lowest_evicted)) { + lowest_evicted = sequence_number; } } } @@ -2620,10 +2778,22 @@ void RtpsParticipant::retransmit_user_data(const GuidPrefix &reader_prefix, Enti build_directed_data_message(reader_prefix, reader_id, writer_id, sequence_number, payload); user_unicast_receiver_->send(bytes, send_config); } - if (!samples.empty()) { - logger_.debug("Retransmitted {} user-data sample(s) to {}:{} (writer {}, reader {})", - samples.size(), dest_address, dest_port, writer_id.to_string(), - reader_id.to_string()); + // Tell the reader the evicted samples are gone (GAP [lowest_evicted, first_available - 1]) so it + // advances its frontier instead of NACKing them forever. + if (lowest_evicted != 0 && first_available > lowest_evicted) { + SequenceNumberSet gap_list; + gap_list.base = first_available; // next SN the reader should expect + Message message; + message.header.guid_prefix = guid_prefix_; + message.submessages.push_back(build_info_dst_submessage(reader_prefix)); + message.submessages.push_back( + build_gap_submessage(reader_id, writer_id, lowest_evicted, gap_list)); + user_unicast_receiver_->send(message.serialize(), send_config); + } + if (!samples.empty() || lowest_evicted != 0) { + logger_.debug("Retransmitted {} user-data sample(s){} to {}:{} (writer {}, reader {})", + samples.size(), lowest_evicted != 0 ? " + GAP" : "", dest_address, dest_port, + writer_id.to_string(), reader_id.to_string()); } } @@ -2694,6 +2864,57 @@ void RtpsParticipant::retransmit_sedp(const GuidPrefix &reader_prefix, EntityId } } +void RtpsParticipant::handle_user_gap(const GuidPrefix &writer_prefix, EntityId writer_id, + int64_t gap_start, int64_t gap_list_base, + const std::vector &bitmap_irrelevant) { + const Guid writer_guid{.prefix = writer_prefix, .entity_id = writer_id}; + auto writer = discovery_.find_writer(writer_guid); + if (!writer || writer->reliability != ReliabilityKind::RELIABLE) { + return; + } + struct MatchedReader { + uint32_t entity_index{0}; + std::function)> on_sample{}; + }; + std::vector matched_readers; + { + std::lock_guard lock(mutex_); + for (const auto &reader_config : readers_) { + if (reader_config.topic_name == writer->topic_name && + reader_config.reliability == ReliabilityKind::RELIABLE && reader_config.on_sample) { + matched_readers.push_back({reader_config.entity_index, reader_config.on_sample}); + } + } + } + for (const auto &reader : matched_readers) { + std::vector> delivered; + { + std::lock_guard lock(reliable_mutex_); + auto key = fmt::format("{}#{}", reader.entity_index, writer_guid.to_string()); + apply_gap(reader_reliable_states_[key], gap_start, gap_list_base, bitmap_irrelevant, + delivered); + } + for (const auto &sample : delivered) { + reader.on_sample(sample); + } + } +} + +void RtpsParticipant::handle_builtin_gap(const GuidPrefix &writer_prefix, EntityId writer_id, + int64_t gap_start, int64_t gap_list_base, + const std::vector &bitmap_irrelevant) { + // Only the reliable builtin SEDP writers are tracked for ACKNACK accounting. + if (writer_id.value != kSedpPublicationsWriterEntityId && + writer_id.value != kSedpSubscriptionsWriterEntityId) { + return; + } + const Guid writer_guid{.prefix = writer_prefix, .entity_id = writer_id}; + std::vector> delivered; // builtin reader tracks SNs only; nothing to deliver + std::lock_guard lock(reliable_mutex_); + apply_gap(builtin_reader_states_[writer_guid.to_string()], gap_start, gap_list_base, + bitmap_irrelevant, delivered); +} + bool RtpsParticipant::send_discovery_now() { auto participants = discovered_participants(); return std::accumulate(participants.begin(), participants.end(), send_spdp_announce_now(),