in quic/s2n-quic-core/src/sync/spsc/tests.rs [84:232]
fn apply(&mut self, operation: Operation, mut generator: impl FnMut() -> T) {
match operation {
Operation::Push(count) => {
let mut did_push = false;
if let Some(send) = self.send.as_mut() {
match send.try_slice() {
Ok(Some(mut slice)) => {
for _ in 0..count {
let value = generator();
if slice.push(value.clone()).is_ok() {
self.oracle.push_back(value);
did_push = true;
}
}
}
Ok(None) => {
assert_eq!(
self.oracle.len(),
self.capacity,
"slice should return None when at capacity"
);
}
Err(_) => {
assert!(self.recv.is_none());
}
}
}
if did_push {
self.recv_waker.assert_wake();
}
}
Operation::AsyncPush(count) => {
let mut did_push = false;
if let Some(send) = self.send.as_mut() {
match send.poll_slice(&mut self.send_waker.context()) {
Poll::Ready(Ok(mut slice)) => {
for _ in 0..count {
let value = generator();
if slice.push(value.clone()).is_ok() {
self.oracle.push_back(value);
did_push = true;
}
}
}
Poll::Ready(Err(_)) => {
assert!(self.recv.is_none());
}
Poll::Pending => {
assert_eq!(
self.oracle.len(),
self.capacity,
"slice should return Pending when at capacity"
);
self.send_waker.snapshot();
}
}
}
if did_push {
self.recv_waker.assert_wake();
}
}
Operation::Pop(count) => {
let mut did_pop = false;
if let Some(recv) = self.recv.as_mut() {
match recv.try_slice() {
Ok(Some(mut slice)) => {
for _ in 0..count {
let value = slice.pop();
assert_eq!(value, self.oracle.pop_front());
did_pop |= value.is_some();
}
}
Ok(None) => {
assert!(self.oracle.is_empty());
}
Err(_) => {
assert!(self.send.is_none());
assert!(self.oracle.is_empty());
}
}
}
if did_pop {
self.send_waker.assert_wake();
}
}
Operation::AsyncPop(count) => {
let mut did_pop = false;
if let Some(recv) = self.recv.as_mut() {
match recv.poll_slice(&mut self.recv_waker.context()) {
Poll::Ready(Ok(mut slice)) => {
for _ in 0..count {
let value = slice.pop();
assert_eq!(value, self.oracle.pop_front());
did_pop |= value.is_some();
}
}
Poll::Ready(Err(_)) => {
assert!(self.send.is_none());
assert!(self.oracle.is_empty());
}
Poll::Pending => {
assert!(self.oracle.is_empty());
self.recv_waker.snapshot();
}
}
}
if did_pop {
self.send_waker.assert_wake();
}
}
Operation::Clear => {
let mut did_pop = false;
if let Some(recv) = self.recv.as_mut() {
match recv.try_slice() {
Ok(Some(mut slice)) => {
// we need to pull in the latest values to clear everything
while slice.clear() > 0 {
did_pop = true;
let _ = slice.peek();
}
self.oracle.clear();
}
Ok(None) => {
assert!(self.oracle.is_empty());
}
Err(_) => {
assert!(self.send.is_none());
assert!(self.oracle.is_empty());
}
}
}
if did_pop {
self.send_waker.assert_wake();
}
}
Operation::DropSend => {
self.send = None;
}
Operation::DropRecv => {
self.recv = None;
}
}
}