From 16fdbf44f564a24d87658b29d81089d1e84821e4 Mon Sep 17 00:00:00 2001 From: Gerard Date: Thu, 2 Jul 2026 05:39:35 -0400 Subject: [PATCH 1/3] test(tcp): add TCP Tier 1/2 functionality smoke tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit connect->echo->close smoke coverage for the DPDK TCP stack, modeled on the existing UDP Tier 1/2 integration tiers (two-host, JUnit, SSM-driven): - apps/tcp-kernel-client: pure std::net reference client (the 'not our TCP stack' peer). Prints TCP_KERNEL_OK/FAIL, exit-coded. - tier1-tcp-echo.sh: DPDK<->DPDK, sync (tcp-echo) + async (tokio-tcp-echo) servers via --server-binary; bidir echo + 20x multi round-trip. - tier2-tcp-echo.sh: kernel client -> DPDK tcp-echo server — the standard-stack interop that exposed the codec padding bug (#110); mirrors the passing UDP Tier-2 kernel->DPDK direction. - run-integration-tests.sh: run_tier1_tcp / _async / run_tier2_tcp registered in the default CI set (+ --tier tcp1/tcp1a/tcp2); CLEANUP_CMD extended to kill the TCP binaries so a stale process can't hold the DPDK EAL primary lock; --tier validation also accepts 4 (was dispatched but rejected). - CDK: DPDK<->DPDK allTcp self-ingress (else Tier-1 TCP handshakes are dropped). Verified locally: bash -n, cargo build, markers match binary output. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.toml | 1 + apps/tcp-kernel-client/Cargo.toml | 13 ++ apps/tcp-kernel-client/src/main.rs | 97 +++++++++ deploy/cdk/lib/dpdk-test-stack.ts | 8 + scripts/integration-tests/tier1-tcp-echo.sh | 227 ++++++++++++++++++++ scripts/integration-tests/tier2-tcp-echo.sh | 192 +++++++++++++++++ scripts/run-integration-tests.sh | 141 +++++++++++- 7 files changed, 674 insertions(+), 5 deletions(-) create mode 100644 apps/tcp-kernel-client/Cargo.toml create mode 100644 apps/tcp-kernel-client/src/main.rs create mode 100755 scripts/integration-tests/tier1-tcp-echo.sh create mode 100755 scripts/integration-tests/tier2-tcp-echo.sh diff --git a/Cargo.toml b/Cargo.toml index 6b88149..daa9b18 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ members = [ "apps/tokio-tcp-echo", "apps/tcp-synthetic-bench", "apps/plain-tcp-echo", + "apps/tcp-kernel-client", ] [workspace.package] diff --git a/apps/tcp-kernel-client/Cargo.toml b/apps/tcp-kernel-client/Cargo.toml new file mode 100644 index 0000000..88087a8 --- /dev/null +++ b/apps/tcp-kernel-client/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "tcp-kernel-client" +version.workspace = true +edition.workspace = true +license.workspace = true +publish = false + +[[bin]] +name = "tcp-kernel-client" +path = "src/main.rs" + +[dependencies] +clap = { version = "4", features = ["derive"] } diff --git a/apps/tcp-kernel-client/src/main.rs b/apps/tcp-kernel-client/src/main.rs new file mode 100644 index 0000000..2e6045f --- /dev/null +++ b/apps/tcp-kernel-client/src/main.rs @@ -0,0 +1,97 @@ +//! Pure-kernel (`std::net`) TCP echo client — the "reference" peer for the TCP +//! smoke tiers. +//! +//! It uses the OS kernel TCP stack (NO DPDK), so it exercises our DPDK +//! `tcp-echo` server against a known-good, independent TCP implementation over +//! the real NIC. This is the exact scenario that surfaced the codec padding bug: +//! a standard stack's bare ACKs are NIC-padded to 60 bytes, which our parser +//! must not mistake for payload. +//! +//! Prints `TCP_KERNEL_OK round_trips=N` on full success; on any error/mismatch +//! prints `TCP_KERNEL_FAIL ` and exits non-zero. + +use clap::Parser; +use std::io::{Read, Write}; +use std::net::{SocketAddr, TcpStream}; +use std::time::Duration; + +#[derive(Parser)] +#[command(name = "tcp-kernel-client")] +#[command(about = "Pure-kernel std::net TCP echo client (reference peer for TCP smoke tests)")] +struct Args { + /// Target IP address + #[arg(long, default_value = "10.0.0.2")] + target: String, + + /// Target port + #[arg(long, default_value_t = 9000)] + port: u16, + + /// Number of connect → echo → close iterations + #[arg(long, default_value_t = 5)] + count: u32, + + /// Payload size in bytes per round-trip + #[arg(long, default_value_t = 64)] + payload_size: usize, + + /// Connect/read/write timeout in seconds + #[arg(long, default_value_t = 10)] + timeout_secs: u64, +} + +/// One connect → send → recv-echo → verify → close cycle over the kernel stack. +fn one_round_trip(addr: &SocketAddr, payload: &[u8], timeout: Duration) -> Result<(), String> { + let mut stream = + TcpStream::connect_timeout(addr, timeout).map_err(|e| format!("connect failed: {e}"))?; + stream + .set_read_timeout(Some(timeout)) + .map_err(|e| format!("set_read_timeout failed: {e}"))?; + stream + .set_write_timeout(Some(timeout)) + .map_err(|e| format!("set_write_timeout failed: {e}"))?; + + stream + .write_all(payload) + .map_err(|e| format!("write failed: {e}"))?; + + let mut recv = vec![0u8; payload.len()]; + let mut got = 0; + while got < payload.len() { + match stream.read(&mut recv[got..]) { + Ok(0) => return Err(format!("server closed after {got}/{} bytes", payload.len())), + Ok(n) => got += n, + Err(e) => return Err(format!("read failed after {got} bytes: {e}")), + } + } + if recv != payload { + return Err("echo mismatch (bytes differ)".to_string()); + } + Ok(()) +} + +fn main() { + let args = Args::parse(); + + let addr: SocketAddr = match format!("{}:{}", args.target, args.port).parse() { + Ok(a) => a, + Err(e) => { + println!("TCP_KERNEL_FAIL bad address {}:{}: {e}", args.target, args.port); + std::process::exit(1); + } + }; + let timeout = Duration::from_secs(args.timeout_secs); + let payload: Vec = (0..args.payload_size).map(|i| (i % 256) as u8).collect(); + + println!( + "tcp-kernel-client: {} round-trips of {}B to {}", + args.count, args.payload_size, addr + ); + for i in 0..args.count { + if let Err(reason) = one_round_trip(&addr, &payload, timeout) { + println!("TCP_KERNEL_FAIL iteration {}/{}: {reason}", i + 1, args.count); + std::process::exit(1); + } + } + println!("TCP_KERNEL_OK round_trips={}", args.count); +} diff --git a/deploy/cdk/lib/dpdk-test-stack.ts b/deploy/cdk/lib/dpdk-test-stack.ts index 44a846b..09c0928 100644 --- a/deploy/cdk/lib/dpdk-test-stack.ts +++ b/deploy/cdk/lib/dpdk-test-stack.ts @@ -89,6 +89,14 @@ export class DpdkTestStack extends cdk.Stack { 'ICMP traffic between instances' ); + // Allow TCP between DPDK interfaces (tcp-echo smoke tiers on port 9000). + // Without this, Tier-1 DPDK<->DPDK TCP handshakes are silently dropped. + dpdkSecurityGroup.addIngressRule( + dpdkSecurityGroup, + ec2.Port.allTcp(), + 'TCP between DPDK interfaces (tcp-echo smoke tiers)' + ); + // Allow UDP from management interfaces (test-client sends from primary ENI // which is in the mgmt security group, targeting the DPDK ENI) dpdkSecurityGroup.addIngressRule( diff --git a/scripts/integration-tests/tier1-tcp-echo.sh b/scripts/integration-tests/tier1-tcp-echo.sh new file mode 100755 index 0000000..8d6f16a --- /dev/null +++ b/scripts/integration-tests/tier1-tcp-echo.sh @@ -0,0 +1,227 @@ +#!/usr/bin/env bash +# tier1-tcp-echo.sh - Tier 1: TCP DPDK <-> DPDK echo smoke test +# +# Tests TCP connect -> echo -> close between two dpdk-stdlib-tcp instances, both +# using the DPDK kernel-bypass stack. Verifies the handshake, data-phase echo +# integrity, and multi-round-trip stability over the real NIC. +# +# --server-binary selects the listener implementation: tcp-echo (sync, default) +# or tokio-tcp-echo (async) — same client either way. +# +# Usage: +# # On Instance B (listener): +# ./tier1-tcp-echo.sh --role listener --bind-ip 10.0.1.100 --port 9000 \ +# [--server-binary tcp-echo|tokio-tcp-echo] +# +# # On Instance A (sender): +# ./tier1-tcp-echo.sh --role sender --bind-ip 10.0.1.50 --peer-ip 10.0.1.100 \ +# --port 9000 --output /tmp/test-results/tier1-tcp-echo.xml + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +source "$SCRIPT_DIR/harness-common.sh" + +# ── Defaults ───────────────────────────────────────────────────────────────── + +PROJECT_DIR="/opt/dpdk-stdlib" +SERVER_BINARY="$PROJECT_DIR/target/release/tcp-echo" +CLIENT_BINARY="$PROJECT_DIR/target/release/tcp-test-client" +ROLE="" +BIND_IP="" +PEER_IP="" +PORT=9000 +OUTPUT="" +GATEWAY_MAC="" +TEST_TIMEOUT=60 +CLASSNAME="tier1.tcp_echo" + +# ── Argument parsing ───────────────────────────────────────────────────────── + +while [[ $# -gt 0 ]]; do + case "$1" in + --role) ROLE="$2"; shift 2 ;; + --bind-ip) BIND_IP="$2"; shift 2 ;; + --peer-ip) PEER_IP="$2"; shift 2 ;; + --port) PORT="$2"; shift 2 ;; + --output) OUTPUT="$2"; shift 2 ;; + --gateway-mac) GATEWAY_MAC="$2"; shift 2 ;; + --server-binary) SERVER_BINARY="$PROJECT_DIR/target/release/$2"; shift 2 ;; + *) + echo "Unknown argument: $1" >&2 + exit 1 + ;; + esac +done + +if [[ -z "$ROLE" || -z "$BIND_IP" ]]; then + echo "Missing required arguments: --role and --bind-ip are required" >&2 + exit 1 +fi + +if [[ "$ROLE" == "sender" && -z "$PEER_IP" ]]; then + echo "Sender role requires --peer-ip" >&2 + exit 1 +fi + +if [[ -z "$OUTPUT" ]]; then + OUTPUT=$(result_path "tier1" "tcp-echo") +fi + +# ── Discover gateway MAC if not provided ───────────────────────────────────── + +discover_gateway_mac() { + if [[ -n "$GATEWAY_MAC" ]]; then + return + fi + # In AWS VPC, the gateway is always .1 of the subnet + local subnet_gw + subnet_gw=$(echo "$BIND_IP" | sed 's/\.[0-9]*$/.1/') + ping -c 1 -W 2 "$subnet_gw" >/dev/null 2>&1 || true + sleep 1 + GATEWAY_MAC=$(awk -v ip="$subnet_gw" '$1 == ip && $4 != "00:00:00:00:00:00" {print $4}' /proc/net/arp | head -1) + if [[ -z "$GATEWAY_MAC" ]]; then + log_error "Could not discover gateway MAC for $subnet_gw" + GATEWAY_MAC="00:00:00:00:00:00" + fi + log_info "Discovered gateway MAC: $GATEWAY_MAC" +} + +# ── Listener role ──────────────────────────────────────────────────────────── + +run_listener() { + log_info "Starting TCP echo server ($SERVER_BINARY) on ${BIND_IP}:${PORT}" + discover_gateway_mac + ulimit -c unlimited 2>/dev/null || true + + # Ensure a clean DPDK primary (a prior server variant may have run here). + rm -rf /var/run/dpdk/ 2>/dev/null || true + + local server_log="/tmp/tcp-echo-server.log" + log_info "Launching: $SERVER_BINARY --ip $BIND_IP --port $PORT --gateway-mac $GATEWAY_MAC" + "$SERVER_BINARY" --ip "$BIND_IP" --port "$PORT" --gateway-mac "$GATEWAY_MAC" \ + > /tmp/tcp-echo-server-stdout.log 2>"$server_log" & + local server_pid=$! + log_info "TCP echo server started with PID $server_pid" + + # The servers print " listening on " to stderr — gate on that. + local waited=0 + while ! grep -q "listening on" "$server_log" 2>/dev/null; do + sleep 1 + waited=$((waited + 1)) + if [[ $waited -ge 30 ]]; then + log_error "TCP echo server did not become ready within 30s" + cat "$server_log" >&2 || true + kill "$server_pid" 2>/dev/null || true + return 1 + fi + if ! kill -0 "$server_pid" 2>/dev/null; then + log_error "TCP echo server process died during startup" + cat "$server_log" >&2 || true + check_process_crash "$server_pid" "tcp-echo" || true + return 1 + fi + done + + log_info "TCP echo server ready, waiting for client tests..." + + # Keep running until the sender finishes or we hit the ceiling. + local max_wait=120 + waited=0 + while kill -0 "$server_pid" 2>/dev/null && [[ $waited -lt $max_wait ]]; do + sleep 5 + waited=$((waited + 5)) + done + + if ! kill -0 "$server_pid" 2>/dev/null; then + check_process_crash "$server_pid" "tcp-echo" || true + else + kill "$server_pid" 2>/dev/null || true + wait "$server_pid" 2>/dev/null || true + fi + log_info "Listener finished" +} + +# ── Sender role ────────────────────────────────────────────────────────────── + +run_sender() { + log_info "Starting Tier 1 TCP sender: ${BIND_IP} -> ${PEER_IP}:${PORT}" + discover_gateway_mac + + local client_log="/tmp/tcp-test-client.log" + exec > >(tee -a "$client_log") 2>&1 + + junit_start_suite "tier1-tcp-echo" 2 + + # Give the listener time to bind. + sleep 5 + + # ── Test 1: bidir echo (single round-trip) ─────────────────────────── + log_info "Test: bidir_echo" + local start end elapsed out ok err + start=$(_timer_now) + ok=true; err="" + out=$(run_with_timeout "$TEST_TIMEOUT" "$CLIENT_BINARY" \ + --target "$PEER_IP" --port "$PORT" --local-ip "$BIND_IP" \ + --gateway-mac "$GATEWAY_MAC" --mode bidir --count 1 2>&1) || { ok=false; err="client exited non-zero"; } + if [[ "$ok" == "true" ]] && ! echo "$out" | grep -q "echo round-trip"; then + ok=false; err="no 'echo round-trip' result line" + fi + end=$(_timer_now); elapsed=$(_timer_elapsed "$start" "$end") + if [[ "$ok" == "true" ]]; then + log_info "PASS: bidir echo" + junit_add_pass "bidir_echo" "$CLASSNAME" "$elapsed" + else + log_error "FAIL: bidir echo — $err" + junit_add_failure "bidir_echo" "$CLASSNAME" "$elapsed" "$err" "$out" + fi + + # Clean up DPDK shared memory so the next process can reinitialize EAL. + rm -rf /var/run/dpdk/ 2>/dev/null || true + + # ── Test 2: bidir multi round-trip (20) ────────────────────────────── + log_info "Test: bidir_multi" + start=$(_timer_now) + ok=true; err="" + out=$(run_with_timeout "$TEST_TIMEOUT" "$CLIENT_BINARY" \ + --target "$PEER_IP" --port "$PORT" --local-ip "$BIND_IP" \ + --gateway-mac "$GATEWAY_MAC" --mode bidir --count 20 2>&1) || { ok=false; err="client exited non-zero"; } + if [[ "$ok" == "true" ]]; then + local rtt + rtt=$(echo "$out" | sed -n 's/^Result: \([0-9]*\) echo round-trips.*/\1/p' | head -1) + if [[ -z "$rtt" || "$rtt" -lt 20 ]]; then + ok=false; err="expected >=20 round-trips, got '${rtt:-none}'" + fi + fi + end=$(_timer_now); elapsed=$(_timer_elapsed "$start" "$end") + if [[ "$ok" == "true" ]]; then + log_info "PASS: bidir multi (20 round-trips)" + junit_add_pass "bidir_multi" "$CLASSNAME" "$elapsed" + else + log_error "FAIL: bidir multi — $err" + junit_add_failure "bidir_multi" "$CLASSNAME" "$elapsed" "$err" "$out" + fi + + rm -rf /var/run/dpdk/ 2>/dev/null || true + + junit_end_suite + junit_write "$OUTPUT" + + if [[ $_JUNIT_FAILURE_COUNT -eq 0 ]]; then + log_info "All tests passed ($_JUNIT_TEST_COUNT/$_JUNIT_TEST_COUNT)" + else + log_error "$_JUNIT_FAILURE_COUNT/$_JUNIT_TEST_COUNT tests failed" + exit 1 + fi +} + +# ── Main ───────────────────────────────────────────────────────────────────── + +case "$ROLE" in + listener) run_listener ;; + sender) run_sender ;; + *) + echo "Invalid role: $ROLE (must be 'listener' or 'sender')" >&2 + exit 1 + ;; +esac diff --git a/scripts/integration-tests/tier2-tcp-echo.sh b/scripts/integration-tests/tier2-tcp-echo.sh new file mode 100755 index 0000000..8a03d6a --- /dev/null +++ b/scripts/integration-tests/tier2-tcp-echo.sh @@ -0,0 +1,192 @@ +#!/usr/bin/env bash +# tier2-tcp-echo.sh - Tier 2: kernel TCP client -> DPDK TCP server smoke test +# +# A pure-kernel (std::net) reference client connects to our DPDK tcp-echo server +# and performs connect -> echo -> close. Proves our DPDK TCP stack interoperates +# with a standard, independent TCP implementation over the real NIC — the exact +# scenario (NIC-padded bare ACKs from a normal stack) that exposed the codec +# padding bug. Mirrors the working kernel->DPDK direction of the UDP Tier 2. +# +# Usage: +# # On Instance B (listener, DPDK): +# ./tier2-tcp-echo.sh --role listener --bind-ip 10.0.1.100 --port 9000 +# +# # On Instance A (sender, kernel networking): +# ./tier2-tcp-echo.sh --role sender --peer-ip 10.0.1.100 --port 9000 \ +# --output /tmp/test-results/tier2-tcp-echo.xml + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +source "$SCRIPT_DIR/harness-common.sh" + +# ── Defaults ───────────────────────────────────────────────────────────────── + +PROJECT_DIR="/opt/dpdk-stdlib" +SERVER_BINARY="$PROJECT_DIR/target/release/tcp-echo" +KERNEL_CLIENT_BINARY="$PROJECT_DIR/target/release/tcp-kernel-client" +ROLE="" +BIND_IP="" +PEER_IP="" +PORT=9000 +OUTPUT="" +GATEWAY_MAC="" +TEST_TIMEOUT=60 +CLASSNAME="tier2.tcp_echo" + +# ── Argument parsing ───────────────────────────────────────────────────────── + +while [[ $# -gt 0 ]]; do + case "$1" in + --role) ROLE="$2"; shift 2 ;; + --bind-ip) BIND_IP="$2"; shift 2 ;; + --peer-ip) PEER_IP="$2"; shift 2 ;; + --port) PORT="$2"; shift 2 ;; + --output) OUTPUT="$2"; shift 2 ;; + --gateway-mac) GATEWAY_MAC="$2"; shift 2 ;; + *) + echo "Unknown argument: $1" >&2 + exit 1 + ;; + esac +done + +if [[ -z "$ROLE" ]]; then + echo "Missing required argument: --role" >&2 + exit 1 +fi +if [[ "$ROLE" == "listener" && -z "$BIND_IP" ]]; then + echo "Listener role requires --bind-ip" >&2 + exit 1 +fi +if [[ "$ROLE" == "sender" && -z "$PEER_IP" ]]; then + echo "Sender role requires --peer-ip" >&2 + exit 1 +fi + +if [[ -z "$OUTPUT" ]]; then + OUTPUT=$(result_path "tier2" "tcp-echo") +fi + +# ── Discover gateway MAC if not provided (listener/DPDK side only) ──────────── + +discover_gateway_mac() { + if [[ -n "$GATEWAY_MAC" ]]; then + return + fi + local subnet_gw + subnet_gw=$(echo "$BIND_IP" | sed 's/\.[0-9]*$/.1/') + ping -c 1 -W 2 "$subnet_gw" >/dev/null 2>&1 || true + sleep 1 + GATEWAY_MAC=$(awk -v ip="$subnet_gw" '$1 == ip && $4 != "00:00:00:00:00:00" {print $4}' /proc/net/arp | head -1) + if [[ -z "$GATEWAY_MAC" ]]; then + log_error "Could not discover gateway MAC for $subnet_gw" + GATEWAY_MAC="00:00:00:00:00:00" + fi + log_info "Discovered gateway MAC: $GATEWAY_MAC" +} + +# ── Listener role (DPDK tcp-echo server) ───────────────────────────────────── + +run_listener() { + log_info "Starting DPDK TCP echo server on ${BIND_IP}:${PORT}" + discover_gateway_mac + ulimit -c unlimited 2>/dev/null || true + + rm -rf /var/run/dpdk/ 2>/dev/null || true + + local server_log="/tmp/tcp-echo-server.log" + log_info "Launching: $SERVER_BINARY --ip $BIND_IP --port $PORT --gateway-mac $GATEWAY_MAC" + "$SERVER_BINARY" --ip "$BIND_IP" --port "$PORT" --gateway-mac "$GATEWAY_MAC" \ + > /tmp/tcp-echo-server-stdout.log 2>"$server_log" & + local server_pid=$! + log_info "DPDK TCP echo server started with PID $server_pid" + + local waited=0 + while ! grep -q "listening on" "$server_log" 2>/dev/null; do + sleep 1 + waited=$((waited + 1)) + if [[ $waited -ge 30 ]]; then + log_error "DPDK TCP echo server did not become ready within 30s" + cat "$server_log" >&2 || true + kill "$server_pid" 2>/dev/null || true + return 1 + fi + if ! kill -0 "$server_pid" 2>/dev/null; then + log_error "DPDK TCP echo server died during startup" + cat "$server_log" >&2 || true + check_process_crash "$server_pid" "tcp-echo" || true + return 1 + fi + done + + log_info "DPDK TCP echo server ready, waiting for kernel client..." + local max_wait=120 + waited=0 + while kill -0 "$server_pid" 2>/dev/null && [[ $waited -lt $max_wait ]]; do + sleep 5 + waited=$((waited + 5)) + done + + if ! kill -0 "$server_pid" 2>/dev/null; then + check_process_crash "$server_pid" "tcp-echo" || true + else + kill "$server_pid" 2>/dev/null || true + wait "$server_pid" 2>/dev/null || true + fi + log_info "Listener finished" +} + +# ── Sender role (pure-kernel client) ───────────────────────────────────────── + +run_sender() { + log_info "Starting Tier 2 kernel TCP client -> ${PEER_IP}:${PORT}" + + local client_log="/tmp/tcp-kernel-client.log" + exec > >(tee -a "$client_log") 2>&1 + + junit_start_suite "tier2-tcp-echo" 1 + + # Give the listener time to bind. + sleep 5 + + # ── Test: kernel client -> DPDK server echo ────────────────────────── + log_info "Test: kernel_to_dpdk_echo" + local start end elapsed out ok err + start=$(_timer_now) + ok=true; err="" + out=$(run_with_timeout "$TEST_TIMEOUT" "$KERNEL_CLIENT_BINARY" \ + --target "$PEER_IP" --port "$PORT" --count 20 --payload-size 64 2>&1) || { ok=false; err="kernel client exited non-zero"; } + if [[ "$ok" == "true" ]] && ! echo "$out" | grep -q "TCP_KERNEL_OK"; then + ok=false; err="no TCP_KERNEL_OK marker" + fi + end=$(_timer_now); elapsed=$(_timer_elapsed "$start" "$end") + if [[ "$ok" == "true" ]]; then + log_info "PASS: kernel client -> DPDK server echo" + junit_add_pass "kernel_to_dpdk_echo" "$CLASSNAME" "$elapsed" + else + log_error "FAIL: kernel_to_dpdk_echo — $err" + junit_add_failure "kernel_to_dpdk_echo" "$CLASSNAME" "$elapsed" "$err" "$out" + fi + + junit_end_suite + junit_write "$OUTPUT" + + if [[ $_JUNIT_FAILURE_COUNT -eq 0 ]]; then + log_info "All tests passed ($_JUNIT_TEST_COUNT/$_JUNIT_TEST_COUNT)" + else + log_error "$_JUNIT_FAILURE_COUNT/$_JUNIT_TEST_COUNT tests failed" + exit 1 + fi +} + +# ── Main ───────────────────────────────────────────────────────────────────── + +case "$ROLE" in + listener) run_listener ;; + sender) run_sender ;; + *) + echo "Invalid role: $ROLE (must be 'listener' or 'sender')" >&2 + exit 1 + ;; +esac diff --git a/scripts/run-integration-tests.sh b/scripts/run-integration-tests.sh index d8c8dc7..7f6db52 100755 --- a/scripts/run-integration-tests.sh +++ b/scripts/run-integration-tests.sh @@ -81,10 +81,13 @@ while [[ $# -gt 0 ]]; do --json-summary) FLAG_JSON_SUMMARY=true; shift ;; --tier) TIER_FILTER="$2" - if [[ "$TIER_FILTER" != "1" && "$TIER_FILTER" != "2" && "$TIER_FILTER" != "3" ]]; then - echo "ERROR: --tier must be 1, 2, or 3, got: $TIER_FILTER" >&2 - exit 2 - fi + case "$TIER_FILTER" in + 1|2|3|4|tcp1|tcp1a|tcp2) ;; + *) + echo "ERROR: --tier must be one of: 1 2 3 4 tcp1 tcp1a tcp2, got: $TIER_FILTER" >&2 + exit 2 + ;; + esac shift 2 ;; -h|--help) @@ -158,7 +161,7 @@ post_pr_comment() { # ── Process cleanup and ARP warming ────────────────────────────────────────── -CLEANUP_CMD="pkill -f 'target/release/echo' 2>/dev/null || true; pkill -f 'target/release/test-client' 2>/dev/null || true; pkill -f 'target/release/tokio-echo' 2>/dev/null || true; pkill -f 'python3.*udp_echo' 2>/dev/null || true; sleep 2; rm -rf /var/run/dpdk/ 2>/dev/null || true" +CLEANUP_CMD="pkill -f 'target/release/echo' 2>/dev/null || true; pkill -f 'target/release/test-client' 2>/dev/null || true; pkill -f 'target/release/tokio-echo' 2>/dev/null || true; pkill -f 'target/release/tcp-echo' 2>/dev/null || true; pkill -f 'target/release/tokio-tcp-echo' 2>/dev/null || true; pkill -f 'target/release/tcp-test-client' 2>/dev/null || true; pkill -f 'target/release/tcp-kernel-client' 2>/dev/null || true; pkill -f 'target/release/plain-tcp-echo' 2>/dev/null || true; pkill -f 'python3.*udp_echo' 2>/dev/null || true; sleep 2; rm -rf /var/run/dpdk/ 2>/dev/null || true" # Warm the kernel ARP cache so DPDK can seed from /proc/net/arp. # In AWS VPC, the gateway MAC is needed for all DPDK outbound frames. @@ -903,6 +906,109 @@ run_tier4() { log_info "Tier 4 execution complete" } +# ── TCP smoke tiers ────────────────────────────────────────────────────────── + +# Shared driver for a DPDK<->DPDK TCP echo pair (both instances on DPDK). +# $1 = server binary name (tcp-echo | tokio-tcp-echo), $2 = output XML basename, +# $3 = suite name for synthetic failure XML. +_tier1_tcp_impl() { + local server_binary="$1" + local out_xml="$2" + local suite="$3" + + log_info "Binding ENIs for $suite..." + if ! configure_eni "$SENDER_INSTANCE_ID" "bind"; then + log_error "Failed to bind ENI on sender" + generate_failure_xml "$suite" "ENI bind failed on sender instance" + return 1 + fi + if ! configure_eni "$RECEIVER_INSTANCE_ID" "bind"; then + log_error "Failed to bind ENI on receiver" + generate_failure_xml "$suite" "ENI bind failed on receiver instance" + return 1 + fi + + warm_arp_cache + + log_info "Starting TCP listener on receiver (server=$server_binary)..." + local listener_cmd="cd /opt/dpdk-stdlib && bash scripts/integration-tests/tier1-tcp-echo.sh --role listener --bind-ip $RECEIVER_DPDK_ENI_IP --port 9000 --server-binary $server_binary" + local listener_cmd_id + listener_cmd_id=$(ssm_run_command_async "$RECEIVER_INSTANCE_ID" "$TEST_TIMEOUT" "$listener_cmd") + + if [[ -z "$listener_cmd_id" ]]; then + log_error "Failed to start TCP listener ($server_binary)" + generate_failure_xml "$suite" "Failed to start TCP listener on receiver" + return 1 + fi + + sleep 10 + + log_info "Starting TCP sender on sender..." + local sender_cmd="cd /opt/dpdk-stdlib && bash scripts/integration-tests/tier1-tcp-echo.sh --role sender --bind-ip $SENDER_DPDK_ENI_IP --peer-ip $RECEIVER_DPDK_ENI_IP --port 9000 --output $RESULTS_REMOTE_DIR/$out_xml" + if ! ssm_run_command "$SENDER_INSTANCE_ID" "$TEST_TIMEOUT" "$sender_cmd"; then + log_error "TCP sender test execution failed ($server_binary)" + generate_failure_xml "$suite" "Sender test execution failed or timed out" + fi + + if ! ssm_wait_command "$RECEIVER_INSTANCE_ID" "$listener_cmd_id" 30; then + ssm_cancel_command "$RECEIVER_INSTANCE_ID" "$listener_cmd_id" + fi +} + +run_tier1_tcp() { + log_section "Tier 1 TCP (sync): DPDK <-> DPDK TCP echo test" + _tier1_tcp_impl "tcp-echo" "tier1-tcp-echo.xml" "tier1-tcp-echo" + log_info "Tier 1 TCP (sync) execution complete" +} + +run_tier1_tcp_async() { + log_section "Tier 1 TCP (async): DPDK <-> DPDK tokio TCP echo test" + _tier1_tcp_impl "tokio-tcp-echo" "tier1-tcp-echo-async.xml" "tier1-tcp-echo-async" + log_info "Tier 1 TCP (async) execution complete" +} + +run_tier2_tcp() { + log_section "Tier 2 TCP: kernel client -> DPDK TCP server test" + + # Bind receiver ENI (DPDK), unbind sender ENI (kernel) — mirrors run_tier2. + log_info "Configuring ENIs for Tier 2 TCP..." + if ! configure_eni "$RECEIVER_INSTANCE_ID" "bind"; then + log_error "Failed to bind ENI on receiver" + generate_failure_xml "tier2-tcp-echo" "ENI bind failed on receiver instance" + return 1 + fi + sleep 3 + configure_eni "$SENDER_INSTANCE_ID" "unbind" "$SENDER_DPDK_ENI_IP" || true + + warm_arp_cache + + log_info "Starting DPDK TCP listener on receiver..." + local listener_cmd="cd /opt/dpdk-stdlib && bash scripts/integration-tests/tier2-tcp-echo.sh --role listener --bind-ip $RECEIVER_DPDK_ENI_IP --port 9000" + local listener_cmd_id + listener_cmd_id=$(ssm_run_command_async "$RECEIVER_INSTANCE_ID" "$TEST_TIMEOUT" "$listener_cmd") + + if [[ -z "$listener_cmd_id" ]]; then + log_error "Failed to start TCP listener" + generate_failure_xml "tier2-tcp-echo" "Failed to start TCP listener on receiver" + return 1 + fi + + sleep 10 + + log_info "Starting kernel TCP client on sender (kernel networking)..." + local sender_cmd="cd /opt/dpdk-stdlib && bash scripts/integration-tests/tier2-tcp-echo.sh --role sender --peer-ip $RECEIVER_DPDK_ENI_IP --port 9000 --output $RESULTS_REMOTE_DIR/tier2-tcp-echo.xml" + if ! ssm_run_command "$SENDER_INSTANCE_ID" "$TEST_TIMEOUT" "$sender_cmd"; then + log_error "Kernel client test execution failed" + generate_failure_xml "tier2-tcp-echo" "Sender test execution failed or timed out" + fi + + if ! ssm_wait_command "$RECEIVER_INSTANCE_ID" "$listener_cmd_id" 30; then + ssm_cancel_command "$RECEIVER_INSTANCE_ID" "$listener_cmd_id" + fi + + log_info "Tier 2 TCP execution complete" +} + # ── Unbind ENIs between tiers ──────────────────────────────────────────────── unbind_all_enis() { @@ -1608,6 +1714,31 @@ Infrastructure ready. run_tier4 || true fi + # Unbind ENIs between tier 4 and the TCP tiers + if [[ -z "$TIER_FILTER" ]]; then + unbind_all_enis + fi + + if [[ -z "$TIER_FILTER" || "$TIER_FILTER" == "tcp1" ]]; then + run_tier1_tcp || true + fi + + if [[ -z "$TIER_FILTER" ]]; then + unbind_all_enis + fi + + if [[ -z "$TIER_FILTER" || "$TIER_FILTER" == "tcp1a" ]]; then + run_tier1_tcp_async || true + fi + + if [[ -z "$TIER_FILTER" ]]; then + unbind_all_enis + fi + + if [[ -z "$TIER_FILTER" || "$TIER_FILTER" == "tcp2" ]]; then + run_tier2_tcp || true + fi + # Step 6: Collect results collect_results From 8b6bf4ca4dc10d2b09598bedf5b22e40d270be48 Mon Sep 17 00:00:00 2001 From: Gerard Date: Thu, 2 Jul 2026 06:58:19 -0400 Subject: [PATCH 2/3] feat(ci): self-contained integration summary comment (real vs infra) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The PR summary comment previously showed only per-tier 'N tests, M failures' — to see WHICH testcase failed, WHY, or whether it was a real code issue vs an SSM/ENI infra flake, you had to download the JUnit artifact or dig through collapsed log
. generate_markdown_summary now emits a self-contained test-results/summary.md (posted via the existing post_pr_comment) with: - a verdict separating REAL failures from INFRA flakes, - a per-tier per-testcase table (result + time + reason), - a Real-failures section with detail excerpts, - an Infra-flakes section (SSM/ENI setup, labeled task #10). Classification uses the JUnit failure type: real tier failures are type=AssertionError (junit_add_failure); synthetic setup failures are type=ExecutionError (generate_failure_xml), with a keyword fallback. Report-only — exit-code behavior unchanged. Scripts-only, no workflow change. Verified locally against sample pass/real-fail/infra-fail XMLs. Co-Authored-By: Claude Opus 4.8 (1M context) --- scripts/run-integration-tests.sh | 132 +++++++++++++++++++++++++++---- 1 file changed, 115 insertions(+), 17 deletions(-) diff --git a/scripts/run-integration-tests.sh b/scripts/run-integration-tests.sh index 7f6db52..cd503df 100755 --- a/scripts/run-integration-tests.sh +++ b/scripts/run-integration-tests.sh @@ -1321,6 +1321,115 @@ PYEOF log_info "JSON summary generated: $RESULTS_DIR/summary.json" } +# Generate a self-contained, analyzable Markdown summary of all tiers: per-tier, +# per-testcase pass/fail WITH the failure reason, classifying each failure as a +# real test failure (JUnit type=AssertionError, from the tier scripts) vs an +# infra/setup flake (type=ExecutionError, from generate_failure_xml — ENI bind / +# SSM / listener-start timeouts). Written to summary.md so a reviewer can analyze +# a run from the PR comment alone, without downloading artifacts or using gh. +generate_markdown_summary() { + local commit_hash run_url label + commit_hash=$(git -C "$REPO_ROOT" rev-parse --short HEAD 2>/dev/null || echo "unknown") + run_url="${GITHUB_SERVER_URL:-https://github.com}/${GITHUB_REPOSITORY:-gspivey/dpdk-stdlib-rust}/actions/runs/${GITHUB_RUN_ID:-0}" + label="${GITHUB_WORKFLOW:-Integration Tests}" + python3 - "$RESULTS_DIR" "$commit_hash" "$run_url" "$label" > "$RESULTS_DIR/summary.md" <<'PYEOF' +import re, os, sys, html + +results_dir, commit, run_url, label = sys.argv[1:5] +# Infra/setup failures use JUnit type=ExecutionError (generate_failure_xml); real +# test failures use type=AssertionError (harness junit_add_failure). Fall back to +# a keyword match if the type is missing. +INFRA_RE = re.compile(r'ENI|SSM|Polling timeout|configure-eni|assign-ip|Failed to start|unbind|IP assignment|listener|readiness|instance', re.I) + +def classify(ftype, msg): + if (ftype or '') == 'ExecutionError': + return 'infra' + if (ftype or '') == 'AssertionError': + return 'real' + return 'infra' if INFRA_RE.search(msg or '') else 'real' + +def cell(s): + s = html.unescape(s or '').replace('\n', ' ').replace('|', r'\|').strip() + return (s[:90] + '…') if len(s) > 91 else s + +rows, reals, infras = [], [], [] +total = passed = n_real = n_infra = 0 + +for xmlf in sorted(os.listdir(results_dir)): + if not xmlf.endswith('.xml'): + continue + with open(os.path.join(results_dir, xmlf)) as f: + content = f.read() + sm = re.search(r']+)>', content) + if not sm: + continue + m = re.search(r'name="([^"]*)"', sm.group(1)) + suite = m.group(1) if m else xmlf + for tc in re.finditer(r']*?)>(.*?)', content, re.DOTALL): + attrs, body = tc.group(1), tc.group(2) + m = re.search(r'name="([^"]*)"', attrs) + name = m.group(1) if m else 'unknown' + m = re.search(r'time="([^"]*)"', attrs) + tsec = m.group(1) if m else '0' + fm = re.search(r']*>(.*?)', body, re.DOTALL) + total += 1 + if not fm: + passed += 1 + rows.append((suite, name, '✅ pass', tsec, '')) + continue + msg = html.unescape(fm.group(1)) + kind = classify(fm.group(2), msg) + details = html.unescape(fm.group(3)).strip() + if kind == 'infra': + n_infra += 1 + rows.append((suite, name, '⚠️ infra', tsec, cell(msg))) + infras.append((suite, name, msg)) + else: + n_real += 1 + rows.append((suite, name, '❌ real', tsec, cell(msg))) + reals.append((suite, name, msg, details)) + +def plural(n): + return '' if n == 1 else 's' + +if n_real: + verdict = f'❌ {n_real} real failure{plural(n_real)}' + if n_infra: + verdict += f' · ⚠️ {n_infra} infra flake{plural(n_infra)}' +elif n_infra: + verdict = f'⚠️ {n_infra} infra flake{plural(n_infra)} · 0 real failures' +else: + verdict = '✅ all passed' + +o = [] +o.append(f'## Integration Tests — {verdict}') +o.append(f'`{commit}` · [run]({run_url}) · {label} · {total} tests: {passed} ✅ · {n_real} ❌ real · {n_infra} ⚠️ infra') +o.append('') +o.append('| Tier | Test | Result | Time | Reason |') +o.append('|------|------|--------|-----:|--------|') +for suite, name, result, tsec, reason in rows: + o.append(f'| {suite} | {name} | {result} | {tsec}s | {reason} |') +o.append('') +if reals: + o.append(f'### ❌ Real failures ({n_real}) — code/test issues') + for suite, name, msg, details in reals: + o.append(f'- **{suite} / {name}** — {html.unescape(msg)}') + if details: + o.append(f'
details\n\n```\n{details[:1500]}\n```\n
') + o.append('') +if infras: + o.append(f'### ⚠️ Infra flakes ({n_infra}) — SSM/ENI setup, not code (task #10)') + for suite, name, msg in infras: + o.append(f'- {suite} / {name} — {html.unescape(msg)}') + o.append('') +if not reals and not infras: + o.append(f'### ✅ All {total} tests passed') + +print('\n'.join(o)) +PYEOF + log_info "Markdown summary generated: $RESULTS_DIR/summary.md" +} + # ── Teardown ───────────────────────────────────────────────────────────────── teardown_infrastructure() { @@ -1750,24 +1859,13 @@ Infrastructure ready. collect_instance_logs || true write_step_summary || true - # Post final summary to PR - local summary_body="## [CI] Stage: Summary\n" - if [[ "$TEST_EXIT_CODE" -eq 0 ]]; then - summary_body+="All tests **PASSED**." - else - summary_body+="Some tests **FAILED** (exit code: $TEST_EXIT_CODE)." + # Post the final analyzable summary to the PR: per-tier, per-testcase pass/ + # fail with the failure reason, classifying real test failures vs infra/setup + # flakes. Self-contained — a run can be analyzed from this comment alone. + generate_markdown_summary + if [[ -f "$RESULTS_DIR/summary.md" ]]; then + post_pr_comment "$(cat "$RESULTS_DIR/summary.md")" fi - summary_body+="\n\nARP seeding: kernel /proc/net/arp (automatic)" - # Include JUnit results summary - for xml_file in "$RESULTS_DIR"/*.xml; do - [[ -f "$xml_file" ]] || continue - local suite_name tests failures - suite_name=$(sed -n 's/.*name="\([^"]*\)".*/\1/p' "$xml_file" | head -1) - tests=$(sed -n 's/.*tests="\([^"]*\)".*/\1/p' "$xml_file" | head -1) - failures=$(sed -n 's/.*failures="\([^"]*\)".*/\1/p' "$xml_file" | head -1) - summary_body+="\n- **${suite_name:-unknown}**: ${tests:-0} tests, ${failures:-0} failures" - done - post_pr_comment "$(echo -e "$summary_body")" # Step 9: Teardown teardown_infrastructure From 7c3cfb15e176e22f88fb648d5cd962180f36ff8f Mon Sep 17 00:00:00 2001 From: Gerard Date: Thu, 2 Jul 2026 07:01:23 -0400 Subject: [PATCH 3/3] fix(tcp): drain acked bytes from send_buf on ACK (unstick sustained exchange) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit handle_established's ACK path pruned the retransmit queue but never drained the acknowledged bytes from the front of send_buf, nor cleared has_unacked_data. After the first send+ACK, send_buf still held the stale acked bytes while already_sent_offset (derived from the now-empty retransmit queue) reset to 0 — so the next small write was misclassified as unsent behind Nagle (has_unacked_data==true, lenserver engines through in-memory queues and drives 20 sequential 64B echoes on one connection: FAILS before (stalls at exchange 2), PASSES after. 221 unit + all property/ integration tests green. Follow-up: the same send_buf drain is missing in handle_fin_wait_1 / handle_close_wait (teardown states — a send_buf leak, not a stall). Co-Authored-By: Claude Opus 4.8 (1M context) --- dpdk-stdlib-tcp/src/engine.rs | 18 ++ dpdk-stdlib-tcp/tests/loopback_stall_repro.rs | 170 ++++++++++++++++++ 2 files changed, 188 insertions(+) create mode 100644 dpdk-stdlib-tcp/tests/loopback_stall_repro.rs diff --git a/dpdk-stdlib-tcp/src/engine.rs b/dpdk-stdlib-tcp/src/engine.rs index 058b703..4d14005 100644 --- a/dpdk-stdlib-tcp/src/engine.rs +++ b/dpdk-stdlib-tcp/src/engine.rs @@ -1163,6 +1163,24 @@ impl TcpEngine { entry_end.gt(tcb.snd_una) }); + // Drain acknowledged bytes off the front of send_buf and rebase + // the surviving retransmit-entry offsets. Without this, acked + // bytes linger in send_buf: the next write pushes send_buf past + // one MSS while has_unacked_data is (wrongly) still set, so Nagle + // withholds every subsequent segment and the connection stalls + // after the first exchange (the sustained bidir_multi hang). + let drain_n = (bytes_acked as usize).min(tcb.send_buf.len()); + if drain_n > 0 { + tcb.send_buf.drain(..drain_n); + for entry in tcb.retransmit_queue.iter_mut() { + entry.offset = entry.offset.saturating_sub(drain_n); + } + } + // Nothing left in flight → clear the Nagle "unacked data" latch. + if tcb.snd_una == tcb.snd_nxt { + tcb.has_unacked_data = false; + } + // Update congestion control let mss = tcb.effective_mss(); tcb.congestion.on_ack(bytes_acked, mss); diff --git a/dpdk-stdlib-tcp/tests/loopback_stall_repro.rs b/dpdk-stdlib-tcp/tests/loopback_stall_repro.rs new file mode 100644 index 0000000..65a924d --- /dev/null +++ b/dpdk-stdlib-tcp/tests/loopback_stall_repro.rs @@ -0,0 +1,170 @@ +//! Offline loopback regression for the sustained-exchange (bidir_multi) stall. +//! +//! Wires a CLIENT `TcpEngine` and a SERVER `TcpEngine` through in-memory frame +//! queues (no NIC, no EC2). Drives N sequential 64B echo exchanges on ONE +//! connection and asserts they all complete — mirroring +//! `tcp-test-client --mode bidir --count 20` against `tcp-echo`. +//! +//! FAILS before the fix (stalls at the 2nd exchange), PASSES after. +//! +//! Root cause: `handle_established`'s ACK path prunes the retransmit queue but +//! never drains acknowledged bytes from the front of `send_buf`, and never +//! clears `has_unacked_data`. After the first send+ACK, `send_buf` still holds +//! the stale acked bytes while `already_sent_offset` (derived from the now-empty +//! retransmit queue) resets to 0, so the next small write is misclassified as +//! "unsent" behind Nagle (`has_unacked_data == true`, len < MSS) and is never +//! transmitted. + +use std::collections::VecDeque; +use std::net::SocketAddr; +use std::sync::{mpsc, Arc}; +use std::time::Duration; + +use dpdk_stdlib_tcp::clock::{Clock, MockClock}; +use dpdk_stdlib_tcp::codec::parse_tcp_packet; +use dpdk_stdlib_tcp::contract::{ + oneshot_channel, CommandSender, ConnectionHandle, EngineCommand, EngineWakeup, +}; +use dpdk_stdlib_tcp::engine::{EngineConfig, TcpEngine}; +use dpdk_stdlib_tcp::state::{FourTuple, TcpState}; + +const CLIENT_MAC: [u8; 6] = [0x02, 0, 0, 0, 0, 0x01]; +const SERVER_MAC: [u8; 6] = [0x02, 0, 0, 0, 0, 0x02]; + +/// Feed raw frames into an engine; return the frames it emits in response. +fn deliver(engine: &mut TcpEngine, frames: &[Vec]) -> Vec> { + let mut out = Vec::new(); + for frame in frames { + if frame.len() < 14 { + continue; + } + let mut dst_mac = [0u8; 6]; + let mut src_mac = [0u8; 6]; + dst_mac.copy_from_slice(&frame[0..6]); + src_mac.copy_from_slice(&frame[6..12]); + if let Ok(seg) = parse_tcp_packet(frame) { + out.extend(engine.on_segment_with_macs(&seg, src_mac, dst_mac)); + } + } + out +} + +#[test] +fn loopback_20_echo_exchanges_do_not_stall() { + let clock = Arc::new(MockClock::new()); + + let (ctx_raw, _crx) = mpsc::channel(); + let client_cmd = CommandSender::new(ctx_raw, Arc::new(EngineWakeup::new())); + let (stx_raw, _srx) = mpsc::channel(); + let server_cmd = CommandSender::new(stx_raw, Arc::new(EngineWakeup::new())); + + let mut client = + TcpEngine::with_cmd_tx(clock.clone(), EngineConfig::default(), client_cmd.clone()); + let mut server = + TcpEngine::with_cmd_tx(clock.clone(), EngineConfig::default(), server_cmd.clone()); + + let client_addr: SocketAddr = "10.0.0.1:50000".parse().unwrap(); + let server_addr: SocketAddr = "10.0.0.2:9000".parse().unwrap(); + + // Server: Listen. + let (resp_tx, resp_rx) = oneshot_channel(); + server.on_command(EngineCommand::Listen { + addr: server_addr, + backlog: 16, + response: resp_tx, + }); + let _ = resp_rx.recv(); + + // Client: Connect. + let key = FourTuple { local: client_addr, remote: server_addr }; + let client_handle = Arc::new(ConnectionHandle::new(65536, 65536, client_cmd.clone(), key)); + let (cresp_tx, _cresp_rx) = oneshot_channel(); + let mut client_out = client.on_command(EngineCommand::Connect { + local: client_addr, + remote: server_addr, + src_mac: CLIENT_MAC, + dst_mac: SERVER_MAC, + handle: client_handle.clone(), + response: cresp_tx, + }); + + let mut to_server: VecDeque> = client_out.drain(..).collect(); + let mut to_client: VecDeque> = VecDeque::new(); + let server_key = FourTuple { local: server_addr, remote: client_addr }; + + // Pump the three-way handshake to completion. + for _ in 0..20 { + let s_out = deliver(&mut server, &to_server.drain(..).collect::>()); + to_client.extend(s_out); + let c_out = deliver(&mut client, &to_client.drain(..).collect::>()); + to_server.extend(c_out); + + clock.advance(Duration::from_millis(1)); + to_client.extend(server.on_tick(clock.now())); + to_server.extend(client.on_tick(clock.now())); + + let c_est = client.tcbs.get(&key).map(|t| t.state) == Some(TcpState::Established); + let s_est = server.tcbs.get(&server_key).map(|t| t.state) == Some(TcpState::Established); + if c_est && s_est && to_server.is_empty() && to_client.is_empty() { + break; + } + } + + assert_eq!( + client.tcbs.get(&key).map(|t| t.state), + Some(TcpState::Established), + "client failed to establish" + ); + let server_handle = { + let tcb = server.tcbs.get(&server_key).expect("server TCB"); + assert_eq!(tcb.state, TcpState::Established, "server failed to establish"); + tcb.handle.clone() + }; + + let payload: Vec = (0..64u16).map(|i| (i % 256) as u8).collect(); + + // 20 sequential echo exchanges on ONE connection. + for iter in 0..20 { + let w = client_handle.tx_ring.write(&payload); + assert_eq!(w, 64, "iter {iter}: client tx_ring write short"); + + let mut got_back = 0usize; + let mut recv_buf = vec![0u8; 64]; + let mut stalled = true; + for _ in 0..200 { + clock.advance(Duration::from_millis(1)); + + // Client engine drains its tx_ring → frames to server. + to_server.extend(client.on_tick(clock.now())); + let s_out = deliver(&mut server, &to_server.drain(..).collect::>()); + to_client.extend(s_out); + + // Server app: read available bytes and echo them back. + let mut sbuf = vec![0u8; 4096]; + let n = server_handle.rx_ring.read(&mut sbuf); + if n > 0 { + let ww = server_handle.tx_ring.write(&sbuf[..n]); + assert_eq!(ww, n, "iter {iter}: server tx_ring full"); + } + + // Server engine drains its tx_ring (echo) + ACKs → frames to client. + to_client.extend(server.on_tick(clock.now())); + let c_out = deliver(&mut client, &to_client.drain(..).collect::>()); + to_server.extend(c_out); + + // Client app: read the echo. + got_back += client_handle.rx_ring.read(&mut recv_buf[got_back..]); + if got_back >= 64 { + stalled = false; + break; + } + } + + assert!( + !stalled, + "STALL at iteration {iter}: client only got {got_back}/64 echoed bytes back \ + after 200 ticks (send_buf never drained on ACK → next write stuck behind Nagle)" + ); + assert_eq!(&recv_buf[..], &payload[..], "iter {iter}: echo mismatch"); + } +}