Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions apps/axon_recorder/config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,16 @@ List of topics to record with batching settings:
- `message_type`: ROS message type
- `batch_size`: Number of messages to batch before writing
- `flush_interval_ms`: Maximum time to wait before flushing (ms)
- `qos_depth`: ROS2 subscription history depth (default `10`). This controls
the middleware QoS queue, not the recorder's `dataset.queue_size` worker queue.
- `qos_depth`: Backward-compatible ROS2 subscription history depth (default `10`).
- `qos`: Optional ROS2 QoS object. Any empty field falls back to the default.
- `mode`: `auto` to detect publisher QoS when available
- `depth`: History depth, or `auto` (default `10`)
- `reliability`: `reliable`, `best_effort`, or `auto` (default `reliable`)
- `durability`: `volatile`, `transient_local`, or `auto` (default `volatile`)
- `history`: `keep_last`, `keep_all`, or `auto` (default `keep_last`)

QoS depth controls the ROS2 middleware queue, not the recorder's
`dataset.queue_size` worker queue.

### Recording
- `profile`: ROS profile (`ros1` or `ros2`)
Expand Down
2 changes: 2 additions & 0 deletions apps/axon_recorder/config/default_config_hybrid.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ subscriptions:
message_type: sensor_msgs/msg/Imu
batch_size: 500
flush_interval_ms: 1000
qos:
mode: auto
- name: /udp/gps
message_type: axon_udp/json
batch_size: 1
Expand Down
23 changes: 21 additions & 2 deletions apps/axon_recorder/config/default_config_ros2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,53 +49,72 @@ dataset:
# High-frequency subscriptions (IMU, camera): Use larger batches (100-5000)
# Low-frequency subscriptions (status, diagnostics): Use small batches (1-10)
# Control signals: Use batch_size=1 for immediate persistence
# Optional qos_depth controls ROS2 subscription history depth. It is separate
# from dataset.queue_size, which controls Axon's per-topic worker queue.
# Optional ROS2 QoS overrides can be set per topic. Use mode: auto to follow
# publisher QoS when available. Leave any field empty to keep the middleware
# default (depth=10, reliability=reliable, durability=volatile,
# history=keep_last). QoS depth is separate from dataset.queue_size, which
# controls Axon's per-topic worker queue.
subscriptions:
# IMU data - high frequency (5 seconds of data per batch)
- name: /imu/data
message_type: sensor_msgs/msg/Imu
batch_size: 5000
flush_interval_ms: 5000
qos:
mode: auto

# Camera 0 - RGB and Depth (10 seconds of data per batch at 30fps)
- name: /camera0/rgb
message_type: sensor_msgs/msg/Image
batch_size: 300
flush_interval_ms: 10000
qos:
mode: auto

- name: /camera0/depth
message_type: sensor_msgs/msg/Image
batch_size: 300
flush_interval_ms: 10000
qos:
mode: auto

# Camera 1 - RGB and Depth
- name: /camera1/rgb
message_type: sensor_msgs/msg/Image
batch_size: 300
flush_interval_ms: 10000
qos:
mode: auto

- name: /camera1/depth
message_type: sensor_msgs/msg/Image
batch_size: 300
flush_interval_ms: 10000
qos:
mode: auto

# Camera 2 - RGB and Depth
- name: /camera2/rgb
message_type: sensor_msgs/msg/Image
batch_size: 300
flush_interval_ms: 10000
qos:
mode: auto

- name: /camera2/depth
message_type: sensor_msgs/msg/Image
batch_size: 300
flush_interval_ms: 10000
qos:
mode: auto

# Depth image with compression (16UC1 -> CompressedImage)
- name: /camera/camera/depth/image_rect_raw
message_type: sensor_msgs/msg/Image
batch_size: 300
flush_interval_ms: 10000
qos:
mode: auto
depth_compression: true

# ============================================================================
Expand Down
188 changes: 185 additions & 3 deletions apps/axon_recorder/src/config/config_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@

#include <nlohmann/json.hpp>

#include <algorithm>
#include <cctype>
#include <fstream>
#include <optional>
#include <sstream>

#include "../core/recorder.hpp"
Expand Down Expand Up @@ -41,6 +44,150 @@ void apply_sidecar_generation_mode(RecordingConfig& recording, std::string mode)
recording.sidecar_json_enabled = mode != "none";
}

std::string trim_ascii(std::string value) {
auto not_space = [](unsigned char c) {
return !std::isspace(c);
};
value.erase(value.begin(), std::find_if(value.begin(), value.end(), not_space));
value.erase(std::find_if(value.rbegin(), value.rend(), not_space).base(), value.end());
return value;
}

std::string normalize_qos_policy_token(std::string value) {
value = trim_ascii(std::move(value));
std::transform(value.begin(), value.end(), value.begin(), [](unsigned char c) {
if (c == '-' || std::isspace(c)) {
return '_';
}
return static_cast<char>(std::tolower(c));
});
return value;
}

bool yaml_node_has_value(const YAML::Node& node) {
if (!node || node.IsNull()) {
return false;
}
if (!node.IsScalar()) {
return true;
}
return !trim_ascii(node.as<std::string>()).empty();
}

bool is_auto_qos_token(const YAML::Node& node) {
return yaml_node_has_value(node) && node.IsScalar() &&
normalize_qos_policy_token(node.as<std::string>()) == "auto";
}

std::optional<size_t> parse_optional_qos_depth(const YAML::Node& node) {
if (!yaml_node_has_value(node)) {
return std::nullopt;
}
const auto depth = node.as<long long>();
return depth > 0 ? std::optional<size_t>(static_cast<size_t>(depth)) : std::optional<size_t>(0);
}

void parse_optional_nested_qos_depth(const YAML::Node& node, RosQosConfig& qos, size_t& qos_depth) {
if (!yaml_node_has_value(node)) {
return;
}
if (is_auto_qos_token(node)) {
qos.depth_auto = true;
return;
}
const auto depth = node.as<long long>();
qos.depth =
depth > 0 ? std::optional<size_t>(static_cast<size_t>(depth)) : std::optional<size_t>(0);
qos_depth = *qos.depth;
}

std::optional<std::string> parse_optional_qos_policy(const YAML::Node& node) {
if (!yaml_node_has_value(node)) {
return std::nullopt;
}
return normalize_qos_policy_token(node.as<std::string>());
}

std::optional<std::string> parse_optional_qos_reliability(const YAML::Node& node) {
if (!yaml_node_has_value(node)) {
return std::nullopt;
}
try {
return node.as<bool>() ? std::optional<std::string>("reliable")
: std::optional<std::string>("best_effort");
} catch (...) {
}
return normalize_qos_policy_token(node.as<std::string>());
}

void parse_ros_qos_node(const YAML::Node& node, RosQosConfig& qos, size_t& qos_depth) {
if (!node || !node.IsMap()) {
return;
}
if (auto mode = parse_optional_qos_policy(node["mode"])) {
qos.mode = mode;
qos.auto_mode = *mode == "auto";
}
if (node["depth"]) {
parse_optional_nested_qos_depth(node["depth"], qos, qos_depth);
}
if (node["qos_depth"]) {
parse_optional_nested_qos_depth(node["qos_depth"], qos, qos_depth);
}
if (auto reliability = parse_optional_qos_reliability(node["reliability"])) {
qos.reliability = reliability;
}
if (auto reliability = parse_optional_qos_reliability(node["reliable"])) {
qos.reliability = reliability;
}
if (auto durability = parse_optional_qos_policy(node["durability"])) {
qos.durability = durability;
}
if (auto history = parse_optional_qos_policy(node["history"])) {
qos.history = history;
}
}

bool valid_qos_reliability(const std::string& value) {
return value == "reliable" || value == "best_effort" || value == "auto";
}

bool valid_qos_durability(const std::string& value) {
return value == "volatile" || value == "transient_local" || value == "auto";
}

bool valid_qos_history(const std::string& value) {
return value == "keep_last" || value == "keep_all" || value == "auto";
}

bool validate_ros_qos_config(const RosQosConfig& qos, std::string& error_msg) {
if (qos.mode.has_value() && *qos.mode != "auto") {
error_msg = "Subscription qos.mode must be 'auto'";
return false;
}
if (qos.depth.has_value() && *qos.depth == 0) {
error_msg = "Subscription qos.depth must be > 0";
return false;
}
if (qos.depth_auto && qos.depth.has_value()) {
error_msg = "Subscription qos.depth cannot be both auto and numeric";
return false;
}
if (qos.reliability.has_value() && !valid_qos_reliability(*qos.reliability)) {
error_msg = "Subscription qos.reliability must be 'reliable', 'best_effort', or 'auto'";
return false;
}
if (qos.durability.has_value() && !valid_qos_durability(*qos.durability)) {
error_msg = "Subscription qos.durability must be 'volatile', 'transient_local', or 'auto'";
return false;
}
if (qos.history.has_value() && !valid_qos_history(*qos.history)) {
error_msg = "Subscription qos.history must be 'keep_last', 'keep_all', or 'auto'";
return false;
}
return true;
}

} // namespace

// ============================================================================
Expand Down Expand Up @@ -279,6 +426,30 @@ bool ConfigParser::save_to_file(const std::string& path, const RecorderConfig& c
subscription_node["batch_size"] = subscription.batch_size;
subscription_node["flush_interval_ms"] = subscription.flush_interval_ms;
subscription_node["qos_depth"] = subscription.qos_depth;
if (subscription.qos.has_overrides()) {
YAML::Node qos_node;
if (subscription.qos.mode.has_value()) {
qos_node["mode"] = *subscription.qos.mode;
} else if (subscription.qos.auto_mode) {
qos_node["mode"] = "auto";
}
if (subscription.qos.depth_auto) {
qos_node["depth"] = "auto";
}
if (subscription.qos.depth.has_value()) {
qos_node["depth"] = *subscription.qos.depth;
}
if (subscription.qos.reliability.has_value()) {
qos_node["reliability"] = *subscription.qos.reliability;
}
if (subscription.qos.durability.has_value()) {
qos_node["durability"] = *subscription.qos.durability;
}
if (subscription.qos.history.has_value()) {
qos_node["history"] = *subscription.qos.history;
}
subscription_node["qos"] = qos_node;
}
node["subscriptions"].push_back(subscription_node);
}

Expand Down Expand Up @@ -360,9 +531,17 @@ bool ConfigParser::parse_subscriptions(
if (subscription_node["flush_interval_ms"]) {
subscription.flush_interval_ms = subscription_node["flush_interval_ms"].as<int>();
}
if (subscription_node["qos_depth"]) {
const auto qos_depth = subscription_node["qos_depth"].as<long long>();
subscription.qos_depth = qos_depth > 0 ? static_cast<size_t>(qos_depth) : 0;
if (auto qos_depth = parse_optional_qos_depth(subscription_node["qos_depth"])) {
subscription.qos_depth = *qos_depth;
}
if (auto reliability = parse_optional_qos_reliability(subscription_node["qos_reliable"])) {
subscription.qos.reliability = reliability;
}
if (auto reliability = parse_optional_qos_reliability(subscription_node["reliable"])) {
subscription.qos.reliability = reliability;
}
if (subscription_node["qos"]) {
parse_ros_qos_node(subscription_node["qos"], subscription.qos, subscription.qos_depth);
}
// Parse depth_compression section (can be boolean or object)
if (subscription_node["depth_compression"]) {
Expand Down Expand Up @@ -794,6 +973,9 @@ bool ConfigParser::validate(const RecorderConfig& config, std::string& error_msg
error_msg = "Subscription qos_depth must be > 0";
return false;
}
if (!validate_ros_qos_config(subscription.qos, error_msg)) {
return false;
}
}

const auto& writer_batch = config.recording.writer_batch;
Expand Down
22 changes: 22 additions & 0 deletions apps/axon_recorder/src/config/task_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#define AXON_RECORDER_TASK_CONFIG_HPP

#include <chrono>
#include <cstddef>
#include <mutex>
#include <optional>
#include <string>
Expand All @@ -14,6 +15,26 @@
namespace axon {
namespace recorder {

struct RosQosConfig {
std::optional<std::string> mode;
bool auto_mode = false;
bool depth_auto = false;
std::optional<size_t> depth;
std::optional<std::string> reliability;
std::optional<std::string> durability;
std::optional<std::string> history;

bool has_overrides() const {
return mode.has_value() || auto_mode || depth_auto || depth.has_value() ||
reliability.has_value() || durability.has_value() || history.has_value();
}
};

struct TopicQosConfig {
std::string topic_name;
RosQosConfig qos;
};

/**
* TaskConfig holds all metadata for a recording task pushed from the server.
* This is cached when CachedRecordingConfig service is called and used
Expand All @@ -35,6 +56,7 @@ struct TaskConfig {

// Topic Selection
std::vector<std::string> topics;
std::vector<TopicQosConfig> topic_qos;

// Server Callback Integration
std::string start_callback_url;
Expand Down
Loading
Loading