fn recv()

in src/sync/mpsc.rs [235:347]


    fn recv(&self) -> Result<T, RecvError> {
        let me = ExecutionState::me();
        let mut state = self.state.borrow_mut();

        trace!(
            state = ?state,
            "starting recv on channel {:p}",
            self,
        );
        // Check if there are any senders left; if not, and the channel is empty, fail with error
        // (If there are no senders, but the channel is nonempty, the receiver can successfully consume that message.)
        if state.messages.is_empty() && state.known_senders == 0 {
            return Err(RecvError);
        }

        // Pre-increment the receiver's clock before continuing
        //
        // Note: The reason for pre-incrementing the receiver's clock is to deal properly with rendezvous channels.
        // Here's the scenario we have to handle:
        //   1. the receiver arrives at a rendezvous channel and blocks
        //   2. the sender arrives, sees the receiver is waiting and does not block
        //   3. the sender drops the message in the channel and updates its clock with the receiver's clock and continues
        //   4. later, the receiver unblocks and picks up the message and updates its clock with the sender's
        // Without the pre-increment, in step 3, the sender would update its clock with the receiver's clock before
        // it is incremented.  (The increment records the fact that the receiver arrived at the synchronization point.)
        ExecutionState::with(|s| {
            let _ = s.increment_clock();
        });

        // If this is a rendezvous channel, and the channel is empty, and there are waiting senders,
        // notify the first waiting sender
        if self.bound == Some(0) && state.messages.is_empty() {
            if let Some(&tid) = state.waiting_senders.first() {
                // Note: another receiver may have unblocked the sender already
                ExecutionState::with(|s| s.get_mut(tid).unblock());
            }
        }

        // The receiver should block in any of the following situations:
        //    the channel is empty
        //    there are waiting receivers
        let should_block = state.messages.is_empty() || !state.waiting_receivers.is_empty();

        state.waiting_receivers.push(me);
        if should_block {
            trace!(
                state = ?state,
                "blocking receiver {:?} on channel {:p}",
                me,
                self,
            );
            ExecutionState::with(|s| s.current_mut().block());
            drop(state);

            thread::switch();

            state = self.state.borrow_mut();
            trace!(
                state = ?state,
                "unblocked receiver {:?} on channel {:p}",
                me,
                self,
            );

            // Check again if there are any senders left; if not, and the channel is empty, fail with error
            // (If there are no senders, but the channel is nonempty, the receiver can successfully consume that message.)
            // We repeat this check because the senders may have disconnected while the receiver was blocked.
            if state.messages.is_empty() && state.known_senders == 0 {
                return Err(RecvError);
            }
        }

        let head = state.waiting_receivers.remove(0);
        assert_eq!(head, me);

        let item = state.messages.remove(0);
        // The receiver has just removed an element from the channel.  Check if any waiting senders
        // need to be notified.
        if let Some(&tid) = state.waiting_senders.first() {
            let bound = self.bound.expect("can't have waiting senders on an unbounded channel");
            // Unblock the first waiting sender provided one of the following conditions hold:
            // - this is a non-rendezvous bounded channel (bound > 0)
            // - this is a rendezvous channel and we have additional waiting receivers
            if bound > 0 || !state.waiting_receivers.is_empty() {
                ExecutionState::with(|s| s.get_mut(tid).unblock());
            }
        }
        // Check and unblock the next the waiting receiver, if eligible
        // Note: this is a no-op for mpsc channels, since there can only be one receiver
        if let Some(&tid) = state.waiting_receivers.first() {
            if !state.messages.is_empty() {
                ExecutionState::with(|s| s.get_mut(tid).unblock());
            }
        }

        // Update receiver clock from the clock attached to the message received
        let TimestampedValue { value, clock } = item;
        ExecutionState::with(|s| {
            // Since we already incremented the receiver's clock above, just update it here
            s.get_clock_mut(me).update(&clock);

            // If this is a (non-rendezvous) bounded channel, propagate causality backwards to sender
            if let Some(receiver_clock) = &mut state.receiver_clock {
                let bound = self.bound.expect("unexpected internal error"); // must be defined for bounded channels
                if bound > 0 {
                    // non-rendezvous
                    assert!(receiver_clock.len() < bound);
                    receiver_clock.push(s.get_clock(me).clone());
                }
            }
        });
        Ok(value)
    }