diff --git a/apps/axon_recorder/axon_recorder.cpp b/apps/axon_recorder/axon_recorder.cpp index 08362959..a5781392 100644 --- a/apps/axon_recorder/axon_recorder.cpp +++ b/apps/axon_recorder/axon_recorder.cpp @@ -49,9 +49,9 @@ void print_usage(const char* program_name) { << " --path PATH Output directory path (default: .)\n" << " --profile PROFILE ROS profile: ros1 or ros2 (default: ros2)\n" << " --compression ALG Compression: none, zstd, lz4 (default: zstd)\n" - << " --level LEVEL Compression preset 0-4 (default: 3)\n" - << " 0=Default, 1=Fastest, 2=Fast, 3=Default, 4=Slow/Slowest\n" - << " (applies to both zstd and lz4; values >4 are clamped)\n" + << " --level LEVEL Compression preset (default: 3)\n" + << " 0=Default, 1=Fastest, 2=Fast, 3=Default, 4=Slow, >=5=Slowest\n" + << " (applies to both zstd and lz4)\n" << " --queue-size SIZE Message queue capacity (default: 1024)\n" << " --ws-url URL WebSocket server URL for --ws-client mode\n" << " --ws-auth-token TOKEN Authentication token for WebSocket handshake\n" @@ -116,7 +116,7 @@ void print_usage(const char* program_name) { << " recording:\n" << " profile: ros2\n" << " compression: zstd\n" - << " compression_level: 3 # preset 0-4 (see config/README.md)\n" + << " compression_level: 3 # preset: 3=Default (see config/README.md)\n" << " disk_usage:\n" << " warn_usage_gb: 80\n" << " hard_limit_gb: 100\n" diff --git a/apps/axon_recorder/config/README.md b/apps/axon_recorder/config/README.md index 0cd9b418..6c04b40d 100644 --- a/apps/axon_recorder/config/README.md +++ b/apps/axon_recorder/config/README.md @@ -62,14 +62,15 @@ List of topics to record with batching settings: - `message_type`: ROS message type - `batch_size`: Number of messages to batch before writing - `flush_interval_ms`: Maximum time to wait before flushing (ms) +- `qos_depth`: ROS2 subscription history depth (default `10`). This controls + the middleware QoS queue, not the recorder's `dataset.queue_size` worker queue. ### Recording - `profile`: ROS profile (`ros1` or `ros2`) - `compression`: MCAP compression (`none`, `zstd`, `lz4`) -- `compression_level`: Compression level preset, range **0-4** (applies to both zstd and lz4). - Maps to MCAP's `CompressionLevel` enum: `0`=Default, `1`=Fastest, `2`=Fast, `3`=Default, `4`=Slow/Slowest. - Values greater than 4 are silently clamped. This is *not* the native zstd (1-19) or lz4 (1-12) - range — the underlying MCAP C++ library only exposes 5 presets. +- `compression_level`: Compression level preset (applies to both zstd and lz4). + Maps recorder/CLI values to MCAP presets: `0`=Default, `1`=Fastest, `2`=Fast, `3`=Default, `4`=Slow, `>=5`=Slowest. + This is *not* the native zstd (1-19) or lz4 (1-12) range; the underlying MCAP C++ library only exposes 5 presets. - `max_disk_usage_gb`: Backward-compatible alias for `disk_usage.hard_limit_gb` - `disk_usage`: Recorder-managed disk budget - `enabled`: Enable warn/hard disk guard checks diff --git a/apps/axon_recorder/config/default_config_ros1.yaml b/apps/axon_recorder/config/default_config_ros1.yaml index 8f09abdf..3a252d41 100644 --- a/apps/axon_recorder/config/default_config_ros1.yaml +++ b/apps/axon_recorder/config/default_config_ros1.yaml @@ -99,13 +99,11 @@ recording: # - lz4: Faster compression, larger files # - none: No compression, fastest, largest files compression: zstd - # Compression level preset (applies to both zstd and lz4, range: 0-4). - # Internally maps to MCAP's CompressionLevel enum: - # 0 = use library default (Default) - # 1 = Fastest 2 = Fast 3 = Default 4 = Slow/Slowest - # Values > 4 are silently clamped to 4. This is NOT the native - # zstd (1-19) / lz4 (1-12) range -- the MCAP C++ library only exposes - # 5 presets. See core/axon_mcap/mcap_writer_wrapper.hpp. + # Compression level preset (applies to both zstd and lz4). + # Internally maps recorder/CLI values to MCAP's CompressionLevel enum: + # 0 = Default 1 = Fastest 2 = Fast 3 = Default 4 = Slow >=5 = Slowest + # This is NOT the native zstd (1-19) / lz4 (1-12) range. + # See core/axon_mcap/mcap_writer_wrapper.hpp. compression_level: 3 # Mixed-topic writer batching. Topic workers remain per-topic serial, while diff --git a/apps/axon_recorder/config/default_config_ros2.yaml b/apps/axon_recorder/config/default_config_ros2.yaml index f67e007e..af1a793e 100644 --- a/apps/axon_recorder/config/default_config_ros2.yaml +++ b/apps/axon_recorder/config/default_config_ros2.yaml @@ -49,6 +49,8 @@ dataset: # High-frequency subscriptions (IMU, camera): Use larger batches (100-5000) # Low-frequency subscriptions (status, diagnostics): Use small batches (1-10) # Control signals: Use batch_size=1 for immediate persistence +# Optional qos_depth controls ROS2 subscription history depth. It is separate +# from dataset.queue_size, which controls Axon's per-topic worker queue. subscriptions: # IMU data - high frequency (5 seconds of data per batch) - name: /imu/data @@ -108,13 +110,11 @@ recording: # - lz4: Faster compression, larger files # - none: No compression, fastest, largest files compression: zstd - # Compression level preset (applies to both zstd and lz4, range: 0-4). - # Internally maps to MCAP's CompressionLevel enum: - # 0 = use library default (Default) - # 1 = Fastest 2 = Fast 3 = Default 4 = Slow/Slowest - # Values > 4 are silently clamped to 4. This is NOT the native - # zstd (1-19) / lz4 (1-12) range -- the MCAP C++ library only exposes - # 5 presets. See core/axon_mcap/mcap_writer_wrapper.hpp. + # Compression level preset (applies to both zstd and lz4). + # Internally maps recorder/CLI values to MCAP's CompressionLevel enum: + # 0 = Default 1 = Fastest 2 = Fast 3 = Default 4 = Slow >=5 = Slowest + # This is NOT the native zstd (1-19) / lz4 (1-12) range. + # See core/axon_mcap/mcap_writer_wrapper.hpp. compression_level: 3 # Mixed-topic writer batching. Topic workers remain per-topic serial, while diff --git a/apps/axon_recorder/config/default_config_udp.yaml b/apps/axon_recorder/config/default_config_udp.yaml index 3569a843..98135999 100644 --- a/apps/axon_recorder/config/default_config_udp.yaml +++ b/apps/axon_recorder/config/default_config_udp.yaml @@ -123,13 +123,11 @@ recording: # - lz4: Faster compression, larger files # - none: No compression, fastest, largest files compression: zstd - # Compression level preset (applies to both zstd and lz4, range: 0-4). - # Internally maps to MCAP's CompressionLevel enum: - # 0 = use library default (Default) - # 1 = Fastest 2 = Fast 3 = Default 4 = Slow/Slowest - # Values > 4 are silently clamped to 4. This is NOT the native - # zstd (1-19) / lz4 (1-12) range -- the MCAP C++ library only exposes - # 5 presets. See core/axon_mcap/mcap_writer_wrapper.hpp. + # Compression level preset (applies to both zstd and lz4). + # Internally maps recorder/CLI values to MCAP's CompressionLevel enum: + # 0 = Default 1 = Fastest 2 = Fast 3 = Default 4 = Slow >=5 = Slowest + # This is NOT the native zstd (1-19) / lz4 (1-12) range. + # See core/axon_mcap/mcap_writer_wrapper.hpp. compression_level: 3 # Disk usage limits for recorder-managed local storage. diff --git a/apps/axon_recorder/src/config/config_parser.cpp b/apps/axon_recorder/src/config/config_parser.cpp index 04ac914c..046c496c 100644 --- a/apps/axon_recorder/src/config/config_parser.cpp +++ b/apps/axon_recorder/src/config/config_parser.cpp @@ -278,6 +278,7 @@ bool ConfigParser::save_to_file(const std::string& path, const RecorderConfig& c subscription_node["message_type"] = subscription.message_type; subscription_node["batch_size"] = subscription.batch_size; subscription_node["flush_interval_ms"] = subscription.flush_interval_ms; + subscription_node["qos_depth"] = subscription.qos_depth; node["subscriptions"].push_back(subscription_node); } @@ -359,6 +360,10 @@ bool ConfigParser::parse_subscriptions( if (subscription_node["flush_interval_ms"]) { subscription.flush_interval_ms = subscription_node["flush_interval_ms"].as(); } + if (subscription_node["qos_depth"]) { + const auto qos_depth = subscription_node["qos_depth"].as(); + subscription.qos_depth = qos_depth > 0 ? static_cast(qos_depth) : 0; + } // Parse depth_compression section (can be boolean or object) if (subscription_node["depth_compression"]) { const auto& dc_node = subscription_node["depth_compression"]; @@ -785,6 +790,10 @@ bool ConfigParser::validate(const RecorderConfig& config, std::string& error_msg error_msg = "Subscription batch_size must be > 0"; return false; } + if (subscription.qos_depth == 0) { + error_msg = "Subscription qos_depth must be > 0"; + return false; + } } const auto& writer_batch = config.recording.writer_batch; diff --git a/apps/axon_recorder/src/core/recorder.cpp b/apps/axon_recorder/src/core/recorder.cpp index 248482fc..cf68e633 100644 --- a/apps/axon_recorder/src/core/recorder.cpp +++ b/apps/axon_recorder/src/core/recorder.cpp @@ -2256,8 +2256,9 @@ bool AxonRecorder::setup_subscriptions() { } std::string options_json; - if (sub.depth_compression.enabled) { + { nlohmann::json opts; + opts["qos_depth"] = sub.qos_depth; opts["depth_compression"]["enabled"] = sub.depth_compression.enabled; opts["depth_compression"]["level"] = sub.depth_compression.level; options_json = opts.dump(); diff --git a/apps/axon_recorder/src/core/recorder.hpp b/apps/axon_recorder/src/core/recorder.hpp index a657bac3..1acf793c 100644 --- a/apps/axon_recorder/src/core/recorder.hpp +++ b/apps/axon_recorder/src/core/recorder.hpp @@ -70,6 +70,7 @@ struct SubscriptionConfig { // Batch writing configuration size_t batch_size = 1; // Number of messages to batch before writing (1 = immediate) int flush_interval_ms = 100; // Maximum time to wait before flushing (ms) + size_t qos_depth = 10; // ROS middleware subscription history depth // Depth compression configuration struct DepthCompression { @@ -129,9 +130,8 @@ struct RecordingConfig { std::string profile = "ros2"; std::string compression = "zstd"; WriterBatchConfig writer_batch; - // Compression preset, range 0-4 (applies to both zstd and lz4). - // Maps to axon::mcap_wrapper::CompressionLevel: 0=Default, 1=Fastest, - // 2=Fast, 3=Default, 4=Slow/Slowest. Values >4 are clamped to 4. + // Compression preset used by recorder YAML/CLI (applies to both zstd and lz4). + // 0=Default, 1=Fastest, 2=Fast, 3=Default, 4=Slow, >=5=Slowest. // NOT the native zstd/lz4 1-19 / 1-12 range. int compression_level = 3; diff --git a/apps/axon_recorder/test/unit/test_config_parser.cpp b/apps/axon_recorder/test/unit/test_config_parser.cpp index 595bcc37..4b500cd6 100644 --- a/apps/axon_recorder/test/unit/test_config_parser.cpp +++ b/apps/axon_recorder/test/unit/test_config_parser.cpp @@ -94,6 +94,7 @@ TEST_F(ConfigParserTest, ParseValidFullConfig) { message_type: sensor_msgs/Image batch_size: 50 flush_interval_ms: 500 + qos_depth: 64 - name: /imu/data message_type: sensor_msgs/Imu batch_size: 200 @@ -170,8 +171,10 @@ TEST_F(ConfigParserTest, ParseValidFullConfig) { EXPECT_EQ(config.subscriptions[0].topic_name, "/camera/image"); EXPECT_EQ(config.subscriptions[0].batch_size, 50); EXPECT_EQ(config.subscriptions[0].flush_interval_ms, 500); + EXPECT_EQ(config.subscriptions[0].qos_depth, 64u); EXPECT_EQ(config.subscriptions[1].topic_name, "/imu/data"); EXPECT_EQ(config.subscriptions[1].batch_size, 200); + EXPECT_EQ(config.subscriptions[1].qos_depth, 10u); EXPECT_DOUBLE_EQ(config.recording.max_disk_usage_gb, 50.0); EXPECT_DOUBLE_EQ(config.recording.disk_usage.warn_usage_gb, 40.0); @@ -293,6 +296,7 @@ TEST_F(ConfigParserTest, DefaultValuesForOptionalFields) { // Subscription defaults EXPECT_EQ(config.subscriptions[0].batch_size, 1); // Default batch size EXPECT_EQ(config.subscriptions[0].flush_interval_ms, 100); // Default flush interval + EXPECT_EQ(config.subscriptions[0].qos_depth, 10u); // Recording defaults EXPECT_DOUBLE_EQ(config.recording.max_disk_usage_gb, 100.0); @@ -433,6 +437,54 @@ TEST_F(ConfigParserTest, ValidateFailsWithZeroBatchSize) { EXPECT_EQ(error_msg, "Subscription batch_size must be > 0"); } +TEST_F(ConfigParserTest, ValidateFailsWithZeroQosDepth) { + RecorderConfig config; + config.dataset.path = "/data"; + config.dataset.mode = "create"; + config.subscriptions.push_back({"/test", "std_msgs/String", 100, 100, 0}); + + std::string error_msg; + EXPECT_FALSE(ConfigParser::validate(config, error_msg)); + EXPECT_EQ(error_msg, "Subscription qos_depth must be > 0"); +} + +TEST_F(ConfigParserTest, NegativeQosDepthValidatesAsInvalid) { + const std::string yaml = R"( +dataset: + path: /data +subscriptions: + - name: /test + message_type: std_msgs/String + qos_depth: -1 +)"; + + ConfigParser parser; + RecorderConfig config; + ASSERT_TRUE(parser.load_from_string(yaml, config)); + ASSERT_EQ(config.subscriptions.size(), 1); + EXPECT_EQ(config.subscriptions[0].qos_depth, 0u); + + std::string error_msg; + EXPECT_FALSE(ConfigParser::validate(config, error_msg)); + EXPECT_EQ(error_msg, "Subscription qos_depth must be > 0"); +} + +TEST_F(ConfigParserTest, OversizedQosDepthFailsToParse) { + const std::string yaml = R"( +dataset: + path: /data +subscriptions: + - name: /test + message_type: std_msgs/String + qos_depth: 9223372036854775808 +)"; + + ConfigParser parser; + RecorderConfig config; + EXPECT_FALSE(parser.load_from_string(yaml, config)); + EXPECT_FALSE(parser.get_last_error().empty()); +} + TEST_F(ConfigParserTest, ValidateFailsWithInvalidMode) { RecorderConfig config; config.dataset.path = "/data"; @@ -501,6 +553,7 @@ TEST_F(ConfigParserTest, SaveToFile) { config.dataset.mode = "create"; config.subscriptions.push_back({"/test_topic", "std_msgs/String", 50, 500}); + config.subscriptions[0].qos_depth = 32; config.recording.max_disk_usage_gb = 75.0; auto path = (test_dir_ / "saved_config.yaml").string(); @@ -510,6 +563,11 @@ TEST_F(ConfigParserTest, SaveToFile) { // Verify file was created EXPECT_TRUE(fs::exists(path)); + + RecorderConfig loaded; + EXPECT_TRUE(parser.load_from_file(path, loaded)); + ASSERT_EQ(loaded.subscriptions.size(), 1); + EXPECT_EQ(loaded.subscriptions[0].qos_depth, 32u); } // ============================================================================ diff --git a/core/axon_mcap/mcap_writer_wrapper.cpp b/core/axon_mcap/mcap_writer_wrapper.cpp index 88ffb681..0e240996 100644 --- a/core/axon_mcap/mcap_writer_wrapper.cpp +++ b/core/axon_mcap/mcap_writer_wrapper.cpp @@ -65,22 +65,30 @@ static mcap::Compression to_mcap_compression(Compression compression) { } } -// Convert our compression level enum to MCAP's CompressionLevel. -// -// Legacy path: if options.compression_level is non-zero, clamp it into the -// enum range (0..4) and use that. This keeps backward compatibility with -// callers that historically passed values like `3` meaning "Slow-ish". -static mcap::CompressionLevel resolve_compression_level(const McapWriterOptions& options) { - int level = options.compression_level; - if (level == 0) { - level = static_cast(options.compression_preset); +CompressionLevel compression_level_from_legacy_int(int level) { + if (level <= 0) { + return CompressionLevel::Default; } - if (level < 0) { - level = 0; - } - if (level > 4) { - level = 4; + + switch (level) { + case 1: + return CompressionLevel::Fastest; + case 2: + return CompressionLevel::Fast; + case 3: + return CompressionLevel::Default; + case 4: + return CompressionLevel::Slow; + default: + return CompressionLevel::Slowest; } +} + +// Convert our compression level enum to MCAP's CompressionLevel. +static mcap::CompressionLevel resolve_compression_level(const McapWriterOptions& options) { + const auto level = options.compression_level == 0 + ? options.compression_preset + : compression_level_from_legacy_int(options.compression_level); return static_cast(level); } diff --git a/core/axon_mcap/mcap_writer_wrapper.hpp b/core/axon_mcap/mcap_writer_wrapper.hpp index b0cfd1a1..24d15ee8 100644 --- a/core/axon_mcap/mcap_writer_wrapper.hpp +++ b/core/axon_mcap/mcap_writer_wrapper.hpp @@ -43,6 +43,15 @@ enum class Compression { None, Zstd, Lz4 }; */ enum class CompressionLevel { Fastest = 0, Fast = 1, Default = 2, Slow = 3, Slowest = 4 }; +/** + * Convert recorder/CLI legacy compression_level values to MCAP presets. + * + * The recorder historically documents `compression_level: 3` as its default + * balanced setting. MCAP's enum value 3 is Slow, so direct enum casts turn the + * documented default into an unexpectedly expensive compression path. + */ +CompressionLevel compression_level_from_legacy_int(int level); + /** * Configuration options for MCAP writer */ @@ -58,9 +67,8 @@ struct McapWriterOptions { /// compatibility and overrides this when non-zero (see `.cpp`). CompressionLevel compression_preset = CompressionLevel::Default; - /// Legacy compression level. 0 means "use compression_preset". Non-zero - /// values are clamped into the CompressionLevel enum range (0..4) for - /// backward compatibility with code that passes raw integers like 3. + /// Legacy recorder/CLI compression level. 0 means "use compression_preset". + /// Non-zero values are translated by compression_level_from_legacy_int(). int compression_level = 0; /// Chunk size in bytes (default 4MB) diff --git a/core/axon_mcap/test/test_mcap_writer.cpp b/core/axon_mcap/test/test_mcap_writer.cpp index 75c4b7bf..d5ae7c1c 100644 --- a/core/axon_mcap/test/test_mcap_writer.cpp +++ b/core/axon_mcap/test/test_mcap_writer.cpp @@ -223,6 +223,17 @@ TEST_F(McapWriterTest, CompressionZstd) { EXPECT_GT(fs::file_size(test_file_), 0); } +TEST_F(McapWriterTest, LegacyCompressionLevelMappingMatchesRecorderDocs) { + EXPECT_EQ(compression_level_from_legacy_int(-1), CompressionLevel::Default); + EXPECT_EQ(compression_level_from_legacy_int(0), CompressionLevel::Default); + EXPECT_EQ(compression_level_from_legacy_int(1), CompressionLevel::Fastest); + EXPECT_EQ(compression_level_from_legacy_int(2), CompressionLevel::Fast); + EXPECT_EQ(compression_level_from_legacy_int(3), CompressionLevel::Default); + EXPECT_EQ(compression_level_from_legacy_int(4), CompressionLevel::Slow); + EXPECT_EQ(compression_level_from_legacy_int(5), CompressionLevel::Slowest); + EXPECT_EQ(compression_level_from_legacy_int(99), CompressionLevel::Slowest); +} + TEST_F(McapWriterTest, ThreadSafety) { McapWriterWrapper writer; diff --git a/middlewares/ros2/include/ros2_subscription_wrapper.hpp b/middlewares/ros2/include/ros2_subscription_wrapper.hpp index 2a3144b1..a2d794e8 100644 --- a/middlewares/ros2/include/ros2_subscription_wrapper.hpp +++ b/middlewares/ros2/include/ros2_subscription_wrapper.hpp @@ -5,6 +5,7 @@ #ifndef ROS2_PLUGIN_SUBSCRIPTION_WRAPPER_HPP #define ROS2_PLUGIN_SUBSCRIPTION_WRAPPER_HPP +#include #include #include @@ -69,6 +70,8 @@ struct SubscribeOptions { : qos(10) {} }; +void apply_subscribe_qos_options(SubscribeOptions& options, const nlohmann::json& opts); + class SubscriptionManager { public: explicit SubscriptionManager(rclcpp::Node::SharedPtr node); @@ -83,10 +86,9 @@ class SubscriptionManager { /** * Subscribe with a zero-copy (ABI v1.2) callback. * - * Fast path (no depth compression): retains the rclcpp::SerializedMessage - * shared_ptr and hands the recorder a pointer into its `buffer` along with - * a release function that drops the shared_ptr. No payload copy happens - * between rcl and the recorder's worker queue. + * Fast path (no depth compression): copies the rclcpp::SerializedMessage + * payload into plugin-owned storage before returning from the ROS callback, + * then hands the recorder that owned buffer plus a release function. * * Compression path (depth_compression.enabled): transparently falls back * to the v1.x copy semantics internally (the compressor allocates a new diff --git a/middlewares/ros2/src/ros2_plugin_export.cpp b/middlewares/ros2/src/ros2_plugin_export.cpp index 64c5599b..f3de7378 100644 --- a/middlewares/ros2/src/ros2_plugin_export.cpp +++ b/middlewares/ros2/src/ros2_plugin_export.cpp @@ -30,6 +30,32 @@ using axon::logging::kv; using namespace ros2_plugin; +namespace ros2_plugin { + +void apply_subscribe_qos_options(SubscribeOptions& options, const nlohmann::json& opts) { + size_t qos_depth = 10; + if (opts.contains("qos_depth")) { + qos_depth = opts["qos_depth"].get(); + } else if (opts.contains("queue_size")) { + qos_depth = opts["queue_size"].get(); + } + if (qos_depth == 0) { + qos_depth = 10; + } + + options.qos = rclcpp::QoS(rclcpp::KeepLast(qos_depth)); + const bool reliable = + opts.contains("qos_reliable") ? opts["qos_reliable"].get() : opts.value("reliable", true); + if (reliable) { + options.qos.reliable(); + } else { + options.qos.best_effort(); + } + options.qos.durability_volatile(); +} + +} // namespace ros2_plugin + // ============================================================================= // Error codes (matching what the loader expects) // ============================================================================= @@ -178,6 +204,7 @@ static int32_t axon_subscribe( if (options_json && strlen(options_json) > 0) { try { nlohmann::json opts = nlohmann::json::parse(options_json); + apply_subscribe_qos_options(options, opts); #ifdef AXON_ENABLE_DEPTH_COMPRESSION if (opts.contains("depth_compression")) { auto dc = opts["depth_compression"]; @@ -192,7 +219,6 @@ static int32_t axon_subscribe( ); } #else - (void)options; // Suppress unused warning when depth compression is disabled if (opts.contains("depth_compression") && opts["depth_compression"].value("enabled", false)) { AXON_LOG_WARN( "Depth compression requested for " @@ -258,6 +284,7 @@ static int32_t axon_subscribe_v2( if (options_json && strlen(options_json) > 0) { try { nlohmann::json opts = nlohmann::json::parse(options_json); + apply_subscribe_qos_options(options, opts); #ifdef AXON_ENABLE_DEPTH_COMPRESSION if (opts.contains("depth_compression")) { auto dc = opts["depth_compression"]; @@ -267,7 +294,14 @@ static int32_t axon_subscribe_v2( options.depth_compression = dc_config; } #else - (void)options; + if (opts.contains("depth_compression") && opts["depth_compression"].value("enabled", false)) { + AXON_LOG_WARN( + "Depth compression requested (v2) for " + << kv("topic", topic_name) + << " but not enabled at build time. " + "Rebuild with -DAXON_ENABLE_DEPTH_COMPRESSION=ON to enable." + ); + } #endif } catch (const std::exception& e) { AXON_LOG_WARN( diff --git a/middlewares/ros2/src/ros2_subscription_wrapper.cpp b/middlewares/ros2/src/ros2_subscription_wrapper.cpp index b2a1ee99..4b7c531c 100644 --- a/middlewares/ros2/src/ros2_subscription_wrapper.cpp +++ b/middlewares/ros2/src/ros2_subscription_wrapper.cpp @@ -259,21 +259,27 @@ bool SubscriptionManager::subscribe_v2( } #endif - // Zero-copy pass-through: hand the recorder a pointer into the rcl - // buffer and a holder that keeps the SerializedMessage alive. The - // release function deletes the heap-allocated shared_ptr holder, - // which in turn drops the last refcount (or whatever the refcount - // is at that point) on the SerializedMessage. - auto* holder = new std::shared_ptr(msg); + // Copy the serialized payload into plugin-owned storage before + // returning from the ROS callback. Holding the rclcpp + // SerializedMessage in Axon's queues can exhaust DDS/rclcpp receive + // buffers when recorder batching or writer backlog retains many + // messages, throttling high-rate topics before they reach Axon. const auto& raw = msg->get_rcl_serialized_message(); + auto* holder = new std::vector(); + if (raw.buffer != nullptr && raw.buffer_length > 0) { + holder->assign(raw.buffer, raw.buffer + raw.buffer_length); + } + if (holder->empty()) { + holder->resize(1); + } callback( topic_name, message_type, - raw.buffer, + holder->data(), raw.buffer_length, timestamp, +[](void* p) { - delete static_cast*>(p); + delete static_cast*>(p); }, holder ); diff --git a/middlewares/ros2/test/test_ros2_plugin.cpp b/middlewares/ros2/test/test_ros2_plugin.cpp index 8d29113a..1732c2ae 100644 --- a/middlewares/ros2/test/test_ros2_plugin.cpp +++ b/middlewares/ros2/test/test_ros2_plugin.cpp @@ -3,6 +3,7 @@ // SPDX-License-Identifier: MulanPSL-2.0 #include +#include #include #include @@ -259,6 +260,40 @@ TEST(Ros2PluginExportTest, DescriptorAdvertisesAbi13) { EXPECT_NE(descriptor->vtable->reserved[1], nullptr); } +TEST(Ros2PluginExportTest, AppliesQosDepthFromOptionsJson) { + ros2_plugin::SubscribeOptions options; + apply_subscribe_qos_options(options, nlohmann::json::parse(R"({"qos_depth": 42})")); + + const auto& qos = options.qos.get_rmw_qos_profile(); + EXPECT_EQ(qos.depth, 42u); + EXPECT_EQ(qos.reliability, RMW_QOS_POLICY_RELIABILITY_RELIABLE); +} + +TEST(Ros2PluginExportTest, AppliesLegacyQueueSizeAsQosDepth) { + ros2_plugin::SubscribeOptions options; + apply_subscribe_qos_options(options, nlohmann::json::parse(R"({"queue_size": 24})")); + + const auto& qos = options.qos.get_rmw_qos_profile(); + EXPECT_EQ(qos.depth, 24u); +} + +TEST(Ros2PluginExportTest, ZeroQosDepthFallsBackToDefault) { + ros2_plugin::SubscribeOptions options; + apply_subscribe_qos_options(options, nlohmann::json::parse(R"({"qos_depth": 0})")); + + const auto& qos = options.qos.get_rmw_qos_profile(); + EXPECT_EQ(qos.depth, 10u); +} + +TEST(Ros2PluginExportTest, AppliesBestEffortReliability) { + ros2_plugin::SubscribeOptions options; + apply_subscribe_qos_options(options, nlohmann::json::parse(R"({"qos_reliable": false})")); + + const auto& qos = options.qos.get_rmw_qos_profile(); + EXPECT_EQ(qos.depth, 10u); + EXPECT_EQ(qos.reliability, RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT); +} + /** * @brief Test stop without explicit start (cleanup in destructor) */