Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ jobs:
- name: Run cargo tests with kyber_only feature
run: nix develop --command cargo test --features kyber_only

- name: Run recvmsg_x test (macOS only)
if: matrix.os == 'macos-latest'
run: sudo nix develop --command cargo test batch_receiver --features batch_receive

- name: Run privileged tests (macOS only)
if: matrix.os == 'macos-latest'
run: sudo nix develop --command cargo test test_privileged -- --ignored
Expand Down
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions lightway-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ debug = ["lightway-core/debug","lightway-app-utils/debug"]
io-uring = ["lightway-app-utils/io-uring"]
postquantum = ["lightway-core/postquantum"]
kyber_only = ["postquantum", "lightway-core/kyber_only"]
batch_receive = ["rtrb"]

[lints]
workspace = true
Expand All @@ -40,6 +41,7 @@ tokio-util.workspace = true
tracing = { workspace = true, features = ["attributes"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
twelf.workspace = true
rtrb = { version = "0.3.3", optional = true }

[dev-dependencies]
more-asserts.workspace = true
Expand Down
10 changes: 10 additions & 0 deletions lightway-client/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,15 @@ fn main() {
// Backends
desktop: { any(windows, linux, macos) },
mobile: { any(android, ios, tvos) },
// Apple platform
apple: {
any(
macos,
ios,
tvos
)
},
// Feature alias
batch_receive: { all(apple, feature = "batch_receive") },
}
}
5 changes: 5 additions & 0 deletions lightway-client/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ pub struct Config {
#[clap(long)]
pub rcvbuf: Option<ByteSize>,

/// Enable batch receive (`recvmsg_x` on macOS)
#[cfg(batch_receive)]
#[clap(long, default_value_t = false)]
pub enable_batch_receive: bool,

/// Setup of route table
/// Modes:
/// default: Sets up routes as specified in server, tun_local_ip, tun_peer_ip, tun_dns_ip
Expand Down
9 changes: 9 additions & 0 deletions lightway-client/src/io/outside.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
pub mod tcp;
pub mod udp;
#[cfg(batch_receive)]
mod udp_batch_receiver;

pub use tcp::Tcp;
pub use udp::Udp;
Expand All @@ -16,6 +18,13 @@ pub trait OutsideIO: Sync + Send {

async fn poll(&self, interest: tokio::io::Interest) -> Result<tokio::io::Ready>;

/// Poll whenever this socket is readable or not. By default, it will call
/// `poll(tokio::io::Interest::READABLE)` on the socket itself.
async fn readable(&self) -> Result<()> {
self.poll(tokio::io::Interest::READABLE).await?;
Ok(())
}

fn recv_buf(&self, buf: &mut bytes::BytesMut) -> IOCallbackResult<usize>;

fn into_io_send_callback(self: Arc<Self>) -> OutsideIOSendCallbackArg;
Expand Down
4 changes: 2 additions & 2 deletions lightway-client/src/io/outside/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ use lightway_core::{IOCallbackResult, OutsideIOSendCallback, OutsideIOSendCallba
pub struct Tcp(tokio::net::TcpStream, SocketAddr);

impl Tcp {
pub async fn new(remote_addr: SocketAddr, maybe_sock: Option<TcpStream>) -> Result<Arc<Self>> {
pub async fn new(remote_addr: SocketAddr, maybe_sock: Option<TcpStream>) -> Result<Self> {
let sock = match maybe_sock {
Some(s) => s,
None => tokio::net::TcpStream::connect(remote_addr).await?,
};
sock.set_nodelay(true)?;
let peer_addr = sock.peer_addr()?;
Ok(Arc::new(Self(sock, peer_addr)))
Ok(Self(sock, peer_addr))
}

fn peer_addr(&self) -> SocketAddr {
Expand Down
59 changes: 48 additions & 11 deletions lightway-client/src/io/outside/udp.rs
Comment thread
kp-mariappan-ramasamy marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
use super::OutsideIO;
#[cfg(batch_receive)]
use crate::io::outside::udp_batch_receiver::BatchReceiver;
#[cfg(batch_receive)]
use crate::io::outside::udp_batch_receiver::BatchReceiverConsumerError;
use anyhow::{Result, anyhow};
use async_trait::async_trait;
use lightway_app_utils::sockopt;
use lightway_core::{IOCallbackResult, OutsideIOSendCallback, OutsideIOSendCallbackArg};
use std::{
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
sync::Arc,
};
use tokio::net::UdpSocket;

use super::OutsideIO;
use lightway_app_utils::sockopt;
use lightway_core::{IOCallbackResult, OutsideIOSendCallback, OutsideIOSendCallbackArg};

pub struct Udp {
sock: tokio::net::UdpSocket,
sock: Arc<tokio::net::UdpSocket>,
peer_addr: SocketAddr,
default_ip_pmtudisc: sockopt::IpPmtudisc,
#[cfg(batch_receive)]
batch_receiver: Option<BatchReceiver>,
}

impl Udp {
pub async fn new(remote_addr: SocketAddr, sock: Option<UdpSocket>) -> Result<Arc<Self>> {
pub async fn new(remote_addr: SocketAddr, sock: Option<UdpSocket>) -> Result<Self> {
let peer_addr = tokio::net::lookup_host(remote_addr)
.await?
.next()
Expand All @@ -38,11 +43,18 @@ impl Udp {
// successfuly in WolfSsl's `OutsideIOSendCallback` callback
sock.writable().await?;

Ok(Arc::new(Self {
sock,
Ok(Self {
sock: Arc::new(sock),
peer_addr,
default_ip_pmtudisc,
}))
#[cfg(batch_receive)]
batch_receiver: None,
})
}

#[cfg(batch_receive)]
pub fn enable_batch_receive(&mut self) {
self.batch_receiver = Some(BatchReceiver::new(self.sock.clone()));
}

fn peer_addr(&self) -> SocketAddr {
Expand All @@ -68,7 +80,32 @@ impl OutsideIO for Udp {
Ok(r)
}

#[cfg(batch_receive)]
/// If `batch_receiver` is on, it will try to acquire a permit
/// of a Semaphore to see if the ring buffer has any packets in it.
async fn readable(&self) -> Result<()> {
if let Some(receiver) = self.batch_receiver.as_ref() {
// Wait until the recv task has pushed at least one packet into recv_queue.
receiver.recv_queue_ready().await?;
} else {
self.poll(tokio::io::Interest::READABLE).await?;
}
Ok(())
}

fn recv_buf(&self, buf: &mut bytes::BytesMut) -> IOCallbackResult<usize> {
#[cfg(batch_receive)]
if let Some(receiver) = self.batch_receiver.as_ref() {
return match receiver.pop_recv_consumer() {
Ok(b) => {
let len = b.len();
*buf = b;
IOCallbackResult::Ok(len)
}
Err(BatchReceiverConsumerError::EmptyBuffer(_)) => IOCallbackResult::WouldBlock,
Err(BatchReceiverConsumerError::SemaphoreClosed(e)) => IOCallbackResult::Err(e),
};
}
match self.sock.try_recv_buf(buf) {
Ok(nr) => IOCallbackResult::Ok(nr),
Err(err) if matches!(err.kind(), std::io::ErrorKind::WouldBlock) => {
Expand Down Expand Up @@ -142,10 +179,10 @@ impl OutsideIOSendCallback for Udp {
}

fn enable_pmtud_probe(&self) -> std::io::Result<()> {
sockopt::set_ip_mtu_discover(&self.sock, sockopt::IpPmtudisc::Probe)
sockopt::set_ip_mtu_discover(self.sock.as_ref(), sockopt::IpPmtudisc::Probe)
}

fn disable_pmtud_probe(&self) -> std::io::Result<()> {
sockopt::set_ip_mtu_discover(&self.sock, self.default_ip_pmtudisc)
sockopt::set_ip_mtu_discover(self.sock.as_ref(), self.default_ip_pmtudisc)
}
}
Loading
Loading