dc/s2n-quic-dc/src/stream/recv/dispatch/queue.rs (266 lines of code) (raw):

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use crate::{ stream::Actor, sync::ring_deque::{Capacity, Closed, RecvWaker}, }; use core::{ fmt, task::{Context, Poll}, }; use s2n_quic_core::ensure; use std::{collections::VecDeque, ops::ControlFlow, sync::Mutex, task::Waker}; #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum Error { /// The queue ID is not associated with a stream Unallocated, /// The queue has been closed and won't reopen Closed, } #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum Half { Stream, Control, } impl s2n_quic_core::probe::Arg for Half { #[inline] fn into_usdt(self) -> isize { match self { Half::Stream => 0, Half::Control => 1, } } } impl From<Closed> for Error { #[inline] fn from(_: Closed) -> Self { Self::Closed } } struct Inner<T> { queue: VecDeque<T>, capacity: usize, is_open: bool, has_receiver: bool, app_waker: Option<Waker>, worker_waker: Option<Waker>, } impl<T> Inner<T> { fn take_wakers(&mut self) -> Wakers { Wakers { app_waker: self.app_waker.take(), worker_waker: self.worker_waker.take(), } } } impl<T> fmt::Debug for Inner<T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Inner") .field("queue_len", &self.queue.len()) .field("capacity", &self.capacity) .field("is_open", &self.is_open) .field("has_receiver", &self.has_receiver) .field("app_waker", &self.app_waker.is_some()) .field("worker_waker", &self.worker_waker.is_some()) .finish() } } pub struct Queue<T> { inner: Mutex<Inner<T>>, #[cfg(debug_assertions)] half: Half, } impl<T> Queue<T> { #[inline] pub fn new(capacity: Capacity, half: Half) -> Self { let _ = half; Self { inner: Mutex::new(Inner { queue: VecDeque::with_capacity(capacity.initial), capacity: capacity.max, is_open: true, has_receiver: false, app_waker: None, worker_waker: None, }), #[cfg(debug_assertions)] half, } } #[inline] pub fn push(&self, value: T) -> Result<Option<T>, Error> { let mut inner = self.lock()?; // check if the queue is permanently closed ensure!(inner.is_open, Err(Error::Closed)); // check if the queue is temporarily closed ensure!(inner.has_receiver, Err(Error::Unallocated)); let prev = if inner.capacity == inner.queue.len() { inner.queue.pop_front() } else { None }; inner.queue.push_back(value); let wakers = inner.take_wakers(); drop(inner); drop(wakers); Ok(prev) } /// Bypasses closed checks and pushes items into the queue #[inline] pub fn force_push(&self, value: T) -> Option<T> { let Ok(mut inner) = self.lock() else { return Some(value); }; let prev = if inner.capacity == inner.queue.len() { inner.queue.pop_front() } else { None }; inner.queue.push_back(value); let wakers = inner.take_wakers(); drop(inner); drop(wakers); prev } #[inline] pub fn pop(&self) -> Result<Option<T>, Closed> { let mut inner = self.lock()?; if let Some(item) = inner.queue.pop_front() { return Ok(Some(item)); } ensure!(inner.is_open, Err(Closed)); Ok(None) } #[inline] pub fn poll_pop(&self, cx: &mut Context, actor: Actor) -> Poll<Result<T, Closed>> { let mut inner = self.lock()?; if let Some(item) = inner.queue.pop_front() { return Ok(item).into(); } ensure!(inner.is_open, Err(Closed).into()); match actor { Actor::Application => &mut inner.app_waker, Actor::Worker => &mut inner.worker_waker, } .update(cx); Poll::Pending } #[inline] pub fn poll_swap( &self, cx: &mut Context, actor: Actor, items: &mut VecDeque<T>, ) -> Poll<Result<(), Closed>> { debug_assert!(items.is_empty(), "destination items should be empty"); let mut inner = self.lock()?; if inner.queue.is_empty() { ensure!(inner.is_open, Err(Closed).into()); match actor { Actor::Application => &mut inner.app_waker, Actor::Worker => &mut inner.worker_waker, } .update(cx); return Poll::Pending; } core::mem::swap(items, &mut inner.queue); Ok(()).into() } #[inline] pub fn open_receivers(&self, control: &Self) -> Result<(), Closed> { #[cfg(debug_assertions)] { assert_eq!(self.half, Half::Stream); assert_eq!(control.half, Half::Control); } // perform locks in the same order to avoid deadlocks let Ok(mut stream_inner) = self.lock() else { return Err(Closed); }; let Ok(mut control_inner) = control.lock() else { return Err(Closed); }; // make sure the stream hasn't been permanently closed ensure!(stream_inner.is_open, Err(Closed)); ensure!(control_inner.is_open, Err(Closed)); debug_assert!( !stream_inner.has_receiver && !control_inner.has_receiver, "receiver already open!\n stream: {stream_inner:?}\ncontrol: {control_inner:?}" ); stream_inner.has_receiver = true; control_inner.has_receiver = true; Ok(()) } #[inline] pub fn close_receiver(&self, control: &Self, half: Half) -> ControlFlow<()> { #[cfg(debug_assertions)] { assert_eq!(self.half, Half::Stream); assert_eq!(control.half, Half::Control); } // the Control half owns freeing in the case of poisoning let on_poisoned = if matches!(half, Half::Control) { ControlFlow::Continue(()) } else { ControlFlow::Break(()) }; // acquire both locks in the same order to avoid deadlocks or races let Ok(stream_inner) = self.lock() else { return on_poisoned; }; let Ok(control_inner) = control.lock() else { return on_poisoned; }; let (mut inner, other) = match half { Half::Stream => (stream_inner, control_inner), Half::Control => (control_inner, stream_inner), }; debug_assert!( inner.has_receiver, "receiver already closed:\n{inner:?}\nother: {other:?}" ); // observe the other half receiver status before dropping the `other` lock let has_other_receiver = other.has_receiver; drop(other); let wakers = inner.take_wakers(); inner.has_receiver = false; // take the queue items out of the lock to avoid mutex poisoning. // note that most of the time this should be empty, which would be a no-op let mut queue = VecDeque::new(); queue.append(&mut inner.queue); drop(inner); // drop wakers after the lock to avoid potential mutex poisoning wakers.dont_wake(); if has_other_receiver { // the other queue still has the receiver don't put it back yet ControlFlow::Break(()) } else { // we're the last receiver so free the queue ControlFlow::Continue(()) } } #[inline] pub fn close(&self) { let Ok(mut inner) = self.lock() else { return; }; inner.is_open = false; // Leave the remaining items in the queue in case the receiver wants them. // Notify the receiver that the queue is now closed let wakers = inner.take_wakers(); drop(inner); drop(wakers); } #[inline] fn lock(&self) -> Result<std::sync::MutexGuard<Inner<T>>, Closed> { self.inner.lock().map_err(|_| Closed) } } struct Wakers { app_waker: Option<Waker>, worker_waker: Option<Waker>, } impl Wakers { #[inline] fn dont_wake(mut self) { self.app_waker = None; self.worker_waker = None; } } impl Drop for Wakers { #[inline] fn drop(&mut self) { if let Some(waker) = self.app_waker.take() { waker.wake(); } if let Some(waker) = self.worker_waker.take() { waker.wake(); } } }