Skip to content

QapiEvents<S>::poll_next do not work with tokio::select! #25

@xuehaonan27

Description

@xuehaonan27

I've been using qapi for communication with QEMU. What I have done is:

let stream = qapi::futures::QmpStreamTokio::open_uds(&socket_addr).await?;
let stream = stream.negotiate().await?;
let (service, events) = stream.into_parts();

To get a QapiEvents<S> instance.
Then I used it with tokio::select:

let qemu_exit_status: Result<ExitStatus> = loop {
    tokio::select! {
        // Wait for the QEMU process to exit
        result = self.child.wait() => {
            info!("{MONITOR_MSG_HEADER}QEMU process terminated");
            match result {
                Ok(exit_status) => {
                    // QEMU process has exited, sync one last time and return
                    break Ok(exit_status)
                }
                Err(e) => {
                    // Error waiting for QEMU process, return the error.
                    break Err(e.into())
                }
            }
        }
        // Monitor events sent by QEMU
        Some(event) = self.qmp.events.next() => {
            let event = match event {
                Ok(event) => { event }
                Err(e) => {
                    error!("{MONITOR_MSG_HEADER}{e}");
                    errs.push(e.into());
                    continue;
                }
            };
            //TODO: handle events
            info!("{MONITOR_MSG_HEADER}got event {:?}", event);
        }
        
        _ = interval.tick() => {
            heartbeat_sent += 1;
            if heartbeat_sent == 10 {
                info!("{MONITOR_MSG_HEADER}Sent 10 heartbeat");
                heartbeat_sent = 0;
            }
        }
    }
};

Then the tokio::select! failed to work (I cannot see Sent to heartbeat in log file, which means the whole tokio::select! failed).

Then I searched in the source file and found:

// In `qapi::futures`
#[cfg(feature = "qapi-qmp")]
impl<S: Stream<Item=io::Result<QmpMessageAny>>> Stream for QapiEvents<S> {
    type Item = io::Result<qapi_qmp::Event>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = unsafe { self.get_unchecked_mut() };
        let stream = unsafe { Pin::new_unchecked(&mut this.stream) };
        let shared = &this.shared;

        shared.poll_next(cx, |cx| Poll::Ready(match futures::ready!(stream.poll_next(cx)) {
            None => None, // eof
            Some(Err(e)) => Some(Err(e)),
            Some(Ok(QmpMessage::Event(e))) => Some(Ok(e)),
            Some(Ok(QmpMessage::Response(res))) => match handle_response(shared, res) {
                Err(e) => Some(Err(e)),
                Ok(()) => {
                    cx.waker().wake_by_ref(); // TODO: I've seen this not work with tokio?
                    return Poll::Pending
                },
            },
        }))
    }
}

TODO: I've seen this not work with tokio?
Yeah now I've seen this as well!
Do you have any plan to work on this bug? @arcnmx

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions