fn poll_next()

in mqtt/mqtt3/src/client/mod.rs [236:471]


    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        use futures_util::Sink;

        let reason = loop {
            match &mut self.0 {
                ClientState::Up {
                    client_id,
                    username,
                    will,
                    keep_alive,

                    shutdown_recv,

                    packet_identifiers,

                    connect,
                    ping,
                    publish,
                    subscriptions,

                    packets_waiting_to_be_sent,
                    ..
                } => {
                    match std::pin::Pin::new(shutdown_recv).poll_next(cx) {
                        std::task::Poll::Ready(Some(())) => break None,

                        std::task::Poll::Ready(None) | std::task::Poll::Pending => (),
                    }

                    let connect::Connected {
                        framed,
                        new_connection,
                        reset_session,
                    } = match connect.poll(
                        cx,
                        username.as_ref().map(AsRef::as_ref),
                        will.as_ref(),
                        client_id,
                        *keep_alive,
                    ) {
                        std::task::Poll::Ready(framed) => framed,
                        std::task::Poll::Pending => return std::task::Poll::Pending,
                    };

                    if new_connection {
                        log::debug!("New connection established");

                        *packets_waiting_to_be_sent = Default::default();

                        ping.new_connection();

                        packets_waiting_to_be_sent
                            .extend(publish.new_connection(reset_session, packet_identifiers));

                        packets_waiting_to_be_sent.extend(
                            subscriptions.new_connection(reset_session, packet_identifiers),
                        );

                        return std::task::Poll::Ready(Some(Ok(Event::NewConnection {
                            reset_session,
                        })));
                    }

                    match client_poll(
                        cx,
                        framed,
                        *keep_alive,
                        packets_waiting_to_be_sent,
                        packet_identifiers,
                        ping,
                        publish,
                        subscriptions,
                    ) {
                        std::task::Poll::Ready(Ok(event)) => {
                            return std::task::Poll::Ready(Some(Ok(event)))
                        }

                        std::task::Poll::Ready(Err(err)) => {
                            if err.is_user_error() {
                                break Some(err);
                            }
                            log::warn!("client will reconnect because of error: {}", err);

                            if !err.session_is_resumable() {
                                // Ensure clean session if the error is such that the session is not resumable.
                                //
                                // DEVNOTE: subscriptions::State relies on the fact that the session is reset here.
                                // Update that if this ever changes.
                                *client_id = match std::mem::replace(
                                    client_id,
                                    crate::proto::ClientId::ServerGenerated,
                                ) {
                                    id @ (crate::proto::ClientId::ServerGenerated
                                    | crate::proto::ClientId::IdWithCleanSession(_)) => id,
                                    crate::proto::ClientId::IdWithExistingSession(id) => {
                                        crate::proto::ClientId::IdWithCleanSession(id)
                                    }
                                };
                            }

                            connect.reconnect();

                            if err.is_connection_error() {
                                return std::task::Poll::Ready(Some(Ok(Event::Disconnected(
                                    err.into(),
                                ))));
                            }
                        }

                        std::task::Poll::Pending => return std::task::Poll::Pending,
                    }
                }

                ClientState::ShuttingDown {
                    client_id,
                    username,
                    will,
                    keep_alive,

                    connect,

                    sent_disconnect,

                    reason,
                } => {
                    let connect::Connected { mut framed, .. } = match connect.poll(
                        cx,
                        username.as_ref().map(AsRef::as_ref),
                        will.as_ref(),
                        client_id,
                        *keep_alive,
                    ) {
                        std::task::Poll::Ready(framed) => framed,
                        std::task::Poll::Pending => {
                            // Already disconnected
                            self.0 = ClientState::ShutDown {
                                reason: reason.take(),
                            };
                            continue;
                        }
                    };

                    loop {
                        if *sent_disconnect {
                            match std::pin::Pin::new(&mut framed).poll_flush(cx) {
                                std::task::Poll::Ready(Ok(())) => {
                                    self.0 = ClientState::ShutDown {
                                        reason: reason.take(),
                                    };
                                    break;
                                }

                                std::task::Poll::Ready(Err(err)) => {
                                    let err = Error::EncodePacket(err);
                                    log::warn!("couldn't send DISCONNECT: {}", err);
                                    self.0 = ClientState::ShutDown {
                                        reason: reason.take(),
                                    };
                                    break;
                                }

                                std::task::Poll::Pending => return std::task::Poll::Pending,
                            }
                        }
                        match std::pin::Pin::new(&mut framed).poll_ready(cx) {
                            std::task::Poll::Ready(Ok(())) => {
                                let packet =
                                    crate::proto::Packet::Disconnect(crate::proto::Disconnect);
                                match std::pin::Pin::new(&mut framed).start_send(packet) {
                                    Ok(()) => *sent_disconnect = true,

                                    Err(err) => {
                                        log::warn!("couldn't send DISCONNECT: {}", err);
                                        self.0 = ClientState::ShutDown {
                                            reason: reason.take(),
                                        };
                                        break;
                                    }
                                }
                            }

                            std::task::Poll::Ready(Err(err)) => {
                                log::warn!("couldn't send DISCONNECT: {}", err);
                                self.0 = ClientState::ShutDown {
                                    reason: reason.take(),
                                };
                                break;
                            }

                            std::task::Poll::Pending => return std::task::Poll::Pending,
                        }
                    }
                }

                ClientState::ShutDown { reason } => match reason.take() {
                    Some(err) => return std::task::Poll::Ready(Some(Err(err))),
                    None => return std::task::Poll::Ready(None),
                },
            }
        };

        // If we're here, then we're transitioning from Up to ShuttingDown

        match std::mem::replace(&mut self.0, ClientState::ShutDown { reason: None }) {
            ClientState::Up {
                client_id,
                username,
                will,
                keep_alive,

                connect,
                ..
            } => {
                log::warn!("Shutting down...");

                self.0 = ClientState::ShuttingDown {
                    client_id,
                    username,
                    will,
                    keep_alive,

                    connect,

                    sent_disconnect: false,

                    reason,
                };
                self.poll_next(cx)
            }

            _ => unreachable!(),
        }
    }