Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions apps/axon_recorder/axon_recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ void print_usage(const char* program_name) {
<< " --path PATH Output directory path (default: .)\n"
<< " --profile PROFILE ROS profile: ros1 or ros2 (default: ros2)\n"
<< " --compression ALG Compression: none, zstd, lz4 (default: zstd)\n"
<< " --level LEVEL Compression preset 0-4 (default: 3)\n"
<< " 0=Default, 1=Fastest, 2=Fast, 3=Default, 4=Slow/Slowest\n"
<< " (applies to both zstd and lz4; values >4 are clamped)\n"
<< " --level LEVEL Compression preset (default: 3)\n"
<< " 0=Default, 1=Fastest, 2=Fast, 3=Default, 4=Slow, >=5=Slowest\n"
<< " (applies to both zstd and lz4)\n"
<< " --queue-size SIZE Message queue capacity (default: 1024)\n"
<< " --ws-url URL WebSocket server URL for --ws-client mode\n"
<< " --ws-auth-token TOKEN Authentication token for WebSocket handshake\n"
Expand Down Expand Up @@ -116,7 +116,7 @@ void print_usage(const char* program_name) {
<< " recording:\n"
<< " profile: ros2\n"
<< " compression: zstd\n"
<< " compression_level: 3 # preset 0-4 (see config/README.md)\n"
<< " compression_level: 3 # preset: 3=Default (see config/README.md)\n"
<< " disk_usage:\n"
<< " warn_usage_gb: 80\n"
<< " hard_limit_gb: 100\n"
Expand Down
9 changes: 5 additions & 4 deletions apps/axon_recorder/config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ List of topics to record with batching settings:
- `message_type`: ROS message type
- `batch_size`: Number of messages to batch before writing
- `flush_interval_ms`: Maximum time to wait before flushing (ms)
- `qos_depth`: ROS2 subscription history depth (default `10`). This controls
the middleware QoS queue, not the recorder's `dataset.queue_size` worker queue.

### Recording
- `profile`: ROS profile (`ros1` or `ros2`)
- `compression`: MCAP compression (`none`, `zstd`, `lz4`)
- `compression_level`: Compression level preset, range **0-4** (applies to both zstd and lz4).
Maps to MCAP's `CompressionLevel` enum: `0`=Default, `1`=Fastest, `2`=Fast, `3`=Default, `4`=Slow/Slowest.
Values greater than 4 are silently clamped. This is *not* the native zstd (1-19) or lz4 (1-12)
range — the underlying MCAP C++ library only exposes 5 presets.
- `compression_level`: Compression level preset (applies to both zstd and lz4).
Maps recorder/CLI values to MCAP presets: `0`=Default, `1`=Fastest, `2`=Fast, `3`=Default, `4`=Slow, `>=5`=Slowest.
This is *not* the native zstd (1-19) or lz4 (1-12) range; the underlying MCAP C++ library only exposes 5 presets.
- `max_disk_usage_gb`: Backward-compatible alias for `disk_usage.hard_limit_gb`
- `disk_usage`: Recorder-managed disk budget
- `enabled`: Enable warn/hard disk guard checks
Expand Down
12 changes: 5 additions & 7 deletions apps/axon_recorder/config/default_config_ros1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,11 @@ recording:
# - lz4: Faster compression, larger files
# - none: No compression, fastest, largest files
compression: zstd
# Compression level preset (applies to both zstd and lz4, range: 0-4).
# Internally maps to MCAP's CompressionLevel enum:
# 0 = use library default (Default)
# 1 = Fastest 2 = Fast 3 = Default 4 = Slow/Slowest
# Values > 4 are silently clamped to 4. This is NOT the native
# zstd (1-19) / lz4 (1-12) range -- the MCAP C++ library only exposes
# 5 presets. See core/axon_mcap/mcap_writer_wrapper.hpp.
# Compression level preset (applies to both zstd and lz4).
# Internally maps recorder/CLI values to MCAP's CompressionLevel enum:
# 0 = Default 1 = Fastest 2 = Fast 3 = Default 4 = Slow >=5 = Slowest
# This is NOT the native zstd (1-19) / lz4 (1-12) range.
# See core/axon_mcap/mcap_writer_wrapper.hpp.
compression_level: 3

# Mixed-topic writer batching. Topic workers remain per-topic serial, while
Expand Down
14 changes: 7 additions & 7 deletions apps/axon_recorder/config/default_config_ros2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ dataset:
# High-frequency subscriptions (IMU, camera): Use larger batches (100-5000)
# Low-frequency subscriptions (status, diagnostics): Use small batches (1-10)
# Control signals: Use batch_size=1 for immediate persistence
# Optional qos_depth controls ROS2 subscription history depth. It is separate
# from dataset.queue_size, which controls Axon's per-topic worker queue.
subscriptions:
# IMU data - high frequency (5 seconds of data per batch)
- name: /imu/data
Expand Down Expand Up @@ -108,13 +110,11 @@ recording:
# - lz4: Faster compression, larger files
# - none: No compression, fastest, largest files
compression: zstd
# Compression level preset (applies to both zstd and lz4, range: 0-4).
# Internally maps to MCAP's CompressionLevel enum:
# 0 = use library default (Default)
# 1 = Fastest 2 = Fast 3 = Default 4 = Slow/Slowest
# Values > 4 are silently clamped to 4. This is NOT the native
# zstd (1-19) / lz4 (1-12) range -- the MCAP C++ library only exposes
# 5 presets. See core/axon_mcap/mcap_writer_wrapper.hpp.
# Compression level preset (applies to both zstd and lz4).
# Internally maps recorder/CLI values to MCAP's CompressionLevel enum:
# 0 = Default 1 = Fastest 2 = Fast 3 = Default 4 = Slow >=5 = Slowest
# This is NOT the native zstd (1-19) / lz4 (1-12) range.
# See core/axon_mcap/mcap_writer_wrapper.hpp.
compression_level: 3

# Mixed-topic writer batching. Topic workers remain per-topic serial, while
Expand Down
12 changes: 5 additions & 7 deletions apps/axon_recorder/config/default_config_udp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,11 @@ recording:
# - lz4: Faster compression, larger files
# - none: No compression, fastest, largest files
compression: zstd
# Compression level preset (applies to both zstd and lz4, range: 0-4).
# Internally maps to MCAP's CompressionLevel enum:
# 0 = use library default (Default)
# 1 = Fastest 2 = Fast 3 = Default 4 = Slow/Slowest
# Values > 4 are silently clamped to 4. This is NOT the native
# zstd (1-19) / lz4 (1-12) range -- the MCAP C++ library only exposes
# 5 presets. See core/axon_mcap/mcap_writer_wrapper.hpp.
# Compression level preset (applies to both zstd and lz4).
# Internally maps recorder/CLI values to MCAP's CompressionLevel enum:
# 0 = Default 1 = Fastest 2 = Fast 3 = Default 4 = Slow >=5 = Slowest
# This is NOT the native zstd (1-19) / lz4 (1-12) range.
# See core/axon_mcap/mcap_writer_wrapper.hpp.
compression_level: 3

# Disk usage limits for recorder-managed local storage.
Expand Down
9 changes: 9 additions & 0 deletions apps/axon_recorder/src/config/config_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ bool ConfigParser::save_to_file(const std::string& path, const RecorderConfig& c
subscription_node["message_type"] = subscription.message_type;
subscription_node["batch_size"] = subscription.batch_size;
subscription_node["flush_interval_ms"] = subscription.flush_interval_ms;
subscription_node["qos_depth"] = subscription.qos_depth;
node["subscriptions"].push_back(subscription_node);
}

Expand Down Expand Up @@ -359,6 +360,10 @@ bool ConfigParser::parse_subscriptions(
if (subscription_node["flush_interval_ms"]) {
subscription.flush_interval_ms = subscription_node["flush_interval_ms"].as<int>();
}
if (subscription_node["qos_depth"]) {
const auto qos_depth = subscription_node["qos_depth"].as<long long>();
subscription.qos_depth = qos_depth > 0 ? static_cast<size_t>(qos_depth) : 0;
}
// Parse depth_compression section (can be boolean or object)
if (subscription_node["depth_compression"]) {
const auto& dc_node = subscription_node["depth_compression"];
Expand Down Expand Up @@ -785,6 +790,10 @@ bool ConfigParser::validate(const RecorderConfig& config, std::string& error_msg
error_msg = "Subscription batch_size must be > 0";
return false;
}
if (subscription.qos_depth == 0) {
error_msg = "Subscription qos_depth must be > 0";
return false;
}
}

const auto& writer_batch = config.recording.writer_batch;
Expand Down
3 changes: 2 additions & 1 deletion apps/axon_recorder/src/core/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2256,8 +2256,9 @@ bool AxonRecorder::setup_subscriptions() {
}

std::string options_json;
if (sub.depth_compression.enabled) {
{
nlohmann::json opts;
opts["qos_depth"] = sub.qos_depth;
opts["depth_compression"]["enabled"] = sub.depth_compression.enabled;
opts["depth_compression"]["level"] = sub.depth_compression.level;
options_json = opts.dump();
Expand Down
6 changes: 3 additions & 3 deletions apps/axon_recorder/src/core/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ struct SubscriptionConfig {
// Batch writing configuration
size_t batch_size = 1; // Number of messages to batch before writing (1 = immediate)
int flush_interval_ms = 100; // Maximum time to wait before flushing (ms)
size_t qos_depth = 10; // ROS middleware subscription history depth

// Depth compression configuration
struct DepthCompression {
Expand Down Expand Up @@ -129,9 +130,8 @@ struct RecordingConfig {
std::string profile = "ros2";
std::string compression = "zstd";
WriterBatchConfig writer_batch;
// Compression preset, range 0-4 (applies to both zstd and lz4).
// Maps to axon::mcap_wrapper::CompressionLevel: 0=Default, 1=Fastest,
// 2=Fast, 3=Default, 4=Slow/Slowest. Values >4 are clamped to 4.
// Compression preset used by recorder YAML/CLI (applies to both zstd and lz4).
// 0=Default, 1=Fastest, 2=Fast, 3=Default, 4=Slow, >=5=Slowest.
// NOT the native zstd/lz4 1-19 / 1-12 range.
int compression_level = 3;

Expand Down
58 changes: 58 additions & 0 deletions apps/axon_recorder/test/unit/test_config_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ TEST_F(ConfigParserTest, ParseValidFullConfig) {
message_type: sensor_msgs/Image
batch_size: 50
flush_interval_ms: 500
qos_depth: 64
- name: /imu/data
message_type: sensor_msgs/Imu
batch_size: 200
Expand Down Expand Up @@ -170,8 +171,10 @@ TEST_F(ConfigParserTest, ParseValidFullConfig) {
EXPECT_EQ(config.subscriptions[0].topic_name, "/camera/image");
EXPECT_EQ(config.subscriptions[0].batch_size, 50);
EXPECT_EQ(config.subscriptions[0].flush_interval_ms, 500);
EXPECT_EQ(config.subscriptions[0].qos_depth, 64u);
EXPECT_EQ(config.subscriptions[1].topic_name, "/imu/data");
EXPECT_EQ(config.subscriptions[1].batch_size, 200);
EXPECT_EQ(config.subscriptions[1].qos_depth, 10u);

EXPECT_DOUBLE_EQ(config.recording.max_disk_usage_gb, 50.0);
EXPECT_DOUBLE_EQ(config.recording.disk_usage.warn_usage_gb, 40.0);
Expand Down Expand Up @@ -293,6 +296,7 @@ TEST_F(ConfigParserTest, DefaultValuesForOptionalFields) {
// Subscription defaults
EXPECT_EQ(config.subscriptions[0].batch_size, 1); // Default batch size
EXPECT_EQ(config.subscriptions[0].flush_interval_ms, 100); // Default flush interval
EXPECT_EQ(config.subscriptions[0].qos_depth, 10u);

// Recording defaults
EXPECT_DOUBLE_EQ(config.recording.max_disk_usage_gb, 100.0);
Expand Down Expand Up @@ -433,6 +437,54 @@ TEST_F(ConfigParserTest, ValidateFailsWithZeroBatchSize) {
EXPECT_EQ(error_msg, "Subscription batch_size must be > 0");
}

TEST_F(ConfigParserTest, ValidateFailsWithZeroQosDepth) {
RecorderConfig config;
config.dataset.path = "/data";
config.dataset.mode = "create";
config.subscriptions.push_back({"/test", "std_msgs/String", 100, 100, 0});

std::string error_msg;
EXPECT_FALSE(ConfigParser::validate(config, error_msg));
EXPECT_EQ(error_msg, "Subscription qos_depth must be > 0");
}

TEST_F(ConfigParserTest, NegativeQosDepthValidatesAsInvalid) {
const std::string yaml = R"(
dataset:
path: /data
subscriptions:
- name: /test
message_type: std_msgs/String
qos_depth: -1
)";

ConfigParser parser;
RecorderConfig config;
ASSERT_TRUE(parser.load_from_string(yaml, config));
ASSERT_EQ(config.subscriptions.size(), 1);
EXPECT_EQ(config.subscriptions[0].qos_depth, 0u);

std::string error_msg;
EXPECT_FALSE(ConfigParser::validate(config, error_msg));
EXPECT_EQ(error_msg, "Subscription qos_depth must be > 0");
}

TEST_F(ConfigParserTest, OversizedQosDepthFailsToParse) {
const std::string yaml = R"(
dataset:
path: /data
subscriptions:
- name: /test
message_type: std_msgs/String
qos_depth: 9223372036854775808
)";

ConfigParser parser;
RecorderConfig config;
EXPECT_FALSE(parser.load_from_string(yaml, config));
EXPECT_FALSE(parser.get_last_error().empty());
}

TEST_F(ConfigParserTest, ValidateFailsWithInvalidMode) {
RecorderConfig config;
config.dataset.path = "/data";
Expand Down Expand Up @@ -501,6 +553,7 @@ TEST_F(ConfigParserTest, SaveToFile) {
config.dataset.mode = "create";

config.subscriptions.push_back({"/test_topic", "std_msgs/String", 50, 500});
config.subscriptions[0].qos_depth = 32;
config.recording.max_disk_usage_gb = 75.0;

auto path = (test_dir_ / "saved_config.yaml").string();
Expand All @@ -510,6 +563,11 @@ TEST_F(ConfigParserTest, SaveToFile) {

// Verify file was created
EXPECT_TRUE(fs::exists(path));

RecorderConfig loaded;
EXPECT_TRUE(parser.load_from_file(path, loaded));
ASSERT_EQ(loaded.subscriptions.size(), 1);
EXPECT_EQ(loaded.subscriptions[0].qos_depth, 32u);
}

// ============================================================================
Expand Down
36 changes: 22 additions & 14 deletions core/axon_mcap/mcap_writer_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,30 @@ static mcap::Compression to_mcap_compression(Compression compression) {
}
}

// Convert our compression level enum to MCAP's CompressionLevel.
//
// Legacy path: if options.compression_level is non-zero, clamp it into the
// enum range (0..4) and use that. This keeps backward compatibility with
// callers that historically passed values like `3` meaning "Slow-ish".
static mcap::CompressionLevel resolve_compression_level(const McapWriterOptions& options) {
int level = options.compression_level;
if (level == 0) {
level = static_cast<int>(options.compression_preset);
CompressionLevel compression_level_from_legacy_int(int level) {
if (level <= 0) {
return CompressionLevel::Default;
}
if (level < 0) {
level = 0;
}
if (level > 4) {
level = 4;

switch (level) {
case 1:
return CompressionLevel::Fastest;
case 2:
return CompressionLevel::Fast;
case 3:
return CompressionLevel::Default;
case 4:
return CompressionLevel::Slow;
default:
return CompressionLevel::Slowest;
}
}

// Convert our compression level enum to MCAP's CompressionLevel.
static mcap::CompressionLevel resolve_compression_level(const McapWriterOptions& options) {
const auto level = options.compression_level == 0
? options.compression_preset
: compression_level_from_legacy_int(options.compression_level);
return static_cast<mcap::CompressionLevel>(level);
}

Expand Down
14 changes: 11 additions & 3 deletions core/axon_mcap/mcap_writer_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ enum class Compression { None, Zstd, Lz4 };
*/
enum class CompressionLevel { Fastest = 0, Fast = 1, Default = 2, Slow = 3, Slowest = 4 };

/**
* Convert recorder/CLI legacy compression_level values to MCAP presets.
*
* The recorder historically documents `compression_level: 3` as its default
* balanced setting. MCAP's enum value 3 is Slow, so direct enum casts turn the
* documented default into an unexpectedly expensive compression path.
*/
CompressionLevel compression_level_from_legacy_int(int level);

/**
* Configuration options for MCAP writer
*/
Expand All @@ -58,9 +67,8 @@ struct McapWriterOptions {
/// compatibility and overrides this when non-zero (see `.cpp`).
CompressionLevel compression_preset = CompressionLevel::Default;

/// Legacy compression level. 0 means "use compression_preset". Non-zero
/// values are clamped into the CompressionLevel enum range (0..4) for
/// backward compatibility with code that passes raw integers like 3.
/// Legacy recorder/CLI compression level. 0 means "use compression_preset".
/// Non-zero values are translated by compression_level_from_legacy_int().
int compression_level = 0;

/// Chunk size in bytes (default 4MB)
Expand Down
11 changes: 11 additions & 0 deletions core/axon_mcap/test/test_mcap_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,17 @@ TEST_F(McapWriterTest, CompressionZstd) {
EXPECT_GT(fs::file_size(test_file_), 0);
}

TEST_F(McapWriterTest, LegacyCompressionLevelMappingMatchesRecorderDocs) {
EXPECT_EQ(compression_level_from_legacy_int(-1), CompressionLevel::Default);
EXPECT_EQ(compression_level_from_legacy_int(0), CompressionLevel::Default);
EXPECT_EQ(compression_level_from_legacy_int(1), CompressionLevel::Fastest);
EXPECT_EQ(compression_level_from_legacy_int(2), CompressionLevel::Fast);
EXPECT_EQ(compression_level_from_legacy_int(3), CompressionLevel::Default);
EXPECT_EQ(compression_level_from_legacy_int(4), CompressionLevel::Slow);
EXPECT_EQ(compression_level_from_legacy_int(5), CompressionLevel::Slowest);
EXPECT_EQ(compression_level_from_legacy_int(99), CompressionLevel::Slowest);
}

TEST_F(McapWriterTest, ThreadSafety) {
McapWriterWrapper writer;

Expand Down
10 changes: 6 additions & 4 deletions middlewares/ros2/include/ros2_subscription_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#ifndef ROS2_PLUGIN_SUBSCRIPTION_WRAPPER_HPP
#define ROS2_PLUGIN_SUBSCRIPTION_WRAPPER_HPP

#include <nlohmann/json.hpp>
#include <rclcpp/rclcpp.hpp>

#include <functional>
Expand Down Expand Up @@ -69,6 +70,8 @@ struct SubscribeOptions {
: qos(10) {}
};

void apply_subscribe_qos_options(SubscribeOptions& options, const nlohmann::json& opts);

class SubscriptionManager {
public:
explicit SubscriptionManager(rclcpp::Node::SharedPtr node);
Expand All @@ -83,10 +86,9 @@ class SubscriptionManager {
/**
* Subscribe with a zero-copy (ABI v1.2) callback.
*
* Fast path (no depth compression): retains the rclcpp::SerializedMessage
* shared_ptr and hands the recorder a pointer into its `buffer` along with
* a release function that drops the shared_ptr. No payload copy happens
* between rcl and the recorder's worker queue.
* Fast path (no depth compression): copies the rclcpp::SerializedMessage
* payload into plugin-owned storage before returning from the ROS callback,
* then hands the recorder that owned buffer plus a release function.
*
* Compression path (depth_compression.enabled): transparently falls back
* to the v1.x copy semantics internally (the compressor allocates a new
Expand Down
Loading
Loading