in src/sync/mpsc.rs [134:233]
fn send(&self, message: T) -> Result<(), SendError<T>> {
let me = ExecutionState::me();
let mut state = self.state.borrow_mut();
trace!(
state = ?state,
"sender {:?} starting send on channel {:p}",
me,
self,
);
if state.known_receivers == 0 {
// No receivers are left, so the channel is disconnected. Stop and return failure.
return Err(SendError(message));
}
let (is_rendezvous, is_full) = if let Some(bound) = self.bound {
// For a rendezvous channel (bound = 0), "is_full" holds when there is a message in the channel.
// For a non-rendezvous channel (bound > 0), "is_full" holds when the capacity is reached.
// We cover both these cases at once using max(bound, 1) below.
(bound == 0, state.messages.len() >= std::cmp::max(bound, 1))
} else {
(false, false)
};
// The sender should block in any of the following situations:
// the channel is full (as defined above)
// there are already waiting senders
// this is a rendezvous channel and there are no waiting receivers
let sender_should_block =
is_full || !state.waiting_senders.is_empty() || (is_rendezvous && state.waiting_receivers.is_empty());
state.waiting_senders.push(me);
if sender_should_block {
trace!(
state = ?state,
"blocking sender {:?} on channel {:p}",
me,
self,
);
ExecutionState::with(|s| s.current_mut().block());
drop(state);
thread::switch();
state = self.state.borrow_mut();
trace!(
state = ?state,
"unblocked sender {:?} on channel {:p}",
me,
self,
);
// Check again that we still have a receiver; if not, return with error.
// We repeat this check because the receivers may have disconnected while the sender was blocked.
if state.known_receivers == 0 {
// No receivers are left, so the channel is disconnected. Stop and return failure.
return Err(SendError(message));
}
}
let head = state.waiting_senders.remove(0);
assert_eq!(head, me);
ExecutionState::with(|s| {
let clock = s.increment_clock();
state.messages.push(TimestampedValue::new(message, clock.clone()));
});
// The sender has just added a message to the channel, so unblock the first waiting receiver if any
if let Some(&tid) = state.waiting_receivers.first() {
ExecutionState::with(|s| {
s.get_mut(tid).unblock();
// When a sender successfully sends on a rendezvous channel, it knows that the receiver will perform
// the matching receive, so we need to update the sender's clock with the receiver's.
if is_rendezvous {
let recv_clock = s.get_clock(tid).clone();
s.update_clock(&recv_clock);
}
});
} else {
assert!(!is_rendezvous); // Sender on a rendezvous channel is only unblocked if there's a waiting receiver
}
// Check and unblock the next the waiting sender, if eligible
if let Some(&tid) = state.waiting_senders.first() {
let bound = self.bound.expect("can't have waiting senders on an unbounded channel");
if state.messages.len() < bound {
ExecutionState::with(|s| s.get_mut(tid).unblock());
}
}
if !is_rendezvous {
if let Some(receiver_clock) = &mut state.receiver_clock {
let recv_clock = receiver_clock.remove(0);
ExecutionState::with(|s| s.update_clock(&recv_clock));
}
}
Ok(())
}