Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions bluer/src/gatt/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,9 +618,9 @@ impl CharacteristicWriteIoRequest {
/// Accept the write request.
pub fn accept(self) -> Result<CharacteristicReader> {
let CharacteristicWriteIoRequest { adapter_name, device_address, mtu, tx, .. } = self;
let (fd, stream) = make_socket_pair(false)?;
let (fd, socket) = make_socket_pair(false)?;
let _ = tx.send(Ok(fd));
Ok(CharacteristicReader { adapter_name, device_address, mtu: mtu.into(), stream, buf: Vec::new() })
Ok(CharacteristicReader { adapter_name, device_address, mtu: mtu.into(), socket, buf: Vec::new() })
}

/// Reject the write request.
Expand Down Expand Up @@ -941,13 +941,13 @@ impl RegisteredCharacteristic {
Some(CharacteristicNotify { method: CharacteristicNotifyMethod::Io, .. }) => {
// BlueZ has already confirmed the start of the notification session.
// So there is no point in making this fail-able by our users.
let (fd, stream) = make_socket_pair(true).map_err(|_| ReqError::Failed)?;
let (fd, socket) = make_socket_pair(true).map_err(|_| ReqError::Failed)?;
let mtu = mtu_workaround(options.mtu.into());
let writer = CharacteristicWriter {
adapter_name: options.adapter_name.clone(),
device_address: options.device_address,
mtu,
stream,
socket,
};
let _ = reg
.c
Expand Down
46 changes: 23 additions & 23 deletions bluer/src/gatt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{
use strum::{Display, EnumString};
use tokio::{
io::{AsyncRead, AsyncWrite, ReadBuf},
net::UnixStream,
net::UnixDatagram,
};

use crate::Address;
Expand Down Expand Up @@ -112,7 +112,7 @@ pub struct CharacteristicReader {
device_address: Address,
mtu: usize,
#[pin]
stream: UnixStream,
socket: UnixDatagram,
buf: Vec<u8>,
}

Expand All @@ -134,15 +134,15 @@ impl CharacteristicReader {

/// Wait for a new characteristic value to become available.
pub async fn recvable(&self) -> std::io::Result<()> {
self.stream.readable().await
self.socket.readable().await
}

/// Try to receive the characteristic value from a single notify or write operation.
///
/// Does not wait for new data to arrive.
pub fn try_recv(&self) -> std::io::Result<Vec<u8>> {
let mut buf = Vec::with_capacity(self.mtu);
let n = self.stream.try_read_buf(&mut buf)?;
let n = self.socket.try_recv_buf(&mut buf)?;
buf.truncate(n);
Ok(buf)
}
Expand All @@ -162,7 +162,7 @@ impl CharacteristicReader {

/// Consumes this object, returning the raw underlying file descriptor.
pub fn into_raw_fd(self) -> std::io::Result<RawFd> {
Ok(self.stream.into_std()?.into_raw_fd())
Ok(self.socket.into_std()?.into_raw_fd())
}
}

Expand All @@ -189,7 +189,7 @@ impl AsyncRead for CharacteristicReader {
// If provided buffer is too small, read into temporary buffer.
let mut mtu_buf: Vec<MaybeUninit<u8>> = vec![MaybeUninit::uninit(); *this.mtu];
let mut mtu_read_buf = ReadBuf::uninit(&mut mtu_buf);
ready!(this.stream.poll_read(cx, &mut mtu_read_buf))?;
ready!(this.socket.poll_recv(cx, &mut mtu_read_buf))?;
let n = mtu_read_buf.filled().len();
mtu_buf.truncate(n);
let mut mtu_buf: Vec<u8> = mtu_buf.into_iter().map(|v| unsafe { v.assume_init() }).collect();
Expand All @@ -201,14 +201,14 @@ impl AsyncRead for CharacteristicReader {

Poll::Ready(Ok(()))
} else {
self.project().stream.poll_read(cx, buf)
self.project().socket.poll_recv(cx, buf)
}
}
}

impl AsRawFd for CharacteristicReader {
fn as_raw_fd(&self) -> RawFd {
self.stream.as_raw_fd()
self.socket.as_raw_fd()
}
}

Expand All @@ -226,7 +226,7 @@ pub struct CharacteristicWriter {
device_address: Address,
mtu: usize,
#[pin]
stream: UnixStream,
socket: UnixDatagram,
}

impl CharacteristicWriter {
Expand All @@ -247,13 +247,13 @@ impl CharacteristicWriter {

/// Waits for the remote device to stop the notification session.
pub async fn closed(&self) -> std::io::Result<()> {
self.stream.readable().await
self.socket.readable().await
}

/// Checks if the remote device has stopped the notification session.
pub fn is_closed(&self) -> std::io::Result<bool> {
let mut buf = [0u8];
match self.stream.try_read(&mut buf) {
match self.socket.try_recv(&mut buf) {
Ok(_) => Ok(true),
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => Ok(false),
Err(err) => Err(err),
Expand All @@ -262,7 +262,7 @@ impl CharacteristicWriter {

/// Waits for send space to become available.
pub async fn sendable(&self) -> std::io::Result<()> {
self.stream.writable().await
self.socket.writable().await
}

/// Tries to send the characteristic value using a single write or notify operation.
Expand All @@ -274,7 +274,7 @@ impl CharacteristicWriter {
if buf.len() > self.mtu {
return Err(std::io::Error::new(std::io::ErrorKind::WriteZero, "data length exceeds MTU"));
}
match self.stream.try_write(buf) {
match self.socket.try_send(buf) {
Ok(n) if n == buf.len() => Ok(()),
Ok(_) => Err(std::io::Error::new(std::io::ErrorKind::Other, "partial write occured")),
Err(err) => Err(err),
Expand All @@ -298,7 +298,7 @@ impl CharacteristicWriter {

/// Consumes this object, returning the raw underlying file descriptor.
pub fn into_raw_fd(self) -> std::io::Result<RawFd> {
Ok(self.stream.into_std()?.into_raw_fd())
Ok(self.socket.into_std()?.into_raw_fd())
}
}

Expand All @@ -310,21 +310,21 @@ impl AsyncWrite for CharacteristicWriter {
fn poll_write(self: Pin<&mut Self>, cx: &mut std::task::Context, buf: &[u8]) -> Poll<std::io::Result<usize>> {
let max_len = buf.len().min(self.mtu);
let buf = &buf[..max_len];
self.project().stream.poll_write(cx, buf)
self.project().socket.poll_send(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<std::io::Result<()>> {
self.project().stream.poll_flush(cx)
fn poll_flush(self: Pin<&mut Self>, _cx: &mut std::task::Context) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}

fn poll_shutdown(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<std::io::Result<()>> {
self.project().stream.poll_shutdown(cx)
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut std::task::Context) -> Poll<std::io::Result<()>> {
Poll::Ready(Ok(()))
}
}

impl AsRawFd for CharacteristicWriter {
fn as_raw_fd(&self) -> RawFd {
self.stream.as_raw_fd()
self.socket.as_raw_fd()
}
}

Expand All @@ -335,7 +335,7 @@ impl IntoRawFd for CharacteristicWriter {
}

/// Creates a UNIX socket pair for communication with bluetoothd.
pub(crate) fn make_socket_pair(non_block: bool) -> std::io::Result<(OwnedFd, UnixStream)> {
pub(crate) fn make_socket_pair(non_block: bool) -> std::io::Result<(OwnedFd, UnixDatagram)> {
let mut sv: [RawFd; 2] = [0; 2];
let mut ty = SOCK_SEQPACKET | SOCK_CLOEXEC;
if non_block {
Expand All @@ -347,10 +347,10 @@ pub(crate) fn make_socket_pair(non_block: bool) -> std::io::Result<(OwnedFd, Uni
let [fd1, fd2] = sv;

let fd1 = unsafe { OwnedFd::new(fd1) };
let us = unsafe { std::os::unix::net::UnixStream::from_raw_fd(fd2) };
let us = unsafe { std::os::unix::net::UnixDatagram::from_raw_fd(fd2) };

us.set_nonblocking(true)?;
let us = UnixStream::from_std(us)?;
let us = UnixDatagram::from_std(us)?;

Ok((fd1, us))
}
Expand Down
18 changes: 9 additions & 9 deletions bluer/src/gatt/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use dbus::{
};
use futures::{Stream, StreamExt};
use std::{fmt, os::unix::prelude::FromRawFd, sync::Arc};
use tokio::net::UnixStream;
use tokio::net::UnixDatagram;
use uuid::Uuid;

use super::{
Expand Down Expand Up @@ -351,15 +351,15 @@ impl Characteristic {
pub async fn write_io(&self) -> Result<CharacteristicWriter> {
let options = PropMap::new();
let (fd, mtu): (OwnedFd, u16) = self.call_method("AcquireWrite", (options,)).await?;
let stream = unsafe { std::os::unix::net::UnixStream::from_raw_fd(fd.into_fd()) };
stream.set_nonblocking(true)?;
let stream = UnixStream::from_std(stream)?;
let socket = unsafe { std::os::unix::net::UnixDatagram::from_raw_fd(fd.into_fd()) };
socket.set_nonblocking(true)?;
let socket = UnixDatagram::from_std(socket)?;
let mtu = mtu_workaround(mtu.into());
Ok(CharacteristicWriter {
adapter_name: self.adapter_name().to_string(),
device_address: self.device_address,
mtu,
stream,
socket,
})
}

Expand Down Expand Up @@ -431,14 +431,14 @@ impl Characteristic {
pub async fn notify_io(&self) -> Result<CharacteristicReader> {
let options = PropMap::new();
let (fd, mtu): (OwnedFd, u16) = self.call_method("AcquireNotify", (options,)).await?;
let stream = unsafe { std::os::unix::net::UnixStream::from_raw_fd(fd.into_fd()) };
stream.set_nonblocking(true)?;
let stream = UnixStream::from_std(stream)?;
let socket = unsafe { std::os::unix::net::UnixDatagram::from_raw_fd(fd.into_fd()) };
socket.set_nonblocking(true)?;
let socket = UnixDatagram::from_std(socket)?;
Ok(CharacteristicReader {
adapter_name: self.adapter_name().to_string(),
device_address: self.device_address,
mtu: mtu.into(),
stream,
socket,
buf: Vec::new(),
})
}
Expand Down
Loading