fn try_from()

in rust/azure_iot_operations_protocol/src/rpc_command/invoker.rs [298:511]


    fn try_from(value: Publish) -> Result<CommandResult<TResp>, Self::Error> {
        // NOTE: User properties are parsed out into a new HashMap because:
        // 1) It makes the code more readable/maintanable to do HashMap lookups
        // 2) When this logic is extracted to a ChunkBuffer, it will be more memory efficient as
        //  we won't want to keep entire copies of all Publishes, so we will just copy the
        //  properties once.

        let publish_properties =
            value
                .properties
                .ok_or(AIOProtocolError::new_header_missing_error(
                    "Properties",
                    false,
                    Some("Properties missing from MQTT message".to_string()),
                    None,
                ))?;

        // Parse user properties
        let expected_aio_properties = [
            UserProperty::Timestamp,
            UserProperty::Status,
            UserProperty::StatusMessage,
            UserProperty::IsApplicationError,
            UserProperty::InvalidPropertyName,
            UserProperty::InvalidPropertyValue,
            UserProperty::ProtocolVersion,
            UserProperty::SupportedMajorVersions,
            UserProperty::RequestProtocolVersion,
        ];
        let mut response_custom_user_data = vec![];
        let mut response_aio_data = HashMap::new();
        for (key, value) in publish_properties.user_properties {
            match UserProperty::from_str(&key) {
                Ok(p) if expected_aio_properties.contains(&p) => {
                    response_aio_data.insert(p, value);
                }
                Ok(_) => {
                    log::warn!(
                        "Response should not contain MQTT user property '{key}'. Value is '{value}'"
                    );
                    response_custom_user_data.push((key, value));
                }
                Err(()) => {
                    response_custom_user_data.push((key, value));
                }
            }
        }

        // Check the protocol version.
        // If the protocol version is not supported, or cannot be parsed, all bets are off
        // regarding what anything else even means, so this *must* be done first
        let protocol_version = {
            match response_aio_data.get(&UserProperty::ProtocolVersion) {
                Some(protocol_version) => {
                    if let Some(version) = ProtocolVersion::parse_protocol_version(protocol_version)
                    {
                        version
                    } else {
                        return Err(AIOProtocolError::new_unsupported_version_error(
                            Some(format!(
                                "Received a response with an unparsable protocol version number: {protocol_version}"
                            )),
                            protocol_version.to_string(),
                            SUPPORTED_PROTOCOL_VERSIONS.to_vec(),
                            None,
                            false,
                            false,
                        ));
                    }
                }
                None => DEFAULT_RPC_COMMAND_PROTOCOL_VERSION,
            }
        };
        if !protocol_version.is_supported(SUPPORTED_PROTOCOL_VERSIONS) {
            return Err(AIOProtocolError::new_unsupported_version_error(
                None,
                protocol_version.to_string(),
                SUPPORTED_PROTOCOL_VERSIONS.to_vec(),
                None,
                false,
                false,
            ));
        }

        // Check the status code.
        // We will use this to determine which data format to serialize to.
        let status_code = {
            match response_aio_data.get(&UserProperty::Status) {
                Some(s) => match StatusCode::from_str(s) {
                    Ok(code) => code,
                    Err(StatusCodeParseError::UnparsableStatusCode(s)) => {
                        return Err(AIOProtocolError::new_header_invalid_error(
                            &UserProperty::Status.to_string(),
                            &s,
                            false,
                            Some(format!(
                                "Could not parse status in response '{s}' as an integer"
                            )),
                            None,
                        ));
                    }
                    Err(StatusCodeParseError::UnknownStatusCode(_)) => {
                        let status_message = response_aio_data
                            .remove(&UserProperty::StatusMessage)
                            .unwrap_or(String::from("Unknown"));
                        let mut unknown_err = AIOProtocolError::new_unknown_error(
                            true,
                            false,
                            None,
                            Some(status_message),
                            None,
                        );
                        // Add any invalid properties that might be included for extra information
                        unknown_err.property_name =
                            response_aio_data.remove(&UserProperty::InvalidPropertyName);
                        unknown_err.property_value = response_aio_data
                            .remove(&UserProperty::InvalidPropertyValue)
                            .map(Value::String);
                        return Err(unknown_err);
                    }
                },
                None => {
                    return Err(AIOProtocolError::new_header_missing_error(
                        &UserProperty::Status.to_string(),
                        false,
                        Some(format!(
                            "Response missing MQTT user property '{}'",
                            UserProperty::Status
                        )),
                        None,
                    ));
                }
            }
        };

        // Get HLC here since we will need it no matter what type of result we are processing
        let timestamp = response_aio_data
            .get(&UserProperty::Timestamp)
            .map(|s| HybridLogicalClock::from_str(s))
            .transpose()?;

        // Process result based on status code
        let command_result = match status_code {
            // Response with payload
            StatusCode::Ok | StatusCode::NoContent => {
                let content_type = publish_properties.content_type;
                let format_indicator = publish_properties.payload_format_indicator.try_into().unwrap_or_else(|e| {
                    log::error!("Received invalid payload format indicator: {e}. This should not be possible to receive from the broker. Using default.");
                    FormatIndicator::default()
                });

                if matches!(status_code, StatusCode::NoContent) && !value.payload.is_empty() {
                    return Err(AIOProtocolError::new_payload_invalid_error(
                        false,
                        false,
                        None,
                        Some("Status code 204 (No Content) should not have a payload".to_string()),
                        None,
                    ));
                }

                let payload = match TResp::deserialize(
                    &value.payload,
                    content_type.as_ref(),
                    &format_indicator,
                ) {
                    Ok(payload) => payload,
                    Err(DeserializationError::InvalidPayload(e)) => {
                        return Err(AIOProtocolError::new_payload_invalid_error(
                            false,
                            false,
                            Some(e.into()),
                            None,
                            None,
                        ));
                    }
                    Err(DeserializationError::UnsupportedContentType(message)) => {
                        return Err(AIOProtocolError::new_header_invalid_error(
                            "Content Type",
                            &content_type.unwrap_or("None".to_string()),
                            false,
                            Some(message),
                            None,
                        ));
                    }
                };

                Self::Ok(Response {
                    payload,
                    content_type,
                    format_indicator,
                    custom_user_data: response_custom_user_data,
                    timestamp,
                })
            }
            // RemoteError
            _ => Self::Err(RemoteError {
                status_code,
                protocol_version,
                status_message: response_aio_data.remove(&UserProperty::StatusMessage),
                is_application_error: response_aio_data
                    .get(&UserProperty::IsApplicationError)
                    .is_some_and(|v| v == "true"),
                invalid_property_name: response_aio_data.remove(&UserProperty::InvalidPropertyName),
                invalid_property_value: response_aio_data
                    .remove(&UserProperty::InvalidPropertyValue),
                timestamp,
                supported_protocol_major_versions: response_aio_data
                    .get(&UserProperty::SupportedMajorVersions)
                    .map(|s| parse_supported_protocol_major_versions(s)),
            }),
        };
        Ok(command_result)
    }