fn try_from()

in rust/azure_iot_operations_protocol/src/telemetry/receiver.rs [231:322]


    fn try_from(value: Publish) -> Result<Message<T>, Self::Error> {
        // NOTE: User properties are parsed out into a new HashMap because:
        // 1) It makes the code more readable/maintanable to do HashMap lookups
        // 2) When this logic is extracted to a ChunkBuffer, it will be more memory efficient as
        //  we won't want to keep entire copies of all Publishes, so we will just copy the
        //  properties once.

        let publish_properties = value.properties.ok_or("Publish contains no properties")?;

        // Parse user properties
        let expected_aio_properties = [
            UserProperty::Timestamp,
            UserProperty::ProtocolVersion,
            UserProperty::SourceId,
        ];
        let mut telemetry_custom_user_data = vec![];
        let mut telemetry_aio_data = HashMap::new();
        for (key, value) in publish_properties.user_properties {
            match UserProperty::from_str(&key) {
                Ok(p) if expected_aio_properties.contains(&p) => {
                    telemetry_aio_data.insert(p, value);
                }
                Ok(_) => {
                    log::warn!(
                        "Telemetry should not contain MQTT user property '{key}'. Value is '{value}'"
                    );
                    telemetry_custom_user_data.push((key, value));
                }
                Err(()) => {
                    telemetry_custom_user_data.push((key, value));
                }
            }
        }

        // Check the protocol version.
        // If the protocol version is not supported, or cannot be parsed, all bets are off
        // regarding what anything else even means, so this *must* be done first
        let protocol_version = {
            match telemetry_aio_data.get(&UserProperty::ProtocolVersion) {
                Some(protocol_version) => {
                    if let Some(version) = ProtocolVersion::parse_protocol_version(protocol_version)
                    {
                        version
                    } else {
                        return Err(format!(
                            "Received a telemetry with an unparsable protocol version number: {protocol_version}"
                        ));
                    }
                }
                None => DEFAULT_TELEMETRY_PROTOCOL_VERSION,
            }
        };
        if !protocol_version.is_supported(SUPPORTED_PROTOCOL_VERSIONS) {
            return Err(format!(
                "Unsupported protocol version '{protocol_version}'. Only major protocol versions '{SUPPORTED_PROTOCOL_VERSIONS:?}' are supported"
            ));
        }

        // Format HLC timestamp
        let timestamp = telemetry_aio_data
            .get(&UserProperty::Timestamp)
            .map(|s| HybridLogicalClock::from_str(s))
            .transpose()
            .map_err(|e| e.to_string())?;

        // Parse topic
        let topic = std::str::from_utf8(&value.topic)
            .map_err(|e| e.to_string())?
            .to_string();

        // Deserialize payload
        let format_indicator = publish_properties.payload_format_indicator.try_into().unwrap_or_else(|e| {
            log::error!("Received invalid payload format indicator: {e}. This should not be possible to receive from the broker. Using default.");
            FormatIndicator::default()
        });
        let content_type = publish_properties.content_type;
        let payload = T::deserialize(&value.payload, content_type.as_ref(), &format_indicator)
            .map_err(|e| format!("{e:?}"))?;

        let telemetry_message = Message {
            payload,
            content_type,
            format_indicator,
            custom_user_data: telemetry_custom_user_data,
            sender_id: telemetry_aio_data.remove(&UserProperty::SourceId),
            timestamp,
            // NOTE: Topic Tokens cannot be created from just a Publish, they need additional information
            topic_tokens: HashMap::default(),
            topic,
        };
        Ok(telemetry_message)
    }