Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ members = [
"apps/tokio-tcp-echo",
"apps/tcp-synthetic-bench",
"apps/plain-tcp-echo",
"apps/tcp-kernel-client",
]

[workspace.package]
Expand Down
13 changes: 13 additions & 0 deletions apps/tcp-kernel-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "tcp-kernel-client"
version.workspace = true
edition.workspace = true
license.workspace = true
publish = false

[[bin]]
name = "tcp-kernel-client"
path = "src/main.rs"

[dependencies]
clap = { version = "4", features = ["derive"] }
97 changes: 97 additions & 0 deletions apps/tcp-kernel-client/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
//! Pure-kernel (`std::net`) TCP echo client — the "reference" peer for the TCP
//! smoke tiers.
//!
//! It uses the OS kernel TCP stack (NO DPDK), so it exercises our DPDK
//! `tcp-echo` server against a known-good, independent TCP implementation over
//! the real NIC. This is the exact scenario that surfaced the codec padding bug:
//! a standard stack's bare ACKs are NIC-padded to 60 bytes, which our parser
//! must not mistake for payload.
//!
//! Prints `TCP_KERNEL_OK round_trips=N` on full success; on any error/mismatch
//! prints `TCP_KERNEL_FAIL <reason>` and exits non-zero.

use clap::Parser;
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpStream};
use std::time::Duration;

#[derive(Parser)]
#[command(name = "tcp-kernel-client")]
#[command(about = "Pure-kernel std::net TCP echo client (reference peer for TCP smoke tests)")]
struct Args {
/// Target IP address
#[arg(long, default_value = "10.0.0.2")]
target: String,

/// Target port
#[arg(long, default_value_t = 9000)]
port: u16,

/// Number of connect → echo → close iterations
#[arg(long, default_value_t = 5)]
count: u32,

/// Payload size in bytes per round-trip
#[arg(long, default_value_t = 64)]
payload_size: usize,

/// Connect/read/write timeout in seconds
#[arg(long, default_value_t = 10)]
timeout_secs: u64,
}

/// One connect → send → recv-echo → verify → close cycle over the kernel stack.
fn one_round_trip(addr: &SocketAddr, payload: &[u8], timeout: Duration) -> Result<(), String> {
let mut stream =
TcpStream::connect_timeout(addr, timeout).map_err(|e| format!("connect failed: {e}"))?;
stream
.set_read_timeout(Some(timeout))
.map_err(|e| format!("set_read_timeout failed: {e}"))?;
stream
.set_write_timeout(Some(timeout))
.map_err(|e| format!("set_write_timeout failed: {e}"))?;

stream
.write_all(payload)
.map_err(|e| format!("write failed: {e}"))?;

let mut recv = vec![0u8; payload.len()];
let mut got = 0;
while got < payload.len() {
match stream.read(&mut recv[got..]) {
Ok(0) => return Err(format!("server closed after {got}/{} bytes", payload.len())),
Ok(n) => got += n,
Err(e) => return Err(format!("read failed after {got} bytes: {e}")),
}
}
if recv != payload {
return Err("echo mismatch (bytes differ)".to_string());
}
Ok(())
}

fn main() {
let args = Args::parse();

let addr: SocketAddr = match format!("{}:{}", args.target, args.port).parse() {
Ok(a) => a,
Err(e) => {
println!("TCP_KERNEL_FAIL bad address {}:{}: {e}", args.target, args.port);
std::process::exit(1);
}
};
let timeout = Duration::from_secs(args.timeout_secs);
let payload: Vec<u8> = (0..args.payload_size).map(|i| (i % 256) as u8).collect();

println!(
"tcp-kernel-client: {} round-trips of {}B to {}",
args.count, args.payload_size, addr
);
for i in 0..args.count {
if let Err(reason) = one_round_trip(&addr, &payload, timeout) {
println!("TCP_KERNEL_FAIL iteration {}/{}: {reason}", i + 1, args.count);
std::process::exit(1);
}
}
println!("TCP_KERNEL_OK round_trips={}", args.count);
}
8 changes: 8 additions & 0 deletions deploy/cdk/lib/dpdk-test-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ export class DpdkTestStack extends cdk.Stack {
'ICMP traffic between instances'
);

// Allow TCP between DPDK interfaces (tcp-echo smoke tiers on port 9000).
// Without this, Tier-1 DPDK<->DPDK TCP handshakes are silently dropped.
dpdkSecurityGroup.addIngressRule(
dpdkSecurityGroup,
ec2.Port.allTcp(),
'TCP between DPDK interfaces (tcp-echo smoke tiers)'
);

// Allow UDP from management interfaces (test-client sends from primary ENI
// which is in the mgmt security group, targeting the DPDK ENI)
dpdkSecurityGroup.addIngressRule(
Expand Down
18 changes: 18 additions & 0 deletions dpdk-stdlib-tcp/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,24 @@ impl TcpEngine {
entry_end.gt(tcb.snd_una)
});

// Drain acknowledged bytes off the front of send_buf and rebase
// the surviving retransmit-entry offsets. Without this, acked
// bytes linger in send_buf: the next write pushes send_buf past
// one MSS while has_unacked_data is (wrongly) still set, so Nagle
// withholds every subsequent segment and the connection stalls
// after the first exchange (the sustained bidir_multi hang).
let drain_n = (bytes_acked as usize).min(tcb.send_buf.len());
if drain_n > 0 {
tcb.send_buf.drain(..drain_n);
for entry in tcb.retransmit_queue.iter_mut() {
entry.offset = entry.offset.saturating_sub(drain_n);
}
}
// Nothing left in flight → clear the Nagle "unacked data" latch.
if tcb.snd_una == tcb.snd_nxt {
tcb.has_unacked_data = false;
}

// Update congestion control
let mss = tcb.effective_mss();
tcb.congestion.on_ack(bytes_acked, mss);
Expand Down
170 changes: 170 additions & 0 deletions dpdk-stdlib-tcp/tests/loopback_stall_repro.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
//! Offline loopback regression for the sustained-exchange (bidir_multi) stall.
//!
//! Wires a CLIENT `TcpEngine` and a SERVER `TcpEngine` through in-memory frame
//! queues (no NIC, no EC2). Drives N sequential 64B echo exchanges on ONE
//! connection and asserts they all complete — mirroring
//! `tcp-test-client --mode bidir --count 20` against `tcp-echo`.
//!
//! FAILS before the fix (stalls at the 2nd exchange), PASSES after.
//!
//! Root cause: `handle_established`'s ACK path prunes the retransmit queue but
//! never drains acknowledged bytes from the front of `send_buf`, and never
//! clears `has_unacked_data`. After the first send+ACK, `send_buf` still holds
//! the stale acked bytes while `already_sent_offset` (derived from the now-empty
//! retransmit queue) resets to 0, so the next small write is misclassified as
//! "unsent" behind Nagle (`has_unacked_data == true`, len < MSS) and is never
//! transmitted.

use std::collections::VecDeque;
use std::net::SocketAddr;
use std::sync::{mpsc, Arc};
use std::time::Duration;

use dpdk_stdlib_tcp::clock::{Clock, MockClock};
use dpdk_stdlib_tcp::codec::parse_tcp_packet;
use dpdk_stdlib_tcp::contract::{
oneshot_channel, CommandSender, ConnectionHandle, EngineCommand, EngineWakeup,
};
use dpdk_stdlib_tcp::engine::{EngineConfig, TcpEngine};
use dpdk_stdlib_tcp::state::{FourTuple, TcpState};

const CLIENT_MAC: [u8; 6] = [0x02, 0, 0, 0, 0, 0x01];
const SERVER_MAC: [u8; 6] = [0x02, 0, 0, 0, 0, 0x02];

/// Feed raw frames into an engine; return the frames it emits in response.
fn deliver(engine: &mut TcpEngine, frames: &[Vec<u8>]) -> Vec<Vec<u8>> {
let mut out = Vec::new();
for frame in frames {
if frame.len() < 14 {
continue;
}
let mut dst_mac = [0u8; 6];
let mut src_mac = [0u8; 6];
dst_mac.copy_from_slice(&frame[0..6]);
src_mac.copy_from_slice(&frame[6..12]);
if let Ok(seg) = parse_tcp_packet(frame) {
out.extend(engine.on_segment_with_macs(&seg, src_mac, dst_mac));
}
}
out
}

#[test]
fn loopback_20_echo_exchanges_do_not_stall() {
let clock = Arc::new(MockClock::new());

let (ctx_raw, _crx) = mpsc::channel();
let client_cmd = CommandSender::new(ctx_raw, Arc::new(EngineWakeup::new()));
let (stx_raw, _srx) = mpsc::channel();
let server_cmd = CommandSender::new(stx_raw, Arc::new(EngineWakeup::new()));

let mut client =
TcpEngine::with_cmd_tx(clock.clone(), EngineConfig::default(), client_cmd.clone());
let mut server =
TcpEngine::with_cmd_tx(clock.clone(), EngineConfig::default(), server_cmd.clone());

let client_addr: SocketAddr = "10.0.0.1:50000".parse().unwrap();
let server_addr: SocketAddr = "10.0.0.2:9000".parse().unwrap();

// Server: Listen.
let (resp_tx, resp_rx) = oneshot_channel();
server.on_command(EngineCommand::Listen {
addr: server_addr,
backlog: 16,
response: resp_tx,
});
let _ = resp_rx.recv();

// Client: Connect.
let key = FourTuple { local: client_addr, remote: server_addr };
let client_handle = Arc::new(ConnectionHandle::new(65536, 65536, client_cmd.clone(), key));
let (cresp_tx, _cresp_rx) = oneshot_channel();
let mut client_out = client.on_command(EngineCommand::Connect {
local: client_addr,
remote: server_addr,
src_mac: CLIENT_MAC,
dst_mac: SERVER_MAC,
handle: client_handle.clone(),
response: cresp_tx,
});

let mut to_server: VecDeque<Vec<u8>> = client_out.drain(..).collect();
let mut to_client: VecDeque<Vec<u8>> = VecDeque::new();
let server_key = FourTuple { local: server_addr, remote: client_addr };

// Pump the three-way handshake to completion.
for _ in 0..20 {
let s_out = deliver(&mut server, &to_server.drain(..).collect::<Vec<_>>());
to_client.extend(s_out);
let c_out = deliver(&mut client, &to_client.drain(..).collect::<Vec<_>>());
to_server.extend(c_out);

clock.advance(Duration::from_millis(1));
to_client.extend(server.on_tick(clock.now()));
to_server.extend(client.on_tick(clock.now()));

let c_est = client.tcbs.get(&key).map(|t| t.state) == Some(TcpState::Established);
let s_est = server.tcbs.get(&server_key).map(|t| t.state) == Some(TcpState::Established);
if c_est && s_est && to_server.is_empty() && to_client.is_empty() {
break;
}
}

assert_eq!(
client.tcbs.get(&key).map(|t| t.state),
Some(TcpState::Established),
"client failed to establish"
);
let server_handle = {
let tcb = server.tcbs.get(&server_key).expect("server TCB");
assert_eq!(tcb.state, TcpState::Established, "server failed to establish");
tcb.handle.clone()
};

let payload: Vec<u8> = (0..64u16).map(|i| (i % 256) as u8).collect();

// 20 sequential echo exchanges on ONE connection.
for iter in 0..20 {
let w = client_handle.tx_ring.write(&payload);
assert_eq!(w, 64, "iter {iter}: client tx_ring write short");

let mut got_back = 0usize;
let mut recv_buf = vec![0u8; 64];
let mut stalled = true;
for _ in 0..200 {
clock.advance(Duration::from_millis(1));

// Client engine drains its tx_ring → frames to server.
to_server.extend(client.on_tick(clock.now()));
let s_out = deliver(&mut server, &to_server.drain(..).collect::<Vec<_>>());
to_client.extend(s_out);

// Server app: read available bytes and echo them back.
let mut sbuf = vec![0u8; 4096];
let n = server_handle.rx_ring.read(&mut sbuf);
if n > 0 {
let ww = server_handle.tx_ring.write(&sbuf[..n]);
assert_eq!(ww, n, "iter {iter}: server tx_ring full");
}

// Server engine drains its tx_ring (echo) + ACKs → frames to client.
to_client.extend(server.on_tick(clock.now()));
let c_out = deliver(&mut client, &to_client.drain(..).collect::<Vec<_>>());
to_server.extend(c_out);

// Client app: read the echo.
got_back += client_handle.rx_ring.read(&mut recv_buf[got_back..]);
if got_back >= 64 {
stalled = false;
break;
}
}

assert!(
!stalled,
"STALL at iteration {iter}: client only got {got_back}/64 echoed bytes back \
after 200 ticks (send_buf never drained on ACK → next write stuck behind Nagle)"
);
assert_eq!(&recv_buf[..], &payload[..], "iter {iter}: echo mismatch");
}
}
Loading
Loading