Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 36 additions & 70 deletions src/uucore/src/lib/features/pipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,24 +82,22 @@ pub fn might_fuse(source: &impl AsFd) -> bool {
rustix::fs::fstatfs(source).map_or(true, |stats| stats.f_type == 0x6573_5546) // FUSE magic number, too many platform specific clippy warning with const
}

/// splice all of source to dest
/// returns Ok(()) at end of file
#[inline]
fn splice_unbounded(source: &impl AsFd, dest: &mut impl AsFd) -> rustix::io::Result<()> {
while splice(&source, &dest, MAX_ROOTLESS_PIPE_SIZE)? > 0 {}
Ok(())
}

/// force-splice source to dest even both of them are not pipe via broker pipe
///
/// This should not be used if one of them are pipe to save resources
/// throughput is better than direct splice for the case one of in/output is pipe by unknown reason
/// This includes read ahead and optimization for stdout's pipe size
#[inline]
fn splice_unbounded_broker(source: &impl AsFd, dest: &mut impl AsFd) -> PipeRes {
pub fn splice_unbounded_auto(source: &impl AsFd, dest: &mut impl AsFd) -> PipeRes {
static PIPE_CACHE: OnceLock<Option<(PipeReader, PipeWriter)>> = OnceLock::new();
let Some((pipe_rd, pipe_wr)) = PIPE_CACHE.get_or_init(|| pipe::<false>().ok()) else {
return Ok(Err(()));
};

// fcntl for input would not improve throughput since
// - sender with splice probably increased size already
// - sender without splice is bottleneck
let _ = fcntl_setpipe_size(&mut *dest, MAX_ROOTLESS_PIPE_SIZE);
// pre-generate page caches for splice
let _ = rustix::fs::fadvise(source, 0, None, rustix::fs::Advice::Sequential);
loop {
match splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) {
Ok(0) => return Ok(Ok(())),
Expand All @@ -113,23 +111,6 @@ fn splice_unbounded_broker(source: &impl AsFd, dest: &mut impl AsFd) -> PipeRes
}
}

/// try splice_unbounded 1st and splice_unbounded_broker if both of in/output are not pipe
/// This includes read ahead and optimization for stdout's pipe size
#[inline]
pub fn splice_unbounded_auto(source: &impl AsFd, dest: &mut impl AsFd) -> PipeRes {
// fcntl for input would not improve throughput since
// - sender with splice probably increased size already
// - sender without splice is bottleneck
let is_pipe_out = fcntl_setpipe_size(&mut *dest, MAX_ROOTLESS_PIPE_SIZE).is_ok();
// pre-generate page caches for splice
let is_file_in = rustix::fs::fadvise(source, 0, None, rustix::fs::Advice::Sequential).is_ok();
if (is_file_in && !is_pipe_out) || splice_unbounded(source, dest).is_err() {
// both of in/output are not pipe
return splice_unbounded_broker(source, dest);
}
Ok(Ok(()))
}

/// splice `n` bytes with read/write fallback
/// return actually sent bytes
#[inline]
Expand All @@ -143,52 +124,37 @@ pub fn send_n_bytes(input: impl AsFd, target: impl AsFd, n: u64) -> std::io::Res
}
let mut n = n;
let mut bytes_written: u64 = 0;
let succeed_or_fuse = loop {
if n == 0 {
// avoid unnecessary syscall
return Ok(bytes_written);
}
match splice(&input, &target, n as usize) {
Ok(0) => break true,
Ok(s) => {
n -= s as u64;
bytes_written += s as u64;
let succeed_or_fuse = if let Some((broker_r, broker_w)) = PIPE_CACHE
.get_or_init(|| {
// use std::io::pipe to avoid unnecessary fcntl
let pair = std::io::pipe().ok()?;
if pipe_size > KERNEL_DEFAULT_PIPE_SIZE {
let _ = fcntl_setpipe_size(&pair.0, pipe_size);
}
_ => break false, // input or output is not pipe
}
};
let succeed_or_fuse = succeed_or_fuse
|| if let Some((broker_r, broker_w)) = PIPE_CACHE
.get_or_init(|| {
// use std::io::pipe to avoid unnecessary fcntl
let pair = std::io::pipe().ok()?;
if pipe_size > KERNEL_DEFAULT_PIPE_SIZE {
let _ = fcntl_setpipe_size(&pair.0, pipe_size);
}
Some(pair)
})
.as_ref()
{
// todo: create fn splice_bounded_broker
loop {
if n == 0 {
return Ok(bytes_written);
}
match splice(&input, &broker_w, n as usize) {
Ok(0) => break true,
Ok(s) => {
n -= s as u64;
bytes_written += s as u64;
if drain_pipe(broker_r, &target, s)?.is_err() {
break false;
}
Some(pair)
})
.as_ref()
{
// todo: create fn splice_bounded_broker
loop {
if n == 0 {
return Ok(bytes_written);
}
match splice(&input, &broker_w, n as usize) {
Ok(0) => break true,
Ok(s) => {
n -= s as u64;
bytes_written += s as u64;
if drain_pipe(broker_r, &target, s)?.is_err() {
break false;
}
_ => break false,
}
_ => break false,
}
} else {
false
};
}
} else {
false
};
// do not always fallback to write for fuse, or 2 Ctrl+D is required to exit on tty
// todo: move fuse patch to callers
if !succeed_or_fuse || might_fuse(&input) {
Expand Down
Loading