Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions dpdk-stdlib-net/src/backend_dpdk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,21 +169,26 @@ impl PacketBackend for DpdkBackend {
let mut mbuf = self.mempool.alloc()
.map_err(|e| io::Error::new(io::ErrorKind::Other, format!("mbuf alloc failed: {}", e)))?;

// Copy frame data into mbuf
let data = mbuf.data_mut()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Failed to get mbuf data"))?;

if data.len() < frame.len() {
// A freshly-allocated mbuf has data_len == 0, so data_mut() would return
// a zero-length slice. Compute capacity from the buffer and set the data
// length BEFORE data_mut() so it returns a correctly-sized slice. Getting
// this order wrong makes the capacity check read 0 and silently drops
// EVERY frame — this matches the proven UDP TX path in dpdk-udp.
let capacity = mbuf.buf_len() as usize - mbuf.data_offset() as usize;
if capacity < frame.len() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("Frame too large: {} bytes, mbuf capacity: {}", frame.len(), data.len()),
format!("Frame too large: {} bytes, mbuf capacity: {}", frame.len(), capacity),
));
}

data[..frame.len()].copy_from_slice(frame);
mbuf.set_data_len(frame.len() as u16);
mbuf.set_packet_len(frame.len() as u32);

let data = mbuf.data_mut()
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Failed to get mbuf data"))?;
data[..frame.len()].copy_from_slice(frame);

// Transmit
let port = self.port.lock().unwrap();
let mut packets = vec![mbuf];
Expand Down Expand Up @@ -298,10 +303,18 @@ mod tests {
// EtherType (IPv4)
frame[12..14].copy_from_slice(&[0x08, 0x00]);

// With stubs, tx_burst returns 0, so this will get WouldBlock
// The frame must pass the capacity check and reach tx_burst. With stubs
// tx_burst returns 0 → WouldBlock; on real hardware it returns Ok(len).
// It must NEVER fail the capacity check — that was the regression that
// silently dropped every TX frame (data_mut() before set_data_len()).
let result = backend.send_frame(&frame);
// Stubs return 0 for tx_burst, so we expect WouldBlock
assert!(result.is_err() || result.unwrap() == 64);
if let Err(e) = &result {
assert_eq!(
e.kind(),
std::io::ErrorKind::WouldBlock,
"send_frame must reach tx_burst, not fail the capacity check: {e}"
);
}
}

#[test]
Expand Down
21 changes: 18 additions & 3 deletions dpdk-stdlib-tcp/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,21 @@ pub fn init_dpdk_tcp_context(cfg: DpdkTcpRuntimeConfig) -> io::Result<()> {
///
/// Split out from [`run_engine_driver`] so it can be unit-tested with a mock
/// backend.
/// Send a frame, warning ONCE if the backend rejects it for a non-transient
/// reason. A persistent TX failure would otherwise be silent — silently
/// dropping every SYN-ACK once cost a full EC2 perf run to diagnose. Transient
/// WouldBlock (tx ring full) is normal backpressure and ignored.
fn send_or_warn(backend: &Arc<dyn PacketBackend>, out: &[u8]) {
if let Err(e) = backend.send_frame(out) {
if e.kind() != std::io::ErrorKind::WouldBlock {
static TX_ERR_ONCE: std::sync::Once = std::sync::Once::new();
TX_ERR_ONCE.call_once(|| {
eprintln!("tcp-engine: backend.send_frame failed — frames are being dropped: {e}");
});
}
}
}

fn drive_once(
backend: &Arc<dyn PacketBackend>,
engine: &mut TcpEngine,
Expand All @@ -140,7 +155,7 @@ fn drive_once(
src_mac.copy_from_slice(&frame[6..12]);
if let Ok(seg) = parse_tcp_packet(frame) {
for out in engine.on_segment_with_macs(&seg, src_mac, dst_mac) {
let _ = backend.send_frame(&out);
send_or_warn(backend, &out);
}
}
}
Expand All @@ -149,14 +164,14 @@ fn drive_once(
// Commands (Connect/Listen/Accept/Shutdown/SetOption/Close).
while let Ok(cmd) = cmd_rx.try_recv() {
for out in engine.on_command(cmd) {
let _ = backend.send_frame(&out);
send_or_warn(backend, &out);
}
}

// Timers + tx-ring drain → segments.
let now = engine.clock().now();
for out in engine.on_tick(now) {
let _ = backend.send_frame(&out);
send_or_warn(backend, &out);
}
}

Expand Down
Loading