diff --git a/src/uucore/src/lib/features/pipes.rs b/src/uucore/src/lib/features/pipes.rs index 076e4fbdd5..e6be54c81d 100644 --- a/src/uucore/src/lib/features/pipes.rs +++ b/src/uucore/src/lib/features/pipes.rs @@ -82,24 +82,22 @@ pub fn might_fuse(source: &impl AsFd) -> bool { rustix::fs::fstatfs(source).map_or(true, |stats| stats.f_type == 0x6573_5546) // FUSE magic number, too many platform specific clippy warning with const } -/// splice all of source to dest -/// returns Ok(()) at end of file -#[inline] -fn splice_unbounded(source: &impl AsFd, dest: &mut impl AsFd) -> rustix::io::Result<()> { - while splice(&source, &dest, MAX_ROOTLESS_PIPE_SIZE)? > 0 {} - Ok(()) -} - /// force-splice source to dest even both of them are not pipe via broker pipe /// -/// This should not be used if one of them are pipe to save resources +/// throughput is better than direct splice for the case one of in/output is pipe by unknown reason +/// This includes read ahead and optimization for stdout's pipe size #[inline] -fn splice_unbounded_broker(source: &impl AsFd, dest: &mut impl AsFd) -> PipeRes { +pub fn splice_unbounded_auto(source: &impl AsFd, dest: &mut impl AsFd) -> PipeRes { static PIPE_CACHE: OnceLock> = OnceLock::new(); let Some((pipe_rd, pipe_wr)) = PIPE_CACHE.get_or_init(|| pipe::().ok()) else { return Ok(Err(())); }; - + // fcntl for input would not improve throughput since + // - sender with splice probably increased size already + // - sender without splice is bottleneck + let _ = fcntl_setpipe_size(&mut *dest, MAX_ROOTLESS_PIPE_SIZE); + // pre-generate page caches for splice + let _ = rustix::fs::fadvise(source, 0, None, rustix::fs::Advice::Sequential); loop { match splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) { Ok(0) => return Ok(Ok(())), @@ -113,23 +111,6 @@ fn splice_unbounded_broker(source: &impl AsFd, dest: &mut impl AsFd) -> PipeRes } } -/// try splice_unbounded 1st and splice_unbounded_broker if both of in/output are not pipe -/// This includes read ahead and optimization for stdout's pipe size -#[inline] -pub fn splice_unbounded_auto(source: &impl AsFd, dest: &mut impl AsFd) -> PipeRes { - // fcntl for input would not improve throughput since - // - sender with splice probably increased size already - // - sender without splice is bottleneck - let is_pipe_out = fcntl_setpipe_size(&mut *dest, MAX_ROOTLESS_PIPE_SIZE).is_ok(); - // pre-generate page caches for splice - let is_file_in = rustix::fs::fadvise(source, 0, None, rustix::fs::Advice::Sequential).is_ok(); - if (is_file_in && !is_pipe_out) || splice_unbounded(source, dest).is_err() { - // both of in/output are not pipe - return splice_unbounded_broker(source, dest); - } - Ok(Ok(())) -} - /// splice `n` bytes with read/write fallback /// return actually sent bytes #[inline] @@ -143,52 +124,37 @@ pub fn send_n_bytes(input: impl AsFd, target: impl AsFd, n: u64) -> std::io::Res } let mut n = n; let mut bytes_written: u64 = 0; - let succeed_or_fuse = loop { - if n == 0 { - // avoid unnecessary syscall - return Ok(bytes_written); - } - match splice(&input, &target, n as usize) { - Ok(0) => break true, - Ok(s) => { - n -= s as u64; - bytes_written += s as u64; + let succeed_or_fuse = if let Some((broker_r, broker_w)) = PIPE_CACHE + .get_or_init(|| { + // use std::io::pipe to avoid unnecessary fcntl + let pair = std::io::pipe().ok()?; + if pipe_size > KERNEL_DEFAULT_PIPE_SIZE { + let _ = fcntl_setpipe_size(&pair.0, pipe_size); } - _ => break false, // input or output is not pipe - } - }; - let succeed_or_fuse = succeed_or_fuse - || if let Some((broker_r, broker_w)) = PIPE_CACHE - .get_or_init(|| { - // use std::io::pipe to avoid unnecessary fcntl - let pair = std::io::pipe().ok()?; - if pipe_size > KERNEL_DEFAULT_PIPE_SIZE { - let _ = fcntl_setpipe_size(&pair.0, pipe_size); - } - Some(pair) - }) - .as_ref() - { - // todo: create fn splice_bounded_broker - loop { - if n == 0 { - return Ok(bytes_written); - } - match splice(&input, &broker_w, n as usize) { - Ok(0) => break true, - Ok(s) => { - n -= s as u64; - bytes_written += s as u64; - if drain_pipe(broker_r, &target, s)?.is_err() { - break false; - } + Some(pair) + }) + .as_ref() + { + // todo: create fn splice_bounded_broker + loop { + if n == 0 { + return Ok(bytes_written); + } + match splice(&input, &broker_w, n as usize) { + Ok(0) => break true, + Ok(s) => { + n -= s as u64; + bytes_written += s as u64; + if drain_pipe(broker_r, &target, s)?.is_err() { + break false; } - _ => break false, } + _ => break false, } - } else { - false - }; + } + } else { + false + }; // do not always fallback to write for fuse, or 2 Ctrl+D is required to exit on tty // todo: move fuse patch to callers if !succeed_or_fuse || might_fuse(&input) {