diff --git a/atos/launch/launch_with_mqtt.py b/atos/launch/launch_with_mqtt.py new file mode 100644 index 000000000..66b4eeebb --- /dev/null +++ b/atos/launch/launch_with_mqtt.py @@ -0,0 +1,41 @@ +import os +import sys + +from ament_index_python.packages import get_package_prefix + +sys.path.insert( + 0, + os.path.join( # Need to modify the sys.path since we launch from the ros2 installed path + get_package_prefix("atos"), "share", "atos", "launch" + ), +) + +import launch_utils.launch_base as launch_base +from launch import LaunchDescription +from launch_ros.actions import Node + + +def get_mqtt_nodes(): + files = launch_base.get_files() + return [ + Node( + package="atos", + namespace="atos", + executable="mqtt_bridge", + name="mqtt_bridge", + # prefix=['gdbserver localhost:3000'], ## To use with VSC debugger + parameters=[files["params"]], + # arguments=["--ros-args", "--log-level", "debug"], # To get RCL_DEBUG prints + ), + ] + + +def generate_launch_description(): + base_nodes = launch_base.get_base_nodes() + + mqtt_nodes = get_mqtt_nodes() + + for node in mqtt_nodes: + base_nodes.append(node) + + return LaunchDescription(base_nodes) diff --git a/atos/modules/MQTTBridge/CMakeLists.txt b/atos/modules/MQTTBridge/CMakeLists.txt index 82ef49e7b..59d87764c 100644 --- a/atos/modules/MQTTBridge/CMakeLists.txt +++ b/atos/modules/MQTTBridge/CMakeLists.txt @@ -49,6 +49,30 @@ ament_target_dependencies(${MQTT_BRIDGE_TARGET} atos_interfaces ) +if(BUILD_TESTING) + find_package(ament_cmake_gtest REQUIRED) + set(TESTFILES "tests/mqtt2ros_tests.cpp") + + ament_add_gtest(mqtt_bridge_test ${TESTFILES}) + if(TARGET mqtt_bridge_test) + target_link_libraries(mqtt_bridge_test + ${ATOS_COMMON_LIBRARY} + ${nlohmann_json} + paho-mqttpp3 + paho-mqtt3as + fmt + ) + target_include_directories(mqtt_bridge_test PUBLIC + ${CMAKE_CURRENT_SOURCE_DIR}/inc + ${COMMON_HEADERS} + ) + ament_target_dependencies(mqtt_bridge_test + rclcpp + atos_interfaces + ) + endif() +endif() + # Installation rules install(CODE "MESSAGE(STATUS \"Installing target ${MQTT_BRIDGE_TARGET}\")") install(TARGETS ${MQTT_BRIDGE_TARGET} @@ -56,4 +80,4 @@ install(TARGETS ${MQTT_BRIDGE_TARGET} LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR} -) \ No newline at end of file +) diff --git a/atos/modules/MQTTBridge/inc/Imqtt2ros.hpp b/atos/modules/MQTTBridge/inc/Imqtt2ros.hpp index 8a52e9f7b..52daa363f 100644 --- a/atos/modules/MQTTBridge/inc/Imqtt2ros.hpp +++ b/atos/modules/MQTTBridge/inc/Imqtt2ros.hpp @@ -1,3 +1,5 @@ +#pragma once + #include #include @@ -11,10 +13,10 @@ struct Mqtt2RosInterface { int qos = 0; ///< MQTT QoS value } mqtt; ///< MQTT-related variables struct { - std::string topic; ///< ROS topic - std::string msg_type; ///< message type of publisher - rclcpp::Publisher::SharedPtr publisher; ///< ROS publisher - int queue_size = 1; ///< ROS publisher queue size + std::string topic; ///< ROS topic + std::string msg_type; ///< message type of publisher + rclcpp::GenericPublisher::SharedPtr publisher; + int queue_size = 1; ///< ROS publisher queue size bool is_stale = false; ///< whether a new generic publisher/subscriber is required } ros; ///< ROS-related variables }; diff --git a/atos/modules/MQTTBridge/inc/mqtt2ros_utils.hpp b/atos/modules/MQTTBridge/inc/mqtt2ros_utils.hpp new file mode 100644 index 000000000..a455a1db7 --- /dev/null +++ b/atos/modules/MQTTBridge/inc/mqtt2ros_utils.hpp @@ -0,0 +1,93 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +class Mqtt2RosUtils { + +public: + Mqtt2RosUtils() = delete; + + static sensor_msgs::msg::NavSatFix mqtt2navsatfix(const std::string& payload) { + std::string payload_lowercase = payload; + std::transform(payload_lowercase.begin(), payload_lowercase.end(), payload_lowercase.begin(), ::tolower); + + auto msg = sensor_msgs::msg::NavSatFix(); + msg.header.stamp = rclcpp::Clock(RCL_ROS_TIME).now(); + msg.latitude = extract_value_from_payload(payload_lowercase, {"lat", "latitude"}); + msg.longitude = extract_value_from_payload(payload_lowercase, {"lon", "lng", "long", "longitude"}); + msg.altitude = extract_value_from_payload(payload_lowercase, {"alt", "altitude"}); + + return msg; + } + + static rclcpp::SerializedMessage get_serialized_msg(const std::string& msg_type, const std::string& payload) { + if (msg_type == "std_msgs/msg/Empty") { + auto msg = std_msgs::msg::Empty(); + auto serialized = rclcpp::SerializedMessage(); + rclcpp::Serialization serializer; + serializer.serialize_message(&msg, &serialized); + return serialized; + + } else if (msg_type == "sensor_msgs/msg/NavSatFix") { + auto msg = mqtt2navsatfix(payload); + auto serialized = rclcpp::SerializedMessage(); + rclcpp::Serialization serializer; + serializer.serialize_message(&msg, &serialized); + return serialized; + } + + RCLCPP_WARN(rclcpp::get_logger("mqtt2ros_utils"), + "Message type %s not supported. Returning empty serialized message", + msg_type.c_str()); + return rclcpp::SerializedMessage(); + } + +private: + static double extract_value_from_payload(const std::string& input, std::vector keys) { + // Lowercase input + std::string str = input; + std::transform(str.begin(), str.end(), str.begin(), ::tolower); + + // Sort keys by length to avoid wrong match + std::sort( + keys.begin(), keys.end(), [](const std::string& a, const std::string& b) { return a.size() > b.size(); }); + + // Build regex so we can match any of the keys (key1|key2|...) + std::string key_pattern; + for (size_t i{0}; i < keys.size(); ++i) { + key_pattern += keys.at(i); + if (i != keys.size() - 1) + key_pattern += "|"; + } + + // Extract numeric value for any matching key (supports JSON or key: value formats) + std::regex re{"(?:^|[\\s,{])\"?(" + key_pattern + ")\"?\\s*[:=]\\s*([+-]?\\d+(\\.\\d+)?([eE][+-]?\\d+)?)"}; + + std::smatch match; + double value{0.0}; + + // Iterate through all matches, keep the last one in case inputs are duplicated + auto begin{str.cbegin()}; + auto end{str.cend()}; + while (std::regex_search(begin, end, match, re)) { + try { + value = std::stod(match[2].str()); + } catch (...) { + value = 0.0; + } + begin = match.suffix().first; + } + + return value; + } +}; diff --git a/atos/modules/MQTTBridge/src/mqttbridge.cpp b/atos/modules/MQTTBridge/src/mqttbridge.cpp index 7edd331d5..09896c6c3 100644 --- a/atos/modules/MQTTBridge/src/mqttbridge.cpp +++ b/atos/modules/MQTTBridge/src/mqttbridge.cpp @@ -4,8 +4,15 @@ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ #include "mqttbridge.hpp" + #include +#include +#include +#include + +#include "mqtt2ros_utils.hpp" + using namespace ROSChannels; using std::placeholders::_1; @@ -41,6 +48,9 @@ void MqttBridge::loadParameters() { param_desc.description = "ROS topic on which corresponding MQTT messages are published"; declare_parameter( fmt::format("mqtt2ros.{}.ros_topic", mqtt_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); + param_desc.description = "ROS message type for this bridge"; + declare_parameter( + fmt::format("mqtt2ros.{}.msg_type", mqtt_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); param_desc.description = "MQTT QoS value"; declare_parameter(fmt::format("mqtt2ros.{}.advanced.mqtt.qos", mqtt_topic), rclcpp::ParameterType::PARAMETER_INTEGER, @@ -80,6 +90,13 @@ void MqttBridge::loadParameters() { Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic]; mqtt2ros.ros.topic = ros_topic; + rclcpp::Parameter msg_type_param; + if (get_parameter(fmt::format("mqtt2ros.{}.msg_type", mqtt_topic), msg_type_param)) { + mqtt2ros.ros.msg_type = msg_type_param.as_string(); + } else { + mqtt2ros.ros.msg_type = "std_msgs/msg/Empty"; // default if no parameter is set + } + // mqtt2ros[k]/advanced/mqtt/qos rclcpp::Parameter qos_param; if (get_parameter(fmt::format("mqtt2ros.{}.advanced.mqtt.qos", mqtt_topic), qos_param)) { @@ -93,9 +110,10 @@ void MqttBridge::loadParameters() { } RCLCPP_INFO(get_logger(), - "Bridging MQTT topic '%s' to ROS topic '%s'", + "Bridging MQTT topic '%s' to ROS topic '%s' with message type '%s'", mqtt_topic.c_str(), - mqtt2ros.ros.topic.c_str()); + mqtt2ros.ros.topic.c_str(), + mqtt2ros.ros.msg_type.c_str()); } else { RCLCPP_WARN(get_logger(), fmt::format("Parameter 'ros2mqtt.{}' is missing subparameter " @@ -313,7 +331,8 @@ void MqttBridge::newMqtt2RosBridge(atos_interfaces::srv::NewMqtt2RosBridge::Requ mqtt2ros.ros.is_stale = true; mqtt2ros.ros.topic = request->ros_topic; mqtt2ros.mqtt.qos = request->mqtt_qos; - mqtt2ros.ros.publisher = this->create_publisher(mqtt2ros.ros.topic, request->ros_queue_size); + mqtt2ros.ros.publisher = + this->create_generic_publisher(mqtt2ros.ros.topic, mqtt2ros.ros.msg_type, mqtt2ros.ros.queue_size); mqtt2ros.ros.queue_size = request->ros_queue_size; RCLCPP_DEBUG(get_logger(), @@ -373,9 +392,9 @@ void MqttBridge::message_arrived(mqtt::const_message_ptr mqtt_msg) { RCLCPP_DEBUG(get_logger(), "Received MQTT message on topic '%s'", mqtt_topic.c_str()); Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic]; - // Publish empty message to ROS topic - mqtt2ros.ros.publisher->publish(std_msgs::msg::Empty()); - RCLCPP_DEBUG(get_logger(), "Published empty message to ROS topic '%s'", mqtt2ros.ros.topic.c_str()); + const auto msg = Mqtt2RosUtils::get_serialized_msg(mqtt2ros.ros.msg_type, mqtt_msg->get_payload_str()); + mqtt2ros.ros.publisher->publish(msg); + RCLCPP_DEBUG(get_logger(), "Published MQTT message to ROS topic '%s'", mqtt2ros.ros.topic.c_str()); } template diff --git a/atos/modules/MQTTBridge/tests/mqtt2ros_tests.cpp b/atos/modules/MQTTBridge/tests/mqtt2ros_tests.cpp new file mode 100644 index 000000000..ef354b089 --- /dev/null +++ b/atos/modules/MQTTBridge/tests/mqtt2ros_tests.cpp @@ -0,0 +1,114 @@ +#include + +#include + +#include "mqtt2ros_utils.hpp" + +TEST(Mqtt2RosNavSatParser, PayloadStringFullKeys) { + std::string payload{"latitude: 1.0, longitude: 2.0, altitude: 3.0"}; + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 2.0); + EXPECT_EQ(msg.altitude, 3.0); +} + +TEST(Mqtt2RosNavSatParser, PayloadStringShortKeys) { + std::string payload{"lat: 1.0, lon: 2.0, alt: 3.0"}; + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 2.0); + EXPECT_EQ(msg.altitude, 3.0); +} + +TEST(Mqtt2RosNavSatParser, PayloadStringMixedKeys) { + std::string payload{"latitude: 1.0, longitude: 2.0, alt: 3.0"}; + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 2.0); + EXPECT_EQ(msg.altitude, 3.0); +} + +TEST(Mqtt2RosNavSatParser, PayloadJsonFullKeys) { + std::string payload{"{\"latitude\": 1.0, \"longitude\": 2.0, \"altitude\": 3.0}"}; + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 2.0); + EXPECT_EQ(msg.altitude, 3.0); +} + +TEST(Mqtt2RosNavSatParser, PayloadJsonShortKeys) { + std::string payload{"{\"lat\": 1.0, \"lon\": 2.0, \"alt\": 3.0}"}; + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 2.0); + EXPECT_EQ(msg.altitude, 3.0); +} + +TEST(Mqtt2RosNavSatParser, PayloadJsonMixedKeys) { + std::string payload{"{\"latitude\": 1.0, \"longitude\": 2.0, \"alt\": 3.0}"}; + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 2.0); + EXPECT_EQ(msg.altitude, 3.0); +} + +TEST(Mqtt2RosNavSatParser, PayloadValuesIsInt) { + std::string payload{"latitude: 1, longitude: 2, altitude: 3"}; + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 2.0); + EXPECT_EQ(msg.altitude, 3.0); +} + +TEST(Mqtt2RosNavSatParser, PayloadHasSpaceSeparator) { + std::string payload{"latitude: 1 longitude: 2 altitude: 3 "}; + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 2.0); + EXPECT_EQ(msg.altitude, 3.0); +} + +TEST(Mqtt2RosNavSatParser, PayloadContainsNegativeValues) { + std::string payload{"lat: 1.0, lon: 2.0, alt: -3.0"}; + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 2.0); + EXPECT_EQ(msg.altitude, -3.0); +} + +TEST(Mqtt2RosNavSatParser, PayloadMissingSomeKeys) { + std::string payload{"lat: 1.0"}; + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 0.0); + EXPECT_EQ(msg.altitude, 0.0); +} + +TEST(Mqtt2RosNavSatParser, MixedJsonAndKeyValue) { + std::string payload{"latitude: 1.0, \"longitude\": 2.0, alt: 3.0"}; + + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 2.0); + EXPECT_EQ(msg.altitude, 3.0); +} + +TEST(Mqtt2RosNavSatParser, OrderIndependence) { + std::string payload{"alt: 3.0, lat: 1.0, lon: 2.0"}; + const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)}; + + EXPECT_EQ(msg.latitude, 1.0); + EXPECT_EQ(msg.longitude, 2.0); + EXPECT_EQ(msg.altitude, 3.0); +} diff --git a/conf/conf/params.yaml b/conf/conf/params.yaml index 97f5356e8..0d99a5bc2 100644 --- a/conf/conf/params.yaml +++ b/conf/conf/params.yaml @@ -34,6 +34,7 @@ atos: - placeholder1 # The MQTT topic that should be bridged, corresponds to the sub-object in the YAML placeholder1: ros_topic: placeholder1 # ROS topic on which corresponding MQTT messages are published + msg_type: std_msgs/msg/Empty # ROS message type for this bridge advanced: # Optional settings for each topic mqtt: qos: 1 # [0] MQTT QoS value