Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions atos/launch/launch_with_mqtt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import os
import sys

from ament_index_python.packages import get_package_prefix

sys.path.insert(
0,
os.path.join( # Need to modify the sys.path since we launch from the ros2 installed path
get_package_prefix("atos"), "share", "atos", "launch"
),
)

import launch_utils.launch_base as launch_base
from launch import LaunchDescription
from launch_ros.actions import Node


def get_mqtt_nodes():
files = launch_base.get_files()
return [
Node(
package="atos",
namespace="atos",
executable="mqtt_bridge",
name="mqtt_bridge",
# prefix=['gdbserver localhost:3000'], ## To use with VSC debugger
parameters=[files["params"]],
# arguments=["--ros-args", "--log-level", "debug"], # To get RCL_DEBUG prints
),
]


def generate_launch_description():
base_nodes = launch_base.get_base_nodes()

mqtt_nodes = get_mqtt_nodes()

for node in mqtt_nodes:
base_nodes.append(node)

return LaunchDescription(base_nodes)
26 changes: 25 additions & 1 deletion atos/modules/MQTTBridge/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,35 @@ ament_target_dependencies(${MQTT_BRIDGE_TARGET}
atos_interfaces
)

if(BUILD_TESTING)
find_package(ament_cmake_gtest REQUIRED)
set(TESTFILES "tests/mqtt2ros_tests.cpp")

ament_add_gtest(mqtt_bridge_test ${TESTFILES})
if(TARGET mqtt_bridge_test)
target_link_libraries(mqtt_bridge_test
${ATOS_COMMON_LIBRARY}
${nlohmann_json}
paho-mqttpp3
paho-mqtt3as
fmt
)
target_include_directories(mqtt_bridge_test PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}/inc
${COMMON_HEADERS}
)
ament_target_dependencies(mqtt_bridge_test
rclcpp
atos_interfaces
)
endif()
endif()

# Installation rules
install(CODE "MESSAGE(STATUS \"Installing target ${MQTT_BRIDGE_TARGET}\")")
install(TARGETS ${MQTT_BRIDGE_TARGET}
RUNTIME DESTINATION "${CMAKE_INSTALL_LIBDIR}/atos"
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}
)
)
10 changes: 6 additions & 4 deletions atos/modules/MQTTBridge/inc/Imqtt2ros.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#pragma once

#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/empty.hpp>

Expand All @@ -11,10 +13,10 @@ struct Mqtt2RosInterface {
int qos = 0; ///< MQTT QoS value
} mqtt; ///< MQTT-related variables
struct {
std::string topic; ///< ROS topic
std::string msg_type; ///< message type of publisher
rclcpp::Publisher<std_msgs::msg::Empty>::SharedPtr publisher; ///< ROS publisher
int queue_size = 1; ///< ROS publisher queue size
std::string topic; ///< ROS topic
std::string msg_type; ///< message type of publisher
rclcpp::GenericPublisher::SharedPtr publisher;
int queue_size = 1; ///< ROS publisher queue size
bool is_stale = false; ///< whether a new generic publisher/subscriber is required
} ros; ///< ROS-related variables
};
93 changes: 93 additions & 0 deletions atos/modules/MQTTBridge/inc/mqtt2ros_utils.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#pragma once

#include <algorithm>
#include <memory>
#include <regex>
#include <string>

#include <rclcpp/clock.hpp>
#include <rclcpp/logging.hpp>
#include <rclcpp/serialization.hpp>
#include <rclcpp/serialized_message.hpp>
#include <sensor_msgs/msg/nav_sat_fix.hpp>
#include <std_msgs/msg/empty.hpp>

class Mqtt2RosUtils {

public:
Mqtt2RosUtils() = delete;

static sensor_msgs::msg::NavSatFix mqtt2navsatfix(const std::string& payload) {
std::string payload_lowercase = payload;
std::transform(payload_lowercase.begin(), payload_lowercase.end(), payload_lowercase.begin(), ::tolower);

auto msg = sensor_msgs::msg::NavSatFix();
msg.header.stamp = rclcpp::Clock(RCL_ROS_TIME).now();
msg.latitude = extract_value_from_payload(payload_lowercase, {"lat", "latitude"});
msg.longitude = extract_value_from_payload(payload_lowercase, {"lon", "lng", "long", "longitude"});
msg.altitude = extract_value_from_payload(payload_lowercase, {"alt", "altitude"});

return msg;
}

static rclcpp::SerializedMessage get_serialized_msg(const std::string& msg_type, const std::string& payload) {
if (msg_type == "std_msgs/msg/Empty") {
auto msg = std_msgs::msg::Empty();
auto serialized = rclcpp::SerializedMessage();
rclcpp::Serialization<std_msgs::msg::Empty> serializer;
serializer.serialize_message(&msg, &serialized);
return serialized;

} else if (msg_type == "sensor_msgs/msg/NavSatFix") {
auto msg = mqtt2navsatfix(payload);
auto serialized = rclcpp::SerializedMessage();
rclcpp::Serialization<sensor_msgs::msg::NavSatFix> serializer;
serializer.serialize_message(&msg, &serialized);
return serialized;
}

RCLCPP_WARN(rclcpp::get_logger("mqtt2ros_utils"),
"Message type %s not supported. Returning empty serialized message",
msg_type.c_str());
return rclcpp::SerializedMessage();
Comment thread
samuelthoren marked this conversation as resolved.
}

private:
static double extract_value_from_payload(const std::string& input, std::vector<std::string> keys) {
// Lowercase input
std::string str = input;
std::transform(str.begin(), str.end(), str.begin(), ::tolower);

// Sort keys by length to avoid wrong match
std::sort(
keys.begin(), keys.end(), [](const std::string& a, const std::string& b) { return a.size() > b.size(); });

// Build regex so we can match any of the keys (key1|key2|...)
std::string key_pattern;
for (size_t i{0}; i < keys.size(); ++i) {
key_pattern += keys.at(i);
if (i != keys.size() - 1)
key_pattern += "|";
}

// Extract numeric value for any matching key (supports JSON or key: value formats)
std::regex re{"(?:^|[\\s,{])\"?(" + key_pattern + ")\"?\\s*[:=]\\s*([+-]?\\d+(\\.\\d+)?([eE][+-]?\\d+)?)"};

std::smatch match;
double value{0.0};

// Iterate through all matches, keep the last one in case inputs are duplicated
auto begin{str.cbegin()};
auto end{str.cend()};
while (std::regex_search(begin, end, match, re)) {
try {
value = std::stod(match[2].str());
} catch (...) {
value = 0.0;
}
begin = match.suffix().first;
}

return value;
}
};
31 changes: 25 additions & 6 deletions atos/modules/MQTTBridge/src/mqttbridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,15 @@
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include "mqttbridge.hpp"

#include <random>

#include <rclcpp/serialization.hpp>
#include <sensor_msgs/msg/nav_sat_fix.hpp>
#include <std_msgs/msg/empty.hpp>

#include "mqtt2ros_utils.hpp"

using namespace ROSChannels;

using std::placeholders::_1;
Expand Down Expand Up @@ -41,6 +48,9 @@ void MqttBridge::loadParameters() {
param_desc.description = "ROS topic on which corresponding MQTT messages are published";
declare_parameter(
fmt::format("mqtt2ros.{}.ros_topic", mqtt_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc);
param_desc.description = "ROS message type for this bridge";
declare_parameter(
fmt::format("mqtt2ros.{}.msg_type", mqtt_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc);
param_desc.description = "MQTT QoS value";
declare_parameter(fmt::format("mqtt2ros.{}.advanced.mqtt.qos", mqtt_topic),
rclcpp::ParameterType::PARAMETER_INTEGER,
Expand Down Expand Up @@ -80,6 +90,13 @@ void MqttBridge::loadParameters() {
Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic];
mqtt2ros.ros.topic = ros_topic;

rclcpp::Parameter msg_type_param;
if (get_parameter(fmt::format("mqtt2ros.{}.msg_type", mqtt_topic), msg_type_param)) {
mqtt2ros.ros.msg_type = msg_type_param.as_string();
} else {
mqtt2ros.ros.msg_type = "std_msgs/msg/Empty"; // default if no parameter is set
}

// mqtt2ros[k]/advanced/mqtt/qos
rclcpp::Parameter qos_param;
if (get_parameter(fmt::format("mqtt2ros.{}.advanced.mqtt.qos", mqtt_topic), qos_param)) {
Expand All @@ -93,9 +110,10 @@ void MqttBridge::loadParameters() {
}

RCLCPP_INFO(get_logger(),
"Bridging MQTT topic '%s' to ROS topic '%s'",
"Bridging MQTT topic '%s' to ROS topic '%s' with message type '%s'",
mqtt_topic.c_str(),
mqtt2ros.ros.topic.c_str());
mqtt2ros.ros.topic.c_str(),
mqtt2ros.ros.msg_type.c_str());
} else {
RCLCPP_WARN(get_logger(),
fmt::format("Parameter 'ros2mqtt.{}' is missing subparameter "
Expand Down Expand Up @@ -313,7 +331,8 @@ void MqttBridge::newMqtt2RosBridge(atos_interfaces::srv::NewMqtt2RosBridge::Requ
mqtt2ros.ros.is_stale = true;
mqtt2ros.ros.topic = request->ros_topic;
mqtt2ros.mqtt.qos = request->mqtt_qos;
mqtt2ros.ros.publisher = this->create_publisher<std_msgs::msg::Empty>(mqtt2ros.ros.topic, request->ros_queue_size);
mqtt2ros.ros.publisher =
this->create_generic_publisher(mqtt2ros.ros.topic, mqtt2ros.ros.msg_type, mqtt2ros.ros.queue_size);
mqtt2ros.ros.queue_size = request->ros_queue_size;

RCLCPP_DEBUG(get_logger(),
Expand Down Expand Up @@ -373,9 +392,9 @@ void MqttBridge::message_arrived(mqtt::const_message_ptr mqtt_msg) {
RCLCPP_DEBUG(get_logger(), "Received MQTT message on topic '%s'", mqtt_topic.c_str());
Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic];

// Publish empty message to ROS topic
mqtt2ros.ros.publisher->publish(std_msgs::msg::Empty());
RCLCPP_DEBUG(get_logger(), "Published empty message to ROS topic '%s'", mqtt2ros.ros.topic.c_str());
const auto msg = Mqtt2RosUtils::get_serialized_msg(mqtt2ros.ros.msg_type, mqtt_msg->get_payload_str());
mqtt2ros.ros.publisher->publish(msg);
RCLCPP_DEBUG(get_logger(), "Published MQTT message to ROS topic '%s'", mqtt2ros.ros.topic.c_str());
}

template<typename T>
Expand Down
114 changes: 114 additions & 0 deletions atos/modules/MQTTBridge/tests/mqtt2ros_tests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
#include <gtest/gtest.h>

#include <string>

#include "mqtt2ros_utils.hpp"

TEST(Mqtt2RosNavSatParser, PayloadStringFullKeys) {
std::string payload{"latitude: 1.0, longitude: 2.0, altitude: 3.0"};
const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)};

EXPECT_EQ(msg.latitude, 1.0);
EXPECT_EQ(msg.longitude, 2.0);
EXPECT_EQ(msg.altitude, 3.0);
}

TEST(Mqtt2RosNavSatParser, PayloadStringShortKeys) {
std::string payload{"lat: 1.0, lon: 2.0, alt: 3.0"};
const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)};

EXPECT_EQ(msg.latitude, 1.0);
EXPECT_EQ(msg.longitude, 2.0);
EXPECT_EQ(msg.altitude, 3.0);
}

TEST(Mqtt2RosNavSatParser, PayloadStringMixedKeys) {
std::string payload{"latitude: 1.0, longitude: 2.0, alt: 3.0"};
const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)};

EXPECT_EQ(msg.latitude, 1.0);
EXPECT_EQ(msg.longitude, 2.0);
EXPECT_EQ(msg.altitude, 3.0);
}

TEST(Mqtt2RosNavSatParser, PayloadJsonFullKeys) {
std::string payload{"{\"latitude\": 1.0, \"longitude\": 2.0, \"altitude\": 3.0}"};
const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)};

EXPECT_EQ(msg.latitude, 1.0);
EXPECT_EQ(msg.longitude, 2.0);
EXPECT_EQ(msg.altitude, 3.0);
}

TEST(Mqtt2RosNavSatParser, PayloadJsonShortKeys) {
std::string payload{"{\"lat\": 1.0, \"lon\": 2.0, \"alt\": 3.0}"};
const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)};

EXPECT_EQ(msg.latitude, 1.0);
EXPECT_EQ(msg.longitude, 2.0);
EXPECT_EQ(msg.altitude, 3.0);
}

TEST(Mqtt2RosNavSatParser, PayloadJsonMixedKeys) {
std::string payload{"{\"latitude\": 1.0, \"longitude\": 2.0, \"alt\": 3.0}"};
const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)};

EXPECT_EQ(msg.latitude, 1.0);
EXPECT_EQ(msg.longitude, 2.0);
EXPECT_EQ(msg.altitude, 3.0);
}

TEST(Mqtt2RosNavSatParser, PayloadValuesIsInt) {
std::string payload{"latitude: 1, longitude: 2, altitude: 3"};
const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)};

EXPECT_EQ(msg.latitude, 1.0);
EXPECT_EQ(msg.longitude, 2.0);
EXPECT_EQ(msg.altitude, 3.0);
}

TEST(Mqtt2RosNavSatParser, PayloadHasSpaceSeparator) {
std::string payload{"latitude: 1 longitude: 2 altitude: 3 "};
const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)};

EXPECT_EQ(msg.latitude, 1.0);
EXPECT_EQ(msg.longitude, 2.0);
EXPECT_EQ(msg.altitude, 3.0);
}

TEST(Mqtt2RosNavSatParser, PayloadContainsNegativeValues) {
std::string payload{"lat: 1.0, lon: 2.0, alt: -3.0"};
const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)};

EXPECT_EQ(msg.latitude, 1.0);
EXPECT_EQ(msg.longitude, 2.0);
EXPECT_EQ(msg.altitude, -3.0);
}

TEST(Mqtt2RosNavSatParser, PayloadMissingSomeKeys) {
std::string payload{"lat: 1.0"};
const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)};

EXPECT_EQ(msg.latitude, 1.0);
EXPECT_EQ(msg.longitude, 0.0);
EXPECT_EQ(msg.altitude, 0.0);
}

TEST(Mqtt2RosNavSatParser, MixedJsonAndKeyValue) {
std::string payload{"latitude: 1.0, \"longitude\": 2.0, alt: 3.0"};

const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)};

EXPECT_EQ(msg.latitude, 1.0);
EXPECT_EQ(msg.longitude, 2.0);
EXPECT_EQ(msg.altitude, 3.0);
}

TEST(Mqtt2RosNavSatParser, OrderIndependence) {
std::string payload{"alt: 3.0, lat: 1.0, lon: 2.0"};
const auto msg{Mqtt2RosUtils::mqtt2navsatfix(payload)};

EXPECT_EQ(msg.latitude, 1.0);
EXPECT_EQ(msg.longitude, 2.0);
EXPECT_EQ(msg.altitude, 3.0);
}
1 change: 1 addition & 0 deletions conf/conf/params.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ atos:
- placeholder1 # The MQTT topic that should be bridged, corresponds to the sub-object in the YAML
placeholder1:
ros_topic: placeholder1 # ROS topic on which corresponding MQTT messages are published
msg_type: std_msgs/msg/Empty # ROS message type for this bridge
advanced: # Optional settings for each topic
mqtt:
qos: 1 # [0] MQTT QoS value
Expand Down
Loading