fn send()

in src/sync/mpsc.rs [134:233]


    fn send(&self, message: T) -> Result<(), SendError<T>> {
        let me = ExecutionState::me();
        let mut state = self.state.borrow_mut();

        trace!(
            state = ?state,
            "sender {:?} starting send on channel {:p}",
            me,
            self,
        );
        if state.known_receivers == 0 {
            // No receivers are left, so the channel is disconnected.  Stop and return failure.
            return Err(SendError(message));
        }

        let (is_rendezvous, is_full) = if let Some(bound) = self.bound {
            // For a rendezvous channel (bound = 0), "is_full" holds when there is a message in the channel.
            // For a non-rendezvous channel (bound > 0), "is_full" holds when the capacity is reached.
            // We cover both these cases at once using max(bound, 1) below.
            (bound == 0, state.messages.len() >= std::cmp::max(bound, 1))
        } else {
            (false, false)
        };

        // The sender should block in any of the following situations:
        //    the channel is full (as defined above)
        //    there are already waiting senders
        //    this is a rendezvous channel and there are no waiting receivers
        let sender_should_block =
            is_full || !state.waiting_senders.is_empty() || (is_rendezvous && state.waiting_receivers.is_empty());

        state.waiting_senders.push(me);
        if sender_should_block {
            trace!(
                state = ?state,
                "blocking sender {:?} on channel {:p}",
                me,
                self,
            );
            ExecutionState::with(|s| s.current_mut().block());
            drop(state);

            thread::switch();

            state = self.state.borrow_mut();
            trace!(
                state = ?state,
                "unblocked sender {:?} on channel {:p}",
                me,
                self,
            );

            // Check again that we still have a receiver; if not, return with error.
            // We repeat this check because the receivers may have disconnected while the sender was blocked.
            if state.known_receivers == 0 {
                // No receivers are left, so the channel is disconnected.  Stop and return failure.
                return Err(SendError(message));
            }
        }

        let head = state.waiting_senders.remove(0);
        assert_eq!(head, me);

        ExecutionState::with(|s| {
            let clock = s.increment_clock();
            state.messages.push(TimestampedValue::new(message, clock.clone()));
        });

        // The sender has just added a message to the channel, so unblock the first waiting receiver if any
        if let Some(&tid) = state.waiting_receivers.first() {
            ExecutionState::with(|s| {
                s.get_mut(tid).unblock();

                // When a sender successfully sends on a rendezvous channel, it knows that the receiver will perform
                // the matching receive, so we need to update the sender's clock with the receiver's.
                if is_rendezvous {
                    let recv_clock = s.get_clock(tid).clone();
                    s.update_clock(&recv_clock);
                }
            });
        } else {
            assert!(!is_rendezvous); // Sender on a rendezvous channel is only unblocked if there's a waiting receiver
        }
        // Check and unblock the next the waiting sender, if eligible
        if let Some(&tid) = state.waiting_senders.first() {
            let bound = self.bound.expect("can't have waiting senders on an unbounded channel");
            if state.messages.len() < bound {
                ExecutionState::with(|s| s.get_mut(tid).unblock());
            }
        }

        if !is_rendezvous {
            if let Some(receiver_clock) = &mut state.receiver_clock {
                let recv_clock = receiver_clock.remove(0);
                ExecutionState::with(|s| s.update_clock(&recv_clock));
            }
        }

        Ok(())
    }