diff --git a/.github/codecov.yml b/.github/codecov.yml new file mode 100644 index 00000000..e43d97dd --- /dev/null +++ b/.github/codecov.yml @@ -0,0 +1,2 @@ +ignore: + - src/lib.rs \ No newline at end of file diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 79f66c93..8145d8b0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -157,7 +157,9 @@ jobs: run: cargo install cargo-llvm-cov - name: Generate coverage report - run: ./scripts/generate_coverage.sh + run: | + ./scripts/generate_coverage.sh + ./scripts/generate_coverage_integration.sh - name: Upload coverage artifacts uses: actions/upload-artifact@v4 diff --git a/examples/c_ffi_example/quic_main.c b/examples/c_ffi_example/quic_main.c index 89db43bf..071b03b4 100644 --- a/examples/c_ffi_example/quic_main.c +++ b/examples/c_ffi_example/quic_main.c @@ -20,6 +20,7 @@ typedef struct { const char *ca_cert_file; const char *client_cert_file; const char *client_key_file; + const char *alpn; uint8_t insecure_skip_verify; } MqttTlsOptionsC; diff --git a/examples/c_ffi_example/tls_main.c b/examples/c_ffi_example/tls_main.c index 92cfc821..e7bf9831 100644 --- a/examples/c_ffi_example/tls_main.c +++ b/examples/c_ffi_example/tls_main.c @@ -21,6 +21,7 @@ typedef struct { const char *ca_cert_file; const char *client_cert_file; const char *client_key_file; + const char *alpn; uint8_t insecure_skip_verify; } MqttTlsOptionsC; diff --git a/mqtt_grpc_duality/run_integration_tests.sh b/mqtt_grpc_duality/run_integration_tests.sh index 8eb4eb3a..260aeb81 100755 --- a/mqtt_grpc_duality/run_integration_tests.sh +++ b/mqtt_grpc_duality/run_integration_tests.sh @@ -15,7 +15,7 @@ # Exit immediately if a command exits with a non-zero status. set -e -if [[ $CARGO_LLVM_COV==1 ]]; then +if [[ "$CARGO_LLVM_COV" == "1" ]]; then echo "running with coverage test" fi @@ -65,10 +65,12 @@ cleanup() { if [ -n "$R_PROXY_PID" ]; then echo "Stopping r-proxy (PID: $R_PROXY_PID)..." kill "$R_PROXY_PID" 2>/dev/null || true + wait "$R_PROXY_PID" 2>/dev/null || true fi if [ -n "$S_PROXY_PID" ]; then echo "Stopping s-proxy (PID: $S_PROXY_PID)..." kill "$S_PROXY_PID" 2>/dev/null || true + wait "$S_PROXY_PID" 2>/dev/null || true fi if [ -n "$BROKER_PID" ]; then echo "Stopping Mosquitto broker (PID: $BROKER_PID)..." diff --git a/mqtt_grpc_duality/src/bin/r-proxy.rs b/mqtt_grpc_duality/src/bin/r-proxy.rs index 01e2a513..af019491 100644 --- a/mqtt_grpc_duality/src/bin/r-proxy.rs +++ b/mqtt_grpc_duality/src/bin/r-proxy.rs @@ -16,6 +16,8 @@ use mqtt_grpc_proxy::{ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::{TcpListener, TcpStream}; +#[cfg(unix)] +use tokio::signal::unix::{signal, SignalKind}; use tokio::spawn; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; @@ -75,21 +77,63 @@ async fn run_proxy( grpc_client, // Share this client across all MQTT connections }); + wait_for_shutdown_signal(proxy_state, listener).await +} + +async fn wait_for_shutdown_signal( + proxy_state: Arc, + listener: TcpListener, +) -> Result<(), Box> { loop { - info!("Accepting connection..."); - let (incoming_stream, addr) = listener.accept().await?; - let state = proxy_state.clone(); - info!("Accepted connection from {}", addr); - spawn(async move { - match handle_new_incoming_tcp(incoming_stream, state).await { - Ok(_) => { - info!("Client connection handled successfully"); - } - Err(e) => { - error!("Error handling client connection: {}", e); + tokio::select! { + _ = signal_shutdown() => { + info!("Shutdown signal received, shutting down r-proxy"); + break; + } + res = listener.accept() => { + match res { + Ok((incoming_stream, addr)) => { + let state = proxy_state.clone(); + info!("Accepted connection from {}", addr); + spawn(async move { + match handle_new_incoming_tcp(incoming_stream, state).await { + Ok(_) => { + info!("Client connection handled successfully"); + } + Err(e) => { + error!("Error handling client connection: {}", e); + } + } + }); + } + Err(e) => { + error!("Error accepting connection: {}", e); + } } } - }); + } + } + Ok(()) +} + +async fn signal_shutdown() { + #[cfg(unix)] + { + let mut sigterm = + signal(SignalKind::terminate()).expect("Failed to create SIGTERM listener"); + let mut sigint = signal(SignalKind::interrupt()).expect("Failed to create SIGINT listener"); + tokio::select! { + _ = sigterm.recv() => info!("Received SIGTERM"), + _ = sigint.recv() => info!("Received SIGINT"), + _ = tokio::signal::ctrl_c() => info!("Received Ctrl-C"), + } + } + #[cfg(not(unix))] + { + tokio::signal::ctrl_c() + .await + .expect("Failed to listen for Ctrl-C"); + info!("Received Ctrl-C"); } } diff --git a/mqtt_grpc_duality/src/bin/s-proxy.rs b/mqtt_grpc_duality/src/bin/s-proxy.rs index e6866130..c99a5d36 100644 --- a/mqtt_grpc_duality/src/bin/s-proxy.rs +++ b/mqtt_grpc_duality/src/bin/s-proxy.rs @@ -4,6 +4,8 @@ use dashmap::DashMap; use std::sync::Arc; use std::{env, net::SocketAddr}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; +#[cfg(unix)] +use tokio::signal::unix::{signal, SignalKind}; use tokio_util::sync::CancellationToken; use tonic::{transport::Server, Request, Response, Status}; use tracing::{debug, error, info}; @@ -367,8 +369,31 @@ async fn main() -> Result<(), Box> { Server::builder() .add_service(MqttRelayServiceServer::new(relay)) - .serve(grpc_addr) + .serve_with_shutdown(grpc_addr, signal_shutdown()) .await?; Ok(()) } + +async fn signal_shutdown() { + #[cfg(unix)] + { + let mut sigterm = + signal(SignalKind::terminate()).expect("Failed to create SIGTERM listener"); + let mut sigint = signal(SignalKind::interrupt()).expect("Failed to create SIGINT listener"); + + tokio::select! { + _ = sigterm.recv() => info!("Received SIGTERM"), + _ = sigint.recv() => info!("Received SIGINT"), + _ = tokio::signal::ctrl_c() => info!("Received Ctrl-C"), + } + } + #[cfg(not(unix))] + { + tokio::signal::ctrl_c() + .await + .expect("Failed to listen for Ctrl-C"); + info!("Received Ctrl-C"); + } + info!("Shutting down s-proxy gRPC server..."); +} diff --git a/scripts/generate_coverage.sh b/scripts/generate_coverage.sh index 24f969be..615cdbc4 100755 --- a/scripts/generate_coverage.sh +++ b/scripts/generate_coverage.sh @@ -10,6 +10,7 @@ mkdir -p target/llvm-cov-target # 1. Collect coverage from Rust tests and examples cargo +stable llvm-cov --workspace --no-report --tests +cargo +stable llvm-cov --workspace --no-report --tests -- --ignored cargo +stable llvm-cov --workspace --no-report --examples --all-features cargo +stable llvm-cov report --lcov --output-path lcov.info @@ -65,17 +66,3 @@ fi llvm-cov export -format=lcov --instr-profile target/llvm-cov-target/cargo-llvm-cov2.profdata -object target/debug/deps/libflowsdk_ffi.${LIB_EXT} > lcov2.info -# 4. Collect coverage from integration tests (Proxy binaries) -set -a -source <(cargo llvm-cov show-env --export-prefix) -set +a -cargo +stable llvm-cov clean --workspace -cargo build --workspace --bins --all-features --all - -# Set profile output for integration tests -export LLVM_PROFILE_FILE="target/llvm-cov-target/integration-%p-%m.profraw" -cd mqtt_grpc_duality && ./run_integration_tests.sh Test.test_basic && cd .. -"$PROFDATA_TOOL" merge -sparse target/llvm-cov-target/*.profraw -o target/llvm-cov-target/integration-llvm-cov2.profdata -llvm-cov export -format=lcov --instr-profile target/llvm-cov-target/integration-llvm-cov2.profdata -object target/debug/r-proxy target/debug/s-proxy > lcov3.info - -echo "Coverage report generated at lcov.info, lcov2.info and lcov3.info" diff --git a/scripts/generate_coverage_integration.sh b/scripts/generate_coverage_integration.sh new file mode 100755 index 00000000..40d85cbe --- /dev/null +++ b/scripts/generate_coverage_integration.sh @@ -0,0 +1,27 @@ +#!/bin/bash +set -euo pipefail + +cd mqtt_grpc_duality + +export PATH="$(rustc --print=target-libdir)/../bin:$PATH" +export PROFDATA_TOOL=$(which llvm-profdata 2>/dev/null || true) + +# Clean AFTER setting environment (this is the correct order per llvm-cov docs) +cargo +stable llvm-cov clean --workspace + +eval `cargo +stable llvm-cov show-env --export-prefix` +echo "LLVM_PROFILE_FILE=${LLVM_PROFILE_FILE}" +export CARGO_LLVM_COV=1 + +# Build proxy binaries WITH instrumentation (regular cargo build picks up the env vars) +cargo build --workspace --bins --all-features --all + + +# Set profile output for integration tests + ./run_integration_tests.sh --mqtt-ver=both + + cd ../ + +# Merge and export coverage for proxy binaries +"$PROFDATA_TOOL" merge -sparse target/*.profraw -o target/integration-llvm-cov2.profdata +llvm-cov export -format=lcov --instr-profile target/integration-llvm-cov2.profdata -object target/debug/r-proxy -object target/debug/s-proxy > lcov3.info diff --git a/src/mqtt_client/raw_packet/test_client.rs b/src/mqtt_client/raw_packet/test_client.rs index 5e94f086..5371c45b 100644 --- a/src/mqtt_client/raw_packet/test_client.rs +++ b/src/mqtt_client/raw_packet/test_client.rs @@ -313,16 +313,35 @@ mod tests { #[tokio::test] #[ignore] // Requires running MQTT broker async fn test_raw_client_connect() { - let result = RawTestClient::connect("localhost:1883").await; + let result = RawTestClient::connect("broker.emqx.io:1883").await; assert!(result.is_ok()); } #[tokio::test] #[ignore] // Requires running MQTT broker async fn test_raw_client_send_receive() { - let mut client = RawTestClient::connect("localhost:1883").await.unwrap(); + use crate::mqtt_serde::control_packet::MqttPacket; + use crate::mqtt_serde::mqttv3::connectv3::MqttConnect as MqttConnect3; - // Send PINGREQ + let mut client = RawTestClient::connect("broker.emqx.io:1883").await.unwrap(); + + // Create a proper MQTT v3.1.1 CONNECT packet using the library + let connect = MqttConnect3::new( + "raw_test_client".to_string(), + 60, // keep_alive + true, // clean_session + ); + let connect_bytes = MqttPacket::Connect3(connect).to_bytes().unwrap(); + + // Send CONNECT + client.send_raw(connect_bytes).await.unwrap(); + + // Receive CONNACK + let connack = client.receive_raw(8192, 5000).await.unwrap(); + assert!(connack.len() >= 4); // CONNACK is at least 4 bytes + assert_eq!(connack[0] & 0xF0, 0x20); // CONNACK packet type + + // Now send PINGREQ client.send_raw(vec![0xC0, 0x00]).await.unwrap(); // Receive PINGRESP diff --git a/src/mqtt_client/transport/tcp.rs b/src/mqtt_client/transport/tcp.rs index e2f59eb4..948d2229 100644 --- a/src/mqtt_client/transport/tcp.rs +++ b/src/mqtt_client/transport/tcp.rs @@ -107,7 +107,7 @@ mod tests { #[ignore] // Requires external service async fn test_tcp_transport_connect() { // Test connecting to a public MQTT broker (non-TLS) - let result = TcpTransport::connect("broker.emqx.com:1883").await; + let result = TcpTransport::connect("broker.emqx.io:1883").await; assert!(result.is_ok(), "Should connect to public broker"); if let Ok(mut transport) = result { diff --git a/src/mqtt_client/transport/tls.rs b/src/mqtt_client/transport/tls.rs index 1d195ab8..62be5f23 100644 --- a/src/mqtt_client/transport/tls.rs +++ b/src/mqtt_client/transport/tls.rs @@ -311,10 +311,10 @@ mod tests { } #[tokio::test] - #[ignore] // Requires network access + #[ignore] // Requires external service async fn test_tls_transport_connect() { // Test connecting to a public MQTT TLS broker - let result = TlsTransport::connect("mqtt.eclipseprojects.io:8883").await; + let result = TlsTransport::connect("broker.emqx.io:8883").await; if let Ok(mut transport) = result { // Verify peer address diff --git a/tests/transport_integration_tests.rs b/tests/transport_integration_tests.rs index 657031a5..2e873bfd 100644 --- a/tests/transport_integration_tests.rs +++ b/tests/transport_integration_tests.rs @@ -11,7 +11,7 @@ async fn test_transport_abstraction_with_echo_server() { // It would connect to an echo server and verify read/write operations // For now, we just verify the type can be used through the trait - let result = TcpTransport::connect("broker.emqx.com:1883").await; + let result = TcpTransport::connect("broker.emqx.io:1883").await; assert!(result.is_ok(), "Should connect through Transport trait"); if let Ok(transport) = result {