Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d2bd0d1
Add tracing subscriber hooks into tests
Notgnoshi Mar 14, 2026
3cc6f53
Add CanFrame convenience utilities
Notgnoshi Mar 14, 2026
1ca52f0
recv: Add regular blocking reads in dedicated threads
Notgnoshi Mar 14, 2026
3cb2721
recv: Add epoll backend
Notgnoshi Mar 14, 2026
68b2bc2
recv: Add recvmmsg backend
Notgnoshi Mar 14, 2026
b573eef
recv: Add single shot io_uring backend
Notgnoshi Mar 14, 2026
8777d4d
recv: io_uring multishot backend
Notgnoshi Mar 14, 2026
4fb7551
Fix incorrect Fedora version number
Notgnoshi Mar 17, 2026
6f42956
Add benchmarks to candumpr-architecture design document
Notgnoshi Mar 15, 2026
e4cf320
Add shareable benchmark utilities to vcan-fixture
Notgnoshi Mar 17, 2026
c9ea3de
Enable adding benchmark hooks to the dedicated recv threads
Notgnoshi Mar 17, 2026
1ecbe2a
Add benchmark infrastructure and system impact bench
Notgnoshi Mar 17, 2026
1aa888f
Add system contention benchmark
Notgnoshi Mar 17, 2026
529a1a7
Add callgrind instruction count et al. benchmark
Notgnoshi Mar 17, 2026
c992aa9
Use lost frames rather than loss%
Notgnoshi Mar 22, 2026
1fd5853
Improve benchmark table generation
Notgnoshi Mar 22, 2026
0c64267
Make bench logging consistent
Notgnoshi Mar 22, 2026
26211f8
Actually measure the SKB overhead
Notgnoshi Mar 22, 2026
6e4af88
performance tweaks from benchmarks
Notgnoshi Mar 22, 2026
b2d5a7c
Add support for HW timestamps and dropcounts to uring multishot
Notgnoshi Mar 22, 2026
4c18d12
Fix use-after-free bug
Notgnoshi Mar 22, 2026
26d3ab2
Add benchmark results
Notgnoshi Mar 17, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,10 @@ description = "Opinionated CAN utils written in Rust"
[workspace.dependencies]
ctor = "0.6"
eyre = "0.6"
gungraun = "0.17"
io-uring = "0.7"
libc = "0.2"
neli = "0.7"
tabled = "0.18"
tracing = "0.1"
tracing-subscriber = "0.3"
20 changes: 19 additions & 1 deletion candumpr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,26 @@ description = "Log CAN traffic from multiple networks"
ci = []

[dependencies]
eyre.workspace = true
io-uring.workspace = true
libc.workspace = true

[dev-dependencies]
ctor.workspace = true
libc.workspace = true
gungraun.workspace = true
tabled.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
vcan-fixture = { path = "../vcan-fixture" }

[[bench]]
name = "recv_cost"
harness = false

[[bench]]
name = "recv_impact"
harness = false

[[bench]]
name = "recv_contention"
harness = false
308 changes: 308 additions & 0 deletions candumpr/benches/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,308 @@
use std::os::unix::io::{AsFd, BorrowedFd, OwnedFd};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{Duration, Instant};

use candumpr::can::{self, CanFrame};
use candumpr::recv::dedicated::DedicatedRecv;
use candumpr::recv::epoll::EpollRecv;
use candumpr::recv::recvmmsg::RecvmmsgRecv;
use candumpr::recv::uring::UringRecv;
use candumpr::recv::uring_multi::UringMultiRecv;
use tabled::Tabled;
use vcan_fixture::VcanHarness;
use vcan_fixture::bench::{Rusage, getrusage_thread};

// --- Backend dispatch ---

type BackendRunFn = fn(Vec<OwnedFd>, Arc<AtomicBool>, &AtomicU64) -> (u64, Rusage);

pub struct BackendDef {
pub name: &'static str,
pub blocking: bool,
pub run: BackendRunFn,
}

pub const BACKENDS: &[BackendDef] = &[
BackendDef {
name: "dedicated",
blocking: true,
run: run_dedicated,
},
BackendDef {
name: "epoll",
blocking: false,
run: run_epoll,
},
BackendDef {
name: "recvmmsg",
blocking: false,
run: run_recvmmsg,
},
BackendDef {
name: "uring",
blocking: false,
run: run_uring,
},
BackendDef {
name: "uring_multi",
blocking: false,
run: run_uring_multi,
},
];

// --- Backend run functions ---
//
// Single-threaded backends: wrap the backend's run() with getrusage_thread() before/after.
// Dedicated backend: uses run_instrumented() to collect per-thread RUSAGE_THREAD and
// aggregate the deltas.

fn run_dedicated(sockets: Vec<OwnedFd>, stop: Arc<AtomicBool>, count: &AtomicU64) -> (u64, Rusage) {
let backend = DedicatedRecv::new(sockets);
let rusage = std::sync::Mutex::new(Rusage::default());
let total = backend
.run_instrumented(
stop,
&|_idx, _frame, _meta| {
count.fetch_add(1, Ordering::Relaxed);
},
&|_idx, inner| {
let before = getrusage_thread();
let count = inner()?;
*rusage.lock().unwrap() += getrusage_thread().delta(&before);
Ok(count)
},
)
.unwrap();
(total, rusage.into_inner().unwrap())
}

fn run_epoll(sockets: Vec<OwnedFd>, stop: Arc<AtomicBool>, count: &AtomicU64) -> (u64, Rusage) {
let mut backend = EpollRecv::new(sockets).unwrap();
let before = getrusage_thread();
let total = backend
.run(stop, &mut |_idx, _frame, _meta| {
count.fetch_add(1, Ordering::Relaxed);
})
.unwrap();
let after = getrusage_thread();
(total, after.delta(&before))
}

fn run_recvmmsg(sockets: Vec<OwnedFd>, stop: Arc<AtomicBool>, count: &AtomicU64) -> (u64, Rusage) {
let mut backend = RecvmmsgRecv::new(sockets).unwrap();
let before = getrusage_thread();
let total = backend
.run(stop, &mut |_idx, _frame, _meta| {
count.fetch_add(1, Ordering::Relaxed);
})
.unwrap();
let after = getrusage_thread();
(total, after.delta(&before))
}

fn run_uring(sockets: Vec<OwnedFd>, stop: Arc<AtomicBool>, count: &AtomicU64) -> (u64, Rusage) {
let mut backend = UringRecv::new(sockets).unwrap();
let before = getrusage_thread();
let total = backend
.run(stop, &mut |_idx, _frame, _meta| {
count.fetch_add(1, Ordering::Relaxed);
})
.unwrap();
let after = getrusage_thread();
(total, after.delta(&before))
}

fn run_uring_multi(
sockets: Vec<OwnedFd>,
stop: Arc<AtomicBool>,
count: &AtomicU64,
) -> (u64, Rusage) {
let mut backend = UringMultiRecv::new(sockets).unwrap();
let before = getrusage_thread();
let total = backend
.run(stop, &mut |_idx, _frame, _meta| {
count.fetch_add(1, Ordering::Relaxed);
})
.unwrap();
let after = getrusage_thread();
(total, after.delta(&before))
}

// --- Sender ---

/// Send frames at a fixed rate for `duration`, then set `stop` to signal the receiver.
fn sender_loop(
tx: BorrowedFd<'_>,
iface_idx: usize,
interval: Duration,
duration: Duration,
stop: &AtomicBool,
count: &AtomicU64,
) {
let mut frame_idx = 0u32;
let start = Instant::now();
let mut next = start;
while start.elapsed() < duration {
next += interval;
let now = Instant::now();
if next > now {
std::thread::sleep(next - now);
}
let frame = make_frame(iface_idx, frame_idx);
if can::send_frame(tx, &frame).is_ok() {
count.fetch_add(1, Ordering::Relaxed);
}
frame_idx += 1;
}
stop.store(true, Ordering::Relaxed);
}

fn make_frame(iface_idx: usize, frame_idx: u32) -> CanFrame {
CanFrame::new(
((iface_idx as u32) << 8) | (frame_idx & 0xFF) | libc::CAN_EFF_FLAG,
&[
iface_idx as u8,
frame_idx as u8,
0xDE,
0xAD,
0xBE,
0xEF,
0xCA,
0xFE,
],
)
}

// --- Result ---

struct RawRun {
sent: u64,
recv: u64,
user_us: i64,
sys_us: i64,
vol_csw: i64,
invol_csw: i64,
}

#[derive(Tabled)]
pub struct RunResult {
pub backend: &'static str,
pub ifaces: usize,
pub rate: u32,
pub sent: u64,
pub recv: u64,
pub lost: u64,
pub user_ms: String,
pub sys_ms: String,
pub vol_csw: i64,
pub invol_csw: i64,
}

pub fn print_results(results: &[RunResult]) {
use tabled::settings::Style;
let table = tabled::Table::new(results)
.with(Style::markdown())
.to_string();
eprintln!("{table}");
}

// --- Run config ---

fn run_config(backend: &BackendDef, ifaces: usize, rate: u32, duration: Duration) -> RawRun {
tracing::info!(
backend = backend.name,
ifaces,
rate,
?duration,
"starting run"
);
let vcans = VcanHarness::new(ifaces).unwrap();

let mut tx_sockets = Vec::with_capacity(ifaces);
let mut rx_sockets = Vec::with_capacity(ifaces);
for name in vcans.names() {
let rx = if backend.blocking {
can::open_can_raw_blocking(name).unwrap()
} else {
can::open_can_raw(name).unwrap()
};
rx_sockets.push(rx);
tx_sockets.push(can::open_can_raw_blocking(name).unwrap());
}

let stop = Arc::new(AtomicBool::new(false));
let send_count = AtomicU64::new(0);
let recv_count = AtomicU64::new(0);
let interval = Duration::from_secs_f64(1.0 / rate as f64);

let (recv, rusage) = std::thread::scope(|scope| {
// Senders: one per interface, paced by sleeping between frames.
// Each sender runs for `duration` then sets the stop flag.
let stop_ref: &AtomicBool = &stop;
let send_count_ref = &send_count;
for (idx, tx) in tx_sockets.iter().enumerate() {
scope.spawn(move || {
tracing::debug!(iface = idx, "sender started");
sender_loop(
tx.as_fd(),
idx,
interval,
duration,
stop_ref,
send_count_ref,
);
tracing::debug!(iface = idx, "sender done");
});
}

// Receiver: backend under test on the calling thread.
tracing::debug!("receiver started");
let result = (backend.run)(rx_sockets, stop.clone(), &recv_count);
tracing::debug!("receiver done");
result
});

let sent = send_count.load(Ordering::Relaxed);
tracing::info!(sent, recv, "run complete");

RawRun {
sent,
recv,
user_us: rusage.user_us,
sys_us: rusage.sys_us,
vol_csw: rusage.vol_csw,
invol_csw: rusage.invol_csw,
}
}

/// Run `reps` repetitions and return the averaged result.
pub fn run_repetitions(
backend: &BackendDef,
ifaces: usize,
rate: u32,
duration: Duration,
reps: usize,
) -> RunResult {
let runs: Vec<RawRun> = (0..reps)
.map(|_| run_config(backend, ifaces, rate, duration))
.collect();
let n = reps as u64;
let ni = reps as i64;
let sent = runs.iter().map(|r| r.sent).sum::<u64>() / n;
let recv = runs.iter().map(|r| r.recv).sum::<u64>() / n;
let user_us = runs.iter().map(|r| r.user_us).sum::<i64>() / ni;
let sys_us = runs.iter().map(|r| r.sys_us).sum::<i64>() / ni;
RunResult {
backend: backend.name,
ifaces,
rate,
sent,
recv,
lost: sent.saturating_sub(recv),
user_ms: format!("{:.1}", user_us as f64 / 1000.0),
sys_ms: format!("{:.1}", sys_us as f64 / 1000.0),
vol_csw: runs.iter().map(|r| r.vol_csw).sum::<i64>() / ni,
invol_csw: runs.iter().map(|r| r.invol_csw).sum::<i64>() / ni,
}
}
34 changes: 34 additions & 0 deletions candumpr/benches/recv_contention.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//! Benchmark C: backend behavior under CPU contention.
//!
//! Runs each backend with 4 interfaces at 4000 fps while CPU burner threads consume
//! 75% or 90% of available cycles. Measures whether backends degrade gracefully or
//! drop frames when starved of CPU.
//!
//! Requires: vcan kernel module.

mod common;

use std::time::Duration;

const RUN_DURATION: Duration = Duration::from_secs(5);
const REPETITIONS: usize = 4;

fn main() {
tracing_subscriber::fmt()
.with_writer(std::io::stderr)
.init();
vcan_fixture::enter_namespace();
vcan_fixture::bench::pin_to_cores(4);

for &load_pct in &[75, 90] {
eprintln!("\n--- CPU load: {load_pct}% ---\n");
let mut results = Vec::new();
for backend in common::BACKENDS {
let (burn_stop, burn_handles) = vcan_fixture::bench::start_cpu_load(4, load_pct);
let mean = common::run_repetitions(backend, 4, 4000, RUN_DURATION, REPETITIONS);
vcan_fixture::bench::stop_cpu_load(burn_stop, burn_handles);
results.push(mean);
}
common::print_results(&results);
}
}
Loading