in mqtt/mqtt3/src/client/mod.rs [606:706]
fn client_poll<S>(
cx: &mut std::task::Context<'_>,
framed: &mut crate::logging_framed::LoggingFramed<S>,
keep_alive: std::time::Duration,
packets_waiting_to_be_sent: &mut std::collections::VecDeque<crate::proto::Packet>,
packet_identifiers: &mut PacketIdentifiers,
ping: &mut ping::State,
publish: &mut publish::State,
subscriptions: &mut subscriptions::State,
) -> std::task::Poll<Result<Event, Error>>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
use futures_util::{Sink, Stream};
loop {
// Begin sending any packets waiting to be sent
while let Some(packet) = packets_waiting_to_be_sent.pop_front() {
match std::pin::Pin::new(&mut *framed).poll_ready(cx) {
std::task::Poll::Ready(result) => {
let () = result.map_err(Error::EncodePacket)?;
let () = std::pin::Pin::new(&mut *framed)
.start_send(packet)
.map_err(Error::EncodePacket)?;
}
std::task::Poll::Pending => {
packets_waiting_to_be_sent.push_front(packet);
break;
}
}
}
// Finish sending any packets waiting to be sent.
//
// We don't care whether this returns Poll::Ready or Poll::Pending.
let _: std::task::Poll<_> = std::pin::Pin::new(&mut *framed)
.poll_flush(cx)
.map_err(Error::EncodePacket)?;
let mut continue_loop = false;
let mut packet = match std::pin::Pin::new(&mut *framed).poll_next(cx) {
std::task::Poll::Ready(Some(packet)) => {
let packet = packet.map_err(Error::DecodePacket)?;
// May have more packets after this one, so keep looping
continue_loop = true;
Some(packet)
}
std::task::Poll::Ready(None) => {
return std::task::Poll::Ready(Err(Error::ServerClosedConnection))
}
std::task::Poll::Pending => None,
};
let mut new_packets_to_be_sent = vec![];
// Ping
let ping_packet = ping.poll(cx, &mut packet, keep_alive);
new_packets_to_be_sent.extend(ping_packet);
// Publish
let (new_publish_packets, publication_received) =
publish.poll(cx, &mut packet, packet_identifiers)?;
new_packets_to_be_sent.extend(new_publish_packets);
// Subscriptions
let subscription_updates = if publication_received.is_some() {
// Already have a new publication to return from this tick, so can't process pending subscription updates
// because they might generate their own responses.
vec![]
} else {
let (new_subscription_packets, subscription_updates) =
subscriptions.poll(cx, &mut packet, packet_identifiers)?;
new_packets_to_be_sent.extend(new_subscription_packets);
subscription_updates
};
assert!(packet.is_none(), "unconsumed packet");
if !new_packets_to_be_sent.is_empty() {
// Have new packets to send, so keep looping
continue_loop = true;
packets_waiting_to_be_sent.extend(new_packets_to_be_sent);
}
if let Some(publication_received) = publication_received {
return std::task::Poll::Ready(Ok(Event::Publication(publication_received)));
}
if !subscription_updates.is_empty() {
return std::task::Poll::Ready(Ok(Event::SubscriptionUpdates(subscription_updates)));
}
if !continue_loop {
return std::task::Poll::Pending;
}
}
}