in mqtt/mqtt3/src/client/mod.rs [236:471]
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
use futures_util::Sink;
let reason = loop {
match &mut self.0 {
ClientState::Up {
client_id,
username,
will,
keep_alive,
shutdown_recv,
packet_identifiers,
connect,
ping,
publish,
subscriptions,
packets_waiting_to_be_sent,
..
} => {
match std::pin::Pin::new(shutdown_recv).poll_next(cx) {
std::task::Poll::Ready(Some(())) => break None,
std::task::Poll::Ready(None) | std::task::Poll::Pending => (),
}
let connect::Connected {
framed,
new_connection,
reset_session,
} = match connect.poll(
cx,
username.as_ref().map(AsRef::as_ref),
will.as_ref(),
client_id,
*keep_alive,
) {
std::task::Poll::Ready(framed) => framed,
std::task::Poll::Pending => return std::task::Poll::Pending,
};
if new_connection {
log::debug!("New connection established");
*packets_waiting_to_be_sent = Default::default();
ping.new_connection();
packets_waiting_to_be_sent
.extend(publish.new_connection(reset_session, packet_identifiers));
packets_waiting_to_be_sent.extend(
subscriptions.new_connection(reset_session, packet_identifiers),
);
return std::task::Poll::Ready(Some(Ok(Event::NewConnection {
reset_session,
})));
}
match client_poll(
cx,
framed,
*keep_alive,
packets_waiting_to_be_sent,
packet_identifiers,
ping,
publish,
subscriptions,
) {
std::task::Poll::Ready(Ok(event)) => {
return std::task::Poll::Ready(Some(Ok(event)))
}
std::task::Poll::Ready(Err(err)) => {
if err.is_user_error() {
break Some(err);
}
log::warn!("client will reconnect because of error: {}", err);
if !err.session_is_resumable() {
// Ensure clean session if the error is such that the session is not resumable.
//
// DEVNOTE: subscriptions::State relies on the fact that the session is reset here.
// Update that if this ever changes.
*client_id = match std::mem::replace(
client_id,
crate::proto::ClientId::ServerGenerated,
) {
id @ (crate::proto::ClientId::ServerGenerated
| crate::proto::ClientId::IdWithCleanSession(_)) => id,
crate::proto::ClientId::IdWithExistingSession(id) => {
crate::proto::ClientId::IdWithCleanSession(id)
}
};
}
connect.reconnect();
if err.is_connection_error() {
return std::task::Poll::Ready(Some(Ok(Event::Disconnected(
err.into(),
))));
}
}
std::task::Poll::Pending => return std::task::Poll::Pending,
}
}
ClientState::ShuttingDown {
client_id,
username,
will,
keep_alive,
connect,
sent_disconnect,
reason,
} => {
let connect::Connected { mut framed, .. } = match connect.poll(
cx,
username.as_ref().map(AsRef::as_ref),
will.as_ref(),
client_id,
*keep_alive,
) {
std::task::Poll::Ready(framed) => framed,
std::task::Poll::Pending => {
// Already disconnected
self.0 = ClientState::ShutDown {
reason: reason.take(),
};
continue;
}
};
loop {
if *sent_disconnect {
match std::pin::Pin::new(&mut framed).poll_flush(cx) {
std::task::Poll::Ready(Ok(())) => {
self.0 = ClientState::ShutDown {
reason: reason.take(),
};
break;
}
std::task::Poll::Ready(Err(err)) => {
let err = Error::EncodePacket(err);
log::warn!("couldn't send DISCONNECT: {}", err);
self.0 = ClientState::ShutDown {
reason: reason.take(),
};
break;
}
std::task::Poll::Pending => return std::task::Poll::Pending,
}
}
match std::pin::Pin::new(&mut framed).poll_ready(cx) {
std::task::Poll::Ready(Ok(())) => {
let packet =
crate::proto::Packet::Disconnect(crate::proto::Disconnect);
match std::pin::Pin::new(&mut framed).start_send(packet) {
Ok(()) => *sent_disconnect = true,
Err(err) => {
log::warn!("couldn't send DISCONNECT: {}", err);
self.0 = ClientState::ShutDown {
reason: reason.take(),
};
break;
}
}
}
std::task::Poll::Ready(Err(err)) => {
log::warn!("couldn't send DISCONNECT: {}", err);
self.0 = ClientState::ShutDown {
reason: reason.take(),
};
break;
}
std::task::Poll::Pending => return std::task::Poll::Pending,
}
}
}
ClientState::ShutDown { reason } => match reason.take() {
Some(err) => return std::task::Poll::Ready(Some(Err(err))),
None => return std::task::Poll::Ready(None),
},
}
};
// If we're here, then we're transitioning from Up to ShuttingDown
match std::mem::replace(&mut self.0, ClientState::ShutDown { reason: None }) {
ClientState::Up {
client_id,
username,
will,
keep_alive,
connect,
..
} => {
log::warn!("Shutting down...");
self.0 = ClientState::ShuttingDown {
client_id,
username,
will,
keep_alive,
connect,
sent_disconnect: false,
reason,
};
self.poll_next(cx)
}
_ => unreachable!(),
}
}