Skip to content

Commit 383fd14

Browse files
duncanistaclaude
andauthored
feat(dogstatsd): add configurable SO_RCVBUF for UDP socket (#73)
* feat(dogstatsd): add configurable SO_RCVBUF for UDP socket Use the `socket2` crate to create the UDP socket with a configurable receive buffer size (`SO_RCVBUF`). On constrained environments like AWS Lambda, the kernel caps `SO_RCVBUF` at ~416 KiB (2x rmem_max), which can cause silent packet loss under burst traffic. This change lets operators tune the buffer via `DD_DOGSTATSD_SO_RCVBUF`, matching the Go agent's configuration option. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * remove comment about which env var it would represent in other crates * update comments * fmt & tests * socket2@0.6 * fixes * remove io-util already gated --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 086c937 commit 383fd14

5 files changed

Lines changed: 90 additions & 3 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/datadog-serverless-compat/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ async fn start_dogstatsd(
232232
metric_namespace,
233233
#[cfg(all(windows, feature = "windows-pipes"))]
234234
windows_pipe_name,
235+
so_rcvbuf: None,
235236
};
236237
let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new();
237238

crates/dogstatsd/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ serde = { version = "1.0.197", default-features = false, features = ["derive"] }
2020
serde_json = { version = "1.0.116", default-features = false, features = ["alloc"] }
2121
thiserror = { version = "1.0.58", default-features = false }
2222
tokio = { version = "1.37.0", default-features = false, features = ["macros", "rt-multi-thread", "net"] }
23+
socket2 = { version = "0.6", default-features = false }
2324
tokio-util = { version = "0.7.11", default-features = false }
2425
tracing = { version = "0.1.40", default-features = false }
2526
regex = { version = "1.10.6", default-features = false }

crates/dogstatsd/src/dogstatsd.rs

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use std::str::Split;
1313
use crate::aggregator_service::AggregatorHandle;
1414
use crate::errors::ParseError::UnsupportedType;
1515
use crate::metric::{id, parse, Metric};
16+
use socket2::{Domain, Protocol, Socket, Type};
1617
use tracing::{debug, error, trace};
1718

1819
// Windows-specific imports
@@ -36,6 +37,9 @@ pub struct DogStatsDConfig {
3637
/// Optional Windows named pipe name. (e.g., "\\\\.\\pipe\\my_pipe").
3738
#[cfg(all(windows, feature = "windows-pipes"))]
3839
pub windows_pipe_name: Option<String>,
40+
/// Optional socket receive buffer size (SO_RCVBUF) in bytes.
41+
/// If None, uses the OS default. Increase this to reduce packet loss under high load.
42+
pub so_rcvbuf: Option<usize>,
3943
}
4044

4145
/// Represents the source of a DogStatsD message. Varies by transport method.
@@ -174,9 +178,9 @@ impl DogStatsD {
174178
let addr = format!("{}:{}", config.host, config.port);
175179
// TODO (UDS socket)
176180
#[allow(clippy::expect_used)]
177-
let socket = tokio::net::UdpSocket::bind(addr)
181+
let socket = create_udp_socket(&addr, config.so_rcvbuf)
178182
.await
179-
.expect("couldn't bind to address");
183+
.expect("couldn't create UDP socket");
180184
BufferReader::UdpSocket(socket)
181185
};
182186

@@ -267,6 +271,53 @@ impl DogStatsD {
267271
}
268272
}
269273

274+
async fn create_udp_socket(
275+
addr: &str,
276+
so_rcvbuf: Option<usize>,
277+
) -> std::io::Result<tokio::net::UdpSocket> {
278+
// Resolve via lookup_host to support hostnames (e.g. "localhost:8125"),
279+
// matching the previous behavior of tokio::net::UdpSocket::bind().
280+
let socket_addr = tokio::net::lookup_host(addr).await?.next().ok_or_else(|| {
281+
std::io::Error::new(
282+
std::io::ErrorKind::InvalidInput,
283+
format!("Could not resolve address '{}'", addr),
284+
)
285+
})?;
286+
287+
let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
288+
289+
// Log the kernel's rmem_max cap so operators can tell
290+
// whether the requested SO_RCVBUF was capped by the OS.
291+
#[cfg(target_os = "linux")]
292+
if let Ok(rmem_max) = std::fs::read_to_string("/proc/sys/net/core/rmem_max") {
293+
debug!("DogStatsD Kernel rmem_max={} bytes", rmem_max.trim());
294+
}
295+
296+
if let Some(buf_size) = so_rcvbuf {
297+
socket.set_recv_buffer_size(buf_size)?;
298+
299+
// The kernel may cap the value; log what we actually got.
300+
let actual = socket.recv_buffer_size().unwrap_or(0);
301+
debug!(
302+
"DogStatsD SO_RCVBUF: requested={} bytes, actual={} bytes",
303+
buf_size, actual
304+
);
305+
} else {
306+
debug!(
307+
"DogStatsD using default SO_RCVBUF: {} bytes",
308+
socket.recv_buffer_size().unwrap_or(0)
309+
);
310+
}
311+
312+
// Required for tokio compatibility
313+
socket.set_nonblocking(true)?;
314+
315+
socket.bind(&socket_addr.into())?;
316+
317+
let std_socket: std::net::UdpSocket = socket.into();
318+
tokio::net::UdpSocket::from_std(std_socket)
319+
}
320+
270321
/// Named Pipe server - accepts client connections and forwards metrics.
271322
///
272323
/// Uses a multi-instance approach (like winio in the main agent):
@@ -474,6 +525,34 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d
474525
.starts_with("custom.namespace.my.metric"));
475526
}
476527

528+
#[tokio::test]
529+
async fn test_create_udp_socket_default_so_rcvbuf() {
530+
let socket = super::create_udp_socket("127.0.0.1:0", None).await.unwrap();
531+
let std_socket = socket.into_std().unwrap();
532+
let s2 = socket2::Socket::from(std_socket);
533+
let buf_size = s2.recv_buffer_size().unwrap();
534+
assert!(buf_size > 0, "default SO_RCVBUF should be non-zero");
535+
}
536+
537+
#[tokio::test]
538+
async fn test_create_udp_socket_custom_so_rcvbuf() {
539+
let requested: usize = 262_144;
540+
let socket = super::create_udp_socket("127.0.0.1:0", Some(requested))
541+
.await
542+
.unwrap();
543+
let std_socket = socket.into_std().unwrap();
544+
let s2 = socket2::Socket::from(std_socket);
545+
let actual = s2.recv_buffer_size().unwrap();
546+
// The kernel may double the value (Linux) or cap it, but it should
547+
// be at least as large as the requested size.
548+
assert!(
549+
actual >= requested,
550+
"SO_RCVBUF actual ({}) should be >= requested ({})",
551+
actual,
552+
requested
553+
);
554+
}
555+
477556
async fn setup_and_consume_dogstatsd(
478557
statsd_string: &str,
479558
metric_namespace: Option<String>,

crates/dogstatsd/tests/integration_test.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ async fn start_dogstatsd(aggregator_handle: AggregatorHandle) -> CancellationTok
102102
metric_namespace: None,
103103
#[cfg(all(windows, feature = "windows-pipes"))]
104104
windows_pipe_name: None,
105+
so_rcvbuf: None,
105106
};
106107
let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new();
107108
let dogstatsd_client = DogStatsD::new(
@@ -308,6 +309,7 @@ async fn test_named_pipe_basic_communication() {
308309
port: 0,
309310
metric_namespace: None,
310311
windows_pipe_name: Some(pipe_name.to_string()),
312+
so_rcvbuf: None,
311313
},
312314
handle,
313315
cancel_token,
@@ -362,6 +364,7 @@ async fn test_named_pipe_disconnect_reconnect() {
362364
port: 0,
363365
metric_namespace: None,
364366
windows_pipe_name: Some(pipe_name.to_string()),
367+
so_rcvbuf: None,
365368
},
366369
handle,
367370
cancel_token_clone,
@@ -431,6 +434,7 @@ async fn test_named_pipe_cancellation() {
431434
port: 0,
432435
metric_namespace: None,
433436
windows_pipe_name: Some(pipe_name.to_string()),
437+
so_rcvbuf: None,
434438
},
435439
handle,
436440
cancel_token_clone,
@@ -474,6 +478,7 @@ async fn test_buffer_split_message() {
474478
port: 0,
475479
metric_namespace: None,
476480
windows_pipe_name: Some(pipe_name.to_string()),
481+
so_rcvbuf: None,
477482
},
478483
handle,
479484
cancel_token_clone,

0 commit comments

Comments
 (0)