private async Task OnTelemetryReceived()

in dotnet/src/Azure.Iot.Operations.Services/StateStore/StateStoreClient.cs [53:130]


        private async Task OnTelemetryReceived(MqttApplicationMessageReceivedEventArgs args)
        {
            string topic = args.ApplicationMessage.Topic;
            Debug.Assert(_mqttClient != null);

            // Note that the client Id is expected to be encoded as a hex string in this topic
            if (MqttTopicProcessor.DoesTopicMatchFilter(topic, string.Format(NotificationsTopicFilter, _clientIdHexString)))
            {
                HybridLogicalClock? version = null;
                if (args.ApplicationMessage == null || args.ApplicationMessage.Payload.IsEmpty)
                {
                    Trace.TraceWarning("Received a message on the key-notify topic without any payload. Ignoring it.");
                    return;
                }

                if (args.ApplicationMessage.UserProperties != null)
                {
                    foreach (MqttUserProperty userProperty in args.ApplicationMessage.UserProperties)
                    {
                        if (userProperty.Name.Equals("__ts"))
                        {
                            version = HybridLogicalClock.DecodeFromString(AkriSystemProperties.Timestamp, userProperty.Value);
                            break;
                        }
                    }
                }

                if (topic.Split('/').Length != 8)
                {
                    Trace.TraceWarning("Received a message on the key-notify topic with an unexpected topic format. Ignoring it.");
                    return;
                }

                byte[] keyBeingNotified;
                try
                {
                    string lastTopicSegment = topic.Split('/')[7];
                    keyBeingNotified = Convert.FromHexString(lastTopicSegment);
                }
                catch (Exception ex)
                {
                    Trace.TraceWarning("Received a message on the key-notify topic with an unexpected topic format. Could not decode the key that was notified. Ignoring it.", ex);
                    return;
                }

                if (version == null)
                {
                    Trace.TraceWarning("Received a message on the key-notify topic without a timestamp. Ignoring it");
                    return;
                }

                StateStoreKeyNotification notification;
                try
                {
                    if (args.ApplicationMessage.Payload.IsEmpty)
                    {
                        Trace.TraceWarning("Received a message on the key-notify topic with no payload. Ignoring it.");
                        return;
                    }

                    notification = StateStorePayloadParser.ParseKeyNotification(args.ApplicationMessage.Payload.ToArray(), keyBeingNotified, version);
                }
                catch (Exception ex)
                {
                    Trace.TraceWarning("Received a message on the key-notify topic with an unexpected payload format. Ignoring it.", ex);
                    return;
                }

                var keyChangeArgs = new KeyChangeMessageReceivedEventArgs(notification.Key, notification.KeyState, version)
                {
                    NewValue = notification.Value
                };
                if (KeyChangeMessageReceivedAsync != null)
                {
                    await KeyChangeMessageReceivedAsync.Invoke(this, keyChangeArgs);
                }
            }
        }