in edge-modules/api-proxy-module/rust-sdk/azure-iot-mqtt/src/module.rs [197:399]
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = &mut *self;
loop {
log::trace!(" {:?}", this.state);
while let std::task::Poll::Ready(Some(direct_method_response)) =
std::pin::Pin::new(&mut this.direct_method_response_recv).poll_next(cx)
{
let crate::DirectMethodResponse {
request_id,
status,
payload,
ack_sender,
} = direct_method_response;
let payload = serde_json::to_vec(&payload)
.expect("cannot fail to serialize serde_json::Value");
let publication = mqtt3::proto::Publication {
topic_name: format!("$iothub/methods/res/{}/?$rid={}", status, request_id),
qos: mqtt3::proto::QoS::AtLeastOnce,
retain: false,
payload: payload.into(),
};
if ack_sender
.send(Box::new(this.inner.publish(publication)))
.is_err()
{
log::debug!("could not send ack for direct method response because ack receiver has been dropped");
}
}
match &mut this.state {
State::WaitingForSubscriptions {
reset_session,
acked,
} => {
if *reset_session {
match std::pin::Pin::new(&mut this.inner).poll_next(cx) {
std::task::Poll::Ready(Some(Ok(mqtt3::Event::NewConnection { .. } | mqtt3::Event::Disconnected(_)))) => (),
std::task::Poll::Ready(Some(Ok(mqtt3::Event::Publication(publication)))) => match InternalMessage::parse(publication) {
Ok(InternalMessage::DirectMethod { name, payload, request_id }) =>
return std::task::Poll::Ready(Some(Ok(Message::DirectMethod { name, payload, request_id }))),
Ok(message @ InternalMessage::TwinState(_)) =>
log::debug!("Discarding message {:?} because we haven't finished subscribing yet", message),
Err(err) =>
log::warn!("Discarding message that could not be parsed: {}", err),
},
std::task::Poll::Ready(Some(Ok(mqtt3::Event::SubscriptionUpdates(updates)))) => {
log::debug!("subscriptions acked by server: {:?}", updates);
*acked += updates.len();
log::debug!("waiting for {} more subscriptions", this.num_default_subscriptions - *acked);
if *acked == this.num_default_subscriptions {
this.state = State::Idle;
}
},
std::task::Poll::Ready(Some(Err(err))) => return std::task::Poll::Ready(Some(Err(err))),
std::task::Poll::Ready(None) => return std::task::Poll::Ready(None),
std::task::Poll::Pending => return std::task::Poll::Pending,
}
} else {
this.state = State::Idle;
}
}
State::Idle => {
let mut continue_loop = false;
let mut twin_state_message = match std::pin::Pin::new(&mut this.inner)
.poll_next(cx)
{
std::task::Poll::Ready(Some(Ok(mqtt3::Event::NewConnection {
reset_session,
}))) => {
this.state = State::WaitingForSubscriptions {
reset_session,
acked: 0,
};
this.desired_properties.new_connection();
this.reported_properties.new_connection();
continue;
}
std::task::Poll::Ready(Some(Ok(mqtt3::Event::Publication(
publication,
)))) => match InternalMessage::parse(publication) {
Ok(InternalMessage::DirectMethod {
name,
payload,
request_id,
}) => {
return std::task::Poll::Ready(Some(Ok(Message::DirectMethod {
name,
payload,
request_id,
})))
}
Ok(InternalMessage::TwinState(message)) => {
// There may be more messages, so continue the loop
continue_loop = true;
Some(message)
}
Err(err) => {
log::warn!("Discarding message that could not be parsed: {}", err);
continue;
}
},
// Don't expect any subscription updates at this point
std::task::Poll::Ready(Some(Ok(mqtt3::Event::SubscriptionUpdates(_)))) => {
unreachable!()
}
std::task::Poll::Ready(Some(Ok(mqtt3::Event::Disconnected(_)))) => continue,
std::task::Poll::Ready(Some(Err(err))) => {
return std::task::Poll::Ready(Some(Err(err)))
}
std::task::Poll::Ready(None) => return std::task::Poll::Ready(None),
std::task::Poll::Pending => None,
};
match this.desired_properties.poll(
cx,
&mut this.inner,
&mut twin_state_message,
&mut this.previous_request_id,
) {
Ok(crate::twin_state::Response::Message(
crate::twin_state::desired::Message::Initial(twin_state),
)) => {
this.reported_properties
.set_initial_state(twin_state.reported.properties.clone());
return std::task::Poll::Ready(Some(Ok(Message::TwinInitial(
twin_state,
))));
}
Ok(crate::twin_state::Response::Message(
crate::twin_state::desired::Message::Patch(properties),
)) => {
return std::task::Poll::Ready(Some(Ok(Message::TwinPatch(properties))))
}
Ok(crate::twin_state::Response::Continue) => continue_loop = true,
Ok(crate::twin_state::Response::NotReady) => (),
Err(err) => {
log::warn!("Discarding message that could not be parsed: {}", err);
}
}
match this.reported_properties.poll(
cx,
&mut this.inner,
&mut twin_state_message,
&mut this.previous_request_id,
) {
Ok(crate::twin_state::Response::Message(message)) => match message {
crate::twin_state::reported::Message::Reported(version) => {
return std::task::Poll::Ready(Some(Ok(
Message::ReportedTwinState(version),
)))
}
},
Ok(crate::twin_state::Response::Continue) => continue_loop = true,
Ok(crate::twin_state::Response::NotReady) => (),
Err(err) => {
log::warn!("Discarding message that could not be parsed: {}", err);
}
}
if let Some(twin_state_message) = twin_state_message {
// This can happen if the Azure IoT Hub responded to a reported property request that we aren't waiting for
// because we have since sent a new one
log::debug!("unconsumed twin state message {:?}", twin_state_message);
}
if !continue_loop {
return std::task::Poll::Pending;
}
}
}
}
}