Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
467eda4
Add framework for UDPm (UDP multicast) transport implementation
tsaubergine Jan 26, 2026
51f40a8
Add identifier.h to capture middleware transporter identifer string c…
tsaubergine Jan 26, 2026
02d773e
Clean up identifier functions
tsaubergine Jan 26, 2026
cdc0e0a
Clean up redundant functions in interprocess portal implementations
tsaubergine Jan 26, 2026
d682f2d
Move _publish up a level in the interprocess portal hierarchy
tsaubergine Jan 26, 2026
60dbaab
Minor cleanup
tsaubergine Jan 26, 2026
756d2ac
Move IdentifierManager up a level in the hierarchy
tsaubergine Jan 27, 2026
874485f
Push most of subscription logic into shared middleware::InterProcessP…
tsaubergine Jan 27, 2026
218b528
Move publish_serialized into InterProcessPortalBase
tsaubergine Jan 27, 2026
d276400
Put implementation of portal and forwarder subscriptions into new bas…
tsaubergine Jan 27, 2026
27fe1d2
Basic working UDPm implementation - still needs to be able to split p…
tsaubergine Jan 27, 2026
6cf6a39
goby_test_middleware_interprocess_forwarder_udpm test working. Switch…
tsaubergine Jan 27, 2026
686eee7
Merge branch '3.0' into 3.0-udp-multicast
tsaubergine Mar 23, 2026
545bdc3
Fixes
tsaubergine Mar 23, 2026
5b86a14
Restore missing CMakeLists.txt
tsaubergine Mar 24, 2026
b5cc587
Add LargeMessage packetization test for UDPM and doc700_udpm.md (#367)
Copilot Mar 24, 2026
a358e9d
Add configurable send-rate limit to UDPM transport (#369)
Copilot Mar 24, 2026
b8ce6e1
Merge branch '3.0' into 3.0-udp-multicast
tsaubergine Mar 24, 2026
b18abd8
Add ability to access Protobuf config from Julia
tsaubergine Mar 24, 2026
cba411b
Use continuous series for copilot dependencies
tsaubergine Mar 24, 2026
eaf5150
Fix TSan heap-use-after-free in InterProcessForwarder during thread s…
Copilot Mar 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/copilot-setup-steps.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ jobs:

- name: Install dependencies
run: |
sudo ./DEPENDENCIES -a
sudo ./DEPENDENCIES -a -c
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ file(GLOB_RECURSE INCLUDE_FILES RELATIVE ${goby_SRC_DIR}
src/time/*.h
src/protobuf/*.h
src/zeromq/*.h
src/udpm/*.h
)

file(GLOB_RECURSE PROTOBUF_FILES RELATIVE ${goby_SRC_DIR}
Expand Down
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ include(acomms/src.cmake)
include(util/src.cmake)
include(time/src.cmake)
include(middleware/src.cmake)
include(udpm/src.cmake)

if(NOT enable_janus_acomms)
list(FILTER ACOMMS_SRC EXCLUDE REGEX ".*janus_driver.cpp")
Expand All @@ -237,6 +238,7 @@ add_library(goby
${UTIL_SRC}
${TIME_SRC}
${MIDDLEWARE_SRC}
${UDPM_SRC}
)


Expand Down
152 changes: 152 additions & 0 deletions src/doc/markdown/doc700_udpm.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# goby-udpm: Using UDP Multicast in Goby

UDP Multicast (UDPM) is a lightweight, broker-free transport layer that uses the standard IP multicast protocol for publish/subscribe messaging. Unlike the ZeroMQ-based transport (which requires a running `gobyd` broker), the UDPM transport is fully peer-to-peer: any process that joins the multicast group can directly publish and receive messages.

UDPM is suited for high-throughput applications with multiple subscribers. As UDP has no reliability (ack) implementation, **applications must be comfortable with potential data loss** and/or be willing to handle reliability at the application level.



## Interprocess Portal

The `goby::udpm::InterProcessPortal` implements the [Portal concept](doc210_transporter.md) using a UDP multicast socket. Each process that uses the UDPM portal opens a single UDP socket that is both used for sending (to the multicast group) and receiving (by joining the multicast group). There is no separate router or manager process.

The configuration is given as a `goby::udpm::protobuf::InterProcessPortalConfig` message.

Key configuration fields:

| Field | Default | Description |
|-------|---------|-------------|
| `platform` | `"default_goby_platform"` | Platform (vehicle/system) name |
| `listen_address` | `"0.0.0.0"` | Local address to bind the UDP socket |
| `multicast_address` | `"239.142.0.2"` | IPv4 multicast group address |
| `multicast_port` | `11144` | UDP port for all multicast traffic |
| `udp_payload_bytes` | `1472` | Max UDP payload per packet (MTU 1500 − 28 bytes IP/UDP headers). Messages greater than this packet size will be automatically packetized as needed. |
| `max_send_rate_bytes_per_second` | `0` (unlimited) | Maximum transmit rate in bytes per second. Set to `0` to disable rate limiting. For example, set to `12500000` to limit to 100 Mbps. |
| `client_name` | (app name) | Unique name for this portal instance |

## Wire Protocol

Each UDP datagram carries a single UDPM packet. The packet layout is:

```
[null-terminated identifier][9-byte fixed header][data payload]
```

### Identifier

The identifier is the same null-terminated `/`-delimited string used by the ZeroMQ transport:

```
/group/scheme/type/process/thread/\0
```

These parts are as follows:

* `group`: String representation of the `goby::middleware::Group`
* `scheme`: String representation of the marshalling scheme
* `type`: Type name returned by `goby::middleware::SerializerParserHelper::type_name()`
* `process`: String representation of the publishing process id (`std::to_string(getpid())`)
* `thread`: Hex representation of the std::hash of the publishing thread id

### Fixed Header (9 bytes)

After the null terminator, every packet carries a 9-byte header encoded in network byte order:

| Bytes | Field | Type | Description |
|-------|-------|------|-------------|
| 0–3 | `message_index` | uint32 | Monotonically increasing index per identifier, wraps at 2^32 |
| 4–5 | `num_packets` | uint16 | Total number of packets in this message (1 for messages that fit in a single datagram) |
| 6–7 | `packet_count` | uint16 | Zero-based index of this packet within the message |
| 8 | `status` | uint8 | Packet type: `NORMAL=0` (this isa placeholder for future protocol enhancements) |

### Data Payload

For `NORMAL` packets, the data payload is the serialized message bytes (the chunk corresponding to the packet's position within the full message).

## Packetization

When a serialized message (including header) is larger than `udp_payload_bytes`, it is automatically split into multiple UDP datagrams. Each datagram carries one "packet" with the same `message_index` and `num_packets`, distinguished by `packet_count = 0, 1, 2, …`.

The receiving side reassembles the message: once all `num_packets` fragments for a given `(identifier, message_index)` pair have arrived, the full serialized message is reconstructed and delivered to subscribers.

### Example: 100 KB Message

With the default `udp_payload_bytes = 1472` and a typical identifier of ~40 bytes:

```
max_data_per_packet = 1472 − (40 + 1 + 9) = 1422 bytes
num_packets = ceil(102400 / 1422) = 72 packets
```

All 72 datagrams carry the same `message_index` and `num_packets = 72`, with `packet_count` ranging from 0 to 71.

## Rate Limiting

The `max_send_rate_bytes_per_second` configuration field allows limiting the transmit rate of the UDPM portal. When set to a non-zero value, packets are queued internally and sent no faster than the configured rate.

Rate limiting uses an inter-packet gap approach: after each UDP datagram is sent, the next datagram in the queue is held until enough time has elapsed such that the average transmit rate does not exceed the configured limit.

The rate limit applies to the raw UDP payload bytes (including the UDPM header and identifier). It does not account for IP/UDP framing overhead at the network layer.

### Example: Limiting to 100 Mbps

```
max_send_rate_bytes_per_second: 12500000 # 100 Mbps = 100,000,000 bits/sec / 8 = 12,500,000 bytes/sec
```

When no rate limit is set (the default, `max_send_rate_bytes_per_second = 0`), packets are sent as fast as the UDP socket allows with no queuing overhead.

## No Hold State

Unlike the ZeroMQ transport, UDPM has no "hold" mechanism. The portal immediately begins publishing when started. Subscribers should be set up before publication begins, or they may miss early messages.

## Applications

The `goby::udpm::SingleThreadApplication` and `goby::udpm::MultiThreadApplication` provide the same interface as their ZeroMQ counterparts. They are drop-in replacements for use with the UDPM transport.

The minimal configuration Protobuf message passed to either of these base classes must be:

```protobuf
import "goby/middleware/protobuf/app_config.proto";
import "goby/udpm/protobuf/interprocess_config.proto";

message BasicApplicationConfig
{
// required parameters for ApplicationBase3 class
optional goby.middleware.protobuf.AppConfig app = 1;
// required parameters for connecting to the UDPM multicast group
optional goby.udpm.protobuf.InterProcessPortalConfig interprocess = 2;
}
```

`goby::middleware::Application` reads the `app` field, and `goby::udpm::InterProcessPortal` reads the `interprocess` field.

## Implementation Details

The `goby::udpm::InterProcessPortal` is a single-threaded portal: all sending and receiving are performed on the calling thread via a Boost.Asio `io_context`. During each call to `poll()`, the io_context is polled to:

1. Process any asynchronously received UDP datagrams.
2. Decode the identifier and packet header.
3. Reassemble multi-packet messages.

Asynchronous sends are used so that the `publish()` path does not block waiting for the socket write to complete. A shared pointer to the packet buffer is captured in the send completion handler, ensuring the data lives until the OS has consumed it.

### Class Hierarchy

```
InterProcessPortalImplementation<InnerTransporter, PortalBase>
└── PortalBase<Derived, InnerTransporter> (InterProcessPortalBase or InterProcessPortalCommon)
└── InterProcessPortalCommon<Derived, InnerTransporter>
└── StaticTransporterInterface<Derived, InnerTransporter>
```

The `InterProcessPortal<InnerTransporter>` alias uses `InterProcessPortalBase` as the `PortalBase`.

### Key Private Methods

| Method | Description |
|--------|-------------|
| `_init()` | Opens the UDP socket, joins the multicast group, starts the first async receive |
| `_do_publish(identifier, bytes)` | Packetizes and asynchronously sends all packets for a message |
| `_process_received_packet(lock, raw)` | Parses identifier and header; dispatches to reassembly |
| `_poll(lock)` | Drives the Boost.Asio io_context, processes received datagrams, checks partial messages |
4 changes: 3 additions & 1 deletion src/middleware/application/multi_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "goby/middleware/transport/interprocess.h"
#include "goby/middleware/transport/interthread.h"
#include "goby/middleware/transport/intervehicle.h"
#include "goby/time/steady_clock.h"

namespace goby
{
Expand Down Expand Up @@ -224,7 +225,8 @@ class MultiThreadApplicationBase : public goby::middleware::Application<Config>,
<< running_thread_count_
<< " threads." << std::endl;

MainThreadBase::transporter().poll();
MainThreadBase::transporter().template poll<goby::time::SteadyClock>(
std::chrono::milliseconds(100));
}

goby::glog.is(goby::util::logger::DEBUG1) && goby::glog << "All threads cleanly joined."
Expand Down
2 changes: 1 addition & 1 deletion src/middleware/application/thread.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2016-2023:
// Copyright 2016-2026:
// GobySoft, LLC (2013-)
// Community contributors (see AUTHORS file)
// File authors:
Expand Down
1 change: 0 additions & 1 deletion src/middleware/io/udp_one_to_many.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

#include <array> // for array
#include <boost/asio/buffer.hpp> // for buffer
#include <boost/asio/ip/udp.hpp>
#include <boost/asio/ip/udp.hpp> // for udp, udp::endpoint
#include <boost/asio/socket_base.hpp> // for socket_base
#include <boost/system/error_code.hpp> // for error_code
Expand Down
12 changes: 10 additions & 2 deletions src/middleware/languages/julia/application.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2025:
// Copyright 2025-2026:
// GobySoft, LLC (2013-)
// Community contributors (see AUTHORS file)
// File authors:
Expand Down Expand Up @@ -115,6 +115,13 @@ template <typename App> class ApplicationWrapper

void set_loop_frequency_hertz(double freq) { app_ptr_->set_loop_frequency_hertz(freq); }

std::vector<std::uint8_t> cfg_serialized()
{
std::vector<std::uint8_t> v(app_ptr_->app_cfg().ByteSizeLong());
app_ptr_->app_cfg().SerializeToArray(&v[0], v.size());
Comment on lines +120 to +121
Copy link

Copilot AI Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cfg_serialized() allocates a vector sized by ByteSizeLong() and then calls SerializeToArray(&v[0], v.size()). If the config serializes to 0 bytes, &v[0] is undefined behavior. Prefer v.data() (and handle the empty-vector case) and consider checking the boolean return of SerializeToArray() for failure.

Suggested change
std::vector<std::uint8_t> v(app_ptr_->app_cfg().ByteSizeLong());
app_ptr_->app_cfg().SerializeToArray(&v[0], v.size());
std::vector<std::uint8_t> v;
const auto size = app_ptr_->app_cfg().ByteSizeLong();
if (size > 0)
{
v.resize(static_cast<std::size_t>(size));
if (!app_ptr_->app_cfg().SerializeToArray(v.data(),
static_cast<int>(v.size())))
{
v.clear();
}
}

Copilot uses AI. Check for mistakes.
return v;
}

private:
std::unique_ptr<App> app_ptr_;
std::map<std::string, goby::middleware::DynamicGroup> subscription_groups_;
Expand All @@ -139,7 +146,8 @@ inline void define_julia_module(jlcxx::Module& types, const std::string& app_nam
.method("cxx_run", &ApplicationWrapper<App>::run)
.method("cxx_publish", &ApplicationWrapper<App>::publish)
.method("cxx_subscribe", &ApplicationWrapper<App>::subscribe)
.method("cxx_set_loop_frequency_hertz", &ApplicationWrapper<App>::set_loop_frequency_hertz);
.method("cxx_set_loop_frequency_hertz", &ApplicationWrapper<App>::set_loop_frequency_hertz)
.method("cxx_cfg_serialized", &ApplicationWrapper<App>::cfg_serialized);
}

template <typename DataType, int scheme>
Expand Down
1 change: 1 addition & 0 deletions src/middleware/src.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ set(MIDDLEWARE_SRC
middleware/marshalling/interface.cpp
middleware/marshalling/detail/dccl_serializer_parser.cpp
middleware/transport/interthread.cpp
middleware/transport/identifier.cpp
middleware/transport/intervehicle/driver_thread.cpp
middleware/application/configuration_reader.cpp
middleware/application/tool.cpp
Expand Down
86 changes: 86 additions & 0 deletions src/middleware/transport/identifier.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#include "identifier.h"

const char goby::middleware::InterProcessIdentifierManager::delimiter{'/'};
const char goby::middleware::InterProcessIdentifierManager::delimiter_substitute{
0x1a}; // old ASCII substitute
const char goby::middleware::InterProcessIdentifierManager::end_delimiter{'\0'};

const std::string goby::middleware::InterProcessIdentifierManager::delimiter_str_{delimiter};

std::string goby::middleware::InterProcessIdentifierManager::make_identifier(
const std::string& type_name, int scheme, const std::string& group, IdentifierWildcard wildcard,
const std::string& process,
std::unordered_map<int, std::string>* schemes_buffer /* = nullptr */,
std::unordered_map<std::thread::id, std::string>* threads_buffer /*= nullptr*/)
{
// swap out delimiter with substitute
std::string sanitized_type_name = type_name;
std::replace(sanitized_type_name.begin(), sanitized_type_name.end(), delimiter,
delimiter_substitute);
std::string sanitized_group_name = group;
std::replace(sanitized_group_name.begin(), sanitized_group_name.end(), delimiter,
delimiter_substitute);
switch (wildcard)
{
default:
case IdentifierWildcard::NO_WILDCARDS:
{
auto thread = std::this_thread::get_id();
return (
delimiter_str_ + sanitized_group_name + delimiter_str_ +
(schemes_buffer ? id_component(scheme, *schemes_buffer)
: std::string(identifier_part_to_string(scheme) + delimiter_str_)) +
sanitized_type_name + delimiter_str_ + process + delimiter_str_ +
(threads_buffer ? id_component(thread, *threads_buffer)
: std::string(identifier_part_to_string(thread) + delimiter_str_)));
}
case IdentifierWildcard::THREAD_WILDCARD:
{
return (delimiter_str_ + sanitized_group_name + delimiter_str_ +
(schemes_buffer
? id_component(scheme, *schemes_buffer)
: std::string(identifier_part_to_string(scheme) + delimiter_str_)) +
sanitized_type_name + delimiter_str_ + process + delimiter_str_);
}
case IdentifierWildcard::PROCESS_THREAD_WILDCARD:
{
return (delimiter_str_ + sanitized_group_name + delimiter_str_ +
(schemes_buffer
? id_component(scheme, *schemes_buffer)
: std::string(identifier_part_to_string(scheme) + delimiter_str_)) +
sanitized_type_name + delimiter_str_);
}
}
}
std::tuple<std::string, int, std::string, int, std::size_t>
goby::middleware::InterProcessIdentifierManager::parse_identifier(const std::string& identifier)
{
enum
{
POS_GROUP = 0,
POS_SCHEME = 1,
POS_TYPE = 2,
POS_PROCESS = 3,
POS_THREAD = 4,
POS_MAX = POS_THREAD
};

const int number_elements = POS_MAX + 1;
std::string::size_type previous_delimiter = 0;
std::vector<std::string> elem;
for (auto i = 0; i < number_elements; ++i)
{
auto delimiter_pos = identifier.find(delimiter, previous_delimiter + 1);
elem.push_back(
identifier.substr(previous_delimiter + 1, delimiter_pos - (previous_delimiter + 1)));
previous_delimiter = delimiter_pos;
}

auto& group = elem[POS_GROUP];
auto& type = elem[POS_TYPE];
std::replace(type.begin(), type.end(), delimiter_substitute, delimiter);
std::replace(group.begin(), group.end(), delimiter_substitute, delimiter);
return std::make_tuple(
elem[POS_GROUP], middleware::MarshallingScheme::from_string(elem[POS_SCHEME]),
elem[POS_TYPE], std::stoi(elem[POS_PROCESS]), std::stoull(elem[POS_THREAD], nullptr, 16));
}
Loading
Loading