in rust/azure_iot_operations_protocol/src/rpc_command/invoker.rs [298:511]
fn try_from(value: Publish) -> Result<CommandResult<TResp>, Self::Error> {
// NOTE: User properties are parsed out into a new HashMap because:
// 1) It makes the code more readable/maintanable to do HashMap lookups
// 2) When this logic is extracted to a ChunkBuffer, it will be more memory efficient as
// we won't want to keep entire copies of all Publishes, so we will just copy the
// properties once.
let publish_properties =
value
.properties
.ok_or(AIOProtocolError::new_header_missing_error(
"Properties",
false,
Some("Properties missing from MQTT message".to_string()),
None,
))?;
// Parse user properties
let expected_aio_properties = [
UserProperty::Timestamp,
UserProperty::Status,
UserProperty::StatusMessage,
UserProperty::IsApplicationError,
UserProperty::InvalidPropertyName,
UserProperty::InvalidPropertyValue,
UserProperty::ProtocolVersion,
UserProperty::SupportedMajorVersions,
UserProperty::RequestProtocolVersion,
];
let mut response_custom_user_data = vec![];
let mut response_aio_data = HashMap::new();
for (key, value) in publish_properties.user_properties {
match UserProperty::from_str(&key) {
Ok(p) if expected_aio_properties.contains(&p) => {
response_aio_data.insert(p, value);
}
Ok(_) => {
log::warn!(
"Response should not contain MQTT user property '{key}'. Value is '{value}'"
);
response_custom_user_data.push((key, value));
}
Err(()) => {
response_custom_user_data.push((key, value));
}
}
}
// Check the protocol version.
// If the protocol version is not supported, or cannot be parsed, all bets are off
// regarding what anything else even means, so this *must* be done first
let protocol_version = {
match response_aio_data.get(&UserProperty::ProtocolVersion) {
Some(protocol_version) => {
if let Some(version) = ProtocolVersion::parse_protocol_version(protocol_version)
{
version
} else {
return Err(AIOProtocolError::new_unsupported_version_error(
Some(format!(
"Received a response with an unparsable protocol version number: {protocol_version}"
)),
protocol_version.to_string(),
SUPPORTED_PROTOCOL_VERSIONS.to_vec(),
None,
false,
false,
));
}
}
None => DEFAULT_RPC_COMMAND_PROTOCOL_VERSION,
}
};
if !protocol_version.is_supported(SUPPORTED_PROTOCOL_VERSIONS) {
return Err(AIOProtocolError::new_unsupported_version_error(
None,
protocol_version.to_string(),
SUPPORTED_PROTOCOL_VERSIONS.to_vec(),
None,
false,
false,
));
}
// Check the status code.
// We will use this to determine which data format to serialize to.
let status_code = {
match response_aio_data.get(&UserProperty::Status) {
Some(s) => match StatusCode::from_str(s) {
Ok(code) => code,
Err(StatusCodeParseError::UnparsableStatusCode(s)) => {
return Err(AIOProtocolError::new_header_invalid_error(
&UserProperty::Status.to_string(),
&s,
false,
Some(format!(
"Could not parse status in response '{s}' as an integer"
)),
None,
));
}
Err(StatusCodeParseError::UnknownStatusCode(_)) => {
let status_message = response_aio_data
.remove(&UserProperty::StatusMessage)
.unwrap_or(String::from("Unknown"));
let mut unknown_err = AIOProtocolError::new_unknown_error(
true,
false,
None,
Some(status_message),
None,
);
// Add any invalid properties that might be included for extra information
unknown_err.property_name =
response_aio_data.remove(&UserProperty::InvalidPropertyName);
unknown_err.property_value = response_aio_data
.remove(&UserProperty::InvalidPropertyValue)
.map(Value::String);
return Err(unknown_err);
}
},
None => {
return Err(AIOProtocolError::new_header_missing_error(
&UserProperty::Status.to_string(),
false,
Some(format!(
"Response missing MQTT user property '{}'",
UserProperty::Status
)),
None,
));
}
}
};
// Get HLC here since we will need it no matter what type of result we are processing
let timestamp = response_aio_data
.get(&UserProperty::Timestamp)
.map(|s| HybridLogicalClock::from_str(s))
.transpose()?;
// Process result based on status code
let command_result = match status_code {
// Response with payload
StatusCode::Ok | StatusCode::NoContent => {
let content_type = publish_properties.content_type;
let format_indicator = publish_properties.payload_format_indicator.try_into().unwrap_or_else(|e| {
log::error!("Received invalid payload format indicator: {e}. This should not be possible to receive from the broker. Using default.");
FormatIndicator::default()
});
if matches!(status_code, StatusCode::NoContent) && !value.payload.is_empty() {
return Err(AIOProtocolError::new_payload_invalid_error(
false,
false,
None,
Some("Status code 204 (No Content) should not have a payload".to_string()),
None,
));
}
let payload = match TResp::deserialize(
&value.payload,
content_type.as_ref(),
&format_indicator,
) {
Ok(payload) => payload,
Err(DeserializationError::InvalidPayload(e)) => {
return Err(AIOProtocolError::new_payload_invalid_error(
false,
false,
Some(e.into()),
None,
None,
));
}
Err(DeserializationError::UnsupportedContentType(message)) => {
return Err(AIOProtocolError::new_header_invalid_error(
"Content Type",
&content_type.unwrap_or("None".to_string()),
false,
Some(message),
None,
));
}
};
Self::Ok(Response {
payload,
content_type,
format_indicator,
custom_user_data: response_custom_user_data,
timestamp,
})
}
// RemoteError
_ => Self::Err(RemoteError {
status_code,
protocol_version,
status_message: response_aio_data.remove(&UserProperty::StatusMessage),
is_application_error: response_aio_data
.get(&UserProperty::IsApplicationError)
.is_some_and(|v| v == "true"),
invalid_property_name: response_aio_data.remove(&UserProperty::InvalidPropertyName),
invalid_property_value: response_aio_data
.remove(&UserProperty::InvalidPropertyValue),
timestamp,
supported_protocol_major_versions: response_aio_data
.get(&UserProperty::SupportedMajorVersions)
.map(|s| parse_supported_protocol_major_versions(s)),
}),
};
Ok(command_result)
}