From 303846a3e0401bbe4bed91cc1d9a2a5d37dc30d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Samuel=20Thor=C3=A9n?= Date: Mon, 20 Apr 2026 13:39:16 +0200 Subject: [PATCH 1/2] Add support for publishing position data from MQTT --- atos/launch/launch_with_mqtt.py | 41 + atos/modules/MQTTBridge/CMakeLists.txt | 26 +- atos/modules/MQTTBridge/inc/Imqtt2ros.hpp | 26 +- .../modules/MQTTBridge/inc/mqtt2ros_utils.hpp | 90 ++ atos/modules/MQTTBridge/inc/mqttbridge.hpp | 265 +++--- atos/modules/MQTTBridge/src/mqttbridge.cpp | 789 +++++++++--------- .../MQTTBridge/tests/mqtt2ros_tests.cpp | 114 +++ conf/conf/params.yaml | 1 + 8 files changed, 795 insertions(+), 557 deletions(-) create mode 100644 atos/launch/launch_with_mqtt.py create mode 100644 atos/modules/MQTTBridge/inc/mqtt2ros_utils.hpp create mode 100644 atos/modules/MQTTBridge/tests/mqtt2ros_tests.cpp diff --git a/atos/launch/launch_with_mqtt.py b/atos/launch/launch_with_mqtt.py new file mode 100644 index 000000000..66b4eeebb --- /dev/null +++ b/atos/launch/launch_with_mqtt.py @@ -0,0 +1,41 @@ +import os +import sys + +from ament_index_python.packages import get_package_prefix + +sys.path.insert( + 0, + os.path.join( # Need to modify the sys.path since we launch from the ros2 installed path + get_package_prefix("atos"), "share", "atos", "launch" + ), +) + +import launch_utils.launch_base as launch_base +from launch import LaunchDescription +from launch_ros.actions import Node + + +def get_mqtt_nodes(): + files = launch_base.get_files() + return [ + Node( + package="atos", + namespace="atos", + executable="mqtt_bridge", + name="mqtt_bridge", + # prefix=['gdbserver localhost:3000'], ## To use with VSC debugger + parameters=[files["params"]], + # arguments=["--ros-args", "--log-level", "debug"], # To get RCL_DEBUG prints + ), + ] + + +def generate_launch_description(): + base_nodes = launch_base.get_base_nodes() + + mqtt_nodes = get_mqtt_nodes() + + for node in mqtt_nodes: + base_nodes.append(node) + + return LaunchDescription(base_nodes) diff --git a/atos/modules/MQTTBridge/CMakeLists.txt b/atos/modules/MQTTBridge/CMakeLists.txt index 82ef49e7b..59d87764c 100644 --- a/atos/modules/MQTTBridge/CMakeLists.txt +++ b/atos/modules/MQTTBridge/CMakeLists.txt @@ -49,6 +49,30 @@ ament_target_dependencies(${MQTT_BRIDGE_TARGET} atos_interfaces ) +if(BUILD_TESTING) + find_package(ament_cmake_gtest REQUIRED) + set(TESTFILES "tests/mqtt2ros_tests.cpp") + + ament_add_gtest(mqtt_bridge_test ${TESTFILES}) + if(TARGET mqtt_bridge_test) + target_link_libraries(mqtt_bridge_test + ${ATOS_COMMON_LIBRARY} + ${nlohmann_json} + paho-mqttpp3 + paho-mqtt3as + fmt + ) + target_include_directories(mqtt_bridge_test PUBLIC + ${CMAKE_CURRENT_SOURCE_DIR}/inc + ${COMMON_HEADERS} + ) + ament_target_dependencies(mqtt_bridge_test + rclcpp + atos_interfaces + ) + endif() +endif() + # Installation rules install(CODE "MESSAGE(STATUS \"Installing target ${MQTT_BRIDGE_TARGET}\")") install(TARGETS ${MQTT_BRIDGE_TARGET} @@ -56,4 +80,4 @@ install(TARGETS ${MQTT_BRIDGE_TARGET} LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR} -) \ No newline at end of file +) diff --git a/atos/modules/MQTTBridge/inc/Imqtt2ros.hpp b/atos/modules/MQTTBridge/inc/Imqtt2ros.hpp index 17927c8b2..52daa363f 100644 --- a/atos/modules/MQTTBridge/inc/Imqtt2ros.hpp +++ b/atos/modules/MQTTBridge/inc/Imqtt2ros.hpp @@ -1,3 +1,5 @@ +#pragma once + #include #include @@ -7,16 +9,14 @@ * is the MQTT topic name. */ struct Mqtt2RosInterface { - struct { - int qos = 0; ///< MQTT QoS value - } mqtt; ///< MQTT-related variables - struct { - std::string topic; ///< ROS topic - std::string msg_type; ///< message type of publisher - rclcpp::Publisher::SharedPtr - publisher; ///< ROS publisher - int queue_size = 1; ///< ROS publisher queue size - bool is_stale = - false; ///< whether a new generic publisher/subscriber is required - } ros; ///< ROS-related variables -}; \ No newline at end of file + struct { + int qos = 0; ///< MQTT QoS value + } mqtt; ///< MQTT-related variables + struct { + std::string topic; ///< ROS topic + std::string msg_type; ///< message type of publisher + rclcpp::GenericPublisher::SharedPtr publisher; + int queue_size = 1; ///< ROS publisher queue size + bool is_stale = false; ///< whether a new generic publisher/subscriber is required + } ros; ///< ROS-related variables +}; diff --git a/atos/modules/MQTTBridge/inc/mqtt2ros_utils.hpp b/atos/modules/MQTTBridge/inc/mqtt2ros_utils.hpp new file mode 100644 index 000000000..a0ae60694 --- /dev/null +++ b/atos/modules/MQTTBridge/inc/mqtt2ros_utils.hpp @@ -0,0 +1,90 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +class Mqtt2RosUtils { + +public: + Mqtt2RosUtils() = delete; + + static sensor_msgs::msg::NavSatFix mqtt2navsatfix(const std::string& payload) { + std::string payload_lowercase = payload; + std::transform(payload_lowercase.begin(), payload_lowercase.end(), payload_lowercase.begin(), ::tolower); + + auto msg = sensor_msgs::msg::NavSatFix(); + msg.header.stamp = rclcpp::Clock(RCL_ROS_TIME).now(); + msg.latitude = extract_value_from_payload(payload_lowercase, {"lat", "latitude"}); + msg.longitude = extract_value_from_payload(payload_lowercase, {"lon", "lng", "long", "longitude"}); + msg.altitude = extract_value_from_payload(payload_lowercase, {"alt", "altitude"}); + + return msg; + } + + static rclcpp::SerializedMessage get_serialized_msg(const std::string& msg_type, const std::string& payload) { + if (msg_type == "std_msgs/msg/Empty") { + auto msg = std_msgs::msg::Empty(); + auto serialized = rclcpp::SerializedMessage(); + rclcpp::Serialization serializer; + serializer.serialize_message(&msg, &serialized); + return serialized; + + } else if (msg_type == "sensor_msgs/msg/NavSatFix") { + auto msg = mqtt2navsatfix(payload); + auto serialized = rclcpp::SerializedMessage(); + rclcpp::Serialization serializer; + serializer.serialize_message(&msg, &serialized); + return serialized; + } + + return rclcpp::SerializedMessage(); + } + +private: + static double extract_value_from_payload(const std::string& input, std::vector keys) { + // Lowercase input + std::string str = input; + std::transform(str.begin(), str.end(), str.begin(), ::tolower); + + // Sort keys by length to avoid wrong match + std::sort( + keys.begin(), keys.end(), [](const std::string& a, const std::string& b) { return a.size() > b.size(); }); + + // Build regex so we can match any of the keys (key1|key2|...) + std::string key_pattern; + for (size_t i{0}; i < keys.size(); ++i) { + key_pattern += keys.at(i); + if (i != keys.size() - 1) + key_pattern += "|"; + } + + // Extract numeric value for any matching key (supports JSON or key: value formats) + std::regex re{"(?:^|[\\s,{])\"?(" + key_pattern + ")\"?\\s*[:=]\\s*([+-]?\\d+(\\.\\d+)?([eE][+-]?\\d+)?)"}; + + std::smatch match; + double value{0.0}; + + // Iterate through all matches, keep the last one in case inputs are duplicated + auto begin{str.cbegin()}; + auto end{str.cend()}; + while (std::regex_search(begin, end, match, re)) { + try { + value = std::stod(match[2].str()); + } catch (...) { + value = 0.0; + } + begin = match.suffix().first; + } + + return value; + } +}; diff --git a/atos/modules/MQTTBridge/inc/mqttbridge.hpp b/atos/modules/MQTTBridge/inc/mqttbridge.hpp index b03d001e2..802d384fc 100644 --- a/atos/modules/MQTTBridge/inc/mqttbridge.hpp +++ b/atos/modules/MQTTBridge/inc/mqttbridge.hpp @@ -24,152 +24,135 @@ using json = nlohmann::json; * publisher */ -class MqttBridge : public Module, - public virtual mqtt::callback, - public virtual mqtt::iaction_listener { +class MqttBridge + : public Module + , public virtual mqtt::callback + , public virtual mqtt::iaction_listener { public: - MqttBridge(); - void initialize(); + MqttBridge(); + void initialize(); protected: - /** - * @brief Loads ROS parameters from parameter server. - */ - void loadParameters(); + /** + * @brief Loads ROS parameters from parameter server. + */ + void loadParameters(); - void connect(); + void connect(); - void newMqtt2RosBridge( - const std::shared_ptr - request, - std::shared_ptr - response); + void newMqtt2RosBridge(const std::shared_ptr request, + std::shared_ptr response); - void newRos2MqttBridge( - const std::shared_ptr - request, - std::shared_ptr - response); + void newRos2MqttBridge(const std::shared_ptr request, + std::shared_ptr response); private: - /** - * @brief MQTT2ROS connection variables sorted by MQTT topic - */ - std::map mqtt2ros_; - - /** - * @brief ROS2MQTT connection variables sorted by ROS topic - */ - std::map ros2mqtt_; - - /** @brief ROS Service server for providing dynamic MQTT to ROS mappings. - */ - rclcpp::Service::SharedPtr - new_mqtt2ros_bridge_service_; - - /** @brief ROS Service server for providing dynamic ROS to MQTT mappings. - */ - rclcpp::Service::SharedPtr - new_ros2mqtt_bridge_service_; - - /** - * @brief MQTT client variable - */ - std::shared_ptr client_; - - /** - * @brief MQTT client connection options - */ - mqtt::connect_options connect_options_; - - /** - * @brief Callback for when the client receives a MQTT message from the - * broker. - * - * Overrides mqtt::callback::message_arrived(mqtt::const_message_ptr). - * Publishes any empty ROS message to the corresponding ROS topic. - * - * @param mqtt_msg MQTT message - */ - void message_arrived(mqtt::const_message_ptr mqtt_msg) override; - - /** - * @brief Checks all active ROS topics in order to set up generic subscribers. - */ - void setupSubscriptions(); - - /** - * @brief Publishes a ROS message received via MQTT to ROS. - * - * @param mqtt_msg MQTT message - * @param arrival_stamp arrival timestamp used for latency computation - */ - void mqtt2ros(mqtt::const_message_ptr mqtt_msg, - const rclcpp::Time &arrival_stamp); - - /** - * @brief Publishes a MQTT message received via ROS to the MQTT broker. - * - * @param serialized_msg generic serialized ROS message - * @param ros_topic ROS topic where the message was published - */ - void - ros2mqtt(const std::shared_ptr &serialized_msg, - const std::string &ros_topic); - - /** - * @brief Callback for when a MQTT action succeeds. - * - * Overrides mqtt::iaction_listener::on_success(const mqtt::token&). - * Does nothing. - * - * @param token token tracking the action - */ - void on_success(const mqtt::token &token) override; - - /** - * @brief Callback for when a MQTT action fails. - * - * Overrides mqtt::iaction_listener::on_failure(const mqtt::token&). - * Logs error. - * - * @param token token tracking the action - */ - void on_failure(const mqtt::token &token) override; - - /** - * @brief Status variable keeping track of connection status to broker - */ - bool is_connected_ = false; - - static inline std::string const moduleName = "mqtt_bridge"; - constexpr static std::chrono::milliseconds SEND_INTERVAL = - std::chrono::milliseconds(5000); - std::string brokerIP; - int port; - std::string clientId; - std::string username; - std::string password; - std::string topic_prefix; - - ROSChannels::CustomCommandAction::Sub - customCommandActionMsgSub; //!< Subscriber to v2x messages requests - ROSChannels::StateChange::Sub - obcStateChangeSub; //!< Subscriber to object state change requests - - void setupClient(); - void setupMqtt2RosBridge(); - void setupRos2MqttBridge(); - void onCustomCommandActionMsg( - const ROSChannels::CustomCommandAction::message_type::SharedPtr); - void - onObcStateChangeMsg(const ROSChannels::StateChange::message_type::SharedPtr); - - template - void onMessage(T msg, std::string mqtt_topic, - std::function convertFunc); - - static json v2xToJson(const std::string v2x_msg); - static json obcStateChangeToJson( - const ROSChannels::StateChange::message_type::SharedPtr obc_msg); -}; \ No newline at end of file + /** + * @brief MQTT2ROS connection variables sorted by MQTT topic + */ + std::map mqtt2ros_; + + /** + * @brief ROS2MQTT connection variables sorted by ROS topic + */ + std::map ros2mqtt_; + + /** @brief ROS Service server for providing dynamic MQTT to ROS mappings. + */ + rclcpp::Service::SharedPtr new_mqtt2ros_bridge_service_; + + /** @brief ROS Service server for providing dynamic ROS to MQTT mappings. + */ + rclcpp::Service::SharedPtr new_ros2mqtt_bridge_service_; + + /** + * @brief MQTT client variable + */ + std::shared_ptr client_; + + /** + * @brief MQTT client connection options + */ + mqtt::connect_options connect_options_; + + /** + * @brief Callback for when the client receives a MQTT message from the + * broker. + * + * Overrides mqtt::callback::message_arrived(mqtt::const_message_ptr). + * Publishes any empty ROS message to the corresponding ROS topic. + * + * @param mqtt_msg MQTT message + */ + void message_arrived(mqtt::const_message_ptr mqtt_msg) override; + + /** + * @brief Checks all active ROS topics in order to set up generic subscribers. + */ + void setupSubscriptions(); + + /** + * @brief Publishes a ROS message received via MQTT to ROS. + * + * @param mqtt_msg MQTT message + * @param arrival_stamp arrival timestamp used for latency computation + */ + void mqtt2ros(mqtt::const_message_ptr mqtt_msg, const rclcpp::Time& arrival_stamp); + + /** + * @brief Publishes a MQTT message received via ROS to the MQTT broker. + * + * @param serialized_msg generic serialized ROS message + * @param ros_topic ROS topic where the message was published + */ + void ros2mqtt(const std::shared_ptr& serialized_msg, const std::string& ros_topic); + + /** + * @brief Callback for when a MQTT action succeeds. + * + * Overrides mqtt::iaction_listener::on_success(const mqtt::token&). + * Does nothing. + * + * @param token token tracking the action + */ + void on_success(const mqtt::token& token) override; + + /** + * @brief Callback for when a MQTT action fails. + * + * Overrides mqtt::iaction_listener::on_failure(const mqtt::token&). + * Logs error. + * + * @param token token tracking the action + */ + void on_failure(const mqtt::token& token) override; + + /** + * @brief Status variable keeping track of connection status to broker + */ + bool is_connected_ = false; + + static inline std::string const moduleName = "mqtt_bridge"; + constexpr static std::chrono::milliseconds SEND_INTERVAL = std::chrono::milliseconds(5000); + std::string brokerIP; + int port; + std::string clientId; + std::string username; + std::string password; + std::string topic_prefix; + + ROSChannels::CustomCommandAction::Sub customCommandActionMsgSub; //!< Subscriber to v2x messages requests + ROSChannels::StateChange::Sub obcStateChangeSub; //!< Subscriber to object state change requests + + void setupClient(); + void setupMqtt2RosBridge(); + void setupRos2MqttBridge(); + void onCustomCommandActionMsg(const ROSChannels::CustomCommandAction::message_type::SharedPtr); + void onObcStateChangeMsg(const ROSChannels::StateChange::message_type::SharedPtr); + + template + void onMessage(T msg, std::string mqtt_topic, std::function convertFunc); + + static json v2xToJson(const std::string v2x_msg); + static json obcStateChangeToJson(const ROSChannels::StateChange::message_type::SharedPtr obc_msg); +}; diff --git a/atos/modules/MQTTBridge/src/mqttbridge.cpp b/atos/modules/MQTTBridge/src/mqttbridge.cpp index 782a0a71a..4d1fecd2a 100644 --- a/atos/modules/MQTTBridge/src/mqttbridge.cpp +++ b/atos/modules/MQTTBridge/src/mqttbridge.cpp @@ -4,146 +4,158 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ #include "mqttbridge.hpp" + #include +#include +#include +#include + +#include "mqtt2ros_utils.hpp" + using namespace ROSChannels; using std::placeholders::_1; -MqttBridge::MqttBridge() - : Module(MqttBridge::moduleName), - customCommandActionMsgSub( - *this, std::bind(&MqttBridge::onCustomCommandActionMsg, this, _1)), - obcStateChangeSub(*this, - std::bind(&MqttBridge::onObcStateChangeMsg, this, _1)) { - this->loadParameters(); - this->initialize(); +MqttBridge::MqttBridge() : + Module(MqttBridge::moduleName), + customCommandActionMsgSub(*this, std::bind(&MqttBridge::onCustomCommandActionMsg, this, _1)), + obcStateChangeSub(*this, std::bind(&MqttBridge::onObcStateChangeMsg, this, _1)) { + this->loadParameters(); + this->initialize(); } void MqttBridge::loadParameters() { - declare_parameter("broker.host", ""); - declare_parameter("broker.port", 1883); - declare_parameter("client.username", ""); - declare_parameter("client.password", ""); - declare_parameter("client.id", ""); - declare_parameter("topic_prefix", "atos"); - - get_parameter("broker.host", brokerIP); - get_parameter("broker.port", port); - get_parameter("client.username", username); - get_parameter("client.password", password); - get_parameter("client.id", clientId); - get_parameter("topic_prefix", topic_prefix); - - rcl_interfaces::msg::ParameterDescriptor param_desc; - param_desc.description = "The list of topics to bridge from MQTT to ROS"; - const auto mqtt2ros_mqtt_topics = declare_parameter>( - "mqtt2ros.mqtt_topics", std::vector(), param_desc); - for (const auto &mqtt_topic : mqtt2ros_mqtt_topics) { - param_desc.description = - "ROS topic on which corresponding MQTT messages are published"; - declare_parameter(fmt::format("mqtt2ros.{}.ros_topic", mqtt_topic), - rclcpp::ParameterType::PARAMETER_STRING, param_desc); - param_desc.description = "MQTT QoS value"; - declare_parameter(fmt::format("mqtt2ros.{}.advanced.mqtt.qos", mqtt_topic), - rclcpp::ParameterType::PARAMETER_INTEGER, param_desc); - param_desc.description = "ROS publisher queue size"; - declare_parameter( - fmt::format("mqtt2ros.{}.advanced.ros.queue_size", mqtt_topic), - rclcpp::ParameterType::PARAMETER_INTEGER, param_desc); - } - - param_desc.description = "The list of topics to bridge from ROS to MQTT"; - const auto ros2mqtt_ros_topics = declare_parameter>( - "ros2mqtt.ros_topics", std::vector(), param_desc); - for (const auto &ros_topic : ros2mqtt_ros_topics) { - param_desc.description = "MQTT topic on which the corresponding ROS " - "messages are sent to the broker"; - declare_parameter(fmt::format("ros2mqtt.{}.mqtt_topic", ros_topic), - rclcpp::ParameterType::PARAMETER_STRING, param_desc); - param_desc.description = "ROS subscriber queue size"; - declare_parameter( - fmt::format("ros2mqtt.{}.advanced.ros.queue_size", ros_topic), - rclcpp::ParameterType::PARAMETER_INTEGER, param_desc); - param_desc.description = "MQTT QoS value"; - declare_parameter(fmt::format("ros2mqtt.{}.advanced.mqtt.qos", ros_topic), - rclcpp::ParameterType::PARAMETER_INTEGER, param_desc); - } - - // mqtt2ros - for (const auto &mqtt_topic : mqtt2ros_mqtt_topics) { - - rclcpp::Parameter ros_topic_param; - if (get_parameter(fmt::format("mqtt2ros.{}.ros_topic", mqtt_topic), - ros_topic_param)) { - - // mqtt2ros[k]/mqtt_topic and mqtt2ros[k]/ros_topic - const std::string ros_topic = ros_topic_param.as_string(); - Mqtt2RosInterface &mqtt2ros = mqtt2ros_[mqtt_topic]; - mqtt2ros.ros.topic = ros_topic; - - // mqtt2ros[k]/advanced/mqtt/qos - rclcpp::Parameter qos_param; - if (get_parameter( - fmt::format("mqtt2ros.{}.advanced.mqtt.qos", mqtt_topic), - qos_param)) { - mqtt2ros.mqtt.qos = qos_param.as_int(); - } - - // mqtt2ros[k]/advanced/ros/queue_size - rclcpp::Parameter queue_size_param; - if (get_parameter( - fmt::format("mqtt2ros.{}.advanced.ros.queue_size", mqtt_topic), - queue_size_param)) { - mqtt2ros.ros.queue_size = queue_size_param.as_int(); - } - - RCLCPP_INFO(get_logger(), "Bridging MQTT topic '%s' to ROS topic '%s'", - mqtt_topic.c_str(), mqtt2ros.ros.topic.c_str()); - } else { - RCLCPP_WARN(get_logger(), - fmt::format("Parameter 'ros2mqtt.{}' is missing subparameter " - "'ros_topic', will be ignored", - mqtt_topic) - .c_str()); - } - } - - // ros2mqtt - for (const auto &ros_topic : ros2mqtt_ros_topics) { - - rclcpp::Parameter mqtt_topic_param; - if (get_parameter(fmt::format("ros2mqtt.{}.mqtt_topic", ros_topic), - mqtt_topic_param)) { - - // ros2mqtt[k]/ros_topic and ros2mqtt[k]/mqtt_topic - const std::string mqtt_topic = mqtt_topic_param.as_string(); - Ros2MqttInterface &ros2mqtt = ros2mqtt_[ros_topic]; - ros2mqtt.mqtt.topic = mqtt_topic; - - // ros2mqtt[k]/advanced/ros/queue_size - rclcpp::Parameter queue_size_param; - if (get_parameter( - fmt::format("ros2mqtt.{}.advanced.ros.queue_size", ros_topic), - queue_size_param)) - ros2mqtt.ros.queue_size = queue_size_param.as_int(); - - // ros2mqtt[k]/advanced/mqtt/qos - rclcpp::Parameter qos_param; - if (get_parameter(fmt::format("ros2mqtt.{}.advanced.mqtt.qos", ros_topic), - qos_param)) - ros2mqtt.mqtt.qos = qos_param.as_int(); - - RCLCPP_INFO(get_logger(), "Bridging ROS topic '%s' to MQTT topic '%s'", - ros_topic.c_str(), ros2mqtt.mqtt.topic.c_str()); - } else { - RCLCPP_WARN(get_logger(), - fmt::format("Parameter 'ros2mqtt.{}' is missing subparameter " - "'mqtt_topic', will be ignored", - ros_topic) - .c_str()); - } - } + declare_parameter("broker.host", ""); + declare_parameter("broker.port", 1883); + declare_parameter("client.username", ""); + declare_parameter("client.password", ""); + declare_parameter("client.id", ""); + declare_parameter("topic_prefix", "atos"); + + get_parameter("broker.host", brokerIP); + get_parameter("broker.port", port); + get_parameter("client.username", username); + get_parameter("client.password", password); + get_parameter("client.id", clientId); + get_parameter("topic_prefix", topic_prefix); + + rcl_interfaces::msg::ParameterDescriptor param_desc; + param_desc.description = "The list of topics to bridge from MQTT to ROS"; + const auto mqtt2ros_mqtt_topics = + declare_parameter>("mqtt2ros.mqtt_topics", std::vector(), param_desc); + for (const auto& mqtt_topic : mqtt2ros_mqtt_topics) { + param_desc.description = "ROS topic on which corresponding MQTT messages are published"; + declare_parameter( + fmt::format("mqtt2ros.{}.ros_topic", mqtt_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); + param_desc.description = "ROS message type for this bridge"; + declare_parameter( + fmt::format("mqtt2ros.{}.msg_type", mqtt_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); + param_desc.description = "MQTT QoS value"; + declare_parameter(fmt::format("mqtt2ros.{}.advanced.mqtt.qos", mqtt_topic), + rclcpp::ParameterType::PARAMETER_INTEGER, + param_desc); + param_desc.description = "ROS publisher queue size"; + declare_parameter(fmt::format("mqtt2ros.{}.advanced.ros.queue_size", mqtt_topic), + rclcpp::ParameterType::PARAMETER_INTEGER, + param_desc); + } + + param_desc.description = "The list of topics to bridge from ROS to MQTT"; + const auto ros2mqtt_ros_topics = + declare_parameter>("ros2mqtt.ros_topics", std::vector(), param_desc); + for (const auto& ros_topic : ros2mqtt_ros_topics) { + param_desc.description = "MQTT topic on which the corresponding ROS " + "messages are sent to the broker"; + declare_parameter( + fmt::format("ros2mqtt.{}.mqtt_topic", ros_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); + param_desc.description = "ROS subscriber queue size"; + declare_parameter(fmt::format("ros2mqtt.{}.advanced.ros.queue_size", ros_topic), + rclcpp::ParameterType::PARAMETER_INTEGER, + param_desc); + param_desc.description = "MQTT QoS value"; + declare_parameter(fmt::format("ros2mqtt.{}.advanced.mqtt.qos", ros_topic), + rclcpp::ParameterType::PARAMETER_INTEGER, + param_desc); + } + + // mqtt2ros + for (const auto& mqtt_topic : mqtt2ros_mqtt_topics) { + + rclcpp::Parameter ros_topic_param; + if (get_parameter(fmt::format("mqtt2ros.{}.ros_topic", mqtt_topic), ros_topic_param)) { + + // mqtt2ros[k]/mqtt_topic and mqtt2ros[k]/ros_topic + const std::string ros_topic = ros_topic_param.as_string(); + Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic]; + mqtt2ros.ros.topic = ros_topic; + + rclcpp::Parameter msg_type_param; + if (get_parameter(fmt::format("mqtt2ros.{}.msg_type", mqtt_topic), msg_type_param)) { + mqtt2ros.ros.msg_type = msg_type_param.as_string(); + } else { + mqtt2ros.ros.msg_type = "std_msgs/msg/Empty"; // default if no parameter is set + } + + // mqtt2ros[k]/advanced/mqtt/qos + rclcpp::Parameter qos_param; + if (get_parameter(fmt::format("mqtt2ros.{}.advanced.mqtt.qos", mqtt_topic), qos_param)) { + mqtt2ros.mqtt.qos = qos_param.as_int(); + } + + // mqtt2ros[k]/advanced/ros/queue_size + rclcpp::Parameter queue_size_param; + if (get_parameter(fmt::format("mqtt2ros.{}.advanced.ros.queue_size", mqtt_topic), queue_size_param)) { + mqtt2ros.ros.queue_size = queue_size_param.as_int(); + } + + RCLCPP_INFO(get_logger(), + "Bridging MQTT topic '%s' to ROS topic '%s' with message type '%s'", + mqtt_topic.c_str(), + mqtt2ros.ros.topic.c_str(), + mqtt2ros.ros.msg_type.c_str()); + } else { + RCLCPP_WARN(get_logger(), + fmt::format("Parameter 'ros2mqtt.{}' is missing subparameter " + "'ros_topic', will be ignored", + mqtt_topic) + .c_str()); + } + } + + // ros2mqtt + for (const auto& ros_topic : ros2mqtt_ros_topics) { + + rclcpp::Parameter mqtt_topic_param; + if (get_parameter(fmt::format("ros2mqtt.{}.mqtt_topic", ros_topic), mqtt_topic_param)) { + + // ros2mqtt[k]/ros_topic and ros2mqtt[k]/mqtt_topic + const std::string mqtt_topic = mqtt_topic_param.as_string(); + Ros2MqttInterface& ros2mqtt = ros2mqtt_[ros_topic]; + ros2mqtt.mqtt.topic = mqtt_topic; + + // ros2mqtt[k]/advanced/ros/queue_size + rclcpp::Parameter queue_size_param; + if (get_parameter(fmt::format("ros2mqtt.{}.advanced.ros.queue_size", ros_topic), queue_size_param)) + ros2mqtt.ros.queue_size = queue_size_param.as_int(); + + // ros2mqtt[k]/advanced/mqtt/qos + rclcpp::Parameter qos_param; + if (get_parameter(fmt::format("ros2mqtt.{}.advanced.mqtt.qos", ros_topic), qos_param)) + ros2mqtt.mqtt.qos = qos_param.as_int(); + + RCLCPP_INFO(get_logger(), + "Bridging ROS topic '%s' to MQTT topic '%s'", + ros_topic.c_str(), + ros2mqtt.mqtt.topic.c_str()); + } else { + RCLCPP_WARN(get_logger(), + fmt::format("Parameter 'ros2mqtt.{}' is missing subparameter " + "'mqtt_topic', will be ignored", + ros_topic) + .c_str()); + } + } } /*! @@ -151,322 +163,295 @@ void MqttBridge::loadParameters() { * with ros parameter settings. */ void MqttBridge::initialize() { - if (this->brokerIP.empty()) { - RCLCPP_INFO(this->get_logger(), - "No Broker IP provided in configuration. Shutting down..."); - rclcpp::shutdown(); - } else { - this->setupClient(); - this->connect(); - // Wait for all ros topics to be announced - rclcpp::sleep_for(std::chrono::seconds(1)); - this->setupMqtt2RosBridge(); - this->setupRos2MqttBridge(); - } + if (this->brokerIP.empty()) { + RCLCPP_INFO(this->get_logger(), "No Broker IP provided in configuration. Shutting down..."); + rclcpp::shutdown(); + } else { + this->setupClient(); + this->connect(); + // Wait for all ros topics to be announced + rclcpp::sleep_for(std::chrono::seconds(1)); + this->setupMqtt2RosBridge(); + this->setupRos2MqttBridge(); + } } void MqttBridge::setupMqtt2RosBridge() { - new_mqtt2ros_bridge_service_ = - create_service( - "~/new_mqtt2ros_bridge", - std::bind(&MqttBridge::newMqtt2RosBridge, this, std::placeholders::_1, - std::placeholders::_2)); - - // Create a request and response for each mqtt2ros bridge and put into a - // map - auto serviceCallMap = std::map< - std::shared_ptr, - std::shared_ptr>(); - for (const auto &mqtt2ros : mqtt2ros_) { - atos_interfaces::srv::NewMqtt2RosBridge::Request::SharedPtr request = - std::make_shared(); - request->mqtt_topic = mqtt2ros.first; - request->ros_topic = mqtt2ros.second.ros.topic; - request->mqtt_qos = mqtt2ros.second.mqtt.qos; - request->ros_queue_size = mqtt2ros.second.ros.queue_size; - atos_interfaces::srv::NewMqtt2RosBridge::Response::SharedPtr response = - std::make_shared(); - serviceCallMap[response] = request; - } - - // Loop through the map until all response objects are true - while (std::any_of(serviceCallMap.begin(), serviceCallMap.end(), - [](const auto &pair) { return !pair.first->success; })) { - // Get all unsuccesful responses - auto failedResponses = std::vector< - std::shared_ptr>(); - for (const auto &pair : serviceCallMap) { - if (!pair.first->success) { - failedResponses.push_back(pair.first); - } - } - // Retry all failed responses - for (const auto &response : failedResponses) { - newMqtt2RosBridge(serviceCallMap[response], response); - } - // Wait for a second before retrying - rclcpp::sleep_for(std::chrono::seconds(1)); - } + new_mqtt2ros_bridge_service_ = create_service( + "~/new_mqtt2ros_bridge", + std::bind(&MqttBridge::newMqtt2RosBridge, this, std::placeholders::_1, std::placeholders::_2)); + + // Create a request and response for each mqtt2ros bridge and put into a + // map + auto serviceCallMap = std::map, + std::shared_ptr>(); + for (const auto& mqtt2ros : mqtt2ros_) { + atos_interfaces::srv::NewMqtt2RosBridge::Request::SharedPtr request = + std::make_shared(); + request->mqtt_topic = mqtt2ros.first; + request->ros_topic = mqtt2ros.second.ros.topic; + request->mqtt_qos = mqtt2ros.second.mqtt.qos; + request->ros_queue_size = mqtt2ros.second.ros.queue_size; + atos_interfaces::srv::NewMqtt2RosBridge::Response::SharedPtr response = + std::make_shared(); + serviceCallMap[response] = request; + } + + // Loop through the map until all response objects are true + while (std::any_of( + serviceCallMap.begin(), serviceCallMap.end(), [](const auto& pair) { return !pair.first->success; })) { + // Get all unsuccesful responses + auto failedResponses = std::vector>(); + for (const auto& pair : serviceCallMap) { + if (!pair.first->success) { + failedResponses.push_back(pair.first); + } + } + // Retry all failed responses + for (const auto& response : failedResponses) { + newMqtt2RosBridge(serviceCallMap[response], response); + } + // Wait for a second before retrying + rclcpp::sleep_for(std::chrono::seconds(1)); + } } void MqttBridge::setupRos2MqttBridge() { - new_ros2mqtt_bridge_service_ = - create_service( - "~/new_ros2mqtt_bridge", - std::bind(&MqttBridge::newRos2MqttBridge, this, std::placeholders::_1, - std::placeholders::_2)); - - // setup subscribers - this->setupSubscriptions(); + new_ros2mqtt_bridge_service_ = create_service( + "~/new_ros2mqtt_bridge", + std::bind(&MqttBridge::newRos2MqttBridge, this, std::placeholders::_1, std::placeholders::_2)); + + // setup subscribers + this->setupSubscriptions(); } void MqttBridge::setupSubscriptions() { - // get info of all topics - const auto all_topics_and_types = get_topic_names_and_types(); - // check for ros2mqtt topics - for (auto &[ros_topic, ros2mqtt] : ros2mqtt_) { - if (all_topics_and_types.count(ros_topic)) { - // check if message type has changed or if mapping is stale - const std::string &msg_type = all_topics_and_types.at(ros_topic)[0]; - if (msg_type == ros2mqtt.ros.msg_type && !ros2mqtt.ros.is_stale) - continue; - ros2mqtt.ros.is_stale = false; - ros2mqtt.ros.msg_type = msg_type; - - // create new generic subscription, if message type has changed - std::function msg)> - bound_callback_func = std::bind(&MqttBridge::ros2mqtt, this, - std::placeholders::_1, ros_topic); - try { - ros2mqtt.ros.subscriber = create_generic_subscription( - ros_topic, msg_type, ros2mqtt.ros.queue_size, bound_callback_func); - } catch (rclcpp::exceptions::RCLError &e) { - RCLCPP_ERROR(get_logger(), "Failed to create generic subscriber: %s", - e.what()); - return; - } - RCLCPP_INFO(get_logger(), "Subscribed to ROS topic '%s' of type '%s'", - ros_topic.c_str(), msg_type.c_str()); - } - } + // get info of all topics + const auto all_topics_and_types = get_topic_names_and_types(); + // check for ros2mqtt topics + for (auto& [ros_topic, ros2mqtt] : ros2mqtt_) { + if (all_topics_and_types.count(ros_topic)) { + // check if message type has changed or if mapping is stale + const std::string& msg_type = all_topics_and_types.at(ros_topic)[0]; + if (msg_type == ros2mqtt.ros.msg_type && !ros2mqtt.ros.is_stale) + continue; + ros2mqtt.ros.is_stale = false; + ros2mqtt.ros.msg_type = msg_type; + + // create new generic subscription, if message type has changed + std::function msg)> bound_callback_func = + std::bind(&MqttBridge::ros2mqtt, this, std::placeholders::_1, ros_topic); + try { + ros2mqtt.ros.subscriber = + create_generic_subscription(ros_topic, msg_type, ros2mqtt.ros.queue_size, bound_callback_func); + } catch (rclcpp::exceptions::RCLError& e) { + RCLCPP_ERROR(get_logger(), "Failed to create generic subscriber: %s", e.what()); + return; + } + RCLCPP_INFO(get_logger(), "Subscribed to ROS topic '%s' of type '%s'", ros_topic.c_str(), msg_type.c_str()); + } + } } -void MqttBridge::ros2mqtt( - const std::shared_ptr &serialized_msg, - const std::string &ros_topic) { - - Ros2MqttInterface &ros2mqtt = ros2mqtt_[ros_topic]; - std::string mqtt_topic = ros2mqtt.mqtt.topic; - std::vector payload_buffer; - - RCLCPP_INFO(get_logger(), "Received ROS message on topic '%s'", - ros_topic.c_str()); - - try { - mqtt::message_ptr mqtt_msg = - mqtt::make_message(mqtt_topic, payload_buffer.data(), - payload_buffer.size(), ros2mqtt.mqtt.qos, true); - client_->publish(mqtt_msg); - RCLCPP_INFO(get_logger(), "Published ROS message to MQTT topic '%s'", - mqtt_topic.c_str()); - } catch (const mqtt::exception &e) { - RCLCPP_WARN( - get_logger(), - "Publishing ROS message type information to MQTT topic '%s' failed: %s", - mqtt_topic.c_str(), e.what()); - } +void MqttBridge::ros2mqtt(const std::shared_ptr& serialized_msg, + const std::string& ros_topic) { + + Ros2MqttInterface& ros2mqtt = ros2mqtt_[ros_topic]; + std::string mqtt_topic = ros2mqtt.mqtt.topic; + std::vector payload_buffer; + + RCLCPP_INFO(get_logger(), "Received ROS message on topic '%s'", ros_topic.c_str()); + + try { + mqtt::message_ptr mqtt_msg = + mqtt::make_message(mqtt_topic, payload_buffer.data(), payload_buffer.size(), ros2mqtt.mqtt.qos, true); + client_->publish(mqtt_msg); + RCLCPP_INFO(get_logger(), "Published ROS message to MQTT topic '%s'", mqtt_topic.c_str()); + } catch (const mqtt::exception& e) { + RCLCPP_WARN(get_logger(), + "Publishing ROS message type information to MQTT topic '%s' failed: %s", + mqtt_topic.c_str(), + e.what()); + } } void MqttBridge::setupClient() { - RCLCPP_INFO(this->get_logger(), - "Setting up connection with clientID: %s, and broker IP: %s", - clientId.c_str(), brokerIP.c_str()); - - // basic client connection options - connect_options_.set_automatic_reconnect(true); - connect_options_.set_clean_session(true); - connect_options_.set_keep_alive_interval(5); - - // user authentication - if (!username.empty()) { - connect_options_.set_user_name(username); - connect_options_.set_password(password); - } - - auto timeNanoseconds = - std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - auto id = clientId + "_" + std::to_string(timeNanoseconds); - - const std::string uri = fmt::format("{}://{}:{}", "tcp", brokerIP, port); - try { - client_ = - std::shared_ptr(new mqtt::async_client(uri, id)); - } catch (const mqtt::exception &e) { - RCLCPP_ERROR(get_logger(), "Client could not be initialized: %s", e.what()); - exit(EXIT_FAILURE); - } - - // setup MQTT callbacks - client_->set_callback(*this); + RCLCPP_INFO(this->get_logger(), + "Setting up connection with clientID: %s, and broker IP: %s", + clientId.c_str(), + brokerIP.c_str()); + + // basic client connection options + connect_options_.set_automatic_reconnect(true); + connect_options_.set_clean_session(true); + connect_options_.set_keep_alive_interval(5); + + // user authentication + if (!username.empty()) { + connect_options_.set_user_name(username); + connect_options_.set_password(password); + } + + auto timeNanoseconds = + std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + auto id = clientId + "_" + std::to_string(timeNanoseconds); + + const std::string uri = fmt::format("{}://{}:{}", "tcp", brokerIP, port); + try { + client_ = std::shared_ptr(new mqtt::async_client(uri, id)); + } catch (const mqtt::exception& e) { + RCLCPP_ERROR(get_logger(), "Client could not be initialized: %s", e.what()); + exit(EXIT_FAILURE); + } + + // setup MQTT callbacks + client_->set_callback(*this); } void MqttBridge::connect() { - std::string as_client = - clientId.empty() ? "" - : std::string(" as '") + clientId + std::string("'"); - RCLCPP_INFO(get_logger(), "Connecting to broker at '%s'%s ...", - client_->get_server_uri().c_str(), as_client.c_str()); - - try { - client_->connect(connect_options_, nullptr, *this); - } catch (const mqtt::exception &e) { - RCLCPP_ERROR(get_logger(), "Connection to broker failed: %s", e.what()); - exit(EXIT_FAILURE); - } + std::string as_client = clientId.empty() ? "" : std::string(" as '") + clientId + std::string("'"); + RCLCPP_INFO( + get_logger(), "Connecting to broker at '%s'%s ...", client_->get_server_uri().c_str(), as_client.c_str()); + + try { + client_->connect(connect_options_, nullptr, *this); + } catch (const mqtt::exception& e) { + RCLCPP_ERROR(get_logger(), "Connection to broker failed: %s", e.what()); + exit(EXIT_FAILURE); + } } -void MqttBridge::newMqtt2RosBridge( - atos_interfaces::srv::NewMqtt2RosBridge::Request::SharedPtr request, - atos_interfaces::srv::NewMqtt2RosBridge::Response::SharedPtr response) { - - // add mapping definition to mqtt2ros_ - Mqtt2RosInterface &mqtt2ros = mqtt2ros_[request->mqtt_topic]; - mqtt2ros.ros.is_stale = true; - mqtt2ros.ros.topic = request->ros_topic; - mqtt2ros.mqtt.qos = request->mqtt_qos; - mqtt2ros.ros.publisher = this->create_publisher( - mqtt2ros.ros.topic, request->ros_queue_size); - mqtt2ros.ros.queue_size = request->ros_queue_size; - - RCLCPP_DEBUG(get_logger(), "Bridging MQTT topic '%s' to ROS topic '%s'", - request->mqtt_topic.c_str(), mqtt2ros.ros.topic.c_str()); - - // subscribe to the MQTT topic - std::string mqtt_topic_to_subscribe = request->mqtt_topic; - try { - client_->subscribe(mqtt_topic_to_subscribe, mqtt2ros.mqtt.qos); - RCLCPP_INFO(get_logger(), "Subscribed to MQTT topic '%s'", - mqtt_topic_to_subscribe.c_str()); - response->success = true; - } - // Catch exception if the topic is already subscribed - catch (const mqtt::exception &e) { - RCLCPP_WARN(get_logger(), "Failed to subscribe MQTT topic '%s': %s", - mqtt_topic_to_subscribe.c_str(), e.what()); - response->success = false; - } +void MqttBridge::newMqtt2RosBridge(atos_interfaces::srv::NewMqtt2RosBridge::Request::SharedPtr request, + atos_interfaces::srv::NewMqtt2RosBridge::Response::SharedPtr response) { + + // add mapping definition to mqtt2ros_ + Mqtt2RosInterface& mqtt2ros = mqtt2ros_[request->mqtt_topic]; + mqtt2ros.ros.is_stale = true; + mqtt2ros.ros.topic = request->ros_topic; + mqtt2ros.mqtt.qos = request->mqtt_qos; + mqtt2ros.ros.publisher = + this->create_generic_publisher(mqtt2ros.ros.topic, mqtt2ros.ros.msg_type, mqtt2ros.ros.queue_size); + mqtt2ros.ros.queue_size = request->ros_queue_size; + + RCLCPP_DEBUG(get_logger(), + "Bridging MQTT topic '%s' to ROS topic '%s'", + request->mqtt_topic.c_str(), + mqtt2ros.ros.topic.c_str()); + + // subscribe to the MQTT topic + std::string mqtt_topic_to_subscribe = request->mqtt_topic; + try { + client_->subscribe(mqtt_topic_to_subscribe, mqtt2ros.mqtt.qos); + RCLCPP_INFO(get_logger(), "Subscribed to MQTT topic '%s'", mqtt_topic_to_subscribe.c_str()); + response->success = true; + } + // Catch exception if the topic is already subscribed + catch (const mqtt::exception& e) { + RCLCPP_WARN(get_logger(), "Failed to subscribe MQTT topic '%s': %s", mqtt_topic_to_subscribe.c_str(), e.what()); + response->success = false; + } } -void MqttBridge::newRos2MqttBridge( - atos_interfaces::srv::NewRos2MqttBridge::Request::SharedPtr request, - atos_interfaces::srv::NewRos2MqttBridge::Response::SharedPtr response) { +void MqttBridge::newRos2MqttBridge(atos_interfaces::srv::NewRos2MqttBridge::Request::SharedPtr request, + atos_interfaces::srv::NewRos2MqttBridge::Response::SharedPtr response) { - // add mapping definition to ros2mqtt_ - Ros2MqttInterface &ros2mqtt = ros2mqtt_[request->ros_topic]; - ros2mqtt.ros.is_stale = true; - ros2mqtt.ros.topic = request->ros_topic; - ros2mqtt.mqtt.topic = request->mqtt_topic; - ros2mqtt.mqtt.qos = request->mqtt_qos; + // add mapping definition to ros2mqtt_ + Ros2MqttInterface& ros2mqtt = ros2mqtt_[request->ros_topic]; + ros2mqtt.ros.is_stale = true; + ros2mqtt.ros.topic = request->ros_topic; + ros2mqtt.mqtt.topic = request->mqtt_topic; + ros2mqtt.mqtt.qos = request->mqtt_qos; - RCLCPP_DEBUG(get_logger(), "Bridging ROS topic '%s' to MQTT topic '%s'", - ros2mqtt.ros.topic.c_str(), ros2mqtt.mqtt.topic.c_str()); + RCLCPP_DEBUG(get_logger(), + "Bridging ROS topic '%s' to MQTT topic '%s'", + ros2mqtt.ros.topic.c_str(), + ros2mqtt.mqtt.topic.c_str()); - // setup ROS subscriptions - setupSubscriptions(); + // setup ROS subscriptions + setupSubscriptions(); - response->success = true; + response->success = true; } void MqttBridge::onCustomCommandActionMsg( - const CustomCommandAction::message_type::SharedPtr - custom_command_action_msg) { - if (custom_command_action_msg->type == "v2x") { - RCLCPP_INFO(this->get_logger(), "Received V2X message on %s topic", - CustomCommandAction::topicName.c_str()); - this->onMessage(custom_command_action_msg->content, "v2x", - v2xToJson); - } + const CustomCommandAction::message_type::SharedPtr custom_command_action_msg) { + if (custom_command_action_msg->type == "v2x") { + RCLCPP_INFO(this->get_logger(), "Received V2X message on %s topic", CustomCommandAction::topicName.c_str()); + this->onMessage(custom_command_action_msg->content, "v2x", v2xToJson); + } } -void MqttBridge::onObcStateChangeMsg( - const StateChange::message_type::SharedPtr obc_msg) { - this->onMessage(obc_msg, "state", - obcStateChangeToJson); +void MqttBridge::onObcStateChangeMsg(const StateChange::message_type::SharedPtr obc_msg) { + this->onMessage(obc_msg, "state", obcStateChangeToJson); } void MqttBridge::message_arrived(mqtt::const_message_ptr mqtt_msg) { - std::string mqtt_topic = mqtt_msg->get_topic(); - RCLCPP_DEBUG(get_logger(), "Received MQTT message on topic '%s'", - mqtt_topic.c_str()); - Mqtt2RosInterface &mqtt2ros = mqtt2ros_[mqtt_topic]; - - // Publish empty message to ROS topic - mqtt2ros.ros.publisher->publish(std_msgs::msg::Empty()); - RCLCPP_DEBUG(get_logger(), "Published empty message to ROS topic '%s'", - mqtt2ros.ros.topic.c_str()); + std::string mqtt_topic = mqtt_msg->get_topic(); + RCLCPP_DEBUG(get_logger(), "Received MQTT message on topic '%s'", mqtt_topic.c_str()); + Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic]; + + // Publish empty message to ROS topic + const auto msg = Mqtt2RosUtils::get_serialized_msg(mqtt2ros.ros.msg_type, mqtt_msg->get_payload_str()); + mqtt2ros.ros.publisher->publish(msg); + RCLCPP_DEBUG(get_logger(), "Published empty message to ROS topic '%s'", mqtt2ros.ros.topic.c_str()); } -template -void MqttBridge::onMessage(T msg, std::string mqtt_topic, - std::function convertFunc) { - json payload = convertFunc(msg); - try { - RCLCPP_DEBUG(this->get_logger(), "Publishing MQTT msg to broker %s", - payload.dump().c_str()); - client_->publish(mqtt_topic, payload.dump().c_str(), payload.dump().size()); - } catch (std::runtime_error &) { - RCLCPP_ERROR(this->get_logger(), "Failed to publish MQTT message"); - } +template +void MqttBridge::onMessage(T msg, std::string mqtt_topic, std::function convertFunc) { + json payload = convertFunc(msg); + try { + RCLCPP_DEBUG(this->get_logger(), "Publishing MQTT msg to broker %s", payload.dump().c_str()); + client_->publish(mqtt_topic, payload.dump().c_str(), payload.dump().size()); + } catch (std::runtime_error&) { + RCLCPP_ERROR(this->get_logger(), "Failed to publish MQTT message"); + } } json MqttBridge::v2xToJson(std::string msg_content) { - std::replace(msg_content.begin(), msg_content.end(), '\'', - '"'); // Replace ROS single quotes string with double quotes to - // make it a valid JSON string - json j; - // Parse the string to a json object - try { - j = json::parse(msg_content); - } catch (json::parse_error &e) { - std::cerr << "Failed to parse JSON object: " << e.what() << std::endl; - } - - if (j.find("message_type") == j.end()) { - std::cerr << "No message_type field in JSON object" << std::endl; - return j; - } - if (j["message_type"] == "DENM") { - j["detection_time"] = - std::chrono::system_clock::now().time_since_epoch().count(); - } - return j; + std::replace(msg_content.begin(), + msg_content.end(), + '\'', + '"'); // Replace ROS single quotes string with double quotes to + // make it a valid JSON string + json j; + // Parse the string to a json object + try { + j = json::parse(msg_content); + } catch (json::parse_error& e) { + std::cerr << "Failed to parse JSON object: " << e.what() << std::endl; + } + + if (j.find("message_type") == j.end()) { + std::cerr << "No message_type field in JSON object" << std::endl; + return j; + } + if (j["message_type"] == "DENM") { + j["detection_time"] = std::chrono::system_clock::now().time_since_epoch().count(); + } + return j; } -json MqttBridge::obcStateChangeToJson( - const StateChange::message_type::SharedPtr obc_msg) { - json j; - j["current_state"] = obc_msg->current_state; - j["prev_state"] = obc_msg->prev_state; - return j; +json MqttBridge::obcStateChangeToJson(const StateChange::message_type::SharedPtr obc_msg) { + json j; + j["current_state"] = obc_msg->current_state; + j["prev_state"] = obc_msg->prev_state; + return j; } -void MqttBridge::on_success(const mqtt::token &token) { +void MqttBridge::on_success(const mqtt::token& token) { - (void)token; // Avoid compiler warning for unused parameter. - RCLCPP_INFO(get_logger(), "Connected to broker successfully"); - is_connected_ = true; + (void)token; // Avoid compiler warning for unused parameter. + RCLCPP_INFO(get_logger(), "Connected to broker successfully"); + is_connected_ = true; } -void MqttBridge::on_failure(const mqtt::token &token) { +void MqttBridge::on_failure(const mqtt::token& token) { - RCLCPP_ERROR( - get_logger(), - "Connection to broker failed (return code %d), will automatically " - "retry...", - token.get_return_code()); - is_connected_ = false; -} \ No newline at end of file + RCLCPP_ERROR(get_logger(), + "Connection to broker failed (return code %d), will automatically " + "retry...", + token.get_return_code()); + is_connected_ = false; +} diff --git a/atos/modules/MQTTBridge/tests/mqtt2ros_tests.cpp b/atos/modules/MQTTBridge/tests/mqtt2ros_tests.cpp new file mode 100644 index 000000000..ef354b089 --- /dev/null +++ b/atos/modules/MQTTBridge/tests/mqtt2ros_tests.cpp @@ -0,0 +1,114 @@ +#include + +#include + +#include "mqtt2ros_utils.hpp" + +TEST(Mqtt2RosNavSatParser, PayloadStringFullKeys) { + std::string payload{"latitude: 1.0, longitude: 2.0, altitude: 3.0"}; + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 2.0); + EXPECT_EQ(msg.altitude, 3.0); +} + +TEST(Mqtt2RosNavSatParser, PayloadStringShortKeys) { + std::string payload{"lat: 1.0, lon: 2.0, alt: 3.0"}; + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 2.0); + EXPECT_EQ(msg.altitude, 3.0); +} + +TEST(Mqtt2RosNavSatParser, PayloadStringMixedKeys) { + std::string payload{"latitude: 1.0, longitude: 2.0, alt: 3.0"}; + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 2.0); + EXPECT_EQ(msg.altitude, 3.0); +} + +TEST(Mqtt2RosNavSatParser, PayloadJsonFullKeys) { + std::string payload{"{\"latitude\": 1.0, \"longitude\": 2.0, \"altitude\": 3.0}"}; + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 2.0); + EXPECT_EQ(msg.altitude, 3.0); +} + +TEST(Mqtt2RosNavSatParser, PayloadJsonShortKeys) { + std::string payload{"{\"lat\": 1.0, \"lon\": 2.0, \"alt\": 3.0}"}; + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 2.0); + EXPECT_EQ(msg.altitude, 3.0); +} + +TEST(Mqtt2RosNavSatParser, PayloadJsonMixedKeys) { + std::string payload{"{\"latitude\": 1.0, \"longitude\": 2.0, \"alt\": 3.0}"}; + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 2.0); + EXPECT_EQ(msg.altitude, 3.0); +} + +TEST(Mqtt2RosNavSatParser, PayloadValuesIsInt) { + std::string payload{"latitude: 1, longitude: 2, altitude: 3"}; + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 2.0); + EXPECT_EQ(msg.altitude, 3.0); +} + +TEST(Mqtt2RosNavSatParser, PayloadHasSpaceSeparator) { + std::string payload{"latitude: 1 longitude: 2 altitude: 3 "}; + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 2.0); + EXPECT_EQ(msg.altitude, 3.0); +} + +TEST(Mqtt2RosNavSatParser, PayloadContainsNegativeValues) { + std::string payload{"lat: 1.0, lon: 2.0, alt: -3.0"}; + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 2.0); + EXPECT_EQ(msg.altitude, -3.0); +} + +TEST(Mqtt2RosNavSatParser, PayloadMissingSomeKeys) { + std::string payload{"lat: 1.0"}; + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 0.0); + EXPECT_EQ(msg.altitude, 0.0); +} + +TEST(Mqtt2RosNavSatParser, MixedJsonAndKeyValue) { + std::string payload{"latitude: 1.0, \"longitude\": 2.0, alt: 3.0"}; + + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 2.0); + EXPECT_EQ(msg.altitude, 3.0); +} + +TEST(Mqtt2RosNavSatParser, OrderIndependence) { + std::string payload{"alt: 3.0, lat: 1.0, lon: 2.0"}; + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 2.0); + EXPECT_EQ(msg.altitude, 3.0); +} diff --git a/conf/conf/params.yaml b/conf/conf/params.yaml index 97f5356e8..0d99a5bc2 100644 --- a/conf/conf/params.yaml +++ b/conf/conf/params.yaml @@ -34,6 +34,7 @@ atos: - placeholder1 # The MQTT topic that should be bridged, corresponds to the sub-object in the YAML placeholder1: ros_topic: placeholder1 # ROS topic on which corresponding MQTT messages are published + msg_type: std_msgs/msg/Empty # ROS message type for this bridge advanced: # Optional settings for each topic mqtt: qos: 1 # [0] MQTT QoS value From 4c7e18aefa89f12dd9be6310e4b61c499cdc1512 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Samuel=20Thor=C3=A9n?= Date: Mon, 27 Apr 2026 10:33:56 +0200 Subject: [PATCH 2/2] Fix review comments --- atos/modules/MQTTBridge/inc/mqtt2ros_utils.hpp | 5 ++++- atos/modules/MQTTBridge/src/mqttbridge.cpp | 3 +-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/atos/modules/MQTTBridge/inc/mqtt2ros_utils.hpp b/atos/modules/MQTTBridge/inc/mqtt2ros_utils.hpp index a0ae60694..a455a1db7 100644 --- a/atos/modules/MQTTBridge/inc/mqtt2ros_utils.hpp +++ b/atos/modules/MQTTBridge/inc/mqtt2ros_utils.hpp @@ -6,7 +6,7 @@ #include #include -#include +#include #include #include #include @@ -46,6 +46,9 @@ class Mqtt2RosUtils { return serialized; } + RCLCPP_WARN(rclcpp::get_logger("mqtt2ros_utils"), + "Message type %s not supported. Returning empty serialized message", + msg_type.c_str()); return rclcpp::SerializedMessage(); } diff --git a/atos/modules/MQTTBridge/src/mqttbridge.cpp b/atos/modules/MQTTBridge/src/mqttbridge.cpp index 4d1fecd2a..09896c6c3 100644 --- a/atos/modules/MQTTBridge/src/mqttbridge.cpp +++ b/atos/modules/MQTTBridge/src/mqttbridge.cpp @@ -392,10 +392,9 @@ void MqttBridge::message_arrived(mqtt::const_message_ptr mqtt_msg) { RCLCPP_DEBUG(get_logger(), "Received MQTT message on topic '%s'", mqtt_topic.c_str()); Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic]; - // Publish empty message to ROS topic const auto msg = Mqtt2RosUtils::get_serialized_msg(mqtt2ros.ros.msg_type, mqtt_msg->get_payload_str()); mqtt2ros.ros.publisher->publish(msg); - RCLCPP_DEBUG(get_logger(), "Published empty message to ROS topic '%s'", mqtt2ros.ros.topic.c_str()); + RCLCPP_DEBUG(get_logger(), "Published MQTT message to ROS topic '%s'", mqtt2ros.ros.topic.c_str()); } template