diff --git a/src/uu/cat/src/cat.rs b/src/uu/cat/src/cat.rs index 226c43a7ceb..e119cda8ebd 100644 --- a/src/uu/cat/src/cat.rs +++ b/src/uu/cat/src/cat.rs @@ -481,10 +481,10 @@ fn print_fast(handle: &mut InputHandle) -> CatResult<()> { let mut stdout = stdout; // Try to use the splice() system call for faster writing. If it works, we're done. #[cfg(any(target_os = "linux", target_os = "android"))] - if uucore::pipes::splice_unbounded_auto(&handle.reader, &mut stdout)?.is_ok() - && !uucore::pipes::might_fuse(&handle.reader) - { - return Ok(()); + match uucore::pipes::splice_unbounded_auto(&handle.reader, &mut stdout) { + Ok(_) => return Ok(()), + Err(e) if e.kind() == ErrorKind::Other => {} + Err(e) => return Err(e.into()), } // If we're not on Linux or Android, or the splice() call failed, diff --git a/src/uu/tail/src/tail.rs b/src/uu/tail/src/tail.rs index 7ba1335ad70..551701c970c 100644 --- a/src/uu/tail/src/tail.rs +++ b/src/uu/tail/src/tail.rs @@ -596,10 +596,12 @@ fn print_target_section< } } else { #[cfg(any(target_os = "linux", target_os = "android"))] - if uucore::pipes::splice_unbounded_auto(file, &mut stdout)?.is_err() { - io::copy(file, &mut stdout)?; + match uucore::pipes::splice_unbounded_auto(file, &mut stdout) { + Ok(_) => return Ok(()), + Err(e) if e.kind() == ErrorKind::Other => {} + Err(e) => return Err(e.into()), } - #[cfg(not(any(target_os = "linux", target_os = "android")))] + io::copy(file, &mut stdout)?; } Ok(()) diff --git a/src/uucore/src/lib/features/buf_copy.rs b/src/uucore/src/lib/features/buf_copy.rs index 9943488c7ba..ff37098f5a6 100644 --- a/src/uucore/src/lib/features/buf_copy.rs +++ b/src/uucore/src/lib/features/buf_copy.rs @@ -17,13 +17,16 @@ pub fn copy_stream( dest: &mut impl AsFd, ) -> std::io::Result<()> { // try to splice() system call for throughput - if crate::pipes::splice_unbounded_auto(src, dest)?.is_err() { + match crate::pipes::splice_unbounded_auto(src, dest) { + Ok(_) => Ok(()), // fall back on writing "without buffering", or order of output would be wrong // unrelated for cp /dev/stdin since cp does not have multiple input? - // RawWriter also removes io::copy's specialization slower than our splice - std::io::copy(src, &mut crate::io::RawWriter(dest))?; + // RawWriter also removes io::copy's specialization for proper read/write + Err(e) if e.kind() == std::io::ErrorKind::Other => { + Ok(std::io::copy(src, &mut crate::io::RawWriter(dest)).map(|_| ())?) + } + Err(e) => Err(e), } - Ok(()) } #[cfg(not(any(target_os = "linux", target_os = "android")))] diff --git a/src/uucore/src/lib/features/pipes.rs b/src/uucore/src/lib/features/pipes.rs index 076e4fbdd59..0e339f69c83 100644 --- a/src/uucore/src/lib/features/pipes.rs +++ b/src/uucore/src/lib/features/pipes.rs @@ -10,20 +10,15 @@ use crate::io::{RawReader, RawWriter}; use rustix::pipe::{SpliceFlags, fcntl_setpipe_size}; use std::{ - io::{PipeReader, PipeWriter, Read, Write}, + io::{Error, ErrorKind, PipeReader, PipeWriter, Read, Write}, os::fd::AsFd, sync::OnceLock, }; pub const MAX_ROOTLESS_PIPE_SIZE: usize = 1024 * 1024; const KERNEL_DEFAULT_PIPE_SIZE: usize = 64 * 1024; -/// A type allows to -/// - check that zero-copy succeed by ?.is_ok() -/// - check that zero-copy failed, but read/write fallback succeed by ?.is_err() -/// - catch the read/write fallback's error by ? or let Err(e) -/// -/// use rustix::io::Result for functions without read/write fallback -type PipeRes = std::io::Result>; +// Reuse the error kind never raised by std for read/write fallback +use ErrorKind::Other as Fallback; /// return pipe and try to extend its size /// SIZE_REQUIRED should be true if you want to fail when changing pipe size failed @@ -57,22 +52,21 @@ pub fn splice(source: &impl AsFd, target: &impl AsFd, len: usize) -> rustix::io: /// splice `len` bytes from `pipe` into `dest`. #[inline] -pub fn drain_pipe(pipe: &PipeReader, dest: &impl AsFd, len: usize) -> PipeRes { +pub fn drain_pipe(pipe: &PipeReader, dest: &impl AsFd, len: usize) -> std::io::Result<()> { debug_assert!(len <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage"); let mut remaining = len; while remaining > 0 { if let Ok(s) = splice(pipe, dest, remaining) { remaining -= s; } else { - // read/write fallback // use read_to_end to make pipe empty for the case write failed let mut drain = Vec::with_capacity(remaining); pipe.take(remaining as u64).read_to_end(&mut drain)?; RawWriter(&dest).write_all(&drain)?; - return Ok(Err(())); + return Err(Error::from(Fallback)); } } - Ok(Ok(())) + Ok(()) } /// check that source is FUSE @@ -94,29 +88,24 @@ fn splice_unbounded(source: &impl AsFd, dest: &mut impl AsFd) -> rustix::io::Res /// /// This should not be used if one of them are pipe to save resources #[inline] -fn splice_unbounded_broker(source: &impl AsFd, dest: &mut impl AsFd) -> PipeRes { +fn splice_unbounded_broker(source: &impl AsFd, dest: &mut impl AsFd) -> std::io::Result<()> { static PIPE_CACHE: OnceLock> = OnceLock::new(); let Some((pipe_rd, pipe_wr)) = PIPE_CACHE.get_or_init(|| pipe::().ok()) else { - return Ok(Err(())); + return Err(Error::from(Fallback)); }; - loop { - match splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) { - Ok(0) => return Ok(Ok(())), - Ok(n) => { - if drain_pipe(pipe_rd, dest, n)?.is_err() { - return Ok(Err(())); - } - } - Err(_) => return Ok(Err(())), - } + while let n @ 1.. = + splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE).map_err(|_| Error::from(Fallback))? + { + drain_pipe(pipe_rd, dest, n)?; } + Ok(()) } /// try splice_unbounded 1st and splice_unbounded_broker if both of in/output are not pipe /// This includes read ahead and optimization for stdout's pipe size #[inline] -pub fn splice_unbounded_auto(source: &impl AsFd, dest: &mut impl AsFd) -> PipeRes { +pub fn splice_unbounded_auto(source: &impl AsFd, dest: &mut impl AsFd) -> std::io::Result<()> { // fcntl for input would not improve throughput since // - sender with splice probably increased size already // - sender without splice is bottleneck @@ -127,7 +116,7 @@ pub fn splice_unbounded_auto(source: &impl AsFd, dest: &mut impl AsFd) -> PipeRe // both of in/output are not pipe return splice_unbounded_broker(source, dest); } - Ok(Ok(())) + Ok(()) } /// splice `n` bytes with read/write fallback @@ -179,8 +168,10 @@ pub fn send_n_bytes(input: impl AsFd, target: impl AsFd, n: u64) -> std::io::Res Ok(s) => { n -= s as u64; bytes_written += s as u64; - if drain_pipe(broker_r, &target, s)?.is_err() { - break false; + match drain_pipe(broker_r, &target, s) { + Ok(_) => {} + Err(e) if e.kind() == ErrorKind::Other => break false, + Err(e) => return Err(e), } } _ => break false,