Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/uu/cat/src/cat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,10 +481,10 @@ fn print_fast<R: FdReadable>(handle: &mut InputHandle<R>) -> CatResult<()> {
let mut stdout = stdout;
// Try to use the splice() system call for faster writing. If it works, we're done.
#[cfg(any(target_os = "linux", target_os = "android"))]
if uucore::pipes::splice_unbounded_auto(&handle.reader, &mut stdout)?.is_ok()
&& !uucore::pipes::might_fuse(&handle.reader)
{
return Ok(());
match uucore::pipes::splice_unbounded_auto(&handle.reader, &mut stdout) {
Ok(_) => return Ok(()),
Err(e) if e.kind() == ErrorKind::Other => {}
Err(e) => return Err(e.into()),
}

// If we're not on Linux or Android, or the splice() call failed,
Expand Down
8 changes: 5 additions & 3 deletions src/uu/tail/src/tail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,10 +596,12 @@ fn print_target_section<
}
} else {
#[cfg(any(target_os = "linux", target_os = "android"))]
if uucore::pipes::splice_unbounded_auto(file, &mut stdout)?.is_err() {
io::copy(file, &mut stdout)?;
match uucore::pipes::splice_unbounded_auto(file, &mut stdout) {
Ok(_) => return Ok(()),
Err(e) if e.kind() == ErrorKind::Other => {}
Err(e) => return Err(e.into()),
}
#[cfg(not(any(target_os = "linux", target_os = "android")))]

io::copy(file, &mut stdout)?;
}
Ok(())
Expand Down
11 changes: 7 additions & 4 deletions src/uucore/src/lib/features/buf_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ pub fn copy_stream(
dest: &mut impl AsFd,
) -> std::io::Result<()> {
// try to splice() system call for throughput
if crate::pipes::splice_unbounded_auto(src, dest)?.is_err() {
match crate::pipes::splice_unbounded_auto(src, dest) {
Ok(_) => Ok(()),
// fall back on writing "without buffering", or order of output would be wrong
// unrelated for cp /dev/stdin since cp does not have multiple input? <https://github.com/uutils/coreutils/issues/5186>
// RawWriter also removes io::copy's specialization slower than our splice
std::io::copy(src, &mut crate::io::RawWriter(dest))?;
// RawWriter also removes io::copy's specialization for proper read/write
Err(e) if e.kind() == std::io::ErrorKind::Other => {
Ok(std::io::copy(src, &mut crate::io::RawWriter(dest)).map(|_| ())?)
}
Err(e) => Err(e),
}
Ok(())
}

#[cfg(not(any(target_os = "linux", target_os = "android")))]
Expand Down
47 changes: 19 additions & 28 deletions src/uucore/src/lib/features/pipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,15 @@
use crate::io::{RawReader, RawWriter};
use rustix::pipe::{SpliceFlags, fcntl_setpipe_size};
use std::{
io::{PipeReader, PipeWriter, Read, Write},
io::{Error, ErrorKind, PipeReader, PipeWriter, Read, Write},
os::fd::AsFd,
sync::OnceLock,
};
pub const MAX_ROOTLESS_PIPE_SIZE: usize = 1024 * 1024;
const KERNEL_DEFAULT_PIPE_SIZE: usize = 64 * 1024;

/// A type allows to
/// - check that zero-copy succeed by ?.is_ok()
/// - check that zero-copy failed, but read/write fallback succeed by ?.is_err()
/// - catch the read/write fallback's error by ? or let Err(e)
///
/// use rustix::io::Result for functions without read/write fallback
type PipeRes = std::io::Result<Result<(), ()>>;
// Reuse the error kind never raised by std for read/write fallback
use ErrorKind::Other as Fallback;

/// return pipe and try to extend its size
/// SIZE_REQUIRED should be true if you want to fail when changing pipe size failed
Expand Down Expand Up @@ -57,22 +52,21 @@ pub fn splice(source: &impl AsFd, target: &impl AsFd, len: usize) -> rustix::io:

/// splice `len` bytes from `pipe` into `dest`.
#[inline]
pub fn drain_pipe(pipe: &PipeReader, dest: &impl AsFd, len: usize) -> PipeRes {
pub fn drain_pipe(pipe: &PipeReader, dest: &impl AsFd, len: usize) -> std::io::Result<()> {
debug_assert!(len <= MAX_ROOTLESS_PIPE_SIZE, "unexpected RAM usage");
let mut remaining = len;
while remaining > 0 {
if let Ok(s) = splice(pipe, dest, remaining) {
remaining -= s;
} else {
// read/write fallback
// use read_to_end to make pipe empty for the case write failed
let mut drain = Vec::with_capacity(remaining);
pipe.take(remaining as u64).read_to_end(&mut drain)?;
RawWriter(&dest).write_all(&drain)?;
return Ok(Err(()));
return Err(Error::from(Fallback));
}
}
Ok(Ok(()))
Ok(())
}

/// check that source is FUSE
Expand All @@ -94,29 +88,24 @@ fn splice_unbounded(source: &impl AsFd, dest: &mut impl AsFd) -> rustix::io::Res
///
/// This should not be used if one of them are pipe to save resources
#[inline]
fn splice_unbounded_broker(source: &impl AsFd, dest: &mut impl AsFd) -> PipeRes {
fn splice_unbounded_broker(source: &impl AsFd, dest: &mut impl AsFd) -> std::io::Result<()> {
static PIPE_CACHE: OnceLock<Option<(PipeReader, PipeWriter)>> = OnceLock::new();
let Some((pipe_rd, pipe_wr)) = PIPE_CACHE.get_or_init(|| pipe::<false>().ok()) else {
return Ok(Err(()));
return Err(Error::from(Fallback));
};

loop {
match splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE) {
Ok(0) => return Ok(Ok(())),
Ok(n) => {
if drain_pipe(pipe_rd, dest, n)?.is_err() {
return Ok(Err(()));
}
}
Err(_) => return Ok(Err(())),
}
while let n @ 1.. =
splice(&source, &pipe_wr, MAX_ROOTLESS_PIPE_SIZE).map_err(|_| Error::from(Fallback))?
{
drain_pipe(pipe_rd, dest, n)?;
}
Ok(())
}

/// try splice_unbounded 1st and splice_unbounded_broker if both of in/output are not pipe
/// This includes read ahead and optimization for stdout's pipe size
#[inline]
pub fn splice_unbounded_auto(source: &impl AsFd, dest: &mut impl AsFd) -> PipeRes {
pub fn splice_unbounded_auto(source: &impl AsFd, dest: &mut impl AsFd) -> std::io::Result<()> {
// fcntl for input would not improve throughput since
// - sender with splice probably increased size already
// - sender without splice is bottleneck
Expand All @@ -127,7 +116,7 @@ pub fn splice_unbounded_auto(source: &impl AsFd, dest: &mut impl AsFd) -> PipeRe
// both of in/output are not pipe
return splice_unbounded_broker(source, dest);
}
Ok(Ok(()))
Ok(())
}

/// splice `n` bytes with read/write fallback
Expand Down Expand Up @@ -179,8 +168,10 @@ pub fn send_n_bytes(input: impl AsFd, target: impl AsFd, n: u64) -> std::io::Res
Ok(s) => {
n -= s as u64;
bytes_written += s as u64;
if drain_pipe(broker_r, &target, s)?.is_err() {
break false;
match drain_pipe(broker_r, &target, s) {
Ok(_) => {}
Err(e) if e.kind() == ErrorKind::Other => break false,
Err(e) => return Err(e),
}
}
_ => break false,
Expand Down
Loading