private async Task MessageReceivedCallbackAsync()

in dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandInvoker.cs [221:379]


        private async Task MessageReceivedCallbackAsync(MqttApplicationMessageReceivedEventArgs args)
        {
            if (args.ApplicationMessage.CorrelationData != null && GuidExtensions.TryParseBytes(args.ApplicationMessage.CorrelationData, out Guid? requestGuid))
            {
                string requestGuidString = requestGuid!.Value.ToString();
                ResponsePromise? responsePromise;
                lock (_requestIdMapLock)
                {
                    if (!_requestIdMap.TryGetValue(requestGuidString, out responsePromise))
                    {
                        return;
                    }
                }

                args.AutoAcknowledge = true;
                if (MqttTopicProcessor.DoesTopicMatchFilter(args.ApplicationMessage.Topic, responsePromise.ResponseTopic))
                {
                    // Assume a protocol version of 1.0 if no protocol version was specified
                    string? responseProtocolVersion = args.ApplicationMessage.UserProperties?.FirstOrDefault(p => p.Name == AkriSystemProperties.ProtocolVersion)?.Value;
                    if (!ProtocolVersion.TryParseProtocolVersion(responseProtocolVersion, out ProtocolVersion? protocolVersion))
                    {
                        AkriMqttException akriException = new($"Received a response with an unparsable protocol version number: {responseProtocolVersion}")
                        {
                            Kind = AkriMqttErrorKind.UnsupportedVersion,
                            IsShallow = false,
                            IsRemote = false,
                            CommandName = _commandName,
                            CorrelationId = requestGuid,
                            SupportedMajorProtocolVersions = _supportedMajorProtocolVersions,
                            ProtocolVersion = responseProtocolVersion,
                        };

                        SetExceptionSafe(responsePromise.CompletionSource, akriException);
                        return;
                    }

                    if (!_supportedMajorProtocolVersions.Contains(protocolVersion!.MajorVersion))
                    {
                        AkriMqttException akriException = new($"Received a response with an unsupported protocol version number: {responseProtocolVersion}")
                        {
                            Kind = AkriMqttErrorKind.UnsupportedVersion,
                            IsShallow = false,
                            IsRemote = false,
                            CommandName = _commandName,
                            CorrelationId = requestGuid,
                            SupportedMajorProtocolVersions = _supportedMajorProtocolVersions,
                            ProtocolVersion = responseProtocolVersion,
                        };

                        SetExceptionSafe(responsePromise.CompletionSource, akriException);
                        return;
                    }

                    MqttUserProperty? statusProperty = args.ApplicationMessage.UserProperties?.FirstOrDefault(p => p.Name == AkriSystemProperties.Status);

                    if (!TryValidateResponseHeaders(statusProperty, requestGuidString, out AkriMqttErrorKind errorKind, out string message, out string? headerName, out string? headerValue))
                    {
                        AkriMqttException akriException = new(message)
                        {
                            Kind = errorKind,
                            IsShallow = false,
                            IsRemote = false,
                            HeaderName = headerName,
                            HeaderValue = headerValue,
                            CommandName = _commandName,
                            CorrelationId = requestGuid,
                        };

                        SetExceptionSafe(responsePromise.CompletionSource, akriException);
                        return;
                    }

                    int statusCode = int.Parse(statusProperty!.Value, CultureInfo.InvariantCulture);

                    if (statusCode is not ((int)CommandStatusCode.OK) and not ((int)CommandStatusCode.NoContent))
                    {
                        MqttUserProperty? invalidNameProperty = args.ApplicationMessage.UserProperties?.FirstOrDefault(p => p.Name == AkriSystemProperties.InvalidPropertyName);
                        MqttUserProperty? invalidValueProperty = args.ApplicationMessage.UserProperties?.FirstOrDefault(p => p.Name == AkriSystemProperties.InvalidPropertyValue);
                        bool isApplicationError = (args.ApplicationMessage.UserProperties?.TryGetProperty(AkriSystemProperties.IsApplicationError, out string? isAppError) ?? false) && isAppError?.ToLower(CultureInfo.InvariantCulture) != "false";
                        string? statusMessage = args.ApplicationMessage.UserProperties?.FirstOrDefault(p => p.Name == AkriSystemProperties.StatusMessage)?.Value;

                        errorKind = StatusCodeToErrorKind((CommandStatusCode)statusCode, isApplicationError, invalidNameProperty != null, invalidValueProperty != null);
                        AkriMqttException akriException = new(statusMessage ?? "Error condition identified by remote service")
                        {
                            Kind = errorKind,
                            IsShallow = false,
                            IsRemote = true,
                            HeaderName = UseHeaderFields(errorKind) ? invalidNameProperty?.Value : null,
                            HeaderValue = UseHeaderFields(errorKind) ? invalidValueProperty?.Value : null,
                            PropertyName = UsePropertyFields(errorKind) ? invalidNameProperty?.Value : null,
                            PropertyValue = UsePropertyFields(errorKind) ? invalidValueProperty?.Value : null,
                            TimeoutName = UseTimeoutFields(errorKind) ? invalidNameProperty?.Value : null,
                            TimeoutValue = UseTimeoutFields(errorKind) ? GetAsTimeSpan(invalidValueProperty?.Value) : null,
                            CommandName = _commandName,
                            CorrelationId = requestGuid,
                        };

                        if (errorKind == AkriMqttErrorKind.UnsupportedVersion)
                        {
                            MqttUserProperty? supportedMajorVersions = args.ApplicationMessage.UserProperties?.FirstOrDefault(p => p.Name == AkriSystemProperties.SupportedMajorProtocolVersions);
                            MqttUserProperty? requestProtocolVersion = args.ApplicationMessage.UserProperties?.FirstOrDefault(p => p.Name == AkriSystemProperties.RequestedProtocolVersion);

                            if (requestProtocolVersion != null)
                            {
                                akriException.ProtocolVersion = requestProtocolVersion.Value;
                            }
                            else
                            {
                                Trace.TraceWarning("Command executor failed to provide the request's protocol version");
                            }

                            if (supportedMajorVersions != null
                                && ProtocolVersion.TryParseFromString(supportedMajorVersions!.Value, out int[]? versions))
                            {
                                akriException.SupportedMajorProtocolVersions = versions;
                            }
                            else
                            {
                                Trace.TraceWarning("Command executor failed to provide the supported major protocol versions");
                            }
                        }

                        SetExceptionSafe(responsePromise.CompletionSource, akriException);
                        return;
                    }

                    TResp response;
                    CommandResponseMetadata responseMetadata;
                    try
                    {
                        response = _serializer.FromBytes<TResp>(args.ApplicationMessage.Payload, args.ApplicationMessage.ContentType, args.ApplicationMessage.PayloadFormatIndicator);
                        responseMetadata = new CommandResponseMetadata(args.ApplicationMessage);
                    }
                    catch (Exception ex)
                    {
                        SetExceptionSafe(responsePromise.CompletionSource, ex);
                        return;
                    }

                    if (responseMetadata.Timestamp != null)
                    {
                        await _applicationContext.ApplicationHlc.UpdateWithOtherAsync(responseMetadata.Timestamp);
                    }
                    else
                    {
                        Trace.TraceInformation($"No timestamp present in command response metadata.");
                    }

                    ExtendedResponse<TResp> extendedResponse = new() { Response = response, ResponseMetadata = responseMetadata };

                    if (!responsePromise.CompletionSource.TrySetResult(extendedResponse))
                    {
                        Trace.TraceWarning("Failed to complete the command response promise. This may be because the operation was cancelled or finished with exception.");
                    }
                }
            }

            return;
        }