I've been using qapi for communication with QEMU. What I have done is:
let stream = qapi::futures::QmpStreamTokio::open_uds(&socket_addr).await?;
let stream = stream.negotiate().await?;
let (service, events) = stream.into_parts();
To get a QapiEvents<S> instance.
Then I used it with tokio::select:
let qemu_exit_status: Result<ExitStatus> = loop {
tokio::select! {
// Wait for the QEMU process to exit
result = self.child.wait() => {
info!("{MONITOR_MSG_HEADER}QEMU process terminated");
match result {
Ok(exit_status) => {
// QEMU process has exited, sync one last time and return
break Ok(exit_status)
}
Err(e) => {
// Error waiting for QEMU process, return the error.
break Err(e.into())
}
}
}
// Monitor events sent by QEMU
Some(event) = self.qmp.events.next() => {
let event = match event {
Ok(event) => { event }
Err(e) => {
error!("{MONITOR_MSG_HEADER}{e}");
errs.push(e.into());
continue;
}
};
//TODO: handle events
info!("{MONITOR_MSG_HEADER}got event {:?}", event);
}
_ = interval.tick() => {
heartbeat_sent += 1;
if heartbeat_sent == 10 {
info!("{MONITOR_MSG_HEADER}Sent 10 heartbeat");
heartbeat_sent = 0;
}
}
}
};
Then the tokio::select! failed to work (I cannot see Sent to heartbeat in log file, which means the whole tokio::select! failed).
Then I searched in the source file and found:
// In `qapi::futures`
#[cfg(feature = "qapi-qmp")]
impl<S: Stream<Item=io::Result<QmpMessageAny>>> Stream for QapiEvents<S> {
type Item = io::Result<qapi_qmp::Event>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = unsafe { self.get_unchecked_mut() };
let stream = unsafe { Pin::new_unchecked(&mut this.stream) };
let shared = &this.shared;
shared.poll_next(cx, |cx| Poll::Ready(match futures::ready!(stream.poll_next(cx)) {
None => None, // eof
Some(Err(e)) => Some(Err(e)),
Some(Ok(QmpMessage::Event(e))) => Some(Ok(e)),
Some(Ok(QmpMessage::Response(res))) => match handle_response(shared, res) {
Err(e) => Some(Err(e)),
Ok(()) => {
cx.waker().wake_by_ref(); // TODO: I've seen this not work with tokio?
return Poll::Pending
},
},
}))
}
}
TODO: I've seen this not work with tokio?
Yeah now I've seen this as well!
Do you have any plan to work on this bug? @arcnmx
I've been using
qapifor communication with QEMU. What I have done is:To get a
QapiEvents<S>instance.Then I used it with
tokio::select:Then the
tokio::select!failed to work (I cannot seeSent to heartbeatin log file, which means the wholetokio::select!failed).Then I searched in the source file and found:
TODO: I've seen this not work with tokio?Yeah now I've seen this as well!
Do you have any plan to work on this bug? @arcnmx