Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ignore:
- src/lib.rs
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ jobs:
run: cargo install cargo-llvm-cov

- name: Generate coverage report
run: ./scripts/generate_coverage.sh
run: |
./scripts/generate_coverage.sh
./scripts/generate_coverage_integration.sh

- name: Upload coverage artifacts
uses: actions/upload-artifact@v4
Expand Down
1 change: 1 addition & 0 deletions examples/c_ffi_example/quic_main.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ typedef struct {
const char *ca_cert_file;
const char *client_cert_file;
const char *client_key_file;
const char *alpn;
uint8_t insecure_skip_verify;
} MqttTlsOptionsC;

Expand Down
1 change: 1 addition & 0 deletions examples/c_ffi_example/tls_main.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ typedef struct {
const char *ca_cert_file;
const char *client_cert_file;
const char *client_key_file;
const char *alpn;
uint8_t insecure_skip_verify;
} MqttTlsOptionsC;

Expand Down
4 changes: 3 additions & 1 deletion mqtt_grpc_duality/run_integration_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# Exit immediately if a command exits with a non-zero status.
set -e

if [[ $CARGO_LLVM_COV==1 ]]; then
if [[ "$CARGO_LLVM_COV" == "1" ]]; then
echo "running with coverage test"
fi

Expand Down Expand Up @@ -65,10 +65,12 @@ cleanup() {
if [ -n "$R_PROXY_PID" ]; then
echo "Stopping r-proxy (PID: $R_PROXY_PID)..."
kill "$R_PROXY_PID" 2>/dev/null || true
wait "$R_PROXY_PID" 2>/dev/null || true
fi
if [ -n "$S_PROXY_PID" ]; then
echo "Stopping s-proxy (PID: $S_PROXY_PID)..."
kill "$S_PROXY_PID" 2>/dev/null || true
wait "$S_PROXY_PID" 2>/dev/null || true
fi
if [ -n "$BROKER_PID" ]; then
echo "Stopping Mosquitto broker (PID: $BROKER_PID)..."
Expand Down
68 changes: 56 additions & 12 deletions mqtt_grpc_duality/src/bin/r-proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use mqtt_grpc_proxy::{
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::{TcpListener, TcpStream};
#[cfg(unix)]
use tokio::signal::unix::{signal, SignalKind};
use tokio::spawn;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
Expand Down Expand Up @@ -75,21 +77,63 @@ async fn run_proxy(
grpc_client, // Share this client across all MQTT connections
});

wait_for_shutdown_signal(proxy_state, listener).await
}

async fn wait_for_shutdown_signal(
proxy_state: Arc<RProxyState>,
listener: TcpListener,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
loop {
info!("Accepting connection...");
let (incoming_stream, addr) = listener.accept().await?;
let state = proxy_state.clone();
info!("Accepted connection from {}", addr);
spawn(async move {
match handle_new_incoming_tcp(incoming_stream, state).await {
Ok(_) => {
info!("Client connection handled successfully");
}
Err(e) => {
error!("Error handling client connection: {}", e);
tokio::select! {
_ = signal_shutdown() => {
info!("Shutdown signal received, shutting down r-proxy");
break;
}
res = listener.accept() => {
match res {
Ok((incoming_stream, addr)) => {
let state = proxy_state.clone();
info!("Accepted connection from {}", addr);
spawn(async move {
match handle_new_incoming_tcp(incoming_stream, state).await {
Ok(_) => {
info!("Client connection handled successfully");
}
Err(e) => {
error!("Error handling client connection: {}", e);
}
}
});
}
Err(e) => {
error!("Error accepting connection: {}", e);
}
}
}
});
}
}
Ok(())
}

async fn signal_shutdown() {
#[cfg(unix)]
{
let mut sigterm =
signal(SignalKind::terminate()).expect("Failed to create SIGTERM listener");
let mut sigint = signal(SignalKind::interrupt()).expect("Failed to create SIGINT listener");
tokio::select! {
_ = sigterm.recv() => info!("Received SIGTERM"),
_ = sigint.recv() => info!("Received SIGINT"),
_ = tokio::signal::ctrl_c() => info!("Received Ctrl-C"),
}
}
#[cfg(not(unix))]
{
tokio::signal::ctrl_c()
.await
.expect("Failed to listen for Ctrl-C");
info!("Received Ctrl-C");
}
}

Expand Down
27 changes: 26 additions & 1 deletion mqtt_grpc_duality/src/bin/s-proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use dashmap::DashMap;
use std::sync::Arc;
use std::{env, net::SocketAddr};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[cfg(unix)]
use tokio::signal::unix::{signal, SignalKind};
use tokio_util::sync::CancellationToken;
use tonic::{transport::Server, Request, Response, Status};
use tracing::{debug, error, info};
Expand Down Expand Up @@ -367,8 +369,31 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

Server::builder()
.add_service(MqttRelayServiceServer::new(relay))
.serve(grpc_addr)
.serve_with_shutdown(grpc_addr, signal_shutdown())
.await?;

Ok(())
}

async fn signal_shutdown() {
#[cfg(unix)]
{
let mut sigterm =
signal(SignalKind::terminate()).expect("Failed to create SIGTERM listener");
let mut sigint = signal(SignalKind::interrupt()).expect("Failed to create SIGINT listener");

tokio::select! {
_ = sigterm.recv() => info!("Received SIGTERM"),
_ = sigint.recv() => info!("Received SIGINT"),
_ = tokio::signal::ctrl_c() => info!("Received Ctrl-C"),
}
}
#[cfg(not(unix))]
{
tokio::signal::ctrl_c()
.await
.expect("Failed to listen for Ctrl-C");
info!("Received Ctrl-C");
}
info!("Shutting down s-proxy gRPC server...");
}
15 changes: 1 addition & 14 deletions scripts/generate_coverage.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mkdir -p target/llvm-cov-target

# 1. Collect coverage from Rust tests and examples
cargo +stable llvm-cov --workspace --no-report --tests
cargo +stable llvm-cov --workspace --no-report --tests -- --ignored
cargo +stable llvm-cov --workspace --no-report --examples --all-features
cargo +stable llvm-cov report --lcov --output-path lcov.info

Expand Down Expand Up @@ -65,17 +66,3 @@ fi

llvm-cov export -format=lcov --instr-profile target/llvm-cov-target/cargo-llvm-cov2.profdata -object target/debug/deps/libflowsdk_ffi.${LIB_EXT} > lcov2.info

# 4. Collect coverage from integration tests (Proxy binaries)
set -a
source <(cargo llvm-cov show-env --export-prefix)
set +a
cargo +stable llvm-cov clean --workspace
cargo build --workspace --bins --all-features --all

# Set profile output for integration tests
export LLVM_PROFILE_FILE="target/llvm-cov-target/integration-%p-%m.profraw"
cd mqtt_grpc_duality && ./run_integration_tests.sh Test.test_basic && cd ..
"$PROFDATA_TOOL" merge -sparse target/llvm-cov-target/*.profraw -o target/llvm-cov-target/integration-llvm-cov2.profdata
llvm-cov export -format=lcov --instr-profile target/llvm-cov-target/integration-llvm-cov2.profdata -object target/debug/r-proxy target/debug/s-proxy > lcov3.info

echo "Coverage report generated at lcov.info, lcov2.info and lcov3.info"
27 changes: 27 additions & 0 deletions scripts/generate_coverage_integration.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/bin/bash
set -euo pipefail

cd mqtt_grpc_duality

export PATH="$(rustc --print=target-libdir)/../bin:$PATH"
export PROFDATA_TOOL=$(which llvm-profdata 2>/dev/null || true)

# Clean AFTER setting environment (this is the correct order per llvm-cov docs)
cargo +stable llvm-cov clean --workspace

eval `cargo +stable llvm-cov show-env --export-prefix`
echo "LLVM_PROFILE_FILE=${LLVM_PROFILE_FILE}"
export CARGO_LLVM_COV=1

# Build proxy binaries WITH instrumentation (regular cargo build picks up the env vars)
cargo build --workspace --bins --all-features --all


# Set profile output for integration tests
./run_integration_tests.sh --mqtt-ver=both

cd ../

# Merge and export coverage for proxy binaries
"$PROFDATA_TOOL" merge -sparse target/*.profraw -o target/integration-llvm-cov2.profdata
llvm-cov export -format=lcov --instr-profile target/integration-llvm-cov2.profdata -object target/debug/r-proxy -object target/debug/s-proxy > lcov3.info
25 changes: 22 additions & 3 deletions src/mqtt_client/raw_packet/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,16 +313,35 @@ mod tests {
#[tokio::test]
#[ignore] // Requires running MQTT broker
async fn test_raw_client_connect() {
let result = RawTestClient::connect("localhost:1883").await;
let result = RawTestClient::connect("broker.emqx.io:1883").await;
assert!(result.is_ok());
}

#[tokio::test]
#[ignore] // Requires running MQTT broker
async fn test_raw_client_send_receive() {
let mut client = RawTestClient::connect("localhost:1883").await.unwrap();
use crate::mqtt_serde::control_packet::MqttPacket;
use crate::mqtt_serde::mqttv3::connectv3::MqttConnect as MqttConnect3;

// Send PINGREQ
let mut client = RawTestClient::connect("broker.emqx.io:1883").await.unwrap();

// Create a proper MQTT v3.1.1 CONNECT packet using the library
let connect = MqttConnect3::new(
"raw_test_client".to_string(),
60, // keep_alive
true, // clean_session
);
let connect_bytes = MqttPacket::Connect3(connect).to_bytes().unwrap();

// Send CONNECT
client.send_raw(connect_bytes).await.unwrap();

// Receive CONNACK
let connack = client.receive_raw(8192, 5000).await.unwrap();
assert!(connack.len() >= 4); // CONNACK is at least 4 bytes
assert_eq!(connack[0] & 0xF0, 0x20); // CONNACK packet type

// Now send PINGREQ
client.send_raw(vec![0xC0, 0x00]).await.unwrap();

// Receive PINGRESP
Expand Down
2 changes: 1 addition & 1 deletion src/mqtt_client/transport/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ mod tests {
#[ignore] // Requires external service
async fn test_tcp_transport_connect() {
// Test connecting to a public MQTT broker (non-TLS)
let result = TcpTransport::connect("broker.emqx.com:1883").await;
let result = TcpTransport::connect("broker.emqx.io:1883").await;
assert!(result.is_ok(), "Should connect to public broker");

if let Ok(mut transport) = result {
Expand Down
4 changes: 2 additions & 2 deletions src/mqtt_client/transport/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,10 @@ mod tests {
}

#[tokio::test]
#[ignore] // Requires network access
#[ignore] // Requires external service
async fn test_tls_transport_connect() {
// Test connecting to a public MQTT TLS broker
let result = TlsTransport::connect("mqtt.eclipseprojects.io:8883").await;
let result = TlsTransport::connect("broker.emqx.io:8883").await;

if let Ok(mut transport) = result {
// Verify peer address
Expand Down
2 changes: 1 addition & 1 deletion tests/transport_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async fn test_transport_abstraction_with_echo_server() {
// It would connect to an echo server and verify read/write operations

// For now, we just verify the type can be used through the trait
let result = TcpTransport::connect("broker.emqx.com:1883").await;
let result = TcpTransport::connect("broker.emqx.io:1883").await;
assert!(result.is_ok(), "Should connect through Transport trait");

if let Ok(transport) = result {
Expand Down