Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions pinchy-ebpf/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use core::{mem::size_of, ops::DerefMut as _};

use aya_ebpf::{
bindings::BPF_RB_FORCE_WAKEUP,
helpers::{bpf_probe_read_user, bpf_probe_read_user_buf},
programs::TracePointContext,
EbpfContext as _,
Expand All @@ -12,7 +11,7 @@ use pinchy_common::{
WireEventHeader, WIRE_VERSION,
};

use crate::{ENTER_MAP, EVENTS, SYSCALL_RETURN_OFFSET};
use crate::{ENTER_MAP, EVENTS, SYSCALL_OFFSET, SYSCALL_RETURN_OFFSET};

mod efficiency {
#[cfg(feature = "efficiency-metrics")]
Expand Down Expand Up @@ -112,9 +111,9 @@ pub fn read_epoll_events(events_ptr: *const EpollEvent, nevents: usize, events:

#[inline(always)]
pub fn get_syscall_nr(ctx: &TracePointContext) -> Result<i64, u32> {
unsafe { ENTER_MAP.get(&ctx.pid()) }
.map(|data| data.syscall_nr)
.ok_or(1)
// The syscall number is right there in the sys_exit tracepoint record;
// no need for an ENTER_MAP lookup.
unsafe { ctx.read_at::<i64>(SYSCALL_OFFSET).map_err(|_| 1u32) }
}

#[inline(always)]
Expand Down Expand Up @@ -212,7 +211,10 @@ where

record_compact_submit_size(core::mem::size_of::<WireCompactPayload<T>>() as u64);

entry.submit(BPF_RB_FORCE_WAKEUP.into());
// Flag 0 lets the kernel use adaptive notification: the consumer is
// only woken when it has caught up, instead of an irq_work + epoll
// wakeup for every single event.
entry.submit(0);

Ok(())
}
Expand Down
11 changes: 9 additions & 2 deletions pinchy/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ async fn main() -> Result<()> {
}

async fn relay_trace(fd: OwnedFd, formatting_style: FormattingStyle) -> Result<()> {
let mut reader = tokio::io::BufReader::new(tokio::fs::File::from(std::fs::File::from(fd)));
let mut reader = tokio::io::BufReader::with_capacity(
64 * 1024,
tokio::fs::File::from(std::fs::File::from(fd)),
);
let mut stdout = tokio::io::BufWriter::new(tokio::io::stdout());
let mut header_buf = [0u8; std::mem::size_of::<WireEventHeader>()];
let mut payload = Vec::new();
Expand Down Expand Up @@ -190,7 +193,11 @@ async fn relay_trace(fd: OwnedFd, formatting_style: FormattingStyle) -> Result<(
stdout.write_all(&output).await?;
pending_flush_bytes += output.len();

if pending_flush_bytes >= flush_bytes {
// Flush on threshold, but also whenever we have caught up
// with the pipe (empty read buffer means the next read
// would block); otherwise output lags behind a slow tracee
// by up to the threshold.
if pending_flush_bytes >= flush_bytes || reader.buffer().is_empty() {
stdout.flush().await?;
pending_flush_bytes = 0;
}
Expand Down
77 changes: 51 additions & 26 deletions pinchy/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,26 @@ pub fn open_pidfd(pid: libc::pid_t) -> io::Result<OwnedFd> {
}
}

pub fn uid_from_pidfd(fd: &OwnedFd) -> io::Result<u32> {
let mut stat = std::mem::MaybeUninit::<libc::stat>::uninit();
let ret = unsafe { libc::fstat(fd.as_raw_fd(), stat.as_mut_ptr()) };
if ret != 0 {
return Err(io::Error::last_os_error());
}
pub fn pidfd_has_exited(fd: &OwnedFd) -> io::Result<bool> {
let mut poll_fd = libc::pollfd {
fd: fd.as_raw_fd(),
events: libc::POLLIN,
revents: 0,
};

let stat = unsafe { stat.assume_init() };
loop {
let ret = unsafe { libc::poll(&mut poll_fd, 1, 0) };

Ok(stat.st_uid)
if ret >= 0 {
return Ok(poll_fd.revents & libc::POLLIN != 0);
}

let err = io::Error::last_os_error();

if err.kind() != io::ErrorKind::Interrupted {
return Err(err);
}
}
}

pub fn uid_from_pid(pid: u32) -> io::Result<u32> {
Expand All @@ -60,7 +70,7 @@ async fn validate_same_user_or_root(
header: &Header<'_>,
conn: &zbus::Connection,
pid: u32,
) -> io::Result<Option<OwnedFd>> {
) -> io::Result<Option<(OwnedFd, u32)>> {
trace!("validate_same_user_or_root for PID {}", pid as libc::pid_t);

// Use a pidfd to ensure we know what process we are talking about.
Expand All @@ -69,19 +79,13 @@ async fn validate_same_user_or_root(
// User who owns the PID.
let pid_uid = uid_from_pid(pid)?;

// Check that the pidfd is still valid after reading the uid from /proc/<pid> to ensure the
// PID hasn't been changed from under us between opening the fd and checking the user id.
let fd = pidfd.as_raw_fd();
let pidfd_still_valid = tokio::task::spawn_blocking(move || {
if unsafe { libc::fcntl(fd, libc::F_GETFD) } == -1 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
})
.await
.unwrap_or_else(|e| Err(io::Error::other(format!("Join error: {e}"))));
pidfd_still_valid?;
// A pidfd becomes readable when its process exits. If the process is
// still alive at this point, the PID cannot have been recycled between
// opening the pidfd and reading the UID from /proc/<pid>, so the UID we
// read belongs to the process the pidfd refers to.
if pidfd_has_exited(&pidfd)? {
return Err(io::Error::from_raw_os_error(libc::ESRCH));
}

trace!("pidfd {} has uid {}", pidfd.as_raw_fd(), pid_uid);

Expand All @@ -97,7 +101,7 @@ async fn validate_same_user_or_root(
trace!("dbus request came from uid {caller_uid}");

if caller_uid == pid_uid || caller_uid == 0 {
Ok(Some(pidfd))
Ok(Some((pidfd, caller_uid)))
} else {
Ok(None)
}
Expand All @@ -118,9 +122,30 @@ impl PinchyDBus {
pid: u32,
syscalls: Vec<i64>,
) -> zbus::fdo::Result<Fd<'_>> {
let Some(pidfd) = validate_same_user_or_root(&header, conn, pid)
// The eBPF SYSCALL_FILTER is a 512-bit bitmap indexed by syscall
// number; reject anything outside that range before it reaches the
// bitmap arithmetic.
const MAX_SYSCALL_NR: i64 = 512;
if let Some(&bad) = syscalls
.iter()
.find(|&&nr| !(0..MAX_SYSCALL_NR).contains(&nr))
{
return Err(zbus::fdo::Error::InvalidArgs(format!(
"invalid syscall number: {bad}"
)));
}

let Some((pidfd, caller_uid)) = validate_same_user_or_root(&header, conn, pid)
.await
.map_err(|e| zbus::fdo::Error::AuthFailed(e.to_string()))?
.map_err(|e| {
if e.raw_os_error() == Some(libc::ESRCH) {
zbus::fdo::Error::UnknownObject(format!(
"no process with PID {pid} (it may have exited)"
))
} else {
zbus::fdo::Error::AuthFailed(e.to_string())
}
})?
else {
return Err(zbus::fdo::Error::AccessDenied("Not authorized".to_string()));
};
Expand All @@ -139,7 +164,7 @@ impl PinchyDBus {
.dispatch
.write()
.await
.register_client(pid, writer, syscalls, Some(pidfd))
.register_client(pid, writer, syscalls, Some(pidfd), caller_uid)
.await
.map_err(|e| zbus::fdo::Error::Failed(e.to_string()))?;

Expand Down
115 changes: 112 additions & 3 deletions pinchy/src/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ impl WriterTask {
revents: 0,
};

loop {
'run: loop {
tokio::select! {
event_bytes = self.event_rx.recv() => {
let Some(event_bytes) = event_bytes else {
Expand Down Expand Up @@ -424,14 +424,24 @@ impl WriterTask {
self.stats.writer_write_error();

log::trace!("Writer task ended: write error");
return;
break 'run;
}

self.stats.writer_event_written(event_bytes.len());

log::trace!("Writer task wrote: {} bytes", event_bytes.len());
}

// Caught up with the queue: flush now. A steady trickle
// of events would otherwise keep resetting the sleep
// below and the output would sit in the buffer until it
// fills.
if self.event_rx.is_empty() && self.writer.flush().await.is_err() {
self.stats.writer_flush_error();

log::trace!("Writer task ended: flush error");
break 'run;
}
},
_ = sleep(Duration::from_millis(WRITER_FLUSH_POLL_MS)) => {
if self.event_rx.is_empty() {
Expand Down Expand Up @@ -468,6 +478,9 @@ struct Client {
client_id: u64,
#[allow(unused)]
pid: u32,
// UID the trace was authorized for; 0 means a root caller, which stays
// authorized across privilege transitions of the traced process.
uid: u32,
sender: tokio::sync::mpsc::Sender<Arc<[u8]>>,
syscalls: HashSet<i64>,
queue_stats: Arc<ClientQueueStats>,
Expand Down Expand Up @@ -633,6 +646,7 @@ impl EventDispatch {
let event_stats = stats.clone();
tokio::spawn(async move {
let mut dispatch_batch: Vec<(WireEventHeader, Arc<[u8]>)> = Vec::with_capacity(256);
let mut revalidate_pids: Vec<u32> = Vec::new();

loop {
tokio::select! {
Expand All @@ -647,6 +661,7 @@ impl EventDispatch {
let ring = guard.get_inner_mut();
let dispatch = event_dispatch.read().await;
dispatch_batch.clear();
revalidate_pids.clear();

while let Some(item) = ring.next() {
let event = &*item;
Expand All @@ -661,6 +676,17 @@ impl EventDispatch {
continue;
};

// A successful exec may be a privilege
// transition; revalidate authorization
// once the batch is dispatched.
if (header.syscall_nr == syscalls::SYS_execve
|| header.syscall_nr == syscalls::SYS_execveat)
&& header.return_value == 0
&& !revalidate_pids.contains(&header.pid)
{
revalidate_pids.push(header.pid);
}

let framed_event = Arc::<[u8]>::from(event);

event_stats.framed_event(framed_event.len());
Expand All @@ -674,6 +700,19 @@ impl EventDispatch {
}

guard.clear_ready();
drop(dispatch);

if !revalidate_pids.is_empty() {
let mut dispatch = event_dispatch.write().await;

for pid in revalidate_pids.drain(..) {
if let Err(e) =
dispatch.revalidate_pid_authorization(pid).await
{
eprintln!("Error revalidating PID {pid}: {e}");
}
}
}
}
Err(e) => {
eprintln!("RingBuf read error: {e}");
Expand All @@ -685,7 +724,14 @@ impl EventDispatch {
let _ = event_dispatch.write().await.remove_client(client_id).await;
},
Some(pid) = timeout_rx.recv() => {
let _ = event_dispatch.write().await.start_pid_timeout(pid).await;
let mut dispatch = event_dispatch.write().await;

// The process exited: stop capturing the PID right
// away so a recycled PID is not traced; clients stay
// registered through the grace period so buffered
// events still get dispatched.
let _ = dispatch.remove_pid_from_filter(pid).await;
let _ = dispatch.start_pid_timeout(pid).await;
},
}
}
Expand Down Expand Up @@ -778,7 +824,24 @@ impl EventDispatch {
writer: tokio::io::BufWriter<tokio::fs::File>,
mut syscalls: Vec<i64>,
pidfd: Option<OwnedFd>, // Pass pidfd from server for monitoring
uid: u32,
) -> anyhow::Result<u64> {
// Each registration holds a pipe, a writer task and a queue in the
// daemon; bound what a single user can pin down.
const MAX_CLIENTS_PER_UID: usize = 64;
if uid != 0 {
let existing = self
.clients_map
.values()
.flatten()
.filter(|client| client.uid == uid)
.count();

if existing >= MAX_CLIENTS_PER_UID {
anyhow::bail!("too many concurrent traces for uid {uid}");
}
}

let client_id = self.next_client_id;
self.next_client_id += 1;

Expand All @@ -794,6 +857,7 @@ impl EventDispatch {
let client_info = Client {
client_id,
pid,
uid,
sender: event_tx,
syscalls: syscalls.into_iter().collect(),
queue_stats: queue_stats.clone(),
Expand Down Expand Up @@ -861,6 +925,41 @@ impl EventDispatch {
Ok(())
}

// After a successful execve the traced process may have gained
// privileges (setuid binary). /proc/<pid> ownership follows the
// process's effective credentials (and turns to root when it becomes
// non-dumpable), so drop any client whose authorizing UID no longer
// matches. Root-authorized clients (uid 0) stay.
pub async fn revalidate_pid_authorization(&mut self, pid: u32) -> anyhow::Result<()> {
let Some(clients) = self.clients_map.get(&pid) else {
return Ok(());
};

// If the process is already gone the pidfd monitoring handles
// cleanup; nothing to decide here.
let Ok(meta) = tokio::fs::metadata(format!("/proc/{pid}")).await else {
return Ok(());
};

let current_uid = std::os::unix::fs::MetadataExt::uid(&meta);

let to_remove: Vec<u64> = clients
.iter()
.filter(|client| client.uid != 0 && client.uid != current_uid)
.map(|client| client.client_id)
.collect();

for client_id in to_remove {
log::warn!(
"PID {pid} changed ownership to uid {current_uid} after exec; \
detaching client {client_id}"
);
self.remove_client(client_id).await?;
}

Ok(())
}

pub async fn remove_all_clients_for_pid(&mut self, pid: u32) -> anyhow::Result<()> {
// Remove all clients for this PID
if self.clients_map.remove(&pid).is_some() {
Expand Down Expand Up @@ -899,6 +998,16 @@ impl EventDispatch {
bitmap[(syscall_nr / 8) as usize] |= 1 << (syscall_nr % 8);
});

// Always capture execve/execveat while anything is traced: a
// successful exec can be a privilege transition, which the dispatch
// loop must observe to revalidate authorization. dispatch_event()
// still drops these events for clients that did not subscribe.
if !self.clients_map.is_empty() {
for nr in [syscalls::SYS_execve, syscalls::SYS_execveat] {
bitmap[(nr / 8) as usize] |= 1 << (nr % 8);
}
}

let mut map: aya::maps::Array<_, u8> =
Array::try_from(self.ebpf.map_mut("SYSCALL_FILTER").unwrap()).unwrap();

Expand Down
Loading