diff --git a/pinchy-ebpf/src/util.rs b/pinchy-ebpf/src/util.rs index e0ad373..22af40c 100644 --- a/pinchy-ebpf/src/util.rs +++ b/pinchy-ebpf/src/util.rs @@ -1,7 +1,6 @@ use core::{mem::size_of, ops::DerefMut as _}; use aya_ebpf::{ - bindings::BPF_RB_FORCE_WAKEUP, helpers::{bpf_probe_read_user, bpf_probe_read_user_buf}, programs::TracePointContext, EbpfContext as _, @@ -12,7 +11,7 @@ use pinchy_common::{ WireEventHeader, WIRE_VERSION, }; -use crate::{ENTER_MAP, EVENTS, SYSCALL_RETURN_OFFSET}; +use crate::{ENTER_MAP, EVENTS, SYSCALL_OFFSET, SYSCALL_RETURN_OFFSET}; mod efficiency { #[cfg(feature = "efficiency-metrics")] @@ -112,9 +111,9 @@ pub fn read_epoll_events(events_ptr: *const EpollEvent, nevents: usize, events: #[inline(always)] pub fn get_syscall_nr(ctx: &TracePointContext) -> Result { - unsafe { ENTER_MAP.get(&ctx.pid()) } - .map(|data| data.syscall_nr) - .ok_or(1) + // The syscall number is right there in the sys_exit tracepoint record; + // no need for an ENTER_MAP lookup. + unsafe { ctx.read_at::(SYSCALL_OFFSET).map_err(|_| 1u32) } } #[inline(always)] @@ -212,7 +211,10 @@ where record_compact_submit_size(core::mem::size_of::>() as u64); - entry.submit(BPF_RB_FORCE_WAKEUP.into()); + // Flag 0 lets the kernel use adaptive notification: the consumer is + // only woken when it has caught up, instead of an irq_work + epoll + // wakeup for every single event. + entry.submit(0); Ok(()) } diff --git a/pinchy/src/client.rs b/pinchy/src/client.rs index cfb3d68..b197fd5 100644 --- a/pinchy/src/client.rs +++ b/pinchy/src/client.rs @@ -122,7 +122,10 @@ async fn main() -> Result<()> { } async fn relay_trace(fd: OwnedFd, formatting_style: FormattingStyle) -> Result<()> { - let mut reader = tokio::io::BufReader::new(tokio::fs::File::from(std::fs::File::from(fd))); + let mut reader = tokio::io::BufReader::with_capacity( + 64 * 1024, + tokio::fs::File::from(std::fs::File::from(fd)), + ); let mut stdout = tokio::io::BufWriter::new(tokio::io::stdout()); let mut header_buf = [0u8; std::mem::size_of::()]; let mut payload = Vec::new(); @@ -190,7 +193,11 @@ async fn relay_trace(fd: OwnedFd, formatting_style: FormattingStyle) -> Result<( stdout.write_all(&output).await?; pending_flush_bytes += output.len(); - if pending_flush_bytes >= flush_bytes { + // Flush on threshold, but also whenever we have caught up + // with the pipe (empty read buffer means the next read + // would block); otherwise output lags behind a slow tracee + // by up to the threshold. + if pending_flush_bytes >= flush_bytes || reader.buffer().is_empty() { stdout.flush().await?; pending_flush_bytes = 0; } diff --git a/pinchy/src/server.rs b/pinchy/src/server.rs index 90420a4..89a824b 100644 --- a/pinchy/src/server.rs +++ b/pinchy/src/server.rs @@ -37,16 +37,26 @@ pub fn open_pidfd(pid: libc::pid_t) -> io::Result { } } -pub fn uid_from_pidfd(fd: &OwnedFd) -> io::Result { - let mut stat = std::mem::MaybeUninit::::uninit(); - let ret = unsafe { libc::fstat(fd.as_raw_fd(), stat.as_mut_ptr()) }; - if ret != 0 { - return Err(io::Error::last_os_error()); - } +pub fn pidfd_has_exited(fd: &OwnedFd) -> io::Result { + let mut poll_fd = libc::pollfd { + fd: fd.as_raw_fd(), + events: libc::POLLIN, + revents: 0, + }; - let stat = unsafe { stat.assume_init() }; + loop { + let ret = unsafe { libc::poll(&mut poll_fd, 1, 0) }; - Ok(stat.st_uid) + if ret >= 0 { + return Ok(poll_fd.revents & libc::POLLIN != 0); + } + + let err = io::Error::last_os_error(); + + if err.kind() != io::ErrorKind::Interrupted { + return Err(err); + } + } } pub fn uid_from_pid(pid: u32) -> io::Result { @@ -60,7 +70,7 @@ async fn validate_same_user_or_root( header: &Header<'_>, conn: &zbus::Connection, pid: u32, -) -> io::Result> { +) -> io::Result> { trace!("validate_same_user_or_root for PID {}", pid as libc::pid_t); // Use a pidfd to ensure we know what process we are talking about. @@ -69,19 +79,13 @@ async fn validate_same_user_or_root( // User who owns the PID. let pid_uid = uid_from_pid(pid)?; - // Check that the pidfd is still valid after reading the uid from /proc/ to ensure the - // PID hasn't been changed from under us between opening the fd and checking the user id. - let fd = pidfd.as_raw_fd(); - let pidfd_still_valid = tokio::task::spawn_blocking(move || { - if unsafe { libc::fcntl(fd, libc::F_GETFD) } == -1 { - Err(io::Error::last_os_error()) - } else { - Ok(()) - } - }) - .await - .unwrap_or_else(|e| Err(io::Error::other(format!("Join error: {e}")))); - pidfd_still_valid?; + // A pidfd becomes readable when its process exits. If the process is + // still alive at this point, the PID cannot have been recycled between + // opening the pidfd and reading the UID from /proc/, so the UID we + // read belongs to the process the pidfd refers to. + if pidfd_has_exited(&pidfd)? { + return Err(io::Error::from_raw_os_error(libc::ESRCH)); + } trace!("pidfd {} has uid {}", pidfd.as_raw_fd(), pid_uid); @@ -97,7 +101,7 @@ async fn validate_same_user_or_root( trace!("dbus request came from uid {caller_uid}"); if caller_uid == pid_uid || caller_uid == 0 { - Ok(Some(pidfd)) + Ok(Some((pidfd, caller_uid))) } else { Ok(None) } @@ -118,9 +122,30 @@ impl PinchyDBus { pid: u32, syscalls: Vec, ) -> zbus::fdo::Result> { - let Some(pidfd) = validate_same_user_or_root(&header, conn, pid) + // The eBPF SYSCALL_FILTER is a 512-bit bitmap indexed by syscall + // number; reject anything outside that range before it reaches the + // bitmap arithmetic. + const MAX_SYSCALL_NR: i64 = 512; + if let Some(&bad) = syscalls + .iter() + .find(|&&nr| !(0..MAX_SYSCALL_NR).contains(&nr)) + { + return Err(zbus::fdo::Error::InvalidArgs(format!( + "invalid syscall number: {bad}" + ))); + } + + let Some((pidfd, caller_uid)) = validate_same_user_or_root(&header, conn, pid) .await - .map_err(|e| zbus::fdo::Error::AuthFailed(e.to_string()))? + .map_err(|e| { + if e.raw_os_error() == Some(libc::ESRCH) { + zbus::fdo::Error::UnknownObject(format!( + "no process with PID {pid} (it may have exited)" + )) + } else { + zbus::fdo::Error::AuthFailed(e.to_string()) + } + })? else { return Err(zbus::fdo::Error::AccessDenied("Not authorized".to_string())); }; @@ -139,7 +164,7 @@ impl PinchyDBus { .dispatch .write() .await - .register_client(pid, writer, syscalls, Some(pidfd)) + .register_client(pid, writer, syscalls, Some(pidfd), caller_uid) .await .map_err(|e| zbus::fdo::Error::Failed(e.to_string()))?; diff --git a/pinchy/src/tracing.rs b/pinchy/src/tracing.rs index e4e8229..92e18bd 100644 --- a/pinchy/src/tracing.rs +++ b/pinchy/src/tracing.rs @@ -391,7 +391,7 @@ impl WriterTask { revents: 0, }; - loop { + 'run: loop { tokio::select! { event_bytes = self.event_rx.recv() => { let Some(event_bytes) = event_bytes else { @@ -424,7 +424,7 @@ impl WriterTask { self.stats.writer_write_error(); log::trace!("Writer task ended: write error"); - return; + break 'run; } self.stats.writer_event_written(event_bytes.len()); @@ -432,6 +432,16 @@ impl WriterTask { log::trace!("Writer task wrote: {} bytes", event_bytes.len()); } + // Caught up with the queue: flush now. A steady trickle + // of events would otherwise keep resetting the sleep + // below and the output would sit in the buffer until it + // fills. + if self.event_rx.is_empty() && self.writer.flush().await.is_err() { + self.stats.writer_flush_error(); + + log::trace!("Writer task ended: flush error"); + break 'run; + } }, _ = sleep(Duration::from_millis(WRITER_FLUSH_POLL_MS)) => { if self.event_rx.is_empty() { @@ -468,6 +478,9 @@ struct Client { client_id: u64, #[allow(unused)] pid: u32, + // UID the trace was authorized for; 0 means a root caller, which stays + // authorized across privilege transitions of the traced process. + uid: u32, sender: tokio::sync::mpsc::Sender>, syscalls: HashSet, queue_stats: Arc, @@ -633,6 +646,7 @@ impl EventDispatch { let event_stats = stats.clone(); tokio::spawn(async move { let mut dispatch_batch: Vec<(WireEventHeader, Arc<[u8]>)> = Vec::with_capacity(256); + let mut revalidate_pids: Vec = Vec::new(); loop { tokio::select! { @@ -647,6 +661,7 @@ impl EventDispatch { let ring = guard.get_inner_mut(); let dispatch = event_dispatch.read().await; dispatch_batch.clear(); + revalidate_pids.clear(); while let Some(item) = ring.next() { let event = &*item; @@ -661,6 +676,17 @@ impl EventDispatch { continue; }; + // A successful exec may be a privilege + // transition; revalidate authorization + // once the batch is dispatched. + if (header.syscall_nr == syscalls::SYS_execve + || header.syscall_nr == syscalls::SYS_execveat) + && header.return_value == 0 + && !revalidate_pids.contains(&header.pid) + { + revalidate_pids.push(header.pid); + } + let framed_event = Arc::<[u8]>::from(event); event_stats.framed_event(framed_event.len()); @@ -674,6 +700,19 @@ impl EventDispatch { } guard.clear_ready(); + drop(dispatch); + + if !revalidate_pids.is_empty() { + let mut dispatch = event_dispatch.write().await; + + for pid in revalidate_pids.drain(..) { + if let Err(e) = + dispatch.revalidate_pid_authorization(pid).await + { + eprintln!("Error revalidating PID {pid}: {e}"); + } + } + } } Err(e) => { eprintln!("RingBuf read error: {e}"); @@ -685,7 +724,14 @@ impl EventDispatch { let _ = event_dispatch.write().await.remove_client(client_id).await; }, Some(pid) = timeout_rx.recv() => { - let _ = event_dispatch.write().await.start_pid_timeout(pid).await; + let mut dispatch = event_dispatch.write().await; + + // The process exited: stop capturing the PID right + // away so a recycled PID is not traced; clients stay + // registered through the grace period so buffered + // events still get dispatched. + let _ = dispatch.remove_pid_from_filter(pid).await; + let _ = dispatch.start_pid_timeout(pid).await; }, } } @@ -778,7 +824,24 @@ impl EventDispatch { writer: tokio::io::BufWriter, mut syscalls: Vec, pidfd: Option, // Pass pidfd from server for monitoring + uid: u32, ) -> anyhow::Result { + // Each registration holds a pipe, a writer task and a queue in the + // daemon; bound what a single user can pin down. + const MAX_CLIENTS_PER_UID: usize = 64; + if uid != 0 { + let existing = self + .clients_map + .values() + .flatten() + .filter(|client| client.uid == uid) + .count(); + + if existing >= MAX_CLIENTS_PER_UID { + anyhow::bail!("too many concurrent traces for uid {uid}"); + } + } + let client_id = self.next_client_id; self.next_client_id += 1; @@ -794,6 +857,7 @@ impl EventDispatch { let client_info = Client { client_id, pid, + uid, sender: event_tx, syscalls: syscalls.into_iter().collect(), queue_stats: queue_stats.clone(), @@ -861,6 +925,41 @@ impl EventDispatch { Ok(()) } + // After a successful execve the traced process may have gained + // privileges (setuid binary). /proc/ ownership follows the + // process's effective credentials (and turns to root when it becomes + // non-dumpable), so drop any client whose authorizing UID no longer + // matches. Root-authorized clients (uid 0) stay. + pub async fn revalidate_pid_authorization(&mut self, pid: u32) -> anyhow::Result<()> { + let Some(clients) = self.clients_map.get(&pid) else { + return Ok(()); + }; + + // If the process is already gone the pidfd monitoring handles + // cleanup; nothing to decide here. + let Ok(meta) = tokio::fs::metadata(format!("/proc/{pid}")).await else { + return Ok(()); + }; + + let current_uid = std::os::unix::fs::MetadataExt::uid(&meta); + + let to_remove: Vec = clients + .iter() + .filter(|client| client.uid != 0 && client.uid != current_uid) + .map(|client| client.client_id) + .collect(); + + for client_id in to_remove { + log::warn!( + "PID {pid} changed ownership to uid {current_uid} after exec; \ + detaching client {client_id}" + ); + self.remove_client(client_id).await?; + } + + Ok(()) + } + pub async fn remove_all_clients_for_pid(&mut self, pid: u32) -> anyhow::Result<()> { // Remove all clients for this PID if self.clients_map.remove(&pid).is_some() { @@ -899,6 +998,16 @@ impl EventDispatch { bitmap[(syscall_nr / 8) as usize] |= 1 << (syscall_nr % 8); }); + // Always capture execve/execveat while anything is traced: a + // successful exec can be a privilege transition, which the dispatch + // loop must observe to revalidate authorization. dispatch_event() + // still drops these events for clients that did not subscribe. + if !self.clients_map.is_empty() { + for nr in [syscalls::SYS_execve, syscalls::SYS_execveat] { + bitmap[(nr / 8) as usize] |= 1 << (nr % 8); + } + } + let mut map: aya::maps::Array<_, u8> = Array::try_from(self.ebpf.map_mut("SYSCALL_FILTER").unwrap()).unwrap();