diff --git a/src/sources/channel.rs b/src/sources/channel.rs index 49f439ea..fc84fea6 100644 --- a/src/sources/channel.rs +++ b/src/sources/channel.rs @@ -10,7 +10,8 @@ use std::cmp; use std::fmt; -use std::sync::mpsc; +use std::ops; +use std::sync::{mpsc, Arc}; use crate::{EventSource, Poll, PostAction, Readiness, Token, TokenFactory}; @@ -30,13 +31,32 @@ pub enum Event { Closed, } +#[derive(Debug)] +struct PingOnDrop(Ping); + +impl ops::Deref for PingOnDrop { + type Target = Ping; + + fn deref(&self) -> &Ping { + &self.0 + } +} + +impl Drop for PingOnDrop { + fn drop(&mut self) { + self.0.ping(); + } +} + /// The sender end of a channel /// /// It can be cloned and sent accross threads (if `T` is). #[derive(Debug)] pub struct Sender { sender: mpsc::Sender, - ping: Ping, + // Dropped after `sender` so receiver is guaranteed to get `Disconnected` + // after ping. + ping: PingOnDrop, } impl Clone for Sender { @@ -44,7 +64,7 @@ impl Clone for Sender { fn clone(&self) -> Sender { Sender { sender: self.sender.clone(), - ping: self.ping.clone(), + ping: PingOnDrop(self.ping.clone()), } } } @@ -59,20 +79,15 @@ impl Sender { } } -impl Drop for Sender { - fn drop(&mut self) { - // ping on drop, to notify about channel closure - self.ping.ping(); - } -} - /// The sender end of a synchronous channel /// /// It can be cloned and sent accross threads (if `T` is). #[derive(Debug)] pub struct SyncSender { sender: mpsc::SyncSender, - ping: Ping, + // Dropped after `sender` so receiver is guaranteed to get `Disconnected` + // after ping. + ping: Arc, } impl Clone for SyncSender { @@ -164,7 +179,7 @@ pub fn channel() -> (Sender, Channel) { ( Sender { sender, - ping: ping.clone(), + ping: PingOnDrop(ping.clone()), }, Channel { receiver, @@ -182,7 +197,7 @@ pub fn sync_channel(bound: usize) -> (SyncSender, Channel) { ( SyncSender { sender, - ping: ping.clone(), + ping: Arc::new(PingOnDrop(ping.clone())), }, Channel { receiver,