fn poll()

in dc/s2n-quic-dc/src/stream/server/tokio/tcp/worker.rs [231:394]


    fn poll<Sub, Pub>(
        &mut self,
        cx: &mut task::Context,
        context: &mut Context<Sub>,
        stream: &mut Option<(TcpStream, SocketAddress)>,
        subscriber_ctx: &mut Option<Sub::ConnectionContext>,
        queue_time: Timestamp,
        now: Timestamp,
        publisher: &Pub,
    ) -> Poll<Result<ControlFlow<()>, Option<io::Error>>>
    where
        Sub: event::Subscriber + Clone,
        Pub: EndpointPublisher,
    {
        let sojourn_time = now.saturating_duration_since(queue_time);

        loop {
            // figure out where to put the received bytes
            let (recv_buffer, blocked_count) = match self {
                // borrow the context's recv buffer initially
                WorkerState::Init => (&mut context.recv_buffer, 0),
                // we have our own recv buffer to use
                WorkerState::Buffering {
                    buffer,
                    blocked_count,
                } => (buffer, *blocked_count),
                // we encountered an error so try and send it back
                WorkerState::Erroring { offset, buffer, .. } => {
                    let (stream, _remote_address) = stream.as_mut().unwrap();
                    let len = ready!(Pin::new(stream).poll_write(cx, &buffer[*offset..]))?;

                    *offset += len;

                    // if we still need to send part of the buffer then loop back around
                    if *offset < buffer.len() {
                        continue;
                    }

                    // io::Error doesn't implement clone so we have to take the error to return it
                    let WorkerState::Erroring { error, .. } = core::mem::replace(self, Self::Init)
                    else {
                        unreachable!()
                    };

                    return Err(Some(error)).into();
                }
            };

            // try to read an initial packet from the socket
            let res = {
                let (stream, remote_address) = stream.as_mut().unwrap();
                Self::poll_initial_packet(
                    cx,
                    stream,
                    remote_address,
                    recv_buffer,
                    sojourn_time,
                    publisher,
                )
            };

            let Poll::Ready(res) = res else {
                // if we got `Pending` but we don't own the recv buffer then we need to copy it
                // into the worker so we can resume where we left off last time
                if blocked_count == 0 {
                    let buffer = recv_buffer.take();
                    *self = Self::Buffering {
                        buffer,
                        blocked_count,
                    };
                }

                if let Self::Buffering { blocked_count, .. } = self {
                    *blocked_count += 1;
                }

                return Poll::Pending;
            };

            let initial_packet = res?;

            let subscriber_ctx = subscriber_ctx.take().unwrap();
            let (socket, remote_address) = stream.take().unwrap();

            let recv_buffer = recv::buffer::Local::new(recv_buffer.take(), None);
            let recv_buffer = Either::A(recv_buffer);

            let peer = env::tcp::Reregistered {
                socket,
                peer_addr: remote_address,
                local_port: context.local_port,
                recv_buffer,
            };

            let stream_builder = match endpoint::accept_stream(
                now,
                &context.env,
                peer,
                &initial_packet,
                &context.secrets,
                subscriber_ctx,
                None,
            ) {
                Ok(stream) => stream,
                Err(error) => {
                    if let Some(env::tcp::Reregistered { socket, .. }) = error.peer {
                        if !error.secret_control.is_empty() {
                            // if we need to send an error then update the state and loop back
                            // around
                            *stream = Some((socket, remote_address));
                            *self = WorkerState::Erroring {
                                offset: 0,
                                buffer: error.secret_control,
                                error: error.error,
                            };
                            continue;
                        } else {
                            // close the stream immediately and send a reset to the client
                            let _ = socket.set_linger(Some(Duration::ZERO));
                            drop(socket);
                        }
                    }
                    return Err(Some(error.error)).into();
                }
            };

            {
                let remote_address: SocketAddress = stream_builder.shared.remote_addr();
                let remote_address = &remote_address;
                let creds = stream_builder.shared.credentials();
                let credential_id = &*creds.id;
                let stream_id = creds.key_id.as_u64();
                publisher.on_acceptor_tcp_stream_enqueued(
                    event::builder::AcceptorTcpStreamEnqueued {
                        remote_address,
                        credential_id,
                        stream_id,
                        sojourn_time,
                        blocked_count,
                    },
                );
            }

            let res = match context.accept_flavor {
                accept::Flavor::Fifo => context.sender.send_back(stream_builder),
                accept::Flavor::Lifo => context.sender.send_front(stream_builder),
            };

            return Poll::Ready(Ok(match res {
                Ok(prev) => {
                    if let Some(stream) = prev {
                        stream.prune(
                            event::builder::AcceptorStreamPruneReason::AcceptQueueCapacityExceeded,
                        );
                    }
                    ControlFlow::Continue(())
                }
                Err(_err) => {
                    debug!("application accept queue dropped; shutting down");
                    ControlFlow::Break(())
                }
            }));
        }
    }