in src/sync/mpsc.rs [235:347]
fn recv(&self) -> Result<T, RecvError> {
let me = ExecutionState::me();
let mut state = self.state.borrow_mut();
trace!(
state = ?state,
"starting recv on channel {:p}",
self,
);
// Check if there are any senders left; if not, and the channel is empty, fail with error
// (If there are no senders, but the channel is nonempty, the receiver can successfully consume that message.)
if state.messages.is_empty() && state.known_senders == 0 {
return Err(RecvError);
}
// Pre-increment the receiver's clock before continuing
//
// Note: The reason for pre-incrementing the receiver's clock is to deal properly with rendezvous channels.
// Here's the scenario we have to handle:
// 1. the receiver arrives at a rendezvous channel and blocks
// 2. the sender arrives, sees the receiver is waiting and does not block
// 3. the sender drops the message in the channel and updates its clock with the receiver's clock and continues
// 4. later, the receiver unblocks and picks up the message and updates its clock with the sender's
// Without the pre-increment, in step 3, the sender would update its clock with the receiver's clock before
// it is incremented. (The increment records the fact that the receiver arrived at the synchronization point.)
ExecutionState::with(|s| {
let _ = s.increment_clock();
});
// If this is a rendezvous channel, and the channel is empty, and there are waiting senders,
// notify the first waiting sender
if self.bound == Some(0) && state.messages.is_empty() {
if let Some(&tid) = state.waiting_senders.first() {
// Note: another receiver may have unblocked the sender already
ExecutionState::with(|s| s.get_mut(tid).unblock());
}
}
// The receiver should block in any of the following situations:
// the channel is empty
// there are waiting receivers
let should_block = state.messages.is_empty() || !state.waiting_receivers.is_empty();
state.waiting_receivers.push(me);
if should_block {
trace!(
state = ?state,
"blocking receiver {:?} on channel {:p}",
me,
self,
);
ExecutionState::with(|s| s.current_mut().block());
drop(state);
thread::switch();
state = self.state.borrow_mut();
trace!(
state = ?state,
"unblocked receiver {:?} on channel {:p}",
me,
self,
);
// Check again if there are any senders left; if not, and the channel is empty, fail with error
// (If there are no senders, but the channel is nonempty, the receiver can successfully consume that message.)
// We repeat this check because the senders may have disconnected while the receiver was blocked.
if state.messages.is_empty() && state.known_senders == 0 {
return Err(RecvError);
}
}
let head = state.waiting_receivers.remove(0);
assert_eq!(head, me);
let item = state.messages.remove(0);
// The receiver has just removed an element from the channel. Check if any waiting senders
// need to be notified.
if let Some(&tid) = state.waiting_senders.first() {
let bound = self.bound.expect("can't have waiting senders on an unbounded channel");
// Unblock the first waiting sender provided one of the following conditions hold:
// - this is a non-rendezvous bounded channel (bound > 0)
// - this is a rendezvous channel and we have additional waiting receivers
if bound > 0 || !state.waiting_receivers.is_empty() {
ExecutionState::with(|s| s.get_mut(tid).unblock());
}
}
// Check and unblock the next the waiting receiver, if eligible
// Note: this is a no-op for mpsc channels, since there can only be one receiver
if let Some(&tid) = state.waiting_receivers.first() {
if !state.messages.is_empty() {
ExecutionState::with(|s| s.get_mut(tid).unblock());
}
}
// Update receiver clock from the clock attached to the message received
let TimestampedValue { value, clock } = item;
ExecutionState::with(|s| {
// Since we already incremented the receiver's clock above, just update it here
s.get_clock_mut(me).update(&clock);
// If this is a (non-rendezvous) bounded channel, propagate causality backwards to sender
if let Some(receiver_clock) = &mut state.receiver_clock {
let bound = self.bound.expect("unexpected internal error"); // must be defined for bounded channels
if bound > 0 {
// non-rendezvous
assert!(receiver_clock.len() < bound);
receiver_clock.push(s.get_clock(me).clone());
}
}
});
Ok(value)
}