Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/devices/src/virtio/block/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl BlockWorker {
let virtq_ev_fd = self.device_queue.event.as_raw_fd();
let stop_ev_fd = self.stop_fd.as_raw_fd();

let epoll = Epoll::new().unwrap();
let mut epoll = Epoll::new().unwrap();

let _ = epoll.ctl(
ControlOperation::Add,
Expand All @@ -105,8 +105,8 @@ impl BlockWorker {
&EpollEvent::new(EventSet::IN, stop_ev_fd as u64),
);

let mut epoll_events = vec![EpollEvent::new(EventSet::empty(), 0); 32];
loop {
let mut epoll_events = vec![EpollEvent::new(EventSet::empty(), 0); 32];
match epoll.wait(epoll_events.len(), -1, epoll_events.as_mut_slice()) {
Ok(ev_cnt) => {
for event in &epoll_events[0..ev_cnt] {
Expand Down
4 changes: 2 additions & 2 deletions src/devices/src/virtio/fs/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl FsWorker {
let virtq_req_ev_fd = self.queue_evts[REQ_INDEX].as_raw_fd();
let stop_ev_fd = self.stop_fd.as_raw_fd();

let epoll = Epoll::new().unwrap();
let mut epoll = Epoll::new().unwrap();

let _ = epoll.ctl(
ControlOperation::Add,
Expand All @@ -89,8 +89,8 @@ impl FsWorker {
&EpollEvent::new(EventSet::IN, stop_ev_fd as u64),
);

let mut epoll_events = vec![EpollEvent::new(EventSet::empty(), 0); 32];
loop {
let mut epoll_events = vec![EpollEvent::new(EventSet::empty(), 0); 32];
match epoll.wait(epoll_events.len(), -1, epoll_events.as_mut_slice()) {
Ok(ev_cnt) => {
for event in &epoll_events[0..ev_cnt] {
Expand Down
2 changes: 1 addition & 1 deletion src/devices/src/virtio/input/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl InputWorker {
const EVENTQ_USER: u64 = 3;
const QUIT: u64 = 4;
// Set up epoll to wait for events
let epoll = Epoll::new().expect("Failed to create epoll");
let mut epoll = Epoll::new().expect("Failed to create epoll");

let ready_fd = match events_instance.get_read_notify_fd() {
Ok(fd) => fd,
Expand Down
4 changes: 2 additions & 2 deletions src/devices/src/virtio/net/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl NetWorker {
let virtq_tx_ev_fd = self.tx_q.event.as_raw_fd();
let backend_socket = self.backend.raw_socket_fd();

let epoll = Epoll::new().unwrap();
let mut epoll = Epoll::new().unwrap();

let _ = epoll.ctl(
ControlOperation::Add,
Expand All @@ -125,8 +125,8 @@ impl NetWorker {
),
);

let mut epoll_events = vec![EpollEvent::new(EventSet::empty(), 0); 32];
loop {
let mut epoll_events = vec![EpollEvent::new(EventSet::empty(), 0); 32];
match epoll.wait(epoll_events.len(), -1, epoll_events.as_mut_slice()) {
Ok(ev_cnt) => {
for event in &epoll_events[0..ev_cnt] {
Expand Down
4 changes: 2 additions & 2 deletions src/devices/src/virtio/snd/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl SndWorker {
}

fn work(mut self) {
let epoll = Epoll::new().unwrap();
let mut epoll = Epoll::new().unwrap();

for idx in QUEUE_INDEXES {
let fd = self.queue_events[idx].as_raw_fd();
Expand All @@ -139,8 +139,8 @@ impl SndWorker {
)
.unwrap();

let mut epoll_events = vec![EpollEvent::new(EventSet::empty(), 0); 32];
loop {
let mut epoll_events = vec![EpollEvent::new(EventSet::empty(), 0); 32];
match epoll.wait(epoll_events.len(), -1, epoll_events.as_mut_slice()) {
Ok(ev_cnt) => {
for event in &epoll_events[0..ev_cnt] {
Expand Down
4 changes: 2 additions & 2 deletions src/devices/src/virtio/vsock/muxer_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,11 @@ impl MuxerThread {
}
}

fn work(self) {
fn work(mut self) {
let mut thread_rng = rng();
self.create_lisening_ipc_sockets();
let mut epoll_events = vec![EpollEvent::new(EventSet::empty(), 0); 32];
loop {
let mut epoll_events = vec![EpollEvent::new(EventSet::empty(), 0); 32];
match self
.epoll
.wait(epoll_events.len(), -1, epoll_events.as_mut_slice())
Expand Down
4 changes: 2 additions & 2 deletions src/utils/src/linux/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl Epoll {
/// * `events` points to a memory area that will be used for storing the events
/// returned by `epoll_wait()` call.
pub fn wait(
&self,
&mut self,
max_events: usize,
timeout: i32,
events: &mut [EpollEvent],
Expand Down Expand Up @@ -249,7 +249,7 @@ mod tests {
const EVENT_BUFFER_SIZE: usize = 128;
const MAX_EVENTS: usize = 10;

let epoll = Epoll::new().unwrap();
let mut epoll = Epoll::new().unwrap();
assert_eq!(epoll.epoll_fd, epoll.as_raw_fd());

// Let's test different scenarios for `epoll_ctl()` and `epoll_wait()` functionality.
Expand Down
39 changes: 32 additions & 7 deletions src/utils/src/macos/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ bitflags! {
#[derive(Clone, Copy)]
pub struct Kevent(libc::kevent);

// Safety: udata is used as an integer tag (cast from u64), never dereferenced.
// Needed because libc::kevent contains *mut c_void which is !Send, making
// Vec<Kevent> !Send which in turn makes Epoll !Send which is unwanted.
unsafe impl Send for Kevent {}

impl std::fmt::Debug for Kevent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{ ident: {}, data: {} }}", self.ident(), self.data())
Expand Down Expand Up @@ -141,9 +146,19 @@ impl EpollEvent {
}
}

#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct Epoll {
queue: RawFd,
kevs: Vec<Kevent>,
}

impl Clone for Epoll {
fn clone(&self) -> Self {
Epoll {
queue: self.queue,
kevs: Vec::new(),
}
}
}

impl Epoll {
Expand All @@ -152,7 +167,10 @@ impl Epoll {
if queue == -1 {
Err(io::Error::last_os_error())
} else {
Ok(Epoll { queue })
Ok(Epoll {
queue,
kevs: Vec::new(),
})
}
}

Expand Down Expand Up @@ -259,11 +277,13 @@ impl Epoll {
}

pub fn wait(
&self,
&mut self,
max_events: usize,
timeout: i32,
events: &mut [EpollEvent],
) -> io::Result<usize> {
let max_events = events.len().min(max_events).min(i32::MAX as usize);

let _tout = if timeout >= 0 {
Some(Duration::from_millis(timeout as u64))
} else {
Expand All @@ -275,14 +295,16 @@ impl Epoll {
tv_nsec: 0,
};

let mut kevs = vec![Kevent::default(); events.len()];
debug!("kevs len: {}", kevs.len());
self.kevs.clear();
self.kevs.reserve_exact(max_events);
let spare = self.kevs.spare_capacity_mut();
debug_assert!(spare.len() >= max_events);
let ret = unsafe {
libc::kevent(
self.queue,
ptr::null(),
0,
kevs.as_mut_ptr() as *mut libc::kevent,
spare.as_mut_ptr().cast::<libc::kevent>(),
max_events as i32,
&ts as *const libc::timespec,
)
Expand All @@ -295,6 +317,9 @@ impl Epoll {
}

let nevents = ret as usize;
// Safety: kevent() initialized the first `nevents` elements of spare capacity.
unsafe { self.kevs.set_len(nevents) };
let kevs = &self.kevs;

for i in 0..nevents {
if kevs[i].0.filter == libc::EVFILT_READ {
Expand Down Expand Up @@ -378,7 +403,7 @@ mod tests {
const EVENT_BUFFER_SIZE: usize = 128;
const MAX_EVENTS: usize = 10;

let epoll = Epoll::new().unwrap();
let mut epoll = Epoll::new().unwrap();
assert_eq!(epoll.queue, epoll.as_raw_fd());

// Let's test different scenarios for `epoll_ctl()` and `epoll_wait()` functionality.
Expand Down
Loading