diff --git a/bluer/src/gatt/local.rs b/bluer/src/gatt/local.rs index c259074..0ec6451 100644 --- a/bluer/src/gatt/local.rs +++ b/bluer/src/gatt/local.rs @@ -618,9 +618,9 @@ impl CharacteristicWriteIoRequest { /// Accept the write request. pub fn accept(self) -> Result { let CharacteristicWriteIoRequest { adapter_name, device_address, mtu, tx, .. } = self; - let (fd, stream) = make_socket_pair(false)?; + let (fd, socket) = make_socket_pair(false)?; let _ = tx.send(Ok(fd)); - Ok(CharacteristicReader { adapter_name, device_address, mtu: mtu.into(), stream, buf: Vec::new() }) + Ok(CharacteristicReader { adapter_name, device_address, mtu: mtu.into(), socket, buf: Vec::new() }) } /// Reject the write request. @@ -941,13 +941,13 @@ impl RegisteredCharacteristic { Some(CharacteristicNotify { method: CharacteristicNotifyMethod::Io, .. }) => { // BlueZ has already confirmed the start of the notification session. // So there is no point in making this fail-able by our users. - let (fd, stream) = make_socket_pair(true).map_err(|_| ReqError::Failed)?; + let (fd, socket) = make_socket_pair(true).map_err(|_| ReqError::Failed)?; let mtu = mtu_workaround(options.mtu.into()); let writer = CharacteristicWriter { adapter_name: options.adapter_name.clone(), device_address: options.device_address, mtu, - stream, + socket, }; let _ = reg .c diff --git a/bluer/src/gatt/mod.rs b/bluer/src/gatt/mod.rs index e65dddf..e7e29d0 100644 --- a/bluer/src/gatt/mod.rs +++ b/bluer/src/gatt/mod.rs @@ -13,7 +13,7 @@ use std::{ use strum::{Display, EnumString}; use tokio::{ io::{AsyncRead, AsyncWrite, ReadBuf}, - net::UnixStream, + net::UnixDatagram, }; use crate::Address; @@ -112,7 +112,7 @@ pub struct CharacteristicReader { device_address: Address, mtu: usize, #[pin] - stream: UnixStream, + socket: UnixDatagram, buf: Vec, } @@ -134,7 +134,7 @@ impl CharacteristicReader { /// Wait for a new characteristic value to become available. pub async fn recvable(&self) -> std::io::Result<()> { - self.stream.readable().await + self.socket.readable().await } /// Try to receive the characteristic value from a single notify or write operation. @@ -142,7 +142,7 @@ impl CharacteristicReader { /// Does not wait for new data to arrive. pub fn try_recv(&self) -> std::io::Result> { let mut buf = Vec::with_capacity(self.mtu); - let n = self.stream.try_read_buf(&mut buf)?; + let n = self.socket.try_recv_buf(&mut buf)?; buf.truncate(n); Ok(buf) } @@ -162,7 +162,7 @@ impl CharacteristicReader { /// Consumes this object, returning the raw underlying file descriptor. pub fn into_raw_fd(self) -> std::io::Result { - Ok(self.stream.into_std()?.into_raw_fd()) + Ok(self.socket.into_std()?.into_raw_fd()) } } @@ -189,7 +189,7 @@ impl AsyncRead for CharacteristicReader { // If provided buffer is too small, read into temporary buffer. let mut mtu_buf: Vec> = vec![MaybeUninit::uninit(); *this.mtu]; let mut mtu_read_buf = ReadBuf::uninit(&mut mtu_buf); - ready!(this.stream.poll_read(cx, &mut mtu_read_buf))?; + ready!(this.socket.poll_recv(cx, &mut mtu_read_buf))?; let n = mtu_read_buf.filled().len(); mtu_buf.truncate(n); let mut mtu_buf: Vec = mtu_buf.into_iter().map(|v| unsafe { v.assume_init() }).collect(); @@ -201,14 +201,14 @@ impl AsyncRead for CharacteristicReader { Poll::Ready(Ok(())) } else { - self.project().stream.poll_read(cx, buf) + self.project().socket.poll_recv(cx, buf) } } } impl AsRawFd for CharacteristicReader { fn as_raw_fd(&self) -> RawFd { - self.stream.as_raw_fd() + self.socket.as_raw_fd() } } @@ -226,7 +226,7 @@ pub struct CharacteristicWriter { device_address: Address, mtu: usize, #[pin] - stream: UnixStream, + socket: UnixDatagram, } impl CharacteristicWriter { @@ -247,13 +247,13 @@ impl CharacteristicWriter { /// Waits for the remote device to stop the notification session. pub async fn closed(&self) -> std::io::Result<()> { - self.stream.readable().await + self.socket.readable().await } /// Checks if the remote device has stopped the notification session. pub fn is_closed(&self) -> std::io::Result { let mut buf = [0u8]; - match self.stream.try_read(&mut buf) { + match self.socket.try_recv(&mut buf) { Ok(_) => Ok(true), Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => Ok(false), Err(err) => Err(err), @@ -262,7 +262,7 @@ impl CharacteristicWriter { /// Waits for send space to become available. pub async fn sendable(&self) -> std::io::Result<()> { - self.stream.writable().await + self.socket.writable().await } /// Tries to send the characteristic value using a single write or notify operation. @@ -274,7 +274,7 @@ impl CharacteristicWriter { if buf.len() > self.mtu { return Err(std::io::Error::new(std::io::ErrorKind::WriteZero, "data length exceeds MTU")); } - match self.stream.try_write(buf) { + match self.socket.try_send(buf) { Ok(n) if n == buf.len() => Ok(()), Ok(_) => Err(std::io::Error::new(std::io::ErrorKind::Other, "partial write occured")), Err(err) => Err(err), @@ -298,7 +298,7 @@ impl CharacteristicWriter { /// Consumes this object, returning the raw underlying file descriptor. pub fn into_raw_fd(self) -> std::io::Result { - Ok(self.stream.into_std()?.into_raw_fd()) + Ok(self.socket.into_std()?.into_raw_fd()) } } @@ -310,21 +310,21 @@ impl AsyncWrite for CharacteristicWriter { fn poll_write(self: Pin<&mut Self>, cx: &mut std::task::Context, buf: &[u8]) -> Poll> { let max_len = buf.len().min(self.mtu); let buf = &buf[..max_len]; - self.project().stream.poll_write(cx, buf) + self.project().socket.poll_send(cx, buf) } - fn poll_flush(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll> { - self.project().stream.poll_flush(cx) + fn poll_flush(self: Pin<&mut Self>, _cx: &mut std::task::Context) -> Poll> { + Poll::Ready(Ok(())) } - fn poll_shutdown(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll> { - self.project().stream.poll_shutdown(cx) + fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut std::task::Context) -> Poll> { + Poll::Ready(Ok(())) } } impl AsRawFd for CharacteristicWriter { fn as_raw_fd(&self) -> RawFd { - self.stream.as_raw_fd() + self.socket.as_raw_fd() } } @@ -335,7 +335,7 @@ impl IntoRawFd for CharacteristicWriter { } /// Creates a UNIX socket pair for communication with bluetoothd. -pub(crate) fn make_socket_pair(non_block: bool) -> std::io::Result<(OwnedFd, UnixStream)> { +pub(crate) fn make_socket_pair(non_block: bool) -> std::io::Result<(OwnedFd, UnixDatagram)> { let mut sv: [RawFd; 2] = [0; 2]; let mut ty = SOCK_SEQPACKET | SOCK_CLOEXEC; if non_block { @@ -347,10 +347,10 @@ pub(crate) fn make_socket_pair(non_block: bool) -> std::io::Result<(OwnedFd, Uni let [fd1, fd2] = sv; let fd1 = unsafe { OwnedFd::new(fd1) }; - let us = unsafe { std::os::unix::net::UnixStream::from_raw_fd(fd2) }; + let us = unsafe { std::os::unix::net::UnixDatagram::from_raw_fd(fd2) }; us.set_nonblocking(true)?; - let us = UnixStream::from_std(us)?; + let us = UnixDatagram::from_std(us)?; Ok((fd1, us)) } diff --git a/bluer/src/gatt/remote.rs b/bluer/src/gatt/remote.rs index 6f301df..bd42e2c 100644 --- a/bluer/src/gatt/remote.rs +++ b/bluer/src/gatt/remote.rs @@ -7,7 +7,7 @@ use dbus::{ }; use futures::{Stream, StreamExt}; use std::{fmt, os::unix::prelude::FromRawFd, sync::Arc}; -use tokio::net::UnixStream; +use tokio::net::UnixDatagram; use uuid::Uuid; use super::{ @@ -351,15 +351,15 @@ impl Characteristic { pub async fn write_io(&self) -> Result { let options = PropMap::new(); let (fd, mtu): (OwnedFd, u16) = self.call_method("AcquireWrite", (options,)).await?; - let stream = unsafe { std::os::unix::net::UnixStream::from_raw_fd(fd.into_fd()) }; - stream.set_nonblocking(true)?; - let stream = UnixStream::from_std(stream)?; + let socket = unsafe { std::os::unix::net::UnixDatagram::from_raw_fd(fd.into_fd()) }; + socket.set_nonblocking(true)?; + let socket = UnixDatagram::from_std(socket)?; let mtu = mtu_workaround(mtu.into()); Ok(CharacteristicWriter { adapter_name: self.adapter_name().to_string(), device_address: self.device_address, mtu, - stream, + socket, }) } @@ -431,14 +431,14 @@ impl Characteristic { pub async fn notify_io(&self) -> Result { let options = PropMap::new(); let (fd, mtu): (OwnedFd, u16) = self.call_method("AcquireNotify", (options,)).await?; - let stream = unsafe { std::os::unix::net::UnixStream::from_raw_fd(fd.into_fd()) }; - stream.set_nonblocking(true)?; - let stream = UnixStream::from_std(stream)?; + let socket = unsafe { std::os::unix::net::UnixDatagram::from_raw_fd(fd.into_fd()) }; + socket.set_nonblocking(true)?; + let socket = UnixDatagram::from_std(socket)?; Ok(CharacteristicReader { adapter_name: self.adapter_name().to_string(), device_address: self.device_address, mtu: mtu.into(), - stream, + socket, buf: Vec::new(), }) }