fn poll_next()

in edge-modules/api-proxy-module/rust-sdk/azure-iot-mqtt/src/module.rs [197:399]


    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Option<Self::Item>> {
        let this = &mut *self;

        loop {
            log::trace!("    {:?}", this.state);

            while let std::task::Poll::Ready(Some(direct_method_response)) =
                std::pin::Pin::new(&mut this.direct_method_response_recv).poll_next(cx)
            {
                let crate::DirectMethodResponse {
                    request_id,
                    status,
                    payload,
                    ack_sender,
                } = direct_method_response;
                let payload = serde_json::to_vec(&payload)
                    .expect("cannot fail to serialize serde_json::Value");
                let publication = mqtt3::proto::Publication {
                    topic_name: format!("$iothub/methods/res/{}/?$rid={}", status, request_id),
                    qos: mqtt3::proto::QoS::AtLeastOnce,
                    retain: false,
                    payload: payload.into(),
                };

                if ack_sender
                    .send(Box::new(this.inner.publish(publication)))
                    .is_err()
                {
                    log::debug!("could not send ack for direct method response because ack receiver has been dropped");
                }
            }

            match &mut this.state {
                State::WaitingForSubscriptions {
                    reset_session,
                    acked,
                } => {
                    if *reset_session {
                        match std::pin::Pin::new(&mut this.inner).poll_next(cx) {
							std::task::Poll::Ready(Some(Ok(mqtt3::Event::NewConnection { .. } | mqtt3::Event::Disconnected(_)))) => (),

							std::task::Poll::Ready(Some(Ok(mqtt3::Event::Publication(publication)))) => match InternalMessage::parse(publication) {
								Ok(InternalMessage::DirectMethod { name, payload, request_id }) =>
									return std::task::Poll::Ready(Some(Ok(Message::DirectMethod { name, payload, request_id }))),

								Ok(message @ InternalMessage::TwinState(_)) =>
									log::debug!("Discarding message {:?} because we haven't finished subscribing yet", message),

								Err(err) =>
									log::warn!("Discarding message that could not be parsed: {}", err),
							},

							std::task::Poll::Ready(Some(Ok(mqtt3::Event::SubscriptionUpdates(updates)))) => {
								log::debug!("subscriptions acked by server: {:?}", updates);
								*acked += updates.len();
								log::debug!("waiting for {} more subscriptions", this.num_default_subscriptions - *acked);
								if *acked == this.num_default_subscriptions {
									this.state = State::Idle;
								}
                            },



							std::task::Poll::Ready(Some(Err(err))) => return std::task::Poll::Ready(Some(Err(err))),

							std::task::Poll::Ready(None) => return std::task::Poll::Ready(None),

							std::task::Poll::Pending => return std::task::Poll::Pending,
						}
                    } else {
                        this.state = State::Idle;
                    }
                }

                State::Idle => {
                    let mut continue_loop = false;

                    let mut twin_state_message = match std::pin::Pin::new(&mut this.inner)
                        .poll_next(cx)
                    {
                        std::task::Poll::Ready(Some(Ok(mqtt3::Event::NewConnection {
                            reset_session,
                        }))) => {
                            this.state = State::WaitingForSubscriptions {
                                reset_session,
                                acked: 0,
                            };
                            this.desired_properties.new_connection();
                            this.reported_properties.new_connection();
                            continue;
                        }

                        std::task::Poll::Ready(Some(Ok(mqtt3::Event::Publication(
                            publication,
                        )))) => match InternalMessage::parse(publication) {
                            Ok(InternalMessage::DirectMethod {
                                name,
                                payload,
                                request_id,
                            }) => {
                                return std::task::Poll::Ready(Some(Ok(Message::DirectMethod {
                                    name,
                                    payload,
                                    request_id,
                                })))
                            }

                            Ok(InternalMessage::TwinState(message)) => {
                                // There may be more messages, so continue the loop
                                continue_loop = true;

                                Some(message)
                            }

                            Err(err) => {
                                log::warn!("Discarding message that could not be parsed: {}", err);
                                continue;
                            }
                        },

                        // Don't expect any subscription updates at this point
                        std::task::Poll::Ready(Some(Ok(mqtt3::Event::SubscriptionUpdates(_)))) => {
                            unreachable!()
                        }

                        std::task::Poll::Ready(Some(Ok(mqtt3::Event::Disconnected(_)))) => continue,

                        std::task::Poll::Ready(Some(Err(err))) => {
                            return std::task::Poll::Ready(Some(Err(err)))
                        }

                        std::task::Poll::Ready(None) => return std::task::Poll::Ready(None),

                        std::task::Poll::Pending => None,
                    };

                    match this.desired_properties.poll(
                        cx,
                        &mut this.inner,
                        &mut twin_state_message,
                        &mut this.previous_request_id,
                    ) {
                        Ok(crate::twin_state::Response::Message(
                            crate::twin_state::desired::Message::Initial(twin_state),
                        )) => {
                            this.reported_properties
                                .set_initial_state(twin_state.reported.properties.clone());
                            return std::task::Poll::Ready(Some(Ok(Message::TwinInitial(
                                twin_state,
                            ))));
                        }

                        Ok(crate::twin_state::Response::Message(
                            crate::twin_state::desired::Message::Patch(properties),
                        )) => {
                            return std::task::Poll::Ready(Some(Ok(Message::TwinPatch(properties))))
                        }

                        Ok(crate::twin_state::Response::Continue) => continue_loop = true,

                        Ok(crate::twin_state::Response::NotReady) => (),

                        Err(err) => {
                            log::warn!("Discarding message that could not be parsed: {}", err);
                        }
                    }

                    match this.reported_properties.poll(
                        cx,
                        &mut this.inner,
                        &mut twin_state_message,
                        &mut this.previous_request_id,
                    ) {
                        Ok(crate::twin_state::Response::Message(message)) => match message {
                            crate::twin_state::reported::Message::Reported(version) => {
                                return std::task::Poll::Ready(Some(Ok(
                                    Message::ReportedTwinState(version),
                                )))
                            }
                        },
                        Ok(crate::twin_state::Response::Continue) => continue_loop = true,
                        Ok(crate::twin_state::Response::NotReady) => (),
                        Err(err) => {
                            log::warn!("Discarding message that could not be parsed: {}", err);
                        }
                    }

                    if let Some(twin_state_message) = twin_state_message {
                        // This can happen if the Azure IoT Hub responded to a reported property request that we aren't waiting for
                        // because we have since sent a new one
                        log::debug!("unconsumed twin state message {:?}", twin_state_message);
                    }

                    if !continue_loop {
                        return std::task::Poll::Pending;
                    }
                }
            }
        }
    }