in dotnet/src/Azure.Iot.Operations.Services/StateStore/StateStoreClient.cs [53:130]
private async Task OnTelemetryReceived(MqttApplicationMessageReceivedEventArgs args)
{
string topic = args.ApplicationMessage.Topic;
Debug.Assert(_mqttClient != null);
// Note that the client Id is expected to be encoded as a hex string in this topic
if (MqttTopicProcessor.DoesTopicMatchFilter(topic, string.Format(NotificationsTopicFilter, _clientIdHexString)))
{
HybridLogicalClock? version = null;
if (args.ApplicationMessage == null || args.ApplicationMessage.Payload.IsEmpty)
{
Trace.TraceWarning("Received a message on the key-notify topic without any payload. Ignoring it.");
return;
}
if (args.ApplicationMessage.UserProperties != null)
{
foreach (MqttUserProperty userProperty in args.ApplicationMessage.UserProperties)
{
if (userProperty.Name.Equals("__ts"))
{
version = HybridLogicalClock.DecodeFromString(AkriSystemProperties.Timestamp, userProperty.Value);
break;
}
}
}
if (topic.Split('/').Length != 8)
{
Trace.TraceWarning("Received a message on the key-notify topic with an unexpected topic format. Ignoring it.");
return;
}
byte[] keyBeingNotified;
try
{
string lastTopicSegment = topic.Split('/')[7];
keyBeingNotified = Convert.FromHexString(lastTopicSegment);
}
catch (Exception ex)
{
Trace.TraceWarning("Received a message on the key-notify topic with an unexpected topic format. Could not decode the key that was notified. Ignoring it.", ex);
return;
}
if (version == null)
{
Trace.TraceWarning("Received a message on the key-notify topic without a timestamp. Ignoring it");
return;
}
StateStoreKeyNotification notification;
try
{
if (args.ApplicationMessage.Payload.IsEmpty)
{
Trace.TraceWarning("Received a message on the key-notify topic with no payload. Ignoring it.");
return;
}
notification = StateStorePayloadParser.ParseKeyNotification(args.ApplicationMessage.Payload.ToArray(), keyBeingNotified, version);
}
catch (Exception ex)
{
Trace.TraceWarning("Received a message on the key-notify topic with an unexpected payload format. Ignoring it.", ex);
return;
}
var keyChangeArgs = new KeyChangeMessageReceivedEventArgs(notification.Key, notification.KeyState, version)
{
NewValue = notification.Value
};
if (KeyChangeMessageReceivedAsync != null)
{
await KeyChangeMessageReceivedAsync.Invoke(this, keyChangeArgs);
}
}
}