From 545d6dd3112ff6d93a02b9086b4b671b7450bf4d Mon Sep 17 00:00:00 2001 From: stz Date: Tue, 9 Jun 2026 15:31:14 +0800 Subject: [PATCH 1/5] feat(recorder): support topic QoS overrides - Add ROS QoS fields to task and subscription configs - Parse nested YAML and RPC QoS objects while preserving blank defaults - Apply task-scoped QoS overrides to session subscriptions - Echo QoS settings through state, event, and config update payloads --- .../src/config/config_parser.cpp | 188 +++++++++++- apps/axon_recorder/src/config/task_config.hpp | 22 ++ apps/axon_recorder/src/core/recorder.cpp | 280 +++++++++++++++++- apps/axon_recorder/src/core/recorder.hpp | 16 + .../src/http/event_broadcaster.cpp | 39 ++- apps/axon_recorder/src/http/rpc_handlers.cpp | 235 ++++++++++++++- apps/axon_recorder/src/http/ws_rpc_client.cpp | 36 +++ .../test/unit/test_config_parser.cpp | 89 +++++- .../test/unit/test_rpc_handlers.cpp | 50 ++++ 9 files changed, 935 insertions(+), 20 deletions(-) diff --git a/apps/axon_recorder/src/config/config_parser.cpp b/apps/axon_recorder/src/config/config_parser.cpp index 046c496c..d1e7c4b1 100644 --- a/apps/axon_recorder/src/config/config_parser.cpp +++ b/apps/axon_recorder/src/config/config_parser.cpp @@ -6,7 +6,10 @@ #include +#include +#include #include +#include #include #include "../core/recorder.hpp" @@ -41,6 +44,150 @@ void apply_sidecar_generation_mode(RecordingConfig& recording, std::string mode) recording.sidecar_json_enabled = mode != "none"; } +std::string trim_ascii(std::string value) { + auto not_space = [](unsigned char c) { + return !std::isspace(c); + }; + value.erase(value.begin(), std::find_if(value.begin(), value.end(), not_space)); + value.erase(std::find_if(value.rbegin(), value.rend(), not_space).base(), value.end()); + return value; +} + +std::string normalize_qos_policy_token(std::string value) { + value = trim_ascii(std::move(value)); + std::transform(value.begin(), value.end(), value.begin(), [](unsigned char c) { + if (c == '-' || std::isspace(c)) { + return '_'; + } + return static_cast(std::tolower(c)); + }); + return value; +} + +bool yaml_node_has_value(const YAML::Node& node) { + if (!node || node.IsNull()) { + return false; + } + if (!node.IsScalar()) { + return true; + } + return !trim_ascii(node.as()).empty(); +} + +bool is_auto_qos_token(const YAML::Node& node) { + return yaml_node_has_value(node) && node.IsScalar() && + normalize_qos_policy_token(node.as()) == "auto"; +} + +std::optional parse_optional_qos_depth(const YAML::Node& node) { + if (!yaml_node_has_value(node)) { + return std::nullopt; + } + const auto depth = node.as(); + return depth > 0 ? std::optional(static_cast(depth)) : std::optional(0); +} + +void parse_optional_nested_qos_depth(const YAML::Node& node, RosQosConfig& qos, size_t& qos_depth) { + if (!yaml_node_has_value(node)) { + return; + } + if (is_auto_qos_token(node)) { + qos.depth_auto = true; + return; + } + const auto depth = node.as(); + qos.depth = + depth > 0 ? std::optional(static_cast(depth)) : std::optional(0); + qos_depth = *qos.depth; +} + +std::optional parse_optional_qos_policy(const YAML::Node& node) { + if (!yaml_node_has_value(node)) { + return std::nullopt; + } + return normalize_qos_policy_token(node.as()); +} + +std::optional parse_optional_qos_reliability(const YAML::Node& node) { + if (!yaml_node_has_value(node)) { + return std::nullopt; + } + try { + return node.as() ? std::optional("reliable") + : std::optional("best_effort"); + } catch (...) { + } + return normalize_qos_policy_token(node.as()); +} + +void parse_ros_qos_node(const YAML::Node& node, RosQosConfig& qos, size_t& qos_depth) { + if (!node || !node.IsMap()) { + return; + } + if (auto mode = parse_optional_qos_policy(node["mode"])) { + qos.mode = mode; + qos.auto_mode = *mode == "auto"; + } + if (node["depth"]) { + parse_optional_nested_qos_depth(node["depth"], qos, qos_depth); + } + if (node["qos_depth"]) { + parse_optional_nested_qos_depth(node["qos_depth"], qos, qos_depth); + } + if (auto reliability = parse_optional_qos_reliability(node["reliability"])) { + qos.reliability = reliability; + } + if (auto reliability = parse_optional_qos_reliability(node["reliable"])) { + qos.reliability = reliability; + } + if (auto durability = parse_optional_qos_policy(node["durability"])) { + qos.durability = durability; + } + if (auto history = parse_optional_qos_policy(node["history"])) { + qos.history = history; + } +} + +bool valid_qos_reliability(const std::string& value) { + return value == "reliable" || value == "best_effort" || value == "auto"; +} + +bool valid_qos_durability(const std::string& value) { + return value == "volatile" || value == "transient_local" || value == "auto"; +} + +bool valid_qos_history(const std::string& value) { + return value == "keep_last" || value == "keep_all" || value == "auto"; +} + +bool validate_ros_qos_config(const RosQosConfig& qos, std::string& error_msg) { + if (qos.mode.has_value() && *qos.mode != "auto") { + error_msg = "Subscription qos.mode must be 'auto'"; + return false; + } + if (qos.depth.has_value() && *qos.depth == 0) { + error_msg = "Subscription qos.depth must be > 0"; + return false; + } + if (qos.depth_auto && qos.depth.has_value()) { + error_msg = "Subscription qos.depth cannot be both auto and numeric"; + return false; + } + if (qos.reliability.has_value() && !valid_qos_reliability(*qos.reliability)) { + error_msg = "Subscription qos.reliability must be 'reliable', 'best_effort', or 'auto'"; + return false; + } + if (qos.durability.has_value() && !valid_qos_durability(*qos.durability)) { + error_msg = "Subscription qos.durability must be 'volatile', 'transient_local', or 'auto'"; + return false; + } + if (qos.history.has_value() && !valid_qos_history(*qos.history)) { + error_msg = "Subscription qos.history must be 'keep_last', 'keep_all', or 'auto'"; + return false; + } + return true; +} + } // namespace // ============================================================================ @@ -279,6 +426,30 @@ bool ConfigParser::save_to_file(const std::string& path, const RecorderConfig& c subscription_node["batch_size"] = subscription.batch_size; subscription_node["flush_interval_ms"] = subscription.flush_interval_ms; subscription_node["qos_depth"] = subscription.qos_depth; + if (subscription.qos.has_overrides()) { + YAML::Node qos_node; + if (subscription.qos.mode.has_value()) { + qos_node["mode"] = *subscription.qos.mode; + } else if (subscription.qos.auto_mode) { + qos_node["mode"] = "auto"; + } + if (subscription.qos.depth_auto) { + qos_node["depth"] = "auto"; + } + if (subscription.qos.depth.has_value()) { + qos_node["depth"] = *subscription.qos.depth; + } + if (subscription.qos.reliability.has_value()) { + qos_node["reliability"] = *subscription.qos.reliability; + } + if (subscription.qos.durability.has_value()) { + qos_node["durability"] = *subscription.qos.durability; + } + if (subscription.qos.history.has_value()) { + qos_node["history"] = *subscription.qos.history; + } + subscription_node["qos"] = qos_node; + } node["subscriptions"].push_back(subscription_node); } @@ -360,9 +531,17 @@ bool ConfigParser::parse_subscriptions( if (subscription_node["flush_interval_ms"]) { subscription.flush_interval_ms = subscription_node["flush_interval_ms"].as(); } - if (subscription_node["qos_depth"]) { - const auto qos_depth = subscription_node["qos_depth"].as(); - subscription.qos_depth = qos_depth > 0 ? static_cast(qos_depth) : 0; + if (auto qos_depth = parse_optional_qos_depth(subscription_node["qos_depth"])) { + subscription.qos_depth = *qos_depth; + } + if (auto reliability = parse_optional_qos_reliability(subscription_node["qos_reliable"])) { + subscription.qos.reliability = reliability; + } + if (auto reliability = parse_optional_qos_reliability(subscription_node["reliable"])) { + subscription.qos.reliability = reliability; + } + if (subscription_node["qos"]) { + parse_ros_qos_node(subscription_node["qos"], subscription.qos, subscription.qos_depth); } // Parse depth_compression section (can be boolean or object) if (subscription_node["depth_compression"]) { @@ -794,6 +973,9 @@ bool ConfigParser::validate(const RecorderConfig& config, std::string& error_msg error_msg = "Subscription qos_depth must be > 0"; return false; } + if (!validate_ros_qos_config(subscription.qos, error_msg)) { + return false; + } } const auto& writer_batch = config.recording.writer_batch; diff --git a/apps/axon_recorder/src/config/task_config.hpp b/apps/axon_recorder/src/config/task_config.hpp index 07e3bc84..7a99b50a 100644 --- a/apps/axon_recorder/src/config/task_config.hpp +++ b/apps/axon_recorder/src/config/task_config.hpp @@ -6,6 +6,7 @@ #define AXON_RECORDER_TASK_CONFIG_HPP #include +#include #include #include #include @@ -14,6 +15,26 @@ namespace axon { namespace recorder { +struct RosQosConfig { + std::optional mode; + bool auto_mode = false; + bool depth_auto = false; + std::optional depth; + std::optional reliability; + std::optional durability; + std::optional history; + + bool has_overrides() const { + return mode.has_value() || auto_mode || depth_auto || depth.has_value() || + reliability.has_value() || durability.has_value() || history.has_value(); + } +}; + +struct TopicQosConfig { + std::string topic_name; + RosQosConfig qos; +}; + /** * TaskConfig holds all metadata for a recording task pushed from the server. * This is cached when CachedRecordingConfig service is called and used @@ -35,6 +56,7 @@ struct TaskConfig { // Topic Selection std::vector topics; + std::vector topic_qos; // Server Callback Integration std::string start_callback_url; diff --git a/apps/axon_recorder/src/core/recorder.cpp b/apps/axon_recorder/src/core/recorder.cpp index cf68e633..c86f46f4 100644 --- a/apps/axon_recorder/src/core/recorder.cpp +++ b/apps/axon_recorder/src/core/recorder.cpp @@ -102,6 +102,195 @@ std::string to_lower_ascii(std::string value) { return value; } +std::string trim_ascii(std::string value) { + auto not_space = [](unsigned char c) { + return !std::isspace(c); + }; + value.erase(value.begin(), std::find_if(value.begin(), value.end(), not_space)); + value.erase(std::find_if(value.rbegin(), value.rend(), not_space).base(), value.end()); + return value; +} + +std::string normalize_qos_policy_token(std::string value) { + value = trim_ascii(std::move(value)); + std::transform(value.begin(), value.end(), value.begin(), [](unsigned char c) { + if (c == '-' || std::isspace(c)) { + return '_'; + } + return static_cast(std::tolower(c)); + }); + return value; +} + +bool json_has_value(const nlohmann::json& object, const char* key) { + if (!object.is_object() || !object.contains(key) || object[key].is_null()) { + return false; + } + if (object[key].is_string()) { + return !trim_ascii(object[key].get()).empty(); + } + return true; +} + +bool json_field_is_auto(const nlohmann::json& object, const char* key) { + return json_has_value(object, key) && object[key].is_string() && + normalize_qos_policy_token(object[key].get()) == "auto"; +} + +std::optional parse_optional_qos_depth_json( + const nlohmann::json& object, const char* key, bool* depth_auto = nullptr +) { + if (!json_has_value(object, key)) { + return std::nullopt; + } + if (json_field_is_auto(object, key)) { + if (depth_auto != nullptr) { + *depth_auto = true; + } + return std::nullopt; + } + long long depth = 0; + if (object[key].is_string()) { + depth = std::stoll(trim_ascii(object[key].get())); + } else { + depth = object[key].get(); + } + return depth > 0 ? std::optional(static_cast(depth)) : std::optional(0); +} + +std::optional parse_optional_qos_policy_json( + const nlohmann::json& object, const char* key +) { + if (!json_has_value(object, key)) { + return std::nullopt; + } + return normalize_qos_policy_token(object[key].get()); +} + +std::optional parse_optional_qos_reliability_json( + const nlohmann::json& object, const char* key +) { + if (!json_has_value(object, key)) { + return std::nullopt; + } + if (object[key].is_boolean()) { + return object[key].get() ? std::optional("reliable") + : std::optional("best_effort"); + } + return normalize_qos_policy_token(object[key].get()); +} + +RosQosConfig qos_config_from_json_object(const nlohmann::json& object) { + RosQosConfig qos; + if (!object.is_object()) { + return qos; + } + + const nlohmann::json* qos_source = &object; + if (object.contains("qos") && object["qos"].is_object()) { + qos_source = &object["qos"]; + } + + if (auto mode = parse_optional_qos_policy_json(*qos_source, "mode")) { + qos.mode = mode; + qos.auto_mode = *mode == "auto"; + } + if (auto depth = parse_optional_qos_depth_json(*qos_source, "depth", &qos.depth_auto)) { + qos.depth = depth; + } + if (auto depth = parse_optional_qos_depth_json(*qos_source, "qos_depth", &qos.depth_auto)) { + qos.depth = depth; + } + if (auto reliability = parse_optional_qos_reliability_json(*qos_source, "reliability")) { + qos.reliability = reliability; + } + if (auto reliability = parse_optional_qos_reliability_json(*qos_source, "reliable")) { + qos.reliability = reliability; + } + if (auto reliability = parse_optional_qos_reliability_json(object, "qos_reliable")) { + qos.reliability = reliability; + } + if (auto durability = parse_optional_qos_policy_json(*qos_source, "durability")) { + qos.durability = durability; + } + if (auto history = parse_optional_qos_policy_json(*qos_source, "history")) { + qos.history = history; + } + + return qos; +} + +nlohmann::json qos_config_to_json(const RosQosConfig& qos) { + nlohmann::json out = nlohmann::json::object(); + if (qos.mode.has_value()) { + out["mode"] = *qos.mode; + } else if (qos.auto_mode) { + out["mode"] = "auto"; + } + if (qos.depth_auto) { + out["depth"] = "auto"; + } + if (qos.depth.has_value()) { + out["depth"] = *qos.depth; + } + if (qos.reliability.has_value()) { + out["reliability"] = *qos.reliability; + } + if (qos.durability.has_value()) { + out["durability"] = *qos.durability; + } + if (qos.history.has_value()) { + out["history"] = *qos.history; + } + return out; +} + +void apply_qos_override(SubscriptionConfig& subscription, const RosQosConfig& qos) { + if (qos.auto_mode) { + subscription.qos.mode = qos.mode.value_or("auto"); + subscription.qos.auto_mode = true; + } else if (qos.mode.has_value()) { + subscription.qos.mode = qos.mode; + } + if (qos.depth_auto) { + subscription.qos.depth_auto = true; + } + if (qos.depth.has_value()) { + subscription.qos_depth = *qos.depth; + subscription.qos.depth = qos.depth; + } + if (qos.reliability.has_value()) { + subscription.qos.reliability = qos.reliability; + } + if (qos.durability.has_value()) { + subscription.qos.durability = qos.durability; + } + if (qos.history.has_value()) { + subscription.qos.history = qos.history; + } +} + +nlohmann::json subscription_options_json(const SubscriptionConfig& subscription) { + nlohmann::json opts; + opts["qos_depth"] = subscription.qos_depth; + + if (subscription.qos.has_overrides()) { + opts["qos"] = qos_config_to_json(subscription.qos); + if (!subscription.qos.auto_mode && !opts["qos"].contains("depth")) { + opts["qos"]["depth"] = subscription.qos_depth; + } + if (subscription.qos.reliability == "reliable") { + opts["qos_reliable"] = true; + } else if (subscription.qos.reliability == "best_effort") { + opts["qos_reliable"] = false; + } + } + + opts["depth_compression"]["enabled"] = subscription.depth_compression.enabled; + opts["depth_compression"]["level"] = subscription.depth_compression.level; + return opts; +} + std::string severity_to_level_string(axon::logging::severity_level level) { switch (level) { case axon::logging::severity_level::debug: @@ -156,7 +345,36 @@ TaskConfig task_config_from_json(const nlohmann::json& config_json) { } if (config_json.contains("topics")) { for (const auto& topic : config_json["topics"]) { - config.topics.push_back(topic.get()); + if (topic.is_string()) { + config.topics.push_back(topic.get()); + } else if (topic.is_object()) { + const std::string topic_name = topic.contains("name") ? topic["name"].get() + : topic.value("topic", std::string{}); + if (!topic_name.empty()) { + config.topics.push_back(topic_name); + RosQosConfig qos = qos_config_from_json_object(topic); + if (qos.has_overrides()) { + config.topic_qos.push_back({topic_name, std::move(qos)}); + } + } + } + } + } + if (config_json.contains("topic_qos")) { + for (const auto& topic_qos_json : config_json["topic_qos"]) { + if (!topic_qos_json.is_object()) { + continue; + } + const std::string topic_name = topic_qos_json.contains("name") + ? topic_qos_json["name"].get() + : topic_qos_json.value("topic", std::string{}); + if (topic_name.empty()) { + continue; + } + RosQosConfig qos = qos_config_from_json_object(topic_qos_json); + if (qos.has_overrides()) { + config.topic_qos.push_back({topic_name, std::move(qos)}); + } } } return config; @@ -892,10 +1110,13 @@ bool AxonRecorder::start() { recording_session_->set_task_config(task_config_.value()); } + current_session_subscriptions_ = build_session_subscriptions(); + // Register topics with MCAP and create topic workers if (!register_topics()) { set_error_helper("Failed to register topics"); state_manager_.transition_to(RecorderState::IDLE, error_msg); + current_session_subscriptions_.clear(); recording_session_.reset(); return false; } @@ -904,6 +1125,7 @@ bool AxonRecorder::start() { if (!setup_subscriptions()) { set_error_helper("Failed to setup subscriptions: " + get_last_error()); state_manager_.transition_to(RecorderState::IDLE, error_msg); + current_session_subscriptions_.clear(); recording_session_.reset(); return false; } @@ -912,6 +1134,7 @@ bool AxonRecorder::start() { if (!state_manager_.transition(RecorderState::READY, RecorderState::RECORDING, error_msg)) { set_error_helper("State transition to RECORDING failed: " + error_msg); state_manager_.transition_to(RecorderState::IDLE, error_msg); + current_session_subscriptions_.clear(); recording_session_.reset(); return false; } @@ -935,6 +1158,7 @@ bool AxonRecorder::start() { if (st != AXON_SUCCESS) { set_error_helper(std::string("Plugin start failed (") + name + "): " + status_to_string(st)); state_manager_.transition_to(RecorderState::IDLE, error_msg); + current_session_subscriptions_.clear(); recording_session_.reset(); return false; } @@ -1064,6 +1288,7 @@ void AxonRecorder::stop() { recording_session_.reset(); } + current_session_subscriptions_.clear(); // 5. Keep plugins loaded until AxonRecorder destruction. @@ -2091,6 +2316,40 @@ size_t AxonRecorder::latency_batch_message_handler( return written; } +std::vector AxonRecorder::build_session_subscriptions() const { + std::vector subscriptions = config_.subscriptions; + if (!task_config_.has_value() || task_config_->topic_qos.empty()) { + return subscriptions; + } + + for (const auto& topic_qos : task_config_->topic_qos) { + if (topic_qos.topic_name.empty() || !topic_qos.qos.has_overrides()) { + continue; + } + + bool matched = false; + for (auto& subscription : subscriptions) { + if (subscription.topic_name == topic_qos.topic_name) { + apply_qos_override(subscription, topic_qos.qos); + matched = true; + } + } + if (!matched) { + AXON_LOG_WARN( + "Task QoS override does not match a configured subscription" + << axon::logging::kv("topic", topic_qos.topic_name) + ); + } + } + + return subscriptions; +} + +const std::vector& AxonRecorder::active_subscriptions() const { + return current_session_subscriptions_.empty() ? config_.subscriptions + : current_session_subscriptions_; +} + bool AxonRecorder::register_topics() { if (!recording_session_) { set_error_helper("No active recording session"); @@ -2103,7 +2362,7 @@ bool AxonRecorder::register_topics() { std::unordered_set seen_subscription_keys; // Register schemas and channels for all subscriptions - for (const auto& sub : config_.subscriptions) { + for (const auto& sub : active_subscriptions()) { std::string sub_key = sub.topic_name; sub_key.push_back('\0'); sub_key += sub.message_type; @@ -2225,7 +2484,7 @@ bool AxonRecorder::setup_subscriptions() { bool need_ros = false; bool need_udp = false; - for (const auto& sub : config_.subscriptions) { + for (const auto& sub : active_subscriptions()) { if (sub.message_type == "axon_udp/json") { need_udp = true; } else { @@ -2250,19 +2509,12 @@ bool AxonRecorder::setup_subscriptions() { ); } - for (const auto& sub : config_.subscriptions) { + for (const auto& sub : active_subscriptions()) { if (sub.message_type == "axon_udp/json") { continue; } - std::string options_json; - { - nlohmann::json opts; - opts["qos_depth"] = sub.qos_depth; - opts["depth_compression"]["enabled"] = sub.depth_compression.enabled; - opts["depth_compression"]["level"] = sub.depth_compression.level; - options_json = opts.dump(); - } + std::string options_json = subscription_options_json(sub).dump(); AxonStatus status = AXON_ERROR_INTERNAL; if (ros_subscribe_v2 != nullptr) { @@ -2303,7 +2555,7 @@ bool AxonRecorder::setup_subscriptions() { } const SubscriptionConfig* first_udp = nullptr; - for (const auto& sub : config_.subscriptions) { + for (const auto& sub : active_subscriptions()) { if (sub.message_type == "axon_udp/json") { first_udp = ⊂ break; @@ -2333,7 +2585,7 @@ bool AxonRecorder::setup_subscriptions() { const SubscriptionConfig* AxonRecorder::get_subscription_config( const std::string& topic_name ) const { - for (const auto& sub : config_.subscriptions) { + for (const auto& sub : active_subscriptions()) { if (sub.topic_name == topic_name) { return ⊂ } diff --git a/apps/axon_recorder/src/core/recorder.hpp b/apps/axon_recorder/src/core/recorder.hpp index 1acf793c..8fe22c07 100644 --- a/apps/axon_recorder/src/core/recorder.hpp +++ b/apps/axon_recorder/src/core/recorder.hpp @@ -77,6 +77,9 @@ struct SubscriptionConfig { bool enabled = false; std::string level = "medium"; // fast, medium, max } depth_compression; + + // Optional ROS QoS overrides. Empty fields keep the middleware default. + RosQosConfig qos; }; /** @@ -549,6 +552,16 @@ class AxonRecorder { */ bool setup_subscriptions(); + /** + * Build per-session subscriptions by applying task-scoped overrides. + */ + std::vector build_session_subscriptions() const; + + /** + * Subscriptions active for the current recording session. + */ + const std::vector& active_subscriptions() const; + /** * Load and initialize middleware plugin for the recorder process lifetime. */ @@ -657,6 +670,9 @@ class AxonRecorder { // Task configuration for metadata injection std::optional task_config_; + // Subscriptions resolved for the active session, including task QoS overrides. + std::vector current_session_subscriptions_; + // Worker thread pool manages per-topic workers // Use unique_ptr with placement new for late initialization with custom config std::unique_ptr worker_pool_; diff --git a/apps/axon_recorder/src/http/event_broadcaster.cpp b/apps/axon_recorder/src/http/event_broadcaster.cpp index 3b9306dc..271b45b4 100644 --- a/apps/axon_recorder/src/http/event_broadcaster.cpp +++ b/apps/axon_recorder/src/http/event_broadcaster.cpp @@ -11,6 +11,35 @@ namespace axon { namespace recorder { +namespace { + +nlohmann::json qos_config_to_json(const RosQosConfig& qos) { + nlohmann::json out = nlohmann::json::object(); + if (qos.mode.has_value()) { + out["mode"] = *qos.mode; + } else if (qos.auto_mode) { + out["mode"] = "auto"; + } + if (qos.depth_auto) { + out["depth"] = "auto"; + } + if (qos.depth.has_value()) { + out["depth"] = *qos.depth; + } + if (qos.reliability.has_value()) { + out["reliability"] = *qos.reliability; + } + if (qos.durability.has_value()) { + out["durability"] = *qos.durability; + } + if (qos.history.has_value()) { + out["history"] = *qos.history; + } + return out; +} + +} // namespace + EventBroadcaster::EventBroadcaster(WebSocketServer& ws_server) : ws_server_(ws_server) , stats_running_(false) {} @@ -68,12 +97,20 @@ void EventBroadcaster::broadcast_config_change(const TaskConfig* config) { if (config != nullptr) { data["action"] = "set"; - data["task_config"] = { + nlohmann::json task_config = { {"task_id", config->task_id}, {"device_id", config->device_id}, {"scene", config->scene}, {"topics", config->topics} }; + if (!config->topic_qos.empty()) { + nlohmann::json topic_qos = nlohmann::json::array(); + for (const auto& item : config->topic_qos) { + topic_qos.push_back({{"name", item.topic_name}, {"qos", qos_config_to_json(item.qos)}}); + } + task_config["topic_qos"] = topic_qos; + } + data["task_config"] = task_config; } else { data["action"] = "clear"; } diff --git a/apps/axon_recorder/src/http/rpc_handlers.cpp b/apps/axon_recorder/src/http/rpc_handlers.cpp index ded76f8b..5ef69a51 100644 --- a/apps/axon_recorder/src/http/rpc_handlers.cpp +++ b/apps/axon_recorder/src/http/rpc_handlers.cpp @@ -4,7 +4,10 @@ #include "rpc_handlers.hpp" +#include #include +#include +#include #include #include "../config/task_config.hpp" @@ -73,6 +76,201 @@ void validate_string_array_field( } } +std::string trim_ascii(std::string value) { + auto not_space = [](unsigned char c) { + return !std::isspace(c); + }; + value.erase(value.begin(), std::find_if(value.begin(), value.end(), not_space)); + value.erase(std::find_if(value.rbegin(), value.rend(), not_space).base(), value.end()); + return value; +} + +std::string normalize_qos_policy_token(std::string value) { + value = trim_ascii(std::move(value)); + std::transform(value.begin(), value.end(), value.begin(), [](unsigned char c) { + if (c == '-' || std::isspace(c)) { + return '_'; + } + return static_cast(std::tolower(c)); + }); + return value; +} + +bool json_field_has_value(const nlohmann::json& object, const std::string& field) { + if (!object.is_object() || !object.contains(field) || object[field].is_null()) { + return false; + } + if (object[field].is_string()) { + return !trim_ascii(object[field].get()).empty(); + } + return true; +} + +bool parse_positive_depth(const nlohmann::json& value, long long& depth) { + if (value.is_number_integer() || value.is_number_unsigned()) { + depth = value.get(); + return true; + } + if (!value.is_string()) { + return false; + } + const std::string text = trim_ascii(value.get()); + if (text.empty()) { + depth = 0; + return true; + } + char* end = nullptr; + depth = std::strtoll(text.c_str(), &end, 10); + return end != nullptr && *end == '\0'; +} + +bool json_value_is_auto(const nlohmann::json& value) { + return value.is_string() && normalize_qos_policy_token(value.get()) == "auto"; +} + +void validate_qos_depth_field( + const nlohmann::json& object, const std::string& field, const std::string& prefix, + std::vector& errors +) { + if (!json_field_has_value(object, field)) { + return; + } + if (json_value_is_auto(object[field])) { + return; + } + long long depth = 0; + if (!parse_positive_depth(object[field], depth) || depth <= 0) { + errors.push_back({prefix + field, "invalid_value", prefix + field + " must be > 0 or auto"}); + } +} + +void validate_qos_policy_field( + const nlohmann::json& object, const std::string& field, const std::string& prefix, + const std::vector& allowed, std::vector& errors +) { + if (!json_field_has_value(object, field)) { + return; + } + if (!object[field].is_string()) { + errors.push_back({prefix + field, "invalid_type", prefix + field + " must be a string"}); + return; + } + const std::string normalized = normalize_qos_policy_token(object[field].get()); + if (std::find(allowed.begin(), allowed.end(), normalized) == allowed.end()) { + errors.push_back({prefix + field, "invalid_value", prefix + field + " has unsupported value"}); + } +} + +void validate_qos_reliability_field( + const nlohmann::json& object, const std::string& field, const std::string& prefix, + std::vector& errors +) { + if (!json_field_has_value(object, field)) { + return; + } + if (object[field].is_boolean()) { + return; + } + validate_qos_policy_field(object, field, prefix, {"reliable", "best_effort", "auto"}, errors); +} + +void validate_qos_object( + const nlohmann::json& object, const std::string& prefix, std::vector& errors +) { + if (!object.is_object()) { + errors.push_back({prefix, "invalid_type", prefix + " must be an object"}); + return; + } + + const nlohmann::json* qos_source = &object; + std::string qos_prefix = prefix; + if (object.contains("qos")) { + if (object["qos"].is_null()) { + return; + } + if (!object["qos"].is_object()) { + errors.push_back({prefix + "qos", "invalid_type", prefix + "qos must be an object"}); + return; + } + qos_source = &object["qos"]; + qos_prefix = prefix + "qos."; + } + + validate_qos_policy_field(*qos_source, "mode", qos_prefix, {"auto"}, errors); + validate_qos_depth_field(*qos_source, "depth", qos_prefix, errors); + validate_qos_depth_field(*qos_source, "qos_depth", qos_prefix, errors); + validate_qos_reliability_field(*qos_source, "reliability", qos_prefix, errors); + validate_qos_reliability_field(*qos_source, "reliable", qos_prefix, errors); + validate_qos_reliability_field(object, "qos_reliable", prefix, errors); + validate_qos_policy_field( + *qos_source, "durability", qos_prefix, {"volatile", "transient_local", "auto"}, errors + ); + validate_qos_policy_field( + *qos_source, "history", qos_prefix, {"keep_last", "keep_all", "auto"}, errors + ); +} + +void validate_topics_field(const nlohmann::json& object, std::vector& errors) { + if (!object.contains("topics")) { + return; + } + if (!object["topics"].is_array()) { + errors.push_back({"topics", "invalid_type", "topics must be an array"}); + return; + } + for (size_t i = 0; i < object["topics"].size(); ++i) { + const auto& item = object["topics"][i]; + const std::string prefix = "topics[" + std::to_string(i) + "]."; + if (item.is_string()) { + continue; + } + if (!item.is_object()) { + errors.push_back( + {"topics[" + std::to_string(i) + "]", + "invalid_type", + "topics entries must be strings or objects"} + ); + continue; + } + if (!json_field_has_value(item, "name") && !json_field_has_value(item, "topic")) { + errors.push_back({prefix + "name", "missing_required", prefix + "name is required"}); + } else if ((item.contains("name") && !item["name"].is_string()) || + (item.contains("topic") && !item["topic"].is_string())) { + errors.push_back({prefix + "name", "invalid_type", prefix + "name must be a string"}); + } + validate_qos_object(item, prefix, errors); + } +} + +void validate_topic_qos_field(const nlohmann::json& object, std::vector& errors) { + if (!object.contains("topic_qos")) { + return; + } + if (!object["topic_qos"].is_array()) { + errors.push_back({"topic_qos", "invalid_type", "topic_qos must be an array"}); + return; + } + for (size_t i = 0; i < object["topic_qos"].size(); ++i) { + const auto& item = object["topic_qos"][i]; + const std::string prefix = "topic_qos[" + std::to_string(i) + "]."; + if (!item.is_object()) { + errors.push_back( + {"topic_qos[" + std::to_string(i) + "]", + "invalid_type", + "topic_qos entries must be objects"} + ); + continue; + } + if (!json_field_has_value(item, "name") && !json_field_has_value(item, "topic")) { + errors.push_back({prefix + "name", "missing_required", prefix + "name is required"}); + } else if ((item.contains("name") && !item["name"].is_string()) || + (item.contains("topic") && !item["topic"].is_string())) { + errors.push_back({prefix + "name", "invalid_type", prefix + "name must be a string"}); + } + validate_qos_object(item, prefix, errors); + } +} + std::vector validate_task_config_json(const nlohmann::json& config_json) { std::vector errors; if (!config_json.is_object()) { @@ -98,11 +296,37 @@ std::vector validate_task_config_json(const nlohmann::json& con require_string_field(config_json, field, errors, false); } validate_string_array_field(config_json, "skills", errors); - validate_string_array_field(config_json, "topics", errors); + validate_topics_field(config_json, errors); + validate_topic_qos_field(config_json, errors); return errors; } +nlohmann::json qos_config_to_public_json(const RosQosConfig& qos) { + nlohmann::json out = nlohmann::json::object(); + if (qos.mode.has_value()) { + out["mode"] = *qos.mode; + } else if (qos.auto_mode) { + out["mode"] = "auto"; + } + if (qos.depth_auto) { + out["depth"] = "auto"; + } + if (qos.depth.has_value()) { + out["depth"] = *qos.depth; + } + if (qos.reliability.has_value()) { + out["reliability"] = *qos.reliability; + } + if (qos.durability.has_value()) { + out["durability"] = *qos.durability; + } + if (qos.history.has_value()) { + out["history"] = *qos.history; + } + return out; +} + nlohmann::json task_config_to_public_json(const TaskConfig& task_config) { nlohmann::json config_json; config_json["task_id"] = task_config.task_id; @@ -115,6 +339,15 @@ nlohmann::json task_config_to_public_json(const TaskConfig& task_config) { config_json["factory"] = task_config.factory; config_json["operator_name"] = task_config.operator_name; config_json["topics"] = task_config.topics; + if (!task_config.topic_qos.empty()) { + nlohmann::json topic_qos = nlohmann::json::array(); + for (const auto& item : task_config.topic_qos) { + topic_qos.push_back( + {{"name", item.topic_name}, {"qos", qos_config_to_public_json(item.qos)}} + ); + } + config_json["topic_qos"] = topic_qos; + } return config_json; } diff --git a/apps/axon_recorder/src/http/ws_rpc_client.cpp b/apps/axon_recorder/src/http/ws_rpc_client.cpp index d959ba84..19ae6fa5 100644 --- a/apps/axon_recorder/src/http/ws_rpc_client.cpp +++ b/apps/axon_recorder/src/http/ws_rpc_client.cpp @@ -26,6 +26,35 @@ using axon::logging::kv; namespace axon { namespace recorder { +namespace { + +nlohmann::json qos_config_to_json(const RosQosConfig& qos) { + nlohmann::json out = nlohmann::json::object(); + if (qos.mode.has_value()) { + out["mode"] = *qos.mode; + } else if (qos.auto_mode) { + out["mode"] = "auto"; + } + if (qos.depth_auto) { + out["depth"] = "auto"; + } + if (qos.depth.has_value()) { + out["depth"] = *qos.depth; + } + if (qos.reliability.has_value()) { + out["reliability"] = *qos.reliability; + } + if (qos.durability.has_value()) { + out["durability"] = *qos.durability; + } + if (qos.history.has_value()) { + out["history"] = *qos.history; + } + return out; +} + +} // namespace + WsRpcClient::WsRpcClient(net::io_context& ioc, const WsClientConfig& config) : config_(config) , strand_(net::make_strand(ioc)) @@ -237,6 +266,13 @@ void WsRpcClient::send_config_update(const TaskConfig& config) { if (!config.topics.empty()) { data["topics"] = config.topics; } + if (!config.topic_qos.empty()) { + nlohmann::json topic_qos = nlohmann::json::array(); + for (const auto& item : config.topic_qos) { + topic_qos.push_back({{"name", item.topic_name}, {"qos", qos_config_to_json(item.qos)}}); + } + data["topic_qos"] = topic_qos; + } msg["data"] = data; send_message(msg); diff --git a/apps/axon_recorder/test/unit/test_config_parser.cpp b/apps/axon_recorder/test/unit/test_config_parser.cpp index 4b500cd6..42a6eacb 100644 --- a/apps/axon_recorder/test/unit/test_config_parser.cpp +++ b/apps/axon_recorder/test/unit/test_config_parser.cpp @@ -99,6 +99,12 @@ TEST_F(ConfigParserTest, ParseValidFullConfig) { message_type: sensor_msgs/Imu batch_size: 200 flush_interval_ms: 100 + qos: + mode: auto + depth: 24 + reliability: best-effort + durability: transient_local + history: keep_last recording: max_disk_usage_gb: 50.0 @@ -172,9 +178,16 @@ TEST_F(ConfigParserTest, ParseValidFullConfig) { EXPECT_EQ(config.subscriptions[0].batch_size, 50); EXPECT_EQ(config.subscriptions[0].flush_interval_ms, 500); EXPECT_EQ(config.subscriptions[0].qos_depth, 64u); + EXPECT_FALSE(config.subscriptions[0].qos.has_overrides()); EXPECT_EQ(config.subscriptions[1].topic_name, "/imu/data"); EXPECT_EQ(config.subscriptions[1].batch_size, 200); - EXPECT_EQ(config.subscriptions[1].qos_depth, 10u); + EXPECT_TRUE(config.subscriptions[1].qos.auto_mode); + EXPECT_EQ(config.subscriptions[1].qos.mode.value(), "auto"); + EXPECT_EQ(config.subscriptions[1].qos_depth, 24u); + EXPECT_EQ(config.subscriptions[1].qos.depth.value(), 24u); + EXPECT_EQ(config.subscriptions[1].qos.reliability.value(), "best_effort"); + EXPECT_EQ(config.subscriptions[1].qos.durability.value(), "transient_local"); + EXPECT_EQ(config.subscriptions[1].qos.history.value(), "keep_last"); EXPECT_DOUBLE_EQ(config.recording.max_disk_usage_gb, 50.0); EXPECT_DOUBLE_EQ(config.recording.disk_usage.warn_usage_gb, 40.0); @@ -325,6 +338,60 @@ TEST_F(ConfigParserTest, DefaultValuesForOptionalFields) { EXPECT_TRUE(config.upload.delete_after_upload); } +TEST_F(ConfigParserTest, BlankQosFieldsFallBackToDefaults) { + const std::string yaml = R"( +dataset: + path: /data +subscriptions: + - name: /test + message_type: std_msgs/String + qos_depth: + qos: + depth: + reliability: + durability: + history: +)"; + + ConfigParser parser; + RecorderConfig config; + ASSERT_TRUE(parser.load_from_string(yaml, config)); + ASSERT_EQ(config.subscriptions.size(), 1); + EXPECT_EQ(config.subscriptions[0].qos_depth, 10u); + EXPECT_FALSE(config.subscriptions[0].qos.has_overrides()); + + std::string error_msg; + EXPECT_TRUE(ConfigParser::validate(config, error_msg)); +} + +TEST_F(ConfigParserTest, AutoQosFieldsArePreserved) { + const std::string yaml = R"( +dataset: + path: /data +subscriptions: + - name: /test + message_type: sensor_msgs/msg/Image + qos: + depth: auto + reliability: auto + durability: auto + history: auto +)"; + + ConfigParser parser; + RecorderConfig config; + ASSERT_TRUE(parser.load_from_string(yaml, config)); + ASSERT_EQ(config.subscriptions.size(), 1); + EXPECT_TRUE(config.subscriptions[0].qos.has_overrides()); + EXPECT_TRUE(config.subscriptions[0].qos.depth_auto); + EXPECT_EQ(config.subscriptions[0].qos.reliability.value(), "auto"); + EXPECT_EQ(config.subscriptions[0].qos.durability.value(), "auto"); + EXPECT_EQ(config.subscriptions[0].qos.history.value(), "auto"); + + std::string error_msg; + EXPECT_TRUE(ConfigParser::validate(config, error_msg)); +} + // ============================================================================ // Validation Tests // ============================================================================ @@ -485,6 +552,26 @@ TEST_F(ConfigParserTest, OversizedQosDepthFailsToParse) { EXPECT_FALSE(parser.get_last_error().empty()); } +TEST_F(ConfigParserTest, InvalidQosReliabilityFailsValidation) { + const std::string yaml = R"( +dataset: + path: /data +subscriptions: + - name: /test + message_type: std_msgs/String + qos: + reliability: maybe +)"; + + ConfigParser parser; + RecorderConfig config; + ASSERT_TRUE(parser.load_from_string(yaml, config)); + + std::string error_msg; + EXPECT_FALSE(ConfigParser::validate(config, error_msg)); + EXPECT_EQ(error_msg, "Subscription qos.reliability must be 'reliable', 'best_effort', or 'auto'"); +} + TEST_F(ConfigParserTest, ValidateFailsWithInvalidMode) { RecorderConfig config; config.dataset.path = "/data"; diff --git a/apps/axon_recorder/test/unit/test_rpc_handlers.cpp b/apps/axon_recorder/test/unit/test_rpc_handlers.cpp index d48dcab0..d90796be 100644 --- a/apps/axon_recorder/test/unit/test_rpc_handlers.cpp +++ b/apps/axon_recorder/test/unit/test_rpc_handlers.cpp @@ -410,6 +410,46 @@ TEST_F(RpcHandlersTest, ConfigSuccess) { EXPECT_EQ("new_task_123", response.data["task_id"]); } +TEST_F(RpcHandlersTest, ConfigAcceptsTopicQos) { + current_state_ = "idle"; + RpcCallbacks callbacks = create_mock_callbacks(); + callbacks.set_config = [&](const std::string& task_id, const nlohmann::json& config) -> bool { + EXPECT_EQ(task_id, "new_task_123"); + if (!config["topics"][0].is_object()) { + return false; + } + EXPECT_EQ(config["topics"][0]["name"], "/camera/image"); + EXPECT_EQ(config["topics"][0]["qos"]["depth"], 32); + EXPECT_EQ(config["topic_qos"][0]["name"], "/lidar/scan"); + EXPECT_EQ(config["topic_qos"][0]["qos"]["mode"], "auto"); + EXPECT_EQ(config["topic_qos"][0]["qos"]["reliability"], "auto"); + current_state_ = "ready"; + return true; + }; + + nlohmann::json params; + nlohmann::json topics = nlohmann::json::array(); + topics.push_back( + {{"name", "/camera/image"}, + {"qos", {{"depth", 32}, {"reliability", ""}, {"durability", ""}, {"history", ""}}}} + ); + nlohmann::json topic_qos = nlohmann::json::array(); + topic_qos.push_back( + {{"name", "/lidar/scan"}, {"qos", {{"mode", "auto"}, {"reliability", "auto"}}}} + ); + params["task_config"] = { + {"task_id", "new_task_123"}, + {"device_id", "robot_01"}, + {"topics", topics}, + {"topic_qos", topic_qos} + }; + + RpcResponse response = handle_rpc_config(callbacks, params); + + EXPECT_TRUE(response.success); + EXPECT_EQ("Task configuration set successfully", response.message); +} + TEST_F(RpcHandlersTest, ConfigMissingTaskConfig) { RpcCallbacks callbacks = create_mock_callbacks(); callbacks.set_config = [&](const std::string&, const nlohmann::json&) -> bool { @@ -540,6 +580,13 @@ TEST_F(RpcHandlersTest, GetStateSuccess) { current_config_->device_id = "robot_01"; current_config_->scene = "warehouse"; current_config_->topics = {"/camera/image", "/lidar/scan"}; + TopicQosConfig topic_qos; + topic_qos.topic_name = "/lidar/scan"; + topic_qos.qos.mode = "auto"; + topic_qos.qos.auto_mode = true; + topic_qos.qos.depth = 64; + topic_qos.qos.reliability = "best_effort"; + current_config_->topic_qos.push_back(topic_qos); RpcCallbacks callbacks = create_mock_callbacks(); @@ -553,6 +600,9 @@ TEST_F(RpcHandlersTest, GetStateSuccess) { EXPECT_EQ("test_task_123", response.data["task_config"]["task_id"]); EXPECT_EQ("robot_01", response.data["task_config"]["device_id"]); EXPECT_EQ("warehouse", response.data["task_config"]["scene"]); + EXPECT_EQ("/lidar/scan", response.data["task_config"]["topic_qos"][0]["name"]); + EXPECT_EQ("auto", response.data["task_config"]["topic_qos"][0]["qos"]["mode"]); + EXPECT_EQ(64, response.data["task_config"]["topic_qos"][0]["qos"]["depth"]); EXPECT_TRUE(response.data.contains("version")); } From c4922a33378f903dd805ebe8c101edf93e045b56 Mon Sep 17 00:00:00 2001 From: stz Date: Tue, 9 Jun 2026 15:31:49 +0800 Subject: [PATCH 2/5] feat(ros2): auto-resolve subscription QoS - Parse mode:auto and per-field auto QoS subscription options - Resolve auto QoS from publisher endpoint info with topic fallbacks - Enable auto QoS in recorder ROS2 templates - Document auto QoS fields in config and RPC docs --- apps/axon_recorder/config/README.md | 12 +- .../config/default_config_hybrid.yaml | 2 + .../config/default_config_ros2.yaml | 23 +- docs/designs/rpc-api-design.md | 12 +- .../include/ros2_subscription_wrapper.hpp | 9 + middlewares/ros2/src/ros2_plugin_export.cpp | 151 ++++++++++++- .../ros2/src/ros2_subscription_wrapper.cpp | 211 +++++++++++++++++- middlewares/ros2/test/test_ros2_plugin.cpp | 110 +++++++++ 8 files changed, 515 insertions(+), 15 deletions(-) diff --git a/apps/axon_recorder/config/README.md b/apps/axon_recorder/config/README.md index 6c04b40d..5e73451c 100644 --- a/apps/axon_recorder/config/README.md +++ b/apps/axon_recorder/config/README.md @@ -62,8 +62,16 @@ List of topics to record with batching settings: - `message_type`: ROS message type - `batch_size`: Number of messages to batch before writing - `flush_interval_ms`: Maximum time to wait before flushing (ms) -- `qos_depth`: ROS2 subscription history depth (default `10`). This controls - the middleware QoS queue, not the recorder's `dataset.queue_size` worker queue. +- `qos_depth`: Backward-compatible ROS2 subscription history depth (default `10`). +- `qos`: Optional ROS2 QoS object. Any empty field falls back to the default. + - `mode`: `auto` to detect publisher QoS when available + - `depth`: History depth, or `auto` (default `10`) + - `reliability`: `reliable`, `best_effort`, or `auto` (default `reliable`) + - `durability`: `volatile`, `transient_local`, or `auto` (default `volatile`) + - `history`: `keep_last`, `keep_all`, or `auto` (default `keep_last`) + +QoS depth controls the ROS2 middleware queue, not the recorder's +`dataset.queue_size` worker queue. ### Recording - `profile`: ROS profile (`ros1` or `ros2`) diff --git a/apps/axon_recorder/config/default_config_hybrid.yaml b/apps/axon_recorder/config/default_config_hybrid.yaml index 140d7784..699e6fcb 100644 --- a/apps/axon_recorder/config/default_config_hybrid.yaml +++ b/apps/axon_recorder/config/default_config_hybrid.yaml @@ -57,6 +57,8 @@ subscriptions: message_type: sensor_msgs/msg/Imu batch_size: 500 flush_interval_ms: 1000 + qos: + mode: auto - name: /udp/gps message_type: axon_udp/json batch_size: 1 diff --git a/apps/axon_recorder/config/default_config_ros2.yaml b/apps/axon_recorder/config/default_config_ros2.yaml index af1a793e..d0f4b1e5 100644 --- a/apps/axon_recorder/config/default_config_ros2.yaml +++ b/apps/axon_recorder/config/default_config_ros2.yaml @@ -49,53 +49,72 @@ dataset: # High-frequency subscriptions (IMU, camera): Use larger batches (100-5000) # Low-frequency subscriptions (status, diagnostics): Use small batches (1-10) # Control signals: Use batch_size=1 for immediate persistence -# Optional qos_depth controls ROS2 subscription history depth. It is separate -# from dataset.queue_size, which controls Axon's per-topic worker queue. +# Optional ROS2 QoS overrides can be set per topic. Use mode: auto to follow +# publisher QoS when available. Leave any field empty to keep the middleware +# default (depth=10, reliability=reliable, durability=volatile, +# history=keep_last). QoS depth is separate from dataset.queue_size, which +# controls Axon's per-topic worker queue. subscriptions: # IMU data - high frequency (5 seconds of data per batch) - name: /imu/data message_type: sensor_msgs/msg/Imu batch_size: 5000 flush_interval_ms: 5000 + qos: + mode: auto # Camera 0 - RGB and Depth (10 seconds of data per batch at 30fps) - name: /camera0/rgb message_type: sensor_msgs/msg/Image batch_size: 300 flush_interval_ms: 10000 + qos: + mode: auto - name: /camera0/depth message_type: sensor_msgs/msg/Image batch_size: 300 flush_interval_ms: 10000 + qos: + mode: auto # Camera 1 - RGB and Depth - name: /camera1/rgb message_type: sensor_msgs/msg/Image batch_size: 300 flush_interval_ms: 10000 + qos: + mode: auto - name: /camera1/depth message_type: sensor_msgs/msg/Image batch_size: 300 flush_interval_ms: 10000 + qos: + mode: auto # Camera 2 - RGB and Depth - name: /camera2/rgb message_type: sensor_msgs/msg/Image batch_size: 300 flush_interval_ms: 10000 + qos: + mode: auto - name: /camera2/depth message_type: sensor_msgs/msg/Image batch_size: 300 flush_interval_ms: 10000 + qos: + mode: auto # Depth image with compression (16UC1 -> CompressedImage) - name: /camera/camera/depth/image_rect_raw message_type: sensor_msgs/msg/Image batch_size: 300 flush_interval_ms: 10000 + qos: + mode: auto depth_compression: true # ============================================================================ diff --git a/docs/designs/rpc-api-design.md b/docs/designs/rpc-api-design.md index 8852e5a4..72db9cb1 100644 --- a/docs/designs/rpc-api-design.md +++ b/docs/designs/rpc-api-design.md @@ -93,6 +93,15 @@ Content-Type: application/json "factory": "factory_shenzhen", "operator_name": "operator_001", "topics": ["/camera/image", "/lidar/scan", "/odom"], + "topic_qos": [ + { + "name": "/lidar/scan", + "qos": { + "mode": "auto", + "depth": 64 + } + } + ], "start_callback_url": "http://server.example.com/api/v1/tasks/123/start", "finish_callback_url": "http://server.example.com/api/v1/tasks/123/finish", "user_token": "eyJhbGciOiJIUzI1NiIs..." @@ -141,7 +150,8 @@ Content-Type: application/json | `skills` | array | No | List of associated skills | | `factory` | string | No | Factory identifier (identifies the factory producing the data) | | `operator_name` | string | No | Operator identifier | -| `topics` | array | No | List of topics to record | +| `topics` | array | No | List of topics to record. Entries may be strings or objects with `name` and optional `qos`. | +| `topic_qos` | array | No | Per-topic ROS2 QoS overrides. Empty QoS fields fall back to defaults; `mode: auto` or per-field `auto` enables ROS2 publisher QoS detection. | | `start_callback_url` | string | No | Start recording callback URL | | `finish_callback_url` | string | No | Finish recording callback URL | | `user_token` | string | No | JWT token for callback authentication | diff --git a/middlewares/ros2/include/ros2_subscription_wrapper.hpp b/middlewares/ros2/include/ros2_subscription_wrapper.hpp index a2d794e8..9d27a14b 100644 --- a/middlewares/ros2/include/ros2_subscription_wrapper.hpp +++ b/middlewares/ros2/include/ros2_subscription_wrapper.hpp @@ -64,6 +64,10 @@ using MessageCallbackV2 = std::function depth_compression; // Depth compression config (optional) + bool qos_depth_auto = false; + bool qos_reliability_auto = false; + bool qos_durability_auto = false; + bool qos_history_auto = false; // Default constructor SubscribeOptions() @@ -72,6 +76,11 @@ struct SubscribeOptions { void apply_subscribe_qos_options(SubscribeOptions& options, const nlohmann::json& opts); +SubscribeOptions resolve_auto_qos_options( + const rclcpp::Node::SharedPtr& node, const std::string& topic_name, + const std::string& message_type, const SubscribeOptions& options +); + class SubscriptionManager { public: explicit SubscriptionManager(rclcpp::Node::SharedPtr node); diff --git a/middlewares/ros2/src/ros2_plugin_export.cpp b/middlewares/ros2/src/ros2_plugin_export.cpp index f3de7378..6332a0c7 100644 --- a/middlewares/ros2/src/ros2_plugin_export.cpp +++ b/middlewares/ros2/src/ros2_plugin_export.cpp @@ -5,12 +5,16 @@ // ROS2 Plugin C ABI Export // Direct C interface without shared ABI header dependency +#include #include +#include #include #include #include #include #include +#include +#include #include // JSON library @@ -32,26 +36,157 @@ using namespace ros2_plugin; namespace ros2_plugin { +std::string trim_ascii(std::string value) { + auto not_space = [](unsigned char c) { + return !std::isspace(c); + }; + value.erase(value.begin(), std::find_if(value.begin(), value.end(), not_space)); + value.erase(std::find_if(value.rbegin(), value.rend(), not_space).base(), value.end()); + return value; +} + +std::string normalize_qos_policy_token(std::string value) { + value = trim_ascii(std::move(value)); + std::transform(value.begin(), value.end(), value.begin(), [](unsigned char c) { + if (c == '-' || std::isspace(c)) { + return '_'; + } + return static_cast(std::tolower(c)); + }); + return value; +} + +bool json_has_value(const nlohmann::json& object, const char* key) { + if (!object.is_object() || !object.contains(key) || object[key].is_null()) { + return false; + } + if (object[key].is_string()) { + return !trim_ascii(object[key].get()).empty(); + } + return true; +} + +bool json_field_is_auto(const nlohmann::json& object, const char* key) { + return json_has_value(object, key) && object[key].is_string() && + normalize_qos_policy_token(object[key].get()) == "auto"; +} + +std::optional read_qos_depth(const nlohmann::json& object, const char* key) { + if (!json_has_value(object, key)) { + return std::nullopt; + } + if (json_field_is_auto(object, key)) { + return std::nullopt; + } + long long depth = 0; + if (object[key].is_string()) { + depth = std::stoll(trim_ascii(object[key].get())); + } else { + depth = object[key].get(); + } + return depth > 0 ? std::optional(static_cast(depth)) : std::optional(0); +} + +std::optional read_qos_policy(const nlohmann::json& object, const char* key) { + if (!json_has_value(object, key)) { + return std::nullopt; + } + return normalize_qos_policy_token(object[key].get()); +} + +std::optional read_qos_reliable(const nlohmann::json& object, const char* key) { + if (!json_has_value(object, key)) { + return std::nullopt; + } + if (object[key].is_boolean()) { + return object[key].get(); + } + const std::string reliability = normalize_qos_policy_token(object[key].get()); + if (reliability == "reliable") { + return true; + } + if (reliability == "best_effort") { + return false; + } + return std::nullopt; +} + void apply_subscribe_qos_options(SubscribeOptions& options, const nlohmann::json& opts) { size_t qos_depth = 10; - if (opts.contains("qos_depth")) { - qos_depth = opts["qos_depth"].get(); - } else if (opts.contains("queue_size")) { - qos_depth = opts["queue_size"].get(); + const nlohmann::json* qos_opts = &opts; + if (opts.contains("qos") && opts["qos"].is_object()) { + qos_opts = &opts["qos"]; + } + + const bool auto_mode = read_qos_policy(*qos_opts, "mode").value_or(std::string{}) == "auto"; + if (auto_mode) { + options.qos_depth_auto = true; + options.qos_reliability_auto = true; + options.qos_durability_auto = true; + options.qos_history_auto = true; + } + + if (json_field_is_auto(*qos_opts, "depth") || json_field_is_auto(*qos_opts, "qos_depth")) { + options.qos_depth_auto = true; + } + if (auto depth = read_qos_depth(*qos_opts, "depth")) { + options.qos_depth_auto = false; + qos_depth = *depth; + } else if (auto depth = read_qos_depth(*qos_opts, "qos_depth")) { + options.qos_depth_auto = false; + qos_depth = *depth; + } else if (auto depth = read_qos_depth(opts, "qos_depth")) { + qos_depth = *depth; + } else if (auto depth = read_qos_depth(opts, "queue_size")) { + qos_depth = *depth; } if (qos_depth == 0) { qos_depth = 10; } - options.qos = rclcpp::QoS(rclcpp::KeepLast(qos_depth)); - const bool reliable = - opts.contains("qos_reliable") ? opts["qos_reliable"].get() : opts.value("reliable", true); + const std::string history = read_qos_policy(*qos_opts, "history").value_or("keep_last"); + if (history == "auto") { + options.qos_history_auto = true; + } else if (json_has_value(*qos_opts, "history")) { + options.qos_history_auto = false; + } + if (history == "keep_all") { + options.qos = rclcpp::QoS(rclcpp::KeepAll()); + } else { + options.qos = rclcpp::QoS(rclcpp::KeepLast(qos_depth)); + } + + bool reliable = true; + if (json_field_is_auto(*qos_opts, "reliability") || json_field_is_auto(*qos_opts, "reliable")) { + options.qos_reliability_auto = true; + } else if (auto qos_reliable = read_qos_reliable(*qos_opts, "reliability")) { + options.qos_reliability_auto = false; + reliable = *qos_reliable; + } else if (auto qos_reliable = read_qos_reliable(*qos_opts, "reliable")) { + options.qos_reliability_auto = false; + reliable = *qos_reliable; + } else if (auto qos_reliable = read_qos_reliable(opts, "qos_reliable")) { + reliable = *qos_reliable; + } else if (auto legacy_reliable = read_qos_reliable(opts, "reliable")) { + reliable = *legacy_reliable; + } if (reliable) { options.qos.reliable(); } else { options.qos.best_effort(); } - options.qos.durability_volatile(); + + const std::string durability = read_qos_policy(*qos_opts, "durability").value_or("volatile"); + if (durability == "auto") { + options.qos_durability_auto = true; + } else if (json_has_value(*qos_opts, "durability")) { + options.qos_durability_auto = false; + } + if (durability == "transient_local") { + options.qos.transient_local(); + } else { + options.qos.durability_volatile(); + } } } // namespace ros2_plugin diff --git a/middlewares/ros2/src/ros2_subscription_wrapper.cpp b/middlewares/ros2/src/ros2_subscription_wrapper.cpp index 4b7c531c..0d1efcf6 100644 --- a/middlewares/ros2/src/ros2_subscription_wrapper.cpp +++ b/middlewares/ros2/src/ros2_subscription_wrapper.cpp @@ -8,6 +8,10 @@ #include "depth_compression_filter.hpp" #endif +#include +#include +#include + // Define component name for logging #define AXON_LOG_COMPONENT "ros2_subscription" #include @@ -16,6 +20,205 @@ using axon::logging::kv; namespace ros2_plugin { +namespace { + +struct QosPolicySelection { + size_t depth = 10; + rmw_qos_reliability_policy_t reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE; + rmw_qos_durability_policy_t durability = RMW_QOS_POLICY_DURABILITY_VOLATILE; + rmw_qos_history_policy_t history = RMW_QOS_POLICY_HISTORY_KEEP_LAST; +}; + +bool has_auto_qos_fields(const SubscribeOptions& options) { + return options.qos_depth_auto || options.qos_reliability_auto || options.qos_durability_auto || + options.qos_history_auto; +} + +std::string normalize_type_name(std::string value) { + std::replace(value.begin(), value.end(), ':', '/'); + value.erase( + std::unique( + value.begin(), + value.end(), + [](char lhs, char rhs) { + return lhs == '/' && rhs == '/'; + } + ), + value.end() + ); + return value; +} + +bool topic_type_matches(const rclcpp::TopicEndpointInfo& info, const std::string& message_type) { + return message_type.empty() || + normalize_type_name(info.topic_type()) == normalize_type_name(message_type); +} + +bool is_sensor_data_topic(const std::string& topic_name, const std::string& message_type) { + const std::string type = normalize_type_name(message_type); + if (type == "sensor_msgs/msg/Image" || type == "sensor_msgs/msg/CompressedImage" || + type == "sensor_msgs/msg/PointCloud2" || type == "sensor_msgs/msg/LaserScan" || + type == "sensor_msgs/msg/Imu") { + return true; + } + return topic_name.find("/camera") != std::string::npos || + topic_name.find("/lidar") != std::string::npos || + topic_name.find("/scan") != std::string::npos || + topic_name.find("/imu") != std::string::npos; +} + +QosPolicySelection selection_from_qos(const rclcpp::QoS& qos) { + const auto& profile = qos.get_rmw_qos_profile(); + QosPolicySelection selection; + selection.depth = profile.depth > 0 ? profile.depth : 10; + selection.reliability = profile.reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT + ? RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT + : RMW_QOS_POLICY_RELIABILITY_RELIABLE; + selection.durability = profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL + ? RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL + : RMW_QOS_POLICY_DURABILITY_VOLATILE; + selection.history = profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL + ? RMW_QOS_POLICY_HISTORY_KEEP_ALL + : RMW_QOS_POLICY_HISTORY_KEEP_LAST; + return selection; +} + +rclcpp::QoS qos_from_selection(const QosPolicySelection& selection) { + rclcpp::QoS qos = selection.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL + ? rclcpp::QoS(rclcpp::KeepAll()) + : rclcpp::QoS(rclcpp::KeepLast(selection.depth > 0 ? selection.depth : 10)); + if (selection.reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT) { + qos.best_effort(); + } else { + qos.reliable(); + } + if (selection.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { + qos.transient_local(); + } else { + qos.durability_volatile(); + } + return qos; +} + +std::optional selection_from_publishers( + const rclcpp::Node::SharedPtr& node, const std::string& topic_name, + const std::string& message_type +) { + if (!node) { + return std::nullopt; + } + + const auto publishers = node->get_publishers_info_by_topic(topic_name); + bool matched = false; + bool any_best_effort = false; + bool any_volatile = false; + bool any_transient_local = false; + bool any_keep_all = false; + size_t max_depth = 0; + + for (const auto& publisher : publishers) { + if (!topic_type_matches(publisher, message_type)) { + continue; + } + matched = true; + const auto& profile = publisher.qos_profile().get_rmw_qos_profile(); + any_best_effort = + any_best_effort || profile.reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; + any_volatile = any_volatile || profile.durability == RMW_QOS_POLICY_DURABILITY_VOLATILE; + any_transient_local = + any_transient_local || profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL; + any_keep_all = any_keep_all || profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL; + max_depth = std::max(max_depth, profile.depth); + } + + if (!matched) { + return std::nullopt; + } + + QosPolicySelection selection; + selection.depth = max_depth > 0 ? max_depth : 10; + selection.reliability = + any_best_effort ? RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT : RMW_QOS_POLICY_RELIABILITY_RELIABLE; + selection.durability = any_volatile || !any_transient_local + ? RMW_QOS_POLICY_DURABILITY_VOLATILE + : RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL; + selection.history = + any_keep_all ? RMW_QOS_POLICY_HISTORY_KEEP_ALL : RMW_QOS_POLICY_HISTORY_KEEP_LAST; + return selection; +} + +QosPolicySelection fallback_selection_for_topic( + const std::string& topic_name, const std::string& message_type +) { + QosPolicySelection selection; + if (topic_name == "/tf_static" || topic_name.rfind("/tf_static", 0) == 0) { + selection.depth = 1; + selection.reliability = RMW_QOS_POLICY_RELIABILITY_RELIABLE; + selection.durability = RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL; + selection.history = RMW_QOS_POLICY_HISTORY_KEEP_LAST; + return selection; + } + if (is_sensor_data_topic(topic_name, message_type)) { + selection.depth = 10; + selection.reliability = RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT; + selection.durability = RMW_QOS_POLICY_DURABILITY_VOLATILE; + selection.history = RMW_QOS_POLICY_HISTORY_KEEP_LAST; + } + return selection; +} + +} // namespace + +SubscribeOptions resolve_auto_qos_options( + const rclcpp::Node::SharedPtr& node, const std::string& topic_name, + const std::string& message_type, const SubscribeOptions& options +) { + if (!has_auto_qos_fields(options)) { + return options; + } + + SubscribeOptions resolved = options; + QosPolicySelection selection = selection_from_qos(options.qos); + const auto detected = selection_from_publishers(node, topic_name, message_type); + const QosPolicySelection automatic = + detected.value_or(fallback_selection_for_topic(topic_name, message_type)); + + if (options.qos_depth_auto) { + selection.depth = automatic.depth; + } + if (options.qos_reliability_auto) { + selection.reliability = automatic.reliability; + } + if (options.qos_durability_auto) { + selection.durability = automatic.durability; + } + if (options.qos_history_auto) { + selection.history = automatic.history; + } + resolved.qos = qos_from_selection(selection); + + AXON_LOG_INFO( + "Resolved auto QoS for topic " + << kv("topic", topic_name) << kv("type", message_type) + << kv("source", detected.has_value() ? "publisher" : "fallback") + << kv( + "reliability", + selection.reliability == RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT ? "best_effort" + : "reliable" + ) + << kv( + "durability", + selection.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL ? "transient_local" + : "volatile" + ) + << kv( + "history", selection.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL ? "keep_all" : "keep_last" + ) + << kv("depth", selection.depth) + ); + return resolved; +} + // ============================================================================= // SubscriptionManager Implementation // ============================================================================= @@ -33,6 +236,8 @@ bool SubscriptionManager::subscribe( MessageCallback callback ) { std::lock_guard lock(mutex_); + const SubscribeOptions effective_options = + resolve_auto_qos_options(node_, topic_name, message_type, options); // Check if already subscribed if (subscriptions_.find(topic_name) != subscriptions_.end()) { @@ -79,7 +284,7 @@ bool SubscriptionManager::subscribe( auto subscription = node_->create_generic_subscription( topic_name, message_type, - options.qos, + effective_options.qos, [this, topic_name, message_type, callback, captured_filter]( std::shared_ptr msg ) { @@ -171,6 +376,8 @@ bool SubscriptionManager::subscribe_v2( MessageCallbackV2 callback ) { std::lock_guard lock(mutex_); + const SubscribeOptions effective_options = + resolve_auto_qos_options(node_, topic_name, message_type, options); if (subscriptions_.find(topic_name) != subscriptions_.end()) { AXON_LOG_WARN("Already subscribed to topic: " << kv("topic", topic_name)); @@ -206,7 +413,7 @@ bool SubscriptionManager::subscribe_v2( auto subscription = node_->create_generic_subscription( topic_name, message_type, - options.qos, + effective_options.qos, [topic_name, message_type, callback, captured_filter]( std::shared_ptr msg ) { diff --git a/middlewares/ros2/test/test_ros2_plugin.cpp b/middlewares/ros2/test/test_ros2_plugin.cpp index 1732c2ae..87640817 100644 --- a/middlewares/ros2/test/test_ros2_plugin.cpp +++ b/middlewares/ros2/test/test_ros2_plugin.cpp @@ -294,6 +294,116 @@ TEST(Ros2PluginExportTest, AppliesBestEffortReliability) { EXPECT_EQ(qos.reliability, RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT); } +TEST(Ros2PluginExportTest, AppliesNestedQosOptions) { + ros2_plugin::SubscribeOptions options; + apply_subscribe_qos_options( + options, + nlohmann::json::parse( + R"({"qos":{"depth":64,"reliability":"best_effort","durability":"transient_local"}})" + ) + ); + + const auto& qos = options.qos.get_rmw_qos_profile(); + EXPECT_EQ(qos.depth, 64u); + EXPECT_EQ(qos.reliability, RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT); + EXPECT_EQ(qos.durability, RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL); +} + +TEST(Ros2PluginExportTest, BlankNestedQosFallsBackToDefaults) { + ros2_plugin::SubscribeOptions options; + apply_subscribe_qos_options( + options, + nlohmann::json::parse(R"({"qos":{"depth":"","reliability":"","durability":"","history":""}})") + ); + + const auto& qos = options.qos.get_rmw_qos_profile(); + EXPECT_EQ(qos.depth, 10u); + EXPECT_EQ(qos.reliability, RMW_QOS_POLICY_RELIABILITY_RELIABLE); + EXPECT_EQ(qos.durability, RMW_QOS_POLICY_DURABILITY_VOLATILE); +} + +TEST(Ros2PluginExportTest, AppliesKeepAllHistory) { + ros2_plugin::SubscribeOptions options; + apply_subscribe_qos_options(options, nlohmann::json::parse(R"({"qos":{"history":"keep_all"}})")); + + const auto& qos = options.qos.get_rmw_qos_profile(); + EXPECT_EQ(qos.history, RMW_QOS_POLICY_HISTORY_KEEP_ALL); +} + +TEST(Ros2PluginExportTest, ParsesAutoQosMode) { + ros2_plugin::SubscribeOptions options; + apply_subscribe_qos_options(options, nlohmann::json::parse(R"({"qos":{"mode":"auto"}})")); + + EXPECT_TRUE(options.qos_depth_auto); + EXPECT_TRUE(options.qos_reliability_auto); + EXPECT_TRUE(options.qos_durability_auto); + EXPECT_TRUE(options.qos_history_auto); +} + +TEST(Ros2PluginExportTest, ExplicitQosFieldsOverrideAutoMode) { + ros2_plugin::SubscribeOptions options; + apply_subscribe_qos_options( + options, + nlohmann::json::parse( + R"({"qos":{"mode":"auto","depth":64,"reliability":"reliable","durability":"auto"}})" + ) + ); + + const auto& qos = options.qos.get_rmw_qos_profile(); + EXPECT_FALSE(options.qos_depth_auto); + EXPECT_FALSE(options.qos_reliability_auto); + EXPECT_TRUE(options.qos_durability_auto); + EXPECT_TRUE(options.qos_history_auto); + EXPECT_EQ(qos.depth, 64u); + EXPECT_EQ(qos.reliability, RMW_QOS_POLICY_RELIABILITY_RELIABLE); +} + +TEST_F(Ros2PluginTest, AutoQosFallsBackByTopicTypeWhenNoPublisherExists) { + auto node = std::make_shared("auto_qos_fallback_node"); + ros2_plugin::SubscribeOptions options; + apply_subscribe_qos_options(options, nlohmann::json::parse(R"({"qos":{"mode":"auto"}})")); + + const auto resolved = + resolve_auto_qos_options(node, "/camera/image", "sensor_msgs/msg/Image", options); + const auto& qos = resolved.qos.get_rmw_qos_profile(); + EXPECT_EQ(qos.depth, 10u); + EXPECT_EQ(qos.reliability, RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT); + EXPECT_EQ(qos.durability, RMW_QOS_POLICY_DURABILITY_VOLATILE); + EXPECT_EQ(qos.history, RMW_QOS_POLICY_HISTORY_KEEP_LAST); +} + +TEST_F(Ros2PluginTest, AutoQosUsesPublisherEndpointQos) { + const std::string topic = "/axon_auto_qos_test"; + auto pub_node = std::make_shared("auto_qos_pub_node"); + auto sub_node = std::make_shared("auto_qos_sub_node"); + + rclcpp::QoS publisher_qos(rclcpp::KeepLast(7)); + publisher_qos.best_effort(); + publisher_qos.transient_local(); + auto publisher = pub_node->create_generic_publisher(topic, "std_msgs/msg/String", publisher_qos); + ASSERT_NE(publisher, nullptr); + + for (int i = 0; i < 50; ++i) { + if (!sub_node->get_publishers_info_by_topic(topic).empty()) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + const auto publisher_infos = sub_node->get_publishers_info_by_topic(topic); + ASSERT_FALSE(publisher_infos.empty()); + const auto& endpoint_qos = publisher_infos.front().qos_profile().get_rmw_qos_profile(); + + ros2_plugin::SubscribeOptions options; + apply_subscribe_qos_options(options, nlohmann::json::parse(R"({"qos":{"mode":"auto"}})")); + + const auto resolved = resolve_auto_qos_options(sub_node, topic, "std_msgs/msg/String", options); + const auto& qos = resolved.qos.get_rmw_qos_profile(); + EXPECT_EQ(qos.depth, endpoint_qos.depth > 0 ? endpoint_qos.depth : 10u); + EXPECT_EQ(qos.reliability, RMW_QOS_POLICY_RELIABILITY_BEST_EFFORT); + EXPECT_EQ(qos.durability, RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL); + EXPECT_EQ(qos.history, RMW_QOS_POLICY_HISTORY_KEEP_LAST); +} + /** * @brief Test stop without explicit start (cleanup in destructor) */ From 3f6e8c9d6565aedf963f41358f74dfd2857e7717 Mon Sep 17 00:00:00 2001 From: stz Date: Tue, 9 Jun 2026 17:57:21 +0800 Subject: [PATCH 3/5] ci(docker): mirror apt sources in ROS test images --- docker/Dockerfile.ros1 | 8 +++++++- docker/Dockerfile.ros2.humble | 8 +++++++- docker/Dockerfile.ros2.jazzy | 8 +++++++- docker/Dockerfile.ros2.rolling | 8 +++++++- docker/scripts/setup-apt-mirror.sh | 5 +++++ 5 files changed, 33 insertions(+), 4 deletions(-) diff --git a/docker/Dockerfile.ros1 b/docker/Dockerfile.ros1 index d9639526..461dd487 100644 --- a/docker/Dockerfile.ros1 +++ b/docker/Dockerfile.ros1 @@ -1,5 +1,10 @@ FROM ros:noetic-ros-base +ARG AXON_APT_MIRROR=default +COPY docker/scripts/setup-apt-mirror.sh /usr/local/bin/setup-apt-mirror.sh +RUN chmod +x /usr/local/bin/setup-apt-mirror.sh && \ + /usr/local/bin/setup-apt-mirror.sh "${AXON_APT_MIRROR}" + # ============================================================================= # IMPORTANT: When adding new dependencies here, also update: # .github/workflows/ci.yml -> ADDITIONAL_DEBS @@ -29,9 +34,10 @@ RUN apt-get update && apt-get install -y \ # Install clang-format-21 from LLVM repository RUN . /etc/os-release && \ + LLVM_APT_MIRROR="$(cat /etc/apt/axon-llvm-mirror-url 2>/dev/null || echo https://apt.llvm.org)" && \ wget -qO- https://apt.llvm.org/llvm-snapshot.gpg.key | \ gpg --dearmor -o /usr/share/keyrings/apt.llvm.org.gpg && \ - echo "deb [signed-by=/usr/share/keyrings/apt.llvm.org.gpg] https://apt.llvm.org/${VERSION_CODENAME}/ llvm-toolchain-${VERSION_CODENAME}-21 main" > \ + echo "deb [signed-by=/usr/share/keyrings/apt.llvm.org.gpg] ${LLVM_APT_MIRROR}/${VERSION_CODENAME}/ llvm-toolchain-${VERSION_CODENAME}-21 main" > \ /etc/apt/sources.list.d/llvm-21.list && \ apt-get update && \ apt-get install -y clang-format-21 && \ diff --git a/docker/Dockerfile.ros2.humble b/docker/Dockerfile.ros2.humble index 2769221c..f891657f 100644 --- a/docker/Dockerfile.ros2.humble +++ b/docker/Dockerfile.ros2.humble @@ -1,5 +1,10 @@ FROM ros:humble-ros-base +ARG AXON_APT_MIRROR=default +COPY docker/scripts/setup-apt-mirror.sh /usr/local/bin/setup-apt-mirror.sh +RUN chmod +x /usr/local/bin/setup-apt-mirror.sh && \ + /usr/local/bin/setup-apt-mirror.sh "${AXON_APT_MIRROR}" + # ============================================================================= # IMPORTANT: When adding new dependencies here, also update: # .github/workflows/ci.yml -> ADDITIONAL_DEBS @@ -26,9 +31,10 @@ RUN apt-get update && apt-get install -y \ # Install clang-format-21 from LLVM repository RUN . /etc/os-release && \ + LLVM_APT_MIRROR="$(cat /etc/apt/axon-llvm-mirror-url 2>/dev/null || echo https://apt.llvm.org)" && \ wget -qO- https://apt.llvm.org/llvm-snapshot.gpg.key | \ gpg --dearmor -o /usr/share/keyrings/apt.llvm.org.gpg && \ - echo "deb [signed-by=/usr/share/keyrings/apt.llvm.org.gpg] https://apt.llvm.org/${VERSION_CODENAME}/ llvm-toolchain-${VERSION_CODENAME}-21 main" > \ + echo "deb [signed-by=/usr/share/keyrings/apt.llvm.org.gpg] ${LLVM_APT_MIRROR}/${VERSION_CODENAME}/ llvm-toolchain-${VERSION_CODENAME}-21 main" > \ /etc/apt/sources.list.d/llvm-21.list && \ apt-get update && \ apt-get install -y clang-format-21 && \ diff --git a/docker/Dockerfile.ros2.jazzy b/docker/Dockerfile.ros2.jazzy index e580b4f3..df6e2127 100644 --- a/docker/Dockerfile.ros2.jazzy +++ b/docker/Dockerfile.ros2.jazzy @@ -1,5 +1,10 @@ FROM ros:jazzy-ros-base +ARG AXON_APT_MIRROR=default +COPY docker/scripts/setup-apt-mirror.sh /usr/local/bin/setup-apt-mirror.sh +RUN chmod +x /usr/local/bin/setup-apt-mirror.sh && \ + /usr/local/bin/setup-apt-mirror.sh "${AXON_APT_MIRROR}" + # ============================================================================= # IMPORTANT: When adding new dependencies here, also update: # .github/workflows/ci.yml -> ADDITIONAL_DEBS @@ -26,9 +31,10 @@ RUN apt-get update && apt-get install -y \ # Install clang-format-21 from LLVM repository RUN . /etc/os-release && \ + LLVM_APT_MIRROR="$(cat /etc/apt/axon-llvm-mirror-url 2>/dev/null || echo https://apt.llvm.org)" && \ wget -qO- https://apt.llvm.org/llvm-snapshot.gpg.key | \ gpg --dearmor -o /usr/share/keyrings/apt.llvm.org.gpg && \ - echo "deb [signed-by=/usr/share/keyrings/apt.llvm.org.gpg] https://apt.llvm.org/${VERSION_CODENAME}/ llvm-toolchain-${VERSION_CODENAME}-21 main" > \ + echo "deb [signed-by=/usr/share/keyrings/apt.llvm.org.gpg] ${LLVM_APT_MIRROR}/${VERSION_CODENAME}/ llvm-toolchain-${VERSION_CODENAME}-21 main" > \ /etc/apt/sources.list.d/llvm-21.list && \ apt-get update && \ apt-get install -y clang-format-21 && \ diff --git a/docker/Dockerfile.ros2.rolling b/docker/Dockerfile.ros2.rolling index 5ca06fea..300ca74d 100644 --- a/docker/Dockerfile.ros2.rolling +++ b/docker/Dockerfile.ros2.rolling @@ -1,5 +1,10 @@ FROM ros:rolling-ros-base +ARG AXON_APT_MIRROR=default +COPY docker/scripts/setup-apt-mirror.sh /usr/local/bin/setup-apt-mirror.sh +RUN chmod +x /usr/local/bin/setup-apt-mirror.sh && \ + /usr/local/bin/setup-apt-mirror.sh "${AXON_APT_MIRROR}" + # ============================================================================= # IMPORTANT: When adding new dependencies here, also update: # .github/workflows/ci.yml -> ADDITIONAL_DEBS @@ -28,9 +33,10 @@ RUN apt-get update && apt-get install -y \ # Install clang-format-21 from LLVM repository RUN . /etc/os-release && \ + LLVM_APT_MIRROR="$(cat /etc/apt/axon-llvm-mirror-url 2>/dev/null || echo https://apt.llvm.org)" && \ wget -qO- https://apt.llvm.org/llvm-snapshot.gpg.key | \ gpg --dearmor -o /usr/share/keyrings/apt.llvm.org.gpg && \ - echo "deb [signed-by=/usr/share/keyrings/apt.llvm.org.gpg] https://apt.llvm.org/${VERSION_CODENAME}/ llvm-toolchain-${VERSION_CODENAME}-21 main" > \ + echo "deb [signed-by=/usr/share/keyrings/apt.llvm.org.gpg] ${LLVM_APT_MIRROR}/${VERSION_CODENAME}/ llvm-toolchain-${VERSION_CODENAME}-21 main" > \ /etc/apt/sources.list.d/llvm-21.list && \ apt-get update && \ apt-get install -y clang-format-21 && \ diff --git a/docker/scripts/setup-apt-mirror.sh b/docker/scripts/setup-apt-mirror.sh index ec5b4c70..d90f9971 100755 --- a/docker/scripts/setup-apt-mirror.sh +++ b/docker/scripts/setup-apt-mirror.sh @@ -10,6 +10,7 @@ ubuntu_mirror="$default_mirror" ubuntu_ports_mirror="http://ports.ubuntu.com/ubuntu-ports" ros1_mirror="http://packages.ros.org/ros/ubuntu" ros2_mirror="http://packages.ros.org/ros2/ubuntu" +llvm_mirror="https://apt.llvm.org" strip_ros_sources=0 case "$mirror" in @@ -19,6 +20,7 @@ case "$mirror" in ubuntu_mirror="http://mirrors.tuna.tsinghua.edu.cn/ubuntu" ros1_mirror="http://mirrors.tuna.tsinghua.edu.cn/ros/ubuntu" ros2_mirror="http://mirrors.tuna.tsinghua.edu.cn/ros2/ubuntu" + llvm_mirror="https://mirrors.tuna.tsinghua.edu.cn/llvm-apt" strip_ros_sources=1 ;; http://*|https://*) @@ -41,6 +43,7 @@ fi mkdir -p /etc/apt printf "%s\n" "$ubuntu_mirror" > /etc/apt/axon-ubuntu-mirror-url +printf "%s\n" "$llvm_mirror" > /etc/apt/axon-llvm-mirror-url mkdir -p /etc/apt/apt.conf.d cat > /etc/apt/apt.conf.d/80axon-retries < Date: Tue, 9 Jun 2026 18:18:23 +0800 Subject: [PATCH 4/5] ci(docker): mirror apt sources in local test images --- docker/Dockerfile.cpp-test | 8 ++++- docker/Dockerfile.zenoh | 5 +++ scripts/ci-docker-zenoh.sh | 13 +++++++- scripts/docker-apt-mirror-common.sh | 49 +++++++++++++++++++++++++++++ scripts/docker-test-cpp.sh | 7 +++++ scripts/docker-test-e2e.sh | 7 +++++ scripts/docker-test-ros.sh | 7 +++++ 7 files changed, 94 insertions(+), 2 deletions(-) create mode 100644 scripts/docker-apt-mirror-common.sh diff --git a/docker/Dockerfile.cpp-test b/docker/Dockerfile.cpp-test index b732b166..e0f1b53b 100644 --- a/docker/Dockerfile.cpp-test +++ b/docker/Dockerfile.cpp-test @@ -12,6 +12,11 @@ FROM ubuntu:22.04 +ARG AXON_APT_MIRROR=default +COPY docker/scripts/setup-apt-mirror.sh /usr/local/bin/setup-apt-mirror.sh +RUN chmod +x /usr/local/bin/setup-apt-mirror.sh && \ + /usr/local/bin/setup-apt-mirror.sh "${AXON_APT_MIRROR}" + # Prevent interactive prompts during package installation ENV DEBIAN_FRONTEND=noninteractive @@ -107,9 +112,10 @@ RUN apt-get update && apt-get install -y \ lsb-release wget software-properties-common gnupg \ && rm -rf /var/lib/apt/lists/* RUN . /etc/os-release && \ + LLVM_APT_MIRROR="$(cat /etc/apt/axon-llvm-mirror-url 2>/dev/null || echo https://apt.llvm.org)" && \ wget -qO- https://apt.llvm.org/llvm-snapshot.gpg.key | \ gpg --dearmor -o /usr/share/keyrings/apt.llvm.org.gpg && \ - echo "deb [signed-by=/usr/share/keyrings/apt.llvm.org.gpg] https://apt.llvm.org/${VERSION_CODENAME}/ llvm-toolchain-${VERSION_CODENAME}-21 main" > \ + echo "deb [signed-by=/usr/share/keyrings/apt.llvm.org.gpg] ${LLVM_APT_MIRROR}/${VERSION_CODENAME}/ llvm-toolchain-${VERSION_CODENAME}-21 main" > \ /etc/apt/sources.list.d/llvm-21.list && \ apt-get update && apt-get install -y \ clang-format-21 \ diff --git a/docker/Dockerfile.zenoh b/docker/Dockerfile.zenoh index 413b2349..12d8e7cb 100644 --- a/docker/Dockerfile.zenoh +++ b/docker/Dockerfile.zenoh @@ -4,6 +4,11 @@ FROM ubuntu:22.04 +ARG AXON_APT_MIRROR=default +COPY docker/scripts/setup-apt-mirror.sh /usr/local/bin/setup-apt-mirror.sh +RUN chmod +x /usr/local/bin/setup-apt-mirror.sh && \ + /usr/local/bin/setup-apt-mirror.sh "${AXON_APT_MIRROR}" + # Prevent interactive prompts during package installation ENV DEBIAN_FRONTEND=noninteractive diff --git a/scripts/ci-docker-zenoh.sh b/scripts/ci-docker-zenoh.sh index eee4f9c9..889a859b 100755 --- a/scripts/ci-docker-zenoh.sh +++ b/scripts/ci-docker-zenoh.sh @@ -29,6 +29,8 @@ DOCKER_IMAGE="axon:zenoh" RUN_COVERAGE=false RUN_INTEGRATION=false +source "${SCRIPT_DIR}/docker-apt-mirror-common.sh" + # Parse arguments while [[ $# -gt 0 ]]; do case $1 in @@ -62,6 +64,9 @@ echo -e "${BLUE}=============================================${NC}" echo -e "${BLUE}Axon Zenoh Plugin Local CI${NC}" echo -e "${BLUE}=============================================${NC}" echo "" +configure_axon_docker_apt_mirror +echo -e "Docker apt mirror: ${YELLOW}${AXON_DOCKER_APT_MIRROR}${NC}" +echo "" # ============================================================================= # Step 1: Build Docker Image @@ -69,7 +74,11 @@ echo "" echo -e "${YELLOW}Step 1: Building Zenoh Docker image...${NC}" if ! docker image inspect ${DOCKER_IMAGE} &>/dev/null; then echo -e "${YELLOW}Building new image...${NC}" - docker build -f "${PROJECT_ROOT}/docker/Dockerfile.zenoh" -t ${DOCKER_IMAGE} "${PROJECT_ROOT}" + docker build --network=host \ + "${DOCKER_APT_MIRROR_BUILD_ARGS[@]}" \ + -f "${PROJECT_ROOT}/docker/Dockerfile.zenoh" \ + -t ${DOCKER_IMAGE} \ + "${PROJECT_ROOT}" else echo -e "${GREEN}Docker image already exists. Use 'docker rmi ${DOCKER_IMAGE}' to rebuild.${NC}" fi @@ -83,6 +92,7 @@ echo -e "${YELLOW}Step 2: Running Zenoh plugin unit tests...${NC}" if [ "$RUN_COVERAGE" = true ]; then echo -e "${BLUE}Running with coverage instrumentation...${NC}" docker run --rm \ + --network host \ -v "${PROJECT_ROOT}:/workspace/axon" \ ${DOCKER_IMAGE} \ bash -c " @@ -120,6 +130,7 @@ if [ "$RUN_COVERAGE" = true ]; then else echo -e "${BLUE}Running without coverage...${NC}" docker run --rm \ + --network host \ -v "${PROJECT_ROOT}:/workspace/axon" \ ${DOCKER_IMAGE} \ bash -c " diff --git a/scripts/docker-apt-mirror-common.sh b/scripts/docker-apt-mirror-common.sh new file mode 100644 index 00000000..bd7b3d18 --- /dev/null +++ b/scripts/docker-apt-mirror-common.sh @@ -0,0 +1,49 @@ +#!/bin/bash +# SPDX-FileCopyrightText: 2026 ArcheBase +# SPDX-License-Identifier: MulanPSL-2.0 + +detect_axon_docker_country_code() { + local country="${AXON_DOCKER_COUNTRY:-${AXON_PACKAGE_COUNTRY:-}}" + + if [ -z "$country" ] && command -v curl &>/dev/null; then + country="$(curl -fsSL --max-time 8 https://ipinfo.io/country 2>/dev/null || true)" + fi + if [ -z "$country" ] && command -v curl &>/dev/null; then + country="$(curl -fsSL --max-time 8 https://ifconfig.co/country-iso 2>/dev/null || true)" + fi + + country="$(printf "%s" "$country" | tr -d '[:space:]' | tr '[:lower:]' '[:upper:]')" + printf "%.2s" "$country" +} + +resolve_axon_docker_apt_mirror() { + local requested="${AXON_APT_MIRROR:-${AXON_PACKAGE_APT_MIRROR:-auto}}" + local country="" + + case "$requested" in + ""|auto) + country="$(detect_axon_docker_country_code)" + if [ "$country" = "CN" ]; then + printf "%s" "tsinghua" + else + printf "%s" "default" + fi + ;; + cn|CN|china|China|tsinghua|tuna) + printf "%s" "tsinghua" + ;; + default|none|off|http://*|https://*) + printf "%s" "$requested" + ;; + *) + echo "Unsupported AXON_APT_MIRROR='${requested}', using default Ubuntu apt mirror" >&2 + printf "%s" "default" + ;; + esac +} + +configure_axon_docker_apt_mirror() { + AXON_DOCKER_APT_MIRROR="$(resolve_axon_docker_apt_mirror)" + export AXON_DOCKER_APT_MIRROR + DOCKER_APT_MIRROR_BUILD_ARGS=(--build-arg "AXON_APT_MIRROR=${AXON_DOCKER_APT_MIRROR}") +} diff --git a/scripts/docker-test-cpp.sh b/scripts/docker-test-cpp.sh index 73035c92..5c902fc7 100755 --- a/scripts/docker-test-cpp.sh +++ b/scripts/docker-test-cpp.sh @@ -27,6 +27,8 @@ PROJECT_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" RUN_COVERAGE=false IMAGE_NAME="axon:cpp-test" +source "${SCRIPT_DIR}/docker-apt-mirror-common.sh" + # Parse arguments while [[ $# -gt 0 ]]; do case $1 in @@ -55,14 +57,19 @@ echo -e "${BLUE}=============================================${NC}" echo -e "${BLUE}Docker C++ Library Tests${NC}" echo -e "${BLUE}=============================================${NC}" echo "" +configure_axon_docker_apt_mirror +echo -e "Docker apt mirror: ${YELLOW}${AXON_DOCKER_APT_MIRROR}${NC}" +echo "" # Build Docker image if not exists if ! docker image inspect ${IMAGE_NAME} &>/dev/null; then echo -e "${YELLOW}Building C++ test Docker image...${NC}" DOCKER_BUILDKIT=1 docker build \ + --network=host \ -f "${PROJECT_ROOT}/docker/Dockerfile.cpp-test" \ -t ${IMAGE_NAME} \ --build-arg BUILDKIT_INLINE_CACHE=1 \ + "${DOCKER_APT_MIRROR_BUILD_ARGS[@]}" \ "${PROJECT_ROOT}" echo -e "${GREEN}✓ C++ test image built${NC}" else diff --git a/scripts/docker-test-e2e.sh b/scripts/docker-test-e2e.sh index fdbf580b..7b02924c 100755 --- a/scripts/docker-test-e2e.sh +++ b/scripts/docker-test-e2e.sh @@ -30,6 +30,8 @@ PROJECT_ROOT="$(cd "${SCRIPT_DIR}/.." && pwd)" BUILD_IMAGE=false DISTRO="" +source "${SCRIPT_DIR}/docker-apt-mirror-common.sh" + # Parse arguments while [[ $# -gt 0 ]]; do case $1 in @@ -85,13 +87,18 @@ echo -e "${BLUE}=============================================${NC}" echo -e "${BLUE}Docker E2E Tests - ${DISTRO}${NC}" echo -e "${BLUE}=============================================${NC}" echo "" +configure_axon_docker_apt_mirror +echo -e "Docker apt mirror: ${YELLOW}${AXON_DOCKER_APT_MIRROR}${NC}" +echo "" # Build Docker image if requested or not exists if [ "$BUILD_IMAGE" = true ] || ! docker image inspect ${IMAGE_NAME} &>/dev/null; then echo -e "${YELLOW}Building Docker image for ${DISTRO}...${NC}" DOCKER_BUILDKIT=1 docker build \ + --network=host \ -f "${PROJECT_ROOT}/${DOCKERFILE}" \ -t ${IMAGE_NAME} \ + "${DOCKER_APT_MIRROR_BUILD_ARGS[@]}" \ "${PROJECT_ROOT}" || { echo -e "${RED}Build failed. See docker/TROUBLESHOOTING.md${NC}" exit 1 diff --git a/scripts/docker-test-ros.sh b/scripts/docker-test-ros.sh index bb49682a..af60e724 100755 --- a/scripts/docker-test-ros.sh +++ b/scripts/docker-test-ros.sh @@ -32,6 +32,8 @@ BUILD_IMAGE=false CLEAN_BUILD=false DISTRO="" +source "${SCRIPT_DIR}/docker-apt-mirror-common.sh" + # Parse arguments while [[ $# -gt 0 ]]; do case $1 in @@ -93,13 +95,18 @@ echo -e "${BLUE}=============================================${NC}" echo -e "${BLUE}Docker ROS Tests - ${DISTRO}${NC}" echo -e "${BLUE}=============================================${NC}" echo "" +configure_axon_docker_apt_mirror +echo -e "Docker apt mirror: ${YELLOW}${AXON_DOCKER_APT_MIRROR}${NC}" +echo "" # Build Docker image if requested or not exists if [ "$BUILD_IMAGE" = true ] || ! docker image inspect ${IMAGE_NAME} &>/dev/null; then echo -e "${YELLOW}Building Docker image for ${DISTRO}...${NC}" DOCKER_BUILDKIT=1 docker build \ + --network=host \ -f "${PROJECT_ROOT}/${DOCKERFILE}" \ -t ${IMAGE_NAME} \ + "${DOCKER_APT_MIRROR_BUILD_ARGS[@]}" \ "${PROJECT_ROOT}" || { echo -e "${RED}Build failed. See docker/TROUBLESHOOTING.md${NC}" exit 1 From d9276d81719fc1176da673f94ca3bdf4baf2386f Mon Sep 17 00:00:00 2001 From: stz Date: Tue, 9 Jun 2026 18:29:42 +0800 Subject: [PATCH 5/5] ci(zenoh): isolate local Docker build directory --- scripts/ci-docker-zenoh.sh | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/scripts/ci-docker-zenoh.sh b/scripts/ci-docker-zenoh.sh index 889a859b..b199612a 100755 --- a/scripts/ci-docker-zenoh.sh +++ b/scripts/ci-docker-zenoh.sh @@ -97,9 +97,9 @@ if [ "$RUN_COVERAGE" = true ]; then ${DOCKER_IMAGE} \ bash -c " cd /workspace/axon && \ - rm -rf build && \ - mkdir -p build && cd build && \ - cmake .. \ + rm -rf build/zenoh && \ + mkdir -p build/zenoh && cd build/zenoh && \ + cmake ../.. \ -DCMAKE_BUILD_TYPE=Debug \ -DAXON_ENABLE_COVERAGE=ON \ -DAXON_BUILD_ZENOH_PLUGIN=ON \ @@ -135,9 +135,9 @@ else ${DOCKER_IMAGE} \ bash -c " cd /workspace/axon && \ - rm -rf build && \ - mkdir -p build && cd build && \ - cmake .. \ + rm -rf build/zenoh && \ + mkdir -p build/zenoh && cd build/zenoh && \ + cmake ../.. \ -DCMAKE_BUILD_TYPE=Release \ -DAXON_BUILD_ZENOH_PLUGIN=ON \ -DAXON_BUILD_ROS1_PLUGIN=OFF \