in dc/s2n-quic-dc/src/stream/server/tokio/tcp/worker.rs [231:394]
fn poll<Sub, Pub>(
&mut self,
cx: &mut task::Context,
context: &mut Context<Sub>,
stream: &mut Option<(TcpStream, SocketAddress)>,
subscriber_ctx: &mut Option<Sub::ConnectionContext>,
queue_time: Timestamp,
now: Timestamp,
publisher: &Pub,
) -> Poll<Result<ControlFlow<()>, Option<io::Error>>>
where
Sub: event::Subscriber + Clone,
Pub: EndpointPublisher,
{
let sojourn_time = now.saturating_duration_since(queue_time);
loop {
// figure out where to put the received bytes
let (recv_buffer, blocked_count) = match self {
// borrow the context's recv buffer initially
WorkerState::Init => (&mut context.recv_buffer, 0),
// we have our own recv buffer to use
WorkerState::Buffering {
buffer,
blocked_count,
} => (buffer, *blocked_count),
// we encountered an error so try and send it back
WorkerState::Erroring { offset, buffer, .. } => {
let (stream, _remote_address) = stream.as_mut().unwrap();
let len = ready!(Pin::new(stream).poll_write(cx, &buffer[*offset..]))?;
*offset += len;
// if we still need to send part of the buffer then loop back around
if *offset < buffer.len() {
continue;
}
// io::Error doesn't implement clone so we have to take the error to return it
let WorkerState::Erroring { error, .. } = core::mem::replace(self, Self::Init)
else {
unreachable!()
};
return Err(Some(error)).into();
}
};
// try to read an initial packet from the socket
let res = {
let (stream, remote_address) = stream.as_mut().unwrap();
Self::poll_initial_packet(
cx,
stream,
remote_address,
recv_buffer,
sojourn_time,
publisher,
)
};
let Poll::Ready(res) = res else {
// if we got `Pending` but we don't own the recv buffer then we need to copy it
// into the worker so we can resume where we left off last time
if blocked_count == 0 {
let buffer = recv_buffer.take();
*self = Self::Buffering {
buffer,
blocked_count,
};
}
if let Self::Buffering { blocked_count, .. } = self {
*blocked_count += 1;
}
return Poll::Pending;
};
let initial_packet = res?;
let subscriber_ctx = subscriber_ctx.take().unwrap();
let (socket, remote_address) = stream.take().unwrap();
let recv_buffer = recv::buffer::Local::new(recv_buffer.take(), None);
let recv_buffer = Either::A(recv_buffer);
let peer = env::tcp::Reregistered {
socket,
peer_addr: remote_address,
local_port: context.local_port,
recv_buffer,
};
let stream_builder = match endpoint::accept_stream(
now,
&context.env,
peer,
&initial_packet,
&context.secrets,
subscriber_ctx,
None,
) {
Ok(stream) => stream,
Err(error) => {
if let Some(env::tcp::Reregistered { socket, .. }) = error.peer {
if !error.secret_control.is_empty() {
// if we need to send an error then update the state and loop back
// around
*stream = Some((socket, remote_address));
*self = WorkerState::Erroring {
offset: 0,
buffer: error.secret_control,
error: error.error,
};
continue;
} else {
// close the stream immediately and send a reset to the client
let _ = socket.set_linger(Some(Duration::ZERO));
drop(socket);
}
}
return Err(Some(error.error)).into();
}
};
{
let remote_address: SocketAddress = stream_builder.shared.remote_addr();
let remote_address = &remote_address;
let creds = stream_builder.shared.credentials();
let credential_id = &*creds.id;
let stream_id = creds.key_id.as_u64();
publisher.on_acceptor_tcp_stream_enqueued(
event::builder::AcceptorTcpStreamEnqueued {
remote_address,
credential_id,
stream_id,
sojourn_time,
blocked_count,
},
);
}
let res = match context.accept_flavor {
accept::Flavor::Fifo => context.sender.send_back(stream_builder),
accept::Flavor::Lifo => context.sender.send_front(stream_builder),
};
return Poll::Ready(Ok(match res {
Ok(prev) => {
if let Some(stream) = prev {
stream.prune(
event::builder::AcceptorStreamPruneReason::AcceptQueueCapacityExceeded,
);
}
ControlFlow::Continue(())
}
Err(_err) => {
debug!("application accept queue dropped; shutting down");
ControlFlow::Break(())
}
}));
}
}