fn client_poll()

in mqtt/mqtt3/src/client/mod.rs [606:706]


fn client_poll<S>(
    cx: &mut std::task::Context<'_>,

    framed: &mut crate::logging_framed::LoggingFramed<S>,
    keep_alive: std::time::Duration,
    packets_waiting_to_be_sent: &mut std::collections::VecDeque<crate::proto::Packet>,
    packet_identifiers: &mut PacketIdentifiers,
    ping: &mut ping::State,
    publish: &mut publish::State,
    subscriptions: &mut subscriptions::State,
) -> std::task::Poll<Result<Event, Error>>
where
    S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
    use futures_util::{Sink, Stream};

    loop {
        // Begin sending any packets waiting to be sent
        while let Some(packet) = packets_waiting_to_be_sent.pop_front() {
            match std::pin::Pin::new(&mut *framed).poll_ready(cx) {
                std::task::Poll::Ready(result) => {
                    let () = result.map_err(Error::EncodePacket)?;
                    let () = std::pin::Pin::new(&mut *framed)
                        .start_send(packet)
                        .map_err(Error::EncodePacket)?;
                }

                std::task::Poll::Pending => {
                    packets_waiting_to_be_sent.push_front(packet);
                    break;
                }
            }
        }

        // Finish sending any packets waiting to be sent.
        //
        // We don't care whether this returns Poll::Ready or Poll::Pending.
        let _: std::task::Poll<_> = std::pin::Pin::new(&mut *framed)
            .poll_flush(cx)
            .map_err(Error::EncodePacket)?;

        let mut continue_loop = false;

        let mut packet = match std::pin::Pin::new(&mut *framed).poll_next(cx) {
            std::task::Poll::Ready(Some(packet)) => {
                let packet = packet.map_err(Error::DecodePacket)?;

                // May have more packets after this one, so keep looping
                continue_loop = true;
                Some(packet)
            }
            std::task::Poll::Ready(None) => {
                return std::task::Poll::Ready(Err(Error::ServerClosedConnection))
            }
            std::task::Poll::Pending => None,
        };

        let mut new_packets_to_be_sent = vec![];

        // Ping
        let ping_packet = ping.poll(cx, &mut packet, keep_alive);
        new_packets_to_be_sent.extend(ping_packet);

        // Publish
        let (new_publish_packets, publication_received) =
            publish.poll(cx, &mut packet, packet_identifiers)?;
        new_packets_to_be_sent.extend(new_publish_packets);

        // Subscriptions
        let subscription_updates = if publication_received.is_some() {
            // Already have a new publication to return from this tick, so can't process pending subscription updates
            // because they might generate their own responses.
            vec![]
        } else {
            let (new_subscription_packets, subscription_updates) =
                subscriptions.poll(cx, &mut packet, packet_identifiers)?;
            new_packets_to_be_sent.extend(new_subscription_packets);
            subscription_updates
        };

        assert!(packet.is_none(), "unconsumed packet");

        if !new_packets_to_be_sent.is_empty() {
            // Have new packets to send, so keep looping
            continue_loop = true;
            packets_waiting_to_be_sent.extend(new_packets_to_be_sent);
        }

        if let Some(publication_received) = publication_received {
            return std::task::Poll::Ready(Ok(Event::Publication(publication_received)));
        }

        if !subscription_updates.is_empty() {
            return std::task::Poll::Ready(Ok(Event::SubscriptionUpdates(subscription_updates)));
        }

        if !continue_loop {
            return std::task::Poll::Pending;
        }
    }
}