in rust/azure_iot_operations_protocol/src/telemetry/receiver.rs [231:322]
fn try_from(value: Publish) -> Result<Message<T>, Self::Error> {
// NOTE: User properties are parsed out into a new HashMap because:
// 1) It makes the code more readable/maintanable to do HashMap lookups
// 2) When this logic is extracted to a ChunkBuffer, it will be more memory efficient as
// we won't want to keep entire copies of all Publishes, so we will just copy the
// properties once.
let publish_properties = value.properties.ok_or("Publish contains no properties")?;
// Parse user properties
let expected_aio_properties = [
UserProperty::Timestamp,
UserProperty::ProtocolVersion,
UserProperty::SourceId,
];
let mut telemetry_custom_user_data = vec![];
let mut telemetry_aio_data = HashMap::new();
for (key, value) in publish_properties.user_properties {
match UserProperty::from_str(&key) {
Ok(p) if expected_aio_properties.contains(&p) => {
telemetry_aio_data.insert(p, value);
}
Ok(_) => {
log::warn!(
"Telemetry should not contain MQTT user property '{key}'. Value is '{value}'"
);
telemetry_custom_user_data.push((key, value));
}
Err(()) => {
telemetry_custom_user_data.push((key, value));
}
}
}
// Check the protocol version.
// If the protocol version is not supported, or cannot be parsed, all bets are off
// regarding what anything else even means, so this *must* be done first
let protocol_version = {
match telemetry_aio_data.get(&UserProperty::ProtocolVersion) {
Some(protocol_version) => {
if let Some(version) = ProtocolVersion::parse_protocol_version(protocol_version)
{
version
} else {
return Err(format!(
"Received a telemetry with an unparsable protocol version number: {protocol_version}"
));
}
}
None => DEFAULT_TELEMETRY_PROTOCOL_VERSION,
}
};
if !protocol_version.is_supported(SUPPORTED_PROTOCOL_VERSIONS) {
return Err(format!(
"Unsupported protocol version '{protocol_version}'. Only major protocol versions '{SUPPORTED_PROTOCOL_VERSIONS:?}' are supported"
));
}
// Format HLC timestamp
let timestamp = telemetry_aio_data
.get(&UserProperty::Timestamp)
.map(|s| HybridLogicalClock::from_str(s))
.transpose()
.map_err(|e| e.to_string())?;
// Parse topic
let topic = std::str::from_utf8(&value.topic)
.map_err(|e| e.to_string())?
.to_string();
// Deserialize payload
let format_indicator = publish_properties.payload_format_indicator.try_into().unwrap_or_else(|e| {
log::error!("Received invalid payload format indicator: {e}. This should not be possible to receive from the broker. Using default.");
FormatIndicator::default()
});
let content_type = publish_properties.content_type;
let payload = T::deserialize(&value.payload, content_type.as_ref(), &format_indicator)
.map_err(|e| format!("{e:?}"))?;
let telemetry_message = Message {
payload,
content_type,
format_indicator,
custom_user_data: telemetry_custom_user_data,
sender_id: telemetry_aio_data.remove(&UserProperty::SourceId),
timestamp,
// NOTE: Topic Tokens cannot be created from just a Publish, they need additional information
topic_tokens: HashMap::default(),
topic,
};
Ok(telemetry_message)
}