auto merge of #13448 : alexcrichton/rust/rework-chan-return-values, r=brson

There are currently a number of return values from the std::comm methods, not
all of which are necessarily completely expressive:

 * `Sender::try_send(t: T) -> bool`
    This method currently doesn't transmit back the data `t` if the send fails
    due to the other end having disconnected. Additionally, this shares the name
    of the synchronous try_send method, but it differs in semantics in that it
    only has one failure case, not two (the buffer can never be full).

 * `SyncSender::try_send(t: T) -> TrySendResult<T>`
    This method accurately conveys all possible information, but it uses a
    custom type to the std::comm module with no convenience methods on it.
    Additionally, if you want to inspect the result you're forced to import
    something from `std::comm`.

 * `SyncSender::send_opt(t: T) -> Option<T>`
    This method uses Some(T) as an "error value" and None as a "success value",
    but almost all other uses of Option<T> have Some/None the other way

 * `Receiver::try_recv(t: T) -> TryRecvResult<T>`
    Similarly to the synchronous try_send, this custom return type is lacking in
    terms of usability (no convenience methods).

With this number of drawbacks in mind, I believed it was time to re-work the
return types of these methods. The new API for the comm module is:

    Sender::send(t: T) -> ()
    Sender::send_opt(t: T) -> Result<(), T>
    SyncSender::send(t: T) -> ()
    SyncSender::send_opt(t: T) -> Result<(), T>
    SyncSender::try_send(t: T) -> Result<(), TrySendError<T>>
    Receiver::recv() -> T
    Receiver::recv_opt() -> Result<T, ()>
    Receiver::try_recv() -> Result<T, TryRecvError>

The notable changes made are:

* Sender::try_send => Sender::send_opt. This renaming brings the semantics in
  line with the SyncSender::send_opt method. An asychronous send only has one
  failure case, unlike the synchronous try_send method which has two failure
  cases (full/disconnected).

* Sender::send_opt returns the data back to the caller if the send is guaranteed
  to fail. This method previously returned `bool`, but then it was unable to
  retrieve the data if the data was guaranteed to fail to send. There is still a
  race such that when `Ok(())` is returned the data could still fail to be
  received, but that's inherent to an asynchronous channel.

* Result is now the basis of all return values. This not only adds lots of
  convenience methods to all return values for free, but it also means that you
  can inspect the return values with no extra imports (Ok/Err are in the
  prelude). Additionally, it's now self documenting when something failed or not
  because the return value has "Err" in the name.

Things I'm a little uneasy about:

* The methods send_opt and recv_opt are not returning options, but rather
  results. I felt more strongly that Option was the wrong return type than the
  _opt prefix was wrong, and I coudn't think of a much better name for these
  methods. One possible way to think about them is to read the _opt suffix as
  "optionally".

* Result<T, ()> is often better expressed as Option<T>. This is only applicable
  to the recv_opt() method, but I thought it would be more consistent for
  everything to return Result rather than one method returning an Option.

Despite my two reasons to feel uneasy, I feel much better about the consistency
in return values at this point, and I think the only real open question is if
there's a better suffix for {send,recv}_opt.

Closes #11527
This commit is contained in:
bors 2014-04-12 12:21:58 -07:00
commit ab0d847277
27 changed files with 232 additions and 227 deletions

View file

@ -1011,7 +1011,6 @@ fn new_sched_rng() -> XorShiftRng {
mod test {
use rustuv;
use std::comm;
use std::task::TaskOpts;
use std::rt::task::Task;
use std::rt::local::Local;
@ -1428,7 +1427,7 @@ mod test {
// This task should not be able to starve the sender;
// The sender should get stolen to another thread.
spawn(proc() {
while rx.try_recv() != comm::Data(()) { }
while rx.try_recv().is_err() { }
});
tx.send(());
@ -1445,7 +1444,7 @@ mod test {
// This task should not be able to starve the other task.
// The sends should eventually yield.
spawn(proc() {
while rx1.try_recv() != comm::Data(()) {
while rx1.try_recv().is_err() {
tx2.send(());
}
});
@ -1499,7 +1498,7 @@ mod test {
let mut val = 20;
while val > 0 {
val = po.recv();
ch.try_send(val - 1);
let _ = ch.send_opt(val - 1);
}
}

View file

@ -515,7 +515,7 @@ mod tests {
let _tx = tx;
fail!()
});
assert_eq!(rx.recv_opt(), None);
assert_eq!(rx.recv_opt(), Err(()));
}
#[test]

View file

@ -46,7 +46,6 @@
//!
//! Note that all time units in this file are in *milliseconds*.
use std::comm::Data;
use libc;
use std::mem;
use std::os;
@ -119,7 +118,7 @@ fn helper(input: libc::c_int, messages: Receiver<Req>) {
Some(timer) => timer, None => return
};
let tx = timer.tx.take_unwrap();
if tx.try_send(()) && timer.repeat {
if tx.send_opt(()).is_ok() && timer.repeat {
timer.tx = Some(tx);
timer.target += timer.interval;
insert(timer, active);
@ -162,14 +161,14 @@ fn helper(input: libc::c_int, messages: Receiver<Req>) {
1 => {
loop {
match messages.try_recv() {
Data(Shutdown) => {
Ok(Shutdown) => {
assert!(active.len() == 0);
break 'outer;
}
Data(NewTimer(timer)) => insert(timer, &mut active),
Ok(NewTimer(timer)) => insert(timer, &mut active),
Data(RemoveTimer(id, ack)) => {
Ok(RemoveTimer(id, ack)) => {
match dead.iter().position(|&(i, _)| id == i) {
Some(i) => {
let (_, i) = dead.remove(i).unwrap();

View file

@ -28,7 +28,6 @@
//!
//! As with timer_other, all units in this file are in units of millseconds.
use std::comm::Data;
use libc;
use std::ptr;
use std::os;
@ -107,7 +106,7 @@ fn helper(input: libc::c_int, messages: Receiver<Req>) {
match list.as_slice().bsearch(|&(f, _, _)| f.cmp(&fd)) {
Some(i) => {
let (_, ref c, oneshot) = *list.get(i);
(!c.try_send(()) || oneshot, i)
(c.send_opt(()).is_err() || oneshot, i)
}
None => fail!("fd not active: {}", fd),
}
@ -121,7 +120,7 @@ fn helper(input: libc::c_int, messages: Receiver<Req>) {
while incoming {
match messages.try_recv() {
Data(NewTimer(fd, chan, one, timeval)) => {
Ok(NewTimer(fd, chan, one, timeval)) => {
// acknowledge we have the new channel, we will never send
// another message to the old channel
chan.send(());
@ -149,7 +148,7 @@ fn helper(input: libc::c_int, messages: Receiver<Req>) {
assert_eq!(ret, 0);
}
Data(RemoveTimer(fd, chan)) => {
Ok(RemoveTimer(fd, chan)) => {
match list.as_slice().bsearch(|&(f, _, _)| f.cmp(&fd)) {
Some(i) => {
drop(list.remove(i));
@ -160,7 +159,7 @@ fn helper(input: libc::c_int, messages: Receiver<Req>) {
chan.send(());
}
Data(Shutdown) => {
Ok(Shutdown) => {
assert!(list.len() == 0);
break 'outer;
}

View file

@ -20,7 +20,6 @@
//! Other than that, the implementation is pretty straightforward in terms of
//! the other two implementations of timers with nothing *that* new showing up.
use std::comm::Data;
use libc;
use std::ptr;
use std::rt::rtio;
@ -54,11 +53,11 @@ fn helper(input: libc::HANDLE, messages: Receiver<Req>) {
if idx == 0 {
loop {
match messages.try_recv() {
Data(NewTimer(obj, c, one)) => {
Ok(NewTimer(obj, c, one)) => {
objs.push(obj);
chans.push((c, one));
}
Data(RemoveTimer(obj, c)) => {
Ok(RemoveTimer(obj, c)) => {
c.send(());
match objs.iter().position(|&o| o == obj) {
Some(i) => {
@ -68,7 +67,7 @@ fn helper(input: libc::HANDLE, messages: Receiver<Req>) {
None => {}
}
}
Data(Shutdown) => {
Ok(Shutdown) => {
assert_eq!(objs.len(), 1);
assert_eq!(chans.len(), 0);
break 'outer;
@ -79,7 +78,7 @@ fn helper(input: libc::HANDLE, messages: Receiver<Req>) {
} else {
let remove = {
match chans.get(idx as uint - 1) {
&(ref c, oneshot) => !c.try_send(()) || oneshot
&(ref c, oneshot) => c.send_opt(()).is_err() || oneshot
}
};
if remove {

View file

@ -274,7 +274,7 @@ mod tests {
let _tx = tx;
fail!()
});
assert_eq!(rx.recv_opt(), None);
assert_eq!(rx.recv_opt(), Err(()));
}
#[test]

View file

@ -1065,7 +1065,7 @@ mod test {
}
reads += 1;
tx2.try_send(());
let _ = tx2.send_opt(());
}
// Make sure we had multiple reads

View file

@ -51,7 +51,7 @@ impl SignalWatcher {
extern fn signal_cb(handle: *uvll::uv_signal_t, signum: c_int) {
let s: &mut SignalWatcher = unsafe { UvHandle::from_uv_handle(&handle) };
assert_eq!(signum as int, s.signal as int);
s.channel.try_send(s.signal);
let _ = s.channel.send_opt(s.signal);
}
impl HomingIO for SignalWatcher {

View file

@ -140,9 +140,9 @@ extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) {
let task = timer.blocker.take_unwrap();
let _ = task.wake().map(|t| t.reawaken());
}
SendOnce(chan) => { let _ = chan.try_send(()); }
SendOnce(chan) => { let _ = chan.send_opt(()); }
SendMany(chan, id) => {
let _ = chan.try_send(());
let _ = chan.send_opt(());
// Note that the above operation could have performed some form of
// scheduling. This means that the timer may have decided to insert
@ -196,8 +196,8 @@ mod test {
let oport = timer.oneshot(1);
let pport = timer.period(1);
timer.sleep(1);
assert_eq!(oport.recv_opt(), None);
assert_eq!(pport.recv_opt(), None);
assert_eq!(oport.recv_opt(), Err(()));
assert_eq!(pport.recv_opt(), Err(()));
timer.oneshot(1).recv();
}
@ -284,7 +284,7 @@ mod test {
let mut timer = TimerWatcher::new(local_loop());
timer.oneshot(1000)
};
assert_eq!(port.recv_opt(), None);
assert_eq!(port.recv_opt(), Err(()));
}
#[test]
@ -293,7 +293,7 @@ mod test {
let mut timer = TimerWatcher::new(local_loop());
timer.period(1000)
};
assert_eq!(port.recv_opt(), None);
assert_eq!(port.recv_opt(), Err(()));
}
#[test]

View file

@ -322,25 +322,19 @@ pub struct SyncSender<T> {
/// This enumeration is the list of the possible reasons that try_recv could not
/// return data when called.
#[deriving(Eq, Clone, Show)]
pub enum TryRecvResult<T> {
pub enum TryRecvError {
/// This channel is currently empty, but the sender(s) have not yet
/// disconnected, so data may yet become available.
Empty,
/// This channel's sending half has become disconnected, and there will
/// never be any more data received on this channel
Disconnected,
/// The channel had some data and we successfully popped it
Data(T),
}
/// This enumeration is the list of the possible outcomes for the
/// This enumeration is the list of the possible error outcomes for the
/// `SyncSender::try_send` method.
#[deriving(Eq, Clone, Show)]
pub enum TrySendResult<T> {
/// The data was successfully sent along the channel. This either means that
/// it was buffered in the channel, or handed off to a receiver. In either
/// case, the callee no longer has ownership of the data.
Sent,
pub enum TrySendError<T> {
/// The data could not be sent on the channel because it would require that
/// the callee block to send the data.
///
@ -365,7 +359,7 @@ enum Flavor<T> {
/// of `Receiver` and `Sender` to see what's possible with them.
pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
let (a, b) = UnsafeArc::new2(oneshot::Packet::new());
(Sender::my_new(Oneshot(b)), Receiver::my_new(Oneshot(a)))
(Sender::new(Oneshot(b)), Receiver::new(Oneshot(a)))
}
/// Creates a new synchronous, bounded channel.
@ -401,7 +395,7 @@ pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
/// ```
pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
let (a, b) = UnsafeArc::new2(sync::Packet::new(bound));
(SyncSender::new(a), Receiver::my_new(Sync(b)))
(SyncSender::new(a), Receiver::new(Sync(b)))
}
////////////////////////////////////////////////////////////////////////////////
@ -409,7 +403,7 @@ pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
////////////////////////////////////////////////////////////////////////////////
impl<T: Send> Sender<T> {
fn my_new(inner: Flavor<T>) -> Sender<T> {
fn new(inner: Flavor<T>) -> Sender<T> {
Sender { inner: inner, sends: Cell::new(0), marker: marker::NoShare }
}
@ -433,25 +427,42 @@ impl<T: Send> Sender<T> {
/// The purpose of this functionality is to propagate failure among tasks.
/// If failure is not desired, then consider using the `try_send` method
pub fn send(&self, t: T) {
if !self.try_send(t) {
if self.send_opt(t).is_err() {
fail!("sending on a closed channel");
}
}
/// Attempts to send a value on this channel, returning whether it was
/// successfully sent.
/// Attempts to send a value on this channel, returning it back if it could
/// not be sent.
///
/// A successful send occurs when it is determined that the other end of
/// the channel has not hung up already. An unsuccessful send would be one
/// where the corresponding receiver has already been deallocated. Note
/// that a return value of `false` means that the data will never be
/// received, but a return value of `true` does *not* mean that the data
/// that a return value of `Err` means that the data will never be
/// received, but a return value of `Ok` does *not* mean that the data
/// will be received. It is possible for the corresponding receiver to
/// hang up immediately after this function returns `true`.
/// hang up immediately after this function returns `Ok`.
///
/// Like `send`, this method will never block. If the failure of send cannot
/// be tolerated, then this method should be used instead.
pub fn try_send(&self, t: T) -> bool {
/// Like `send`, this method will never block.
///
/// # Failure
///
/// This method will never fail, it will return the message back to the
/// caller if the other end is disconnected
///
/// # Example
///
/// ```
/// let (tx, rx) = channel();
///
/// // This send is always successful
/// assert_eq!(tx.send_opt(1), Ok(()));
///
/// // This send will fail because the receiver is gone
/// drop(rx);
/// assert_eq!(tx.send_opt(1), Err(1));
/// ```
pub fn send_opt(&self, t: T) -> Result<(), T> {
// In order to prevent starvation of other tasks in situations where
// a task sends repeatedly without ever receiving, we occassionally
// yield instead of doing a send immediately.
@ -475,16 +486,19 @@ impl<T: Send> Sender<T> {
return (*p).send(t);
} else {
let (a, b) = UnsafeArc::new2(stream::Packet::new());
match (*p).upgrade(Receiver::my_new(Stream(b))) {
match (*p).upgrade(Receiver::new(Stream(b))) {
oneshot::UpSuccess => {
(*a.get()).send(t);
(a, true)
let ret = (*a.get()).send(t);
(a, ret)
}
oneshot::UpDisconnected => (a, false),
oneshot::UpDisconnected => (a, Err(t)),
oneshot::UpWoke(task) => {
(*a.get()).send(t);
// This send cannot fail because the task is
// asleep (we're looking at it), so the receiver
// can't go away.
(*a.get()).send(t).unwrap();
task.wake().map(|t| t.reawaken());
(a, true)
(a, Ok(()))
}
}
}
@ -496,7 +510,7 @@ impl<T: Send> Sender<T> {
};
unsafe {
let mut tmp = Sender::my_new(Stream(new_inner));
let mut tmp = Sender::new(Stream(new_inner));
mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner);
}
return ret;
@ -508,21 +522,21 @@ impl<T: Send> Clone for Sender<T> {
let (packet, sleeper) = match self.inner {
Oneshot(ref p) => {
let (a, b) = UnsafeArc::new2(shared::Packet::new());
match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } {
match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } {
oneshot::UpSuccess | oneshot::UpDisconnected => (b, None),
oneshot::UpWoke(task) => (b, Some(task))
}
}
Stream(ref p) => {
let (a, b) = UnsafeArc::new2(shared::Packet::new());
match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } {
match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } {
stream::UpSuccess | stream::UpDisconnected => (b, None),
stream::UpWoke(task) => (b, Some(task)),
}
}
Shared(ref p) => {
unsafe { (*p.get()).clone_chan(); }
return Sender::my_new(Shared(p.clone()));
return Sender::new(Shared(p.clone()));
}
Sync(..) => unreachable!(),
};
@ -530,10 +544,10 @@ impl<T: Send> Clone for Sender<T> {
unsafe {
(*packet.get()).inherit_blocker(sleeper);
let mut tmp = Sender::my_new(Shared(packet.clone()));
let mut tmp = Sender::new(Shared(packet.clone()));
mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner);
}
Sender::my_new(Shared(packet))
Sender::new(Shared(packet))
}
}
@ -579,7 +593,7 @@ impl<T: Send> SyncSender<T> {
/// `SyncSender::send_opt` method which will not fail if the receiver
/// disconnects.
pub fn send(&self, t: T) {
if self.send_opt(t).is_some() {
if self.send_opt(t).is_err() {
fail!("sending on a closed channel");
}
}
@ -595,11 +609,8 @@ impl<T: Send> SyncSender<T> {
/// # Failure
///
/// This function cannot fail.
pub fn send_opt(&self, t: T) -> Option<T> {
match unsafe { (*self.inner.get()).send(t) } {
Ok(()) => None,
Err(t) => Some(t),
}
pub fn send_opt(&self, t: T) -> Result<(), T> {
unsafe { (*self.inner.get()).send(t) }
}
/// Attempts to send a value on this channel without blocking.
@ -615,7 +626,7 @@ impl<T: Send> SyncSender<T> {
/// # Failure
///
/// This function cannot fail
pub fn try_send(&self, t: T) -> TrySendResult<T> {
pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
unsafe { (*self.inner.get()).try_send(t) }
}
}
@ -639,7 +650,7 @@ impl<T: Send> Drop for SyncSender<T> {
////////////////////////////////////////////////////////////////////////////////
impl<T: Send> Receiver<T> {
fn my_new(inner: Flavor<T>) -> Receiver<T> {
fn new(inner: Flavor<T>) -> Receiver<T> {
Receiver { inner: inner, receives: Cell::new(0), marker: marker::NoShare }
}
@ -664,8 +675,8 @@ impl<T: Send> Receiver<T> {
/// peek at a value on this receiver.
pub fn recv(&self) -> T {
match self.recv_opt() {
Some(t) => t,
None => fail!("receiving on a closed channel"),
Ok(t) => t,
Err(()) => fail!("receiving on a closed channel"),
}
}
@ -679,7 +690,7 @@ impl<T: Send> Receiver<T> {
/// block on a receiver.
///
/// This function cannot fail.
pub fn try_recv(&self) -> TryRecvResult<T> {
pub fn try_recv(&self) -> Result<T, TryRecvError> {
// If a thread is spinning in try_recv, we should take the opportunity
// to reschedule things occasionally. See notes above in scheduling on
// sends for why this doesn't always hit TLS, and also for why this uses
@ -695,32 +706,32 @@ impl<T: Send> Receiver<T> {
let mut new_port = match self.inner {
Oneshot(ref p) => {
match unsafe { (*p.get()).try_recv() } {
Ok(t) => return Data(t),
Err(oneshot::Empty) => return Empty,
Err(oneshot::Disconnected) => return Disconnected,
Ok(t) => return Ok(t),
Err(oneshot::Empty) => return Err(Empty),
Err(oneshot::Disconnected) => return Err(Disconnected),
Err(oneshot::Upgraded(rx)) => rx,
}
}
Stream(ref p) => {
match unsafe { (*p.get()).try_recv() } {
Ok(t) => return Data(t),
Err(stream::Empty) => return Empty,
Err(stream::Disconnected) => return Disconnected,
Ok(t) => return Ok(t),
Err(stream::Empty) => return Err(Empty),
Err(stream::Disconnected) => return Err(Disconnected),
Err(stream::Upgraded(rx)) => rx,
}
}
Shared(ref p) => {
match unsafe { (*p.get()).try_recv() } {
Ok(t) => return Data(t),
Err(shared::Empty) => return Empty,
Err(shared::Disconnected) => return Disconnected,
Ok(t) => return Ok(t),
Err(shared::Empty) => return Err(Empty),
Err(shared::Disconnected) => return Err(Disconnected),
}
}
Sync(ref p) => {
match unsafe { (*p.get()).try_recv() } {
Ok(t) => return Data(t),
Err(sync::Empty) => return Empty,
Err(sync::Disconnected) => return Disconnected,
Ok(t) => return Ok(t),
Err(sync::Empty) => return Err(Empty),
Err(sync::Disconnected) => return Err(Disconnected),
}
}
};
@ -741,32 +752,32 @@ impl<T: Send> Receiver<T> {
/// In other words, this function has the same semantics as the `recv`
/// method except for the failure aspect.
///
/// If the channel has hung up, then `None` is returned. Otherwise `Some` of
/// If the channel has hung up, then `Err` is returned. Otherwise `Ok` of
/// the value found on the receiver is returned.
pub fn recv_opt(&self) -> Option<T> {
pub fn recv_opt(&self) -> Result<T, ()> {
loop {
let mut new_port = match self.inner {
Oneshot(ref p) => {
match unsafe { (*p.get()).recv() } {
Ok(t) => return Some(t),
Ok(t) => return Ok(t),
Err(oneshot::Empty) => return unreachable!(),
Err(oneshot::Disconnected) => return None,
Err(oneshot::Disconnected) => return Err(()),
Err(oneshot::Upgraded(rx)) => rx,
}
}
Stream(ref p) => {
match unsafe { (*p.get()).recv() } {
Ok(t) => return Some(t),
Ok(t) => return Ok(t),
Err(stream::Empty) => return unreachable!(),
Err(stream::Disconnected) => return None,
Err(stream::Disconnected) => return Err(()),
Err(stream::Upgraded(rx)) => rx,
}
}
Shared(ref p) => {
match unsafe { (*p.get()).recv() } {
Ok(t) => return Some(t),
Ok(t) => return Ok(t),
Err(shared::Empty) => return unreachable!(),
Err(shared::Disconnected) => return None,
Err(shared::Disconnected) => return Err(()),
}
}
Sync(ref p) => return unsafe { (*p.get()).recv() }
@ -873,7 +884,7 @@ impl<T: Send> select::Packet for Receiver<T> {
}
impl<'a, T: Send> Iterator<T> for Messages<'a, T> {
fn next(&mut self) -> Option<T> { self.rx.recv_opt() }
fn next(&mut self) -> Option<T> { self.rx.recv_opt().ok() }
}
#[unsafe_destructor]
@ -1022,7 +1033,7 @@ mod test {
assert_eq!(rx.recv(), 1);
}
match rx.try_recv() {
Data(..) => fail!(),
Ok(..) => fail!(),
_ => {}
}
dtx.send(());
@ -1136,45 +1147,45 @@ mod test {
test!(fn oneshot_single_thread_try_send_open() {
let (tx, rx) = channel::<int>();
assert!(tx.try_send(10));
assert!(tx.send_opt(10).is_ok());
assert!(rx.recv() == 10);
})
test!(fn oneshot_single_thread_try_send_closed() {
let (tx, rx) = channel::<int>();
drop(rx);
assert!(!tx.try_send(10));
assert!(tx.send_opt(10).is_err());
})
test!(fn oneshot_single_thread_try_recv_open() {
let (tx, rx) = channel::<int>();
tx.send(10);
assert!(rx.recv_opt() == Some(10));
assert!(rx.recv_opt() == Ok(10));
})
test!(fn oneshot_single_thread_try_recv_closed() {
let (tx, rx) = channel::<int>();
drop(tx);
assert!(rx.recv_opt() == None);
assert!(rx.recv_opt() == Err(()));
})
test!(fn oneshot_single_thread_peek_data() {
let (tx, rx) = channel::<int>();
assert_eq!(rx.try_recv(), Empty)
assert_eq!(rx.try_recv(), Err(Empty))
tx.send(10);
assert_eq!(rx.try_recv(), Data(10));
assert_eq!(rx.try_recv(), Ok(10));
})
test!(fn oneshot_single_thread_peek_close() {
let (tx, rx) = channel::<int>();
drop(tx);
assert_eq!(rx.try_recv(), Disconnected);
assert_eq!(rx.try_recv(), Disconnected);
assert_eq!(rx.try_recv(), Err(Disconnected));
assert_eq!(rx.try_recv(), Err(Disconnected));
})
test!(fn oneshot_single_thread_peek_open() {
let (_tx, rx) = channel::<int>();
assert_eq!(rx.try_recv(), Empty);
assert_eq!(rx.try_recv(), Err(Empty));
})
test!(fn oneshot_multi_task_recv_then_send() {
@ -1335,7 +1346,7 @@ mod test {
tx.send(2);
tx.send(2);
tx.send(2);
tx.try_send(2);
let _ = tx.send_opt(2);
drop(tx);
assert_eq!(count_rx.recv(), 4);
})
@ -1353,14 +1364,14 @@ mod test {
tx3.send(());
});
assert_eq!(rx1.try_recv(), Empty);
assert_eq!(rx1.try_recv(), Err(Empty));
tx2.send(());
rx3.recv();
assert_eq!(rx1.try_recv(), Data(1));
assert_eq!(rx1.try_recv(), Empty);
assert_eq!(rx1.try_recv(), Ok(1));
assert_eq!(rx1.try_recv(), Err(Empty));
tx2.send(());
rx3.recv();
assert_eq!(rx1.try_recv(), Disconnected);
assert_eq!(rx1.try_recv(), Err(Disconnected));
})
// This bug used to end up in a livelock inside of the Receiver destructor
@ -1409,9 +1420,9 @@ mod test {
let mut hits = 0;
while hits < 10 {
match rx.try_recv() {
Data(()) => { hits += 1; }
Empty => { Thread::yield_now(); }
Disconnected => return,
Ok(()) => { hits += 1; }
Err(Empty) => { Thread::yield_now(); }
Err(Disconnected) => return,
}
}
cdone.send(());
@ -1542,7 +1553,7 @@ mod sync_tests {
assert_eq!(rx.recv(), 1);
}
match rx.try_recv() {
Data(..) => fail!(),
Ok(..) => fail!(),
_ => {}
}
dtx.send(());
@ -1596,50 +1607,50 @@ mod sync_tests {
test!(fn oneshot_single_thread_try_send_open() {
let (tx, rx) = sync_channel::<int>(1);
assert_eq!(tx.try_send(10), Sent);
assert_eq!(tx.try_send(10), Ok(()));
assert!(rx.recv() == 10);
})
test!(fn oneshot_single_thread_try_send_closed() {
let (tx, rx) = sync_channel::<int>(0);
drop(rx);
assert_eq!(tx.try_send(10), RecvDisconnected(10));
assert_eq!(tx.try_send(10), Err(RecvDisconnected(10)));
})
test!(fn oneshot_single_thread_try_send_closed2() {
let (tx, _rx) = sync_channel::<int>(0);
assert_eq!(tx.try_send(10), Full(10));
assert_eq!(tx.try_send(10), Err(Full(10)));
})
test!(fn oneshot_single_thread_try_recv_open() {
let (tx, rx) = sync_channel::<int>(1);
tx.send(10);
assert!(rx.recv_opt() == Some(10));
assert!(rx.recv_opt() == Ok(10));
})
test!(fn oneshot_single_thread_try_recv_closed() {
let (tx, rx) = sync_channel::<int>(0);
drop(tx);
assert!(rx.recv_opt() == None);
assert!(rx.recv_opt() == Err(()));
})
test!(fn oneshot_single_thread_peek_data() {
let (tx, rx) = sync_channel::<int>(1);
assert_eq!(rx.try_recv(), Empty)
assert_eq!(rx.try_recv(), Err(Empty))
tx.send(10);
assert_eq!(rx.try_recv(), Data(10));
assert_eq!(rx.try_recv(), Ok(10));
})
test!(fn oneshot_single_thread_peek_close() {
let (tx, rx) = sync_channel::<int>(0);
drop(tx);
assert_eq!(rx.try_recv(), Disconnected);
assert_eq!(rx.try_recv(), Disconnected);
assert_eq!(rx.try_recv(), Err(Disconnected));
assert_eq!(rx.try_recv(), Err(Disconnected));
})
test!(fn oneshot_single_thread_peek_open() {
let (_tx, rx) = sync_channel::<int>(0);
assert_eq!(rx.try_recv(), Empty);
assert_eq!(rx.try_recv(), Err(Empty));
})
test!(fn oneshot_multi_task_recv_then_send() {
@ -1800,7 +1811,7 @@ mod sync_tests {
tx.send(2);
tx.send(2);
tx.send(2);
tx.try_send(2);
let _ = tx.try_send(2);
drop(tx);
assert_eq!(count_rx.recv(), 4);
})
@ -1818,14 +1829,14 @@ mod sync_tests {
tx3.send(());
});
assert_eq!(rx1.try_recv(), Empty);
assert_eq!(rx1.try_recv(), Err(Empty));
tx2.send(());
rx3.recv();
assert_eq!(rx1.try_recv(), Data(1));
assert_eq!(rx1.try_recv(), Empty);
assert_eq!(rx1.try_recv(), Ok(1));
assert_eq!(rx1.try_recv(), Err(Empty));
tx2.send(());
rx3.recv();
assert_eq!(rx1.try_recv(), Disconnected);
assert_eq!(rx1.try_recv(), Err(Disconnected));
})
// This bug used to end up in a livelock inside of the Receiver destructor
@ -1859,9 +1870,9 @@ mod sync_tests {
let mut hits = 0;
while hits < 10 {
match rx.try_recv() {
Data(()) => { hits += 1; }
Empty => { Thread::yield_now(); }
Disconnected => return,
Ok(()) => { hits += 1; }
Err(Empty) => { Thread::yield_now(); }
Err(Disconnected) => return,
}
}
cdone.send(());
@ -1876,20 +1887,20 @@ mod sync_tests {
test!(fn send_opt1() {
let (tx, rx) = sync_channel(0);
spawn(proc() { rx.recv(); });
assert_eq!(tx.send_opt(1), None);
assert_eq!(tx.send_opt(1), Ok(()));
})
test!(fn send_opt2() {
let (tx, rx) = sync_channel(0);
spawn(proc() { drop(rx); });
assert_eq!(tx.send_opt(1), Some(1));
assert_eq!(tx.send_opt(1), Err(1));
})
test!(fn send_opt3() {
let (tx, rx) = sync_channel(1);
assert_eq!(tx.send_opt(1), None);
assert_eq!(tx.send_opt(1), Ok(()));
spawn(proc() { drop(rx); });
assert_eq!(tx.send_opt(1), Some(1));
assert_eq!(tx.send_opt(1), Err(1));
})
test!(fn send_opt4() {
@ -1898,11 +1909,11 @@ mod sync_tests {
let (done, donerx) = channel();
let done2 = done.clone();
spawn(proc() {
assert_eq!(tx.send_opt(1), Some(1));
assert_eq!(tx.send_opt(1), Err(1));
done.send(());
});
spawn(proc() {
assert_eq!(tx2.send_opt(2), Some(2));
assert_eq!(tx2.send_opt(2), Err(2));
done2.send(());
});
drop(rx);
@ -1912,27 +1923,27 @@ mod sync_tests {
test!(fn try_send1() {
let (tx, _rx) = sync_channel(0);
assert_eq!(tx.try_send(1), Full(1));
assert_eq!(tx.try_send(1), Err(Full(1)));
})
test!(fn try_send2() {
let (tx, _rx) = sync_channel(1);
assert_eq!(tx.try_send(1), Sent);
assert_eq!(tx.try_send(1), Full(1));
assert_eq!(tx.try_send(1), Ok(()));
assert_eq!(tx.try_send(1), Err(Full(1)));
})
test!(fn try_send3() {
let (tx, rx) = sync_channel(1);
assert_eq!(tx.try_send(1), Sent);
assert_eq!(tx.try_send(1), Ok(()));
drop(rx);
assert_eq!(tx.try_send(1), RecvDisconnected(1));
assert_eq!(tx.try_send(1), Err(RecvDisconnected(1)));
})
test!(fn try_send4() {
let (tx, rx) = sync_channel(0);
spawn(proc() {
for _ in range(0, 1000) { task::deschedule(); }
assert_eq!(tx.try_send(1), Sent);
assert_eq!(tx.try_send(1), Ok(()));
});
assert_eq!(rx.recv(), 1);
} #[ignore(reason = "flaky on libnative")])

View file

@ -90,7 +90,7 @@ impl<T: Send> Packet<T> {
}
}
pub fn send(&mut self, t: T) -> bool {
pub fn send(&mut self, t: T) -> Result<(), T> {
// Sanity check
match self.upgrade {
NothingSent => {}
@ -102,14 +102,12 @@ impl<T: Send> Packet<T> {
match self.state.swap(DATA, atomics::SeqCst) {
// Sent the data, no one was waiting
EMPTY => true,
EMPTY => Ok(()),
// Couldn't send the data, the port hung up first. We need to be
// sure to deallocate the sent data (to not leave it stuck in the
// queue)
// Couldn't send the data, the port hung up first. Return the data
// back up the stack.
DISCONNECTED => {
self.data.take_unwrap();
false
Err(self.data.take_unwrap())
}
// Not possible, these are one-use channels
@ -121,7 +119,7 @@ impl<T: Send> Packet<T> {
n => unsafe {
let t = BlockedTask::cast_from_uint(n);
t.wake().map(|t| t.reawaken());
true
Ok(())
}
}
}

View file

@ -236,7 +236,7 @@ impl<'rx, T: Send> Handle<'rx, T> {
/// Block to receive a value on the underlying receiver, returning `Some` on
/// success or `None` if the channel disconnects. This function has the same
/// semantics as `Receiver.recv_opt`
pub fn recv_opt(&mut self) -> Option<T> { self.rx.recv_opt() }
pub fn recv_opt(&mut self) -> Result<T, ()> { self.rx.recv_opt() }
/// Adds this handle to the receiver set that the handle was created from. This
/// method can be called multiple times, but it has no effect if `add` was
@ -338,12 +338,12 @@ mod test {
)
drop(tx1);
select! (
foo = rx1.recv_opt() => { assert_eq!(foo, None); },
foo = rx1.recv_opt() => { assert_eq!(foo, Err(())); },
_bar = rx2.recv() => { fail!() }
)
drop(tx2);
select! (
bar = rx2.recv_opt() => { assert_eq!(bar, None); }
bar = rx2.recv_opt() => { assert_eq!(bar, Err(())); }
)
})
@ -370,7 +370,7 @@ mod test {
select! (
_a1 = rx1.recv_opt() => { fail!() },
a2 = rx2.recv_opt() => { assert_eq!(a2, None); }
a2 = rx2.recv_opt() => { assert_eq!(a2, Err(())); }
)
})
@ -392,7 +392,7 @@ mod test {
)
tx3.send(1);
select! (
a = rx1.recv_opt() => { assert_eq!(a, None); },
a = rx1.recv_opt() => { assert_eq!(a, Err(())); },
_b = rx2.recv() => { fail!() }
)
})
@ -417,8 +417,8 @@ mod test {
a = rx1.recv() => { assert_eq!(a, 1); },
a = rx2.recv() => { assert_eq!(a, 2); }
)
assert_eq!(rx1.try_recv(), Empty);
assert_eq!(rx2.try_recv(), Empty);
assert_eq!(rx1.try_recv(), Err(Empty));
assert_eq!(rx2.try_recv(), Err(Empty));
tx3.send(());
})
@ -456,7 +456,7 @@ mod test {
spawn(proc() {
rx3.recv();
tx1.clone();
assert_eq!(rx3.try_recv(), Empty);
assert_eq!(rx3.try_recv(), Err(Empty));
tx1.send(2);
rx3.recv();
});
@ -477,7 +477,7 @@ mod test {
spawn(proc() {
rx3.recv();
tx1.clone();
assert_eq!(rx3.try_recv(), Empty);
assert_eq!(rx3.try_recv(), Err(Empty));
tx1.send(2);
rx3.recv();
});

View file

@ -131,9 +131,9 @@ impl<T: Send> Packet<T> {
unsafe { self.select_lock.unlock_noguard() }
}
pub fn send(&mut self, t: T) -> bool {
pub fn send(&mut self, t: T) -> Result<(), T> {
// See Port::drop for what's going on
if self.port_dropped.load(atomics::SeqCst) { return false }
if self.port_dropped.load(atomics::SeqCst) { return Err(t) }
// Note that the multiple sender case is a little tricker
// semantically than the single sender case. The logic for
@ -161,7 +161,7 @@ impl<T: Send> Packet<T> {
// received". Once we get beyond this check, we have permanently
// entered the realm of "this may be received"
if self.cnt.load(atomics::SeqCst) < DISCONNECTED + FUDGE {
return false
return Err(t)
}
self.queue.push(t);
@ -213,7 +213,7 @@ impl<T: Send> Packet<T> {
_ => {}
}
true
Ok(())
}
pub fn recv(&mut self) -> Result<T, Failure> {

View file

@ -87,25 +87,27 @@ impl<T: Send> Packet<T> {
}
pub fn send(&mut self, t: T) -> bool {
pub fn send(&mut self, t: T) -> Result<(), T> {
// If the other port has deterministically gone away, then definitely
// must return the data back up the stack. Otherwise, the data is
// considered as being sent.
if self.port_dropped.load(atomics::SeqCst) { return Err(t) }
match self.do_send(Data(t)) {
UpSuccess => true,
UpDisconnected => false,
UpWoke(task) => {
task.wake().map(|t| t.reawaken());
true
}
UpSuccess | UpDisconnected => {},
UpWoke(task) => { task.wake().map(|t| t.reawaken()); }
}
Ok(())
}
pub fn upgrade(&mut self, up: Receiver<T>) -> UpgradeResult {
// If the port has gone away, then there's no need to proceed any
// further.
if self.port_dropped.load(atomics::SeqCst) { return UpDisconnected }
self.do_send(GoUp(up))
}
fn do_send(&mut self, t: Message<T>) -> UpgradeResult {
// Use an acquire/release ordering to maintain the same position with
// respect to the atomic loads below
if self.port_dropped.load(atomics::SeqCst) { return UpDisconnected }
self.queue.push(t);
match self.cnt.fetch_add(1, atomics::SeqCst) {
// As described in the mod's doc comment, -1 == wakeup

View file

@ -201,22 +201,22 @@ impl<T: Send> Packet<T> {
}
}
pub fn try_send(&self, t: T) -> super::TrySendResult<T> {
pub fn try_send(&self, t: T) -> Result<(), super::TrySendError<T>> {
let (guard, state) = self.lock();
if state.disconnected {
super::RecvDisconnected(t)
Err(super::RecvDisconnected(t))
} else if state.buf.size() == state.buf.cap() {
super::Full(t)
Err(super::Full(t))
} else if state.cap == 0 {
// With capacity 0, even though we have buffer space we can't
// transfer the data unless there's a receiver waiting.
match mem::replace(&mut state.blocker, NoneBlocked) {
NoneBlocked => super::Full(t),
NoneBlocked => Err(super::Full(t)),
BlockedSender(..) => unreachable!(),
BlockedReceiver(task) => {
state.buf.enqueue(t);
wakeup(task, guard);
super::Sent
Ok(())
}
}
} else {
@ -224,7 +224,7 @@ impl<T: Send> Packet<T> {
// just enqueue the data for later retrieval.
assert!(state.buf.size() < state.buf.cap());
state.buf.enqueue(t);
super::Sent
Ok(())
}
}
@ -232,7 +232,7 @@ impl<T: Send> Packet<T> {
//
// When reading this, remember that there can only ever be one receiver at
// time.
pub fn recv(&self) -> Option<T> {
pub fn recv(&self) -> Result<T, ()> {
let (guard, state) = self.lock();
// Wait for the buffer to have something in it. No need for a while loop
@ -242,13 +242,13 @@ impl<T: Send> Packet<T> {
wait(&mut state.blocker, BlockedReceiver, &self.lock);
waited = true;
}
if state.disconnected && state.buf.size() == 0 { return None }
if state.disconnected && state.buf.size() == 0 { return Err(()) }
// Pick up the data, wake up our neighbors, and carry on
assert!(state.buf.size() > 0);
let ret = state.buf.dequeue();
self.wakeup_senders(waited, guard, state);
return Some(ret);
return Ok(ret);
}
pub fn try_recv(&self) -> Result<T, Failure> {

View file

@ -73,7 +73,7 @@ impl Reader for ChanReader {
break;
}
self.pos = 0;
self.buf = self.rx.recv_opt();
self.buf = self.rx.recv_opt().ok();
self.closed = self.buf.is_none();
}
if self.closed && num_read == 0 {
@ -116,15 +116,13 @@ impl Clone for ChanWriter {
impl Writer for ChanWriter {
fn write(&mut self, buf: &[u8]) -> IoResult<()> {
if !self.tx.try_send(buf.to_owned()) {
Err(io::IoError {
self.tx.send_opt(buf.to_owned()).map_err(|_| {
io::IoError {
kind: io::BrokenPipe,
desc: "Pipe closed",
detail: None
})
} else {
Ok(())
}
}
})
}
}

View file

@ -422,13 +422,13 @@ mod test {
spawn(proc() {
let mut sock3 = sock3;
match sock3.sendto([1], addr2) {
Ok(..) => { let _ = tx2.try_send(()); }
Ok(..) => { let _ = tx2.send_opt(()); }
Err(..) => {}
}
done.send(());
});
match sock1.sendto([2], addr2) {
Ok(..) => { let _ = tx.try_send(()); }
Ok(..) => { let _ = tx.send_opt(()); }
Err(..) => {}
}
drop(tx);

View file

@ -149,6 +149,7 @@ impl Listener {
#[cfg(test, unix)]
mod test_unix {
use prelude::*;
use libc;
use comm::Empty;
use io::timer;
@ -199,7 +200,7 @@ mod test_unix {
s2.unregister(Interrupt);
sigint();
timer::sleep(10);
assert_eq!(s2.rx.try_recv(), Empty);
assert_eq!(s2.rx.try_recv(), Err(Empty));
}
}

View file

@ -137,7 +137,7 @@ mod test {
let rx1 = timer.oneshot(10000);
let rx = timer.oneshot(1);
rx.recv();
assert_eq!(rx1.recv_opt(), None);
assert_eq!(rx1.recv_opt(), Err(()));
})
iotest!(fn test_io_timer_oneshot_then_sleep() {
@ -145,7 +145,7 @@ mod test {
let rx = timer.oneshot(100000000000);
timer.sleep(1); // this should inalidate rx
assert_eq!(rx.recv_opt(), None);
assert_eq!(rx.recv_opt(), Err(()));
})
iotest!(fn test_io_timer_sleep_periodic() {
@ -170,11 +170,11 @@ mod test {
let rx = timer.oneshot(1);
rx.recv();
assert!(rx.recv_opt().is_none());
assert!(rx.recv_opt().is_err());
let rx = timer.oneshot(1);
rx.recv();
assert!(rx.recv_opt().is_none());
assert!(rx.recv_opt().is_err());
})
iotest!(fn override() {
@ -182,8 +182,8 @@ mod test {
let orx = timer.oneshot(100);
let prx = timer.periodic(100);
timer.sleep(1);
assert_eq!(orx.recv_opt(), None);
assert_eq!(prx.recv_opt(), None);
assert_eq!(orx.recv_opt(), Err(()));
assert_eq!(prx.recv_opt(), Err(()));
timer.oneshot(1).recv();
})
@ -226,7 +226,7 @@ mod test {
let timer_rx = timer.periodic(1000);
spawn(proc() {
timer_rx.recv_opt();
let _ = timer_rx.recv_opt();
});
// when we drop the TimerWatcher we're going to destroy the channel,
@ -239,7 +239,7 @@ mod test {
let timer_rx = timer.periodic(1000);
spawn(proc() {
timer_rx.recv_opt();
let _ = timer_rx.recv_opt();
});
timer.oneshot(1);
@ -251,7 +251,7 @@ mod test {
let timer_rx = timer.periodic(1000);
spawn(proc() {
timer_rx.recv_opt();
let _ = timer_rx.recv_opt();
});
timer.sleep(1);
@ -262,7 +262,7 @@ mod test {
let mut timer = Timer::new().unwrap();
timer.oneshot(1000)
};
assert_eq!(rx.recv_opt(), None);
assert_eq!(rx.recv_opt(), Err(()));
})
iotest!(fn sender_goes_away_period() {
@ -270,7 +270,7 @@ mod test {
let mut timer = Timer::new().unwrap();
timer.periodic(1000)
};
assert_eq!(rx.recv_opt(), None);
assert_eq!(rx.recv_opt(), Err(()));
})
iotest!(fn receiver_goes_away_oneshot() {

View file

@ -385,7 +385,7 @@ impl Death {
pub fn collect_failure(&mut self, result: TaskResult) {
match self.on_exit.take() {
Some(Execute(f)) => f(result),
Some(SendMessage(ch)) => { ch.try_send(result); }
Some(SendMessage(ch)) => { let _ = ch.send_opt(result); }
None => {}
}
}

View file

@ -37,16 +37,16 @@ impl<S:Send,R:Send> DuplexStream<S, R> {
pub fn send(&self, x: S) {
self.tx.send(x)
}
pub fn try_send(&self, x: S) -> bool {
self.tx.try_send(x)
pub fn send_opt(&self, x: S) -> Result<(), S> {
self.tx.send_opt(x)
}
pub fn recv(&self) -> R {
self.rx.recv()
}
pub fn try_recv(&self) -> comm::TryRecvResult<R> {
pub fn try_recv(&self) -> Result<R, comm::TryRecvError> {
self.rx.try_recv()
}
pub fn recv_opt(&self) -> Option<R> {
pub fn recv_opt(&self) -> Result<R, ()> {
self.rx.recv_opt()
}
}

View file

@ -800,7 +800,7 @@ mod tests {
// At this point, all spawned tasks should be blocked,
// so we shouldn't get anything from the port
assert!(match rx.try_recv() {
Empty => true,
Err(Empty) => true,
_ => false,
});

View file

@ -16,7 +16,6 @@
//! containing data.
use std::cast;
use std::comm;
use std::kinds::marker;
use std::mem::replace;
use std::sync::atomics;
@ -46,10 +45,10 @@ impl WaitQueue {
// Signals one live task from the queue.
fn signal(&self) -> bool {
match self.head.try_recv() {
comm::Data(ch) => {
Ok(ch) => {
// Send a wakeup signal. If the waiter was killed, its port will
// have closed. Keep trying until we get a live task.
if ch.try_send(()) {
if ch.send_opt(()).is_ok() {
true
} else {
self.signal()
@ -63,8 +62,8 @@ impl WaitQueue {
let mut count = 0;
loop {
match self.head.try_recv() {
comm::Data(ch) => {
if ch.try_send(()) {
Ok(ch) => {
if ch.send_opt(()).is_ok() {
count += 1;
}
}
@ -76,7 +75,7 @@ impl WaitQueue {
fn wait_end(&self) -> WaitEnd {
let (signal_end, wait_end) = channel();
assert!(self.tail.try_send(signal_end));
self.tail.send(signal_end);
wait_end
}
}

View file

@ -38,12 +38,12 @@ fn server(requests: &Receiver<request>, responses: &Sender<uint>) {
let mut done = false;
while !done {
match requests.recv_opt() {
Some(get_count) => { responses.send(count.clone()); }
Some(bytes(b)) => {
Ok(get_count) => { responses.send(count.clone()); }
Ok(bytes(b)) => {
//println!("server: received {:?} bytes", b);
count += b;
}
None => { done = true; }
Err(..) => { done = true; }
_ => { }
}
}

View file

@ -33,12 +33,12 @@ fn server(requests: &Receiver<request>, responses: &Sender<uint>) {
let mut done = false;
while !done {
match requests.recv_opt() {
Some(get_count) => { responses.send(count.clone()); }
Some(bytes(b)) => {
Ok(get_count) => { responses.send(count.clone()); }
Ok(bytes(b)) => {
//println!("server: received {:?} bytes", b);
count += b;
}
None => { done = true; }
Err(..) => { done = true; }
_ => { }
}
}

View file

@ -50,7 +50,7 @@ fn main() {
let (tx, rx) = channel();
child_generation(from_str::<uint>(*args.get(1)).unwrap(), tx);
if rx.recv_opt().is_none() {
if rx.recv_opt().is_err() {
fail!("it happened when we slumbered");
}
}

View file

@ -20,9 +20,9 @@ pub fn main() {
});
loop {
match rx.try_recv() {
comm::Data(()) => break,
comm::Empty => {}
comm::Disconnected => unreachable!()
Ok(()) => break,
Err(comm::Empty) => {}
Err(comm::Disconnected) => unreachable!()
}
}
}