private async Task MessageReceivedCallbackAsync()

in dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandExecutor.cs [111:289]


        private async Task MessageReceivedCallbackAsync(MqttApplicationMessageReceivedEventArgs args)
        {
            string requestTopicFilter = GetCommandTopic(null);

            if (MqttTopicProcessor.DoesTopicMatchFilter(args.ApplicationMessage.Topic, requestTopicFilter))
            {
                args.AutoAcknowledge = false;

                DateTime messageReceivedTime = WallClock.UtcNow;

                // MessageExpiryInterval is required; if it is missing, this.ExecutionTimeout is substituted as a fail-safe value when sending the error response.
                TimeSpan commandTimeout = args.ApplicationMessage.MessageExpiryInterval != default ? TimeSpan.FromSeconds(args.ApplicationMessage.MessageExpiryInterval) : ExecutionTimeout;
                DateTime commandExpirationTime = messageReceivedTime + commandTimeout;
                DateTime ttl = messageReceivedTime + CacheTtl;

                string? requestedProtocolVersion = args.ApplicationMessage.UserProperties?.FirstOrDefault(p => p.Name == AkriSystemProperties.ProtocolVersion)?.Value;
                if (!TryValidateRequestHeaders(args.ApplicationMessage, out CommandStatusCode? status, out string? statusMessage, out string? invalidPropertyName, out string? invalidPropertyValue))
                {
                    Trace.TraceWarning($"Command '{_commandName}' header validation failed. Status message: {statusMessage}");

                    await GetDispatcher()(
                        status != null ? async () => { await GenerateAndPublishResponseAsync(commandExpirationTime, args.ApplicationMessage.ResponseTopic!, args.ApplicationMessage.CorrelationData!, (CommandStatusCode)status, statusMessage, null, null, false, invalidPropertyName, invalidPropertyValue, requestedProtocolVersion).ConfigureAwait(false); }
                    : null,
                        async () => { await args.AcknowledgeAsync(CancellationToken.None).ConfigureAwait(false); }).ConfigureAwait(false);
                    return;
                }

                // This validation is handled above, so assume a response topic is provided beyond this point.
                Debug.Assert(args.ApplicationMessage.ResponseTopic != null);
                Debug.Assert(args.ApplicationMessage.CorrelationData != null);

                string? clientId = _mqttClient.ClientId;
                Debug.Assert(!string.IsNullOrEmpty(clientId));
                string executorId = ExecutorId ?? clientId;
                bool isExecutorSpecific = args.ApplicationMessage.Topic.Contains(executorId);
                string sourceId = args.ApplicationMessage.UserProperties?.FirstOrDefault(p => p.Name == AkriSystemProperties.SourceId)?.Value ?? string.Empty;

                Task<MqttApplicationMessage>? cachedResponse =
                    await _commandResponseCache.RetrieveAsync(
                        _commandName,
                        sourceId,
                        args.ApplicationMessage.ResponseTopic,
                        args.ApplicationMessage.CorrelationData,
                        args.ApplicationMessage.Payload,
                        isCacheable: CacheTtl > TimeSpan.Zero,
                        canReuseAcrossInvokers: !isExecutorSpecific)
                    .ConfigureAwait(false);

                if (cachedResponse != null)
                {
                    Trace.TraceInformation($"Command '{_commandName}' has a cached response. Will use cached response instead of executing the command again.");

                    await GetDispatcher()(
                        async () =>
                        {
                            MqttApplicationMessage cachedMessage = await cachedResponse.ConfigureAwait(false);
                            await GenerateAndPublishResponse(commandExpirationTime, args.ApplicationMessage.ResponseTopic, args.ApplicationMessage.CorrelationData, cachedMessage.Payload, cachedMessage.UserProperties, cachedMessage.ContentType, (int)cachedMessage.PayloadFormatIndicator).ConfigureAwait(false);
                        },
                        async () => { await args.AcknowledgeAsync(CancellationToken.None).ConfigureAwait(false); }).ConfigureAwait(false);

                    return;
                }

                TReq request;
                CommandRequestMetadata requestMetadata;
                try
                {
                    requestMetadata = new CommandRequestMetadata(args.ApplicationMessage, RequestTopicPattern)
                    {
                        ContentType = args.ApplicationMessage.ContentType,
                        PayloadFormatIndicator = args.ApplicationMessage.PayloadFormatIndicator,
                    };
                    request = _serializer.FromBytes<TReq>(args.ApplicationMessage.Payload, requestMetadata.ContentType, requestMetadata.PayloadFormatIndicator);
                    // Update application HLC against received timestamp
                    if (requestMetadata.Timestamp != null)
                    {
                        await _applicationContext.ApplicationHlc.UpdateWithOtherAsync(requestMetadata.Timestamp);
                    }
                    else
                    {
                        Trace.TraceInformation($"No timestamp present in command request metadata.");
                    }
                }
                catch (Exception ex)
                {
                    Trace.TraceWarning($"Command '{_commandName}' invocation failed during response message contruction. Error message: {ex.Message}");
                    AkriMqttException? amex = ex as AkriMqttException;
                    CommandStatusCode statusCode = amex != null ? ErrorKindToStatusCode(amex.Kind) : CommandStatusCode.InternalServerError;

                    if (amex != null
                        && amex.Kind == AkriMqttErrorKind.HeaderInvalid
                        && amex.HeaderName != null
                        && amex.HeaderName.Equals("Content Type", StringComparison.Ordinal))
                    {
                        statusCode = CommandStatusCode.UnsupportedMediaType;
                    }

                    await GetDispatcher()(
                        async () => { await GenerateAndPublishResponseAsync(commandExpirationTime, args.ApplicationMessage.ResponseTopic, args.ApplicationMessage.CorrelationData, statusCode, ex.Message, null, null, false, amex?.HeaderName, amex?.HeaderValue, requestedProtocolVersion).ConfigureAwait(false); },
                        async () => { await args.AcknowledgeAsync(CancellationToken.None).ConfigureAwait(false); }).ConfigureAwait(false);

                    return;
                }

                ExtendedRequest<TReq> extendedRequest = new() { Request = request, RequestMetadata = requestMetadata };

                async Task CmdFunc()
                {
                    DateTime executionStartTime = WallClock.UtcNow;
                    TimeSpan startupDelay = executionStartTime - messageReceivedTime;
                    TimeSpan remainingCommandTimeout = commandTimeout - startupDelay;
                    TimeSpan cancellationTimeout = remainingCommandTimeout < ExecutionTimeout ? remainingCommandTimeout : ExecutionTimeout;
                    using CancellationTokenSource commandCts = WallClock.CreateCancellationTokenSource(cancellationTimeout);

                    try
                    {
                        ExtendedResponse<TResp> extended = await Task.Run(() => OnCommandReceived(extendedRequest, commandCts.Token)).WaitAsync(ExecutionTimeout).ConfigureAwait(false);

                        var serializedPayloadContext = _serializer.ToBytes(extended.Response);

                        MqttApplicationMessage? responseMessage = await GenerateResponseAsync(commandExpirationTime, args.ApplicationMessage.ResponseTopic, args.ApplicationMessage.CorrelationData, !serializedPayloadContext.SerializedPayload.IsEmpty ? CommandStatusCode.OK : CommandStatusCode.NoContent, null, serializedPayloadContext, extended.ResponseMetadata);
                        await _commandResponseCache.StoreAsync(
                            _commandName,
                            sourceId,
                            args.ApplicationMessage.ResponseTopic,
                            args.ApplicationMessage.CorrelationData,
                            args.ApplicationMessage.Payload,
                            responseMessage,
                            IsIdempotent,
                            commandExpirationTime,
                            WallClock.UtcNow - executionStartTime).ConfigureAwait(false);

                        await PublishResponseAsync(args.ApplicationMessage.ResponseTopic, args.ApplicationMessage.CorrelationData, responseMessage);
                    }
                    catch (Exception ex)
                    {
                        CommandStatusCode statusCode;
                        string? statusMessage;
                        bool isAppError;
                        switch (ex)
                        {
                            case OperationCanceledException:
                            case TimeoutException:
                                statusCode = CommandStatusCode.RequestTimeout;
                                statusMessage = $"Executor timed out after {cancellationTimeout.TotalSeconds} seconds.";
                                isAppError = false;
                                invalidPropertyName = nameof(ExecutionTimeout);
                                invalidPropertyValue = XmlConvert.ToString(ExecutionTimeout);
                                Trace.TraceWarning($"Command '{_commandName}' execution timed out after {cancellationTimeout.TotalSeconds} seconds.");
                                break;
                            case AkriMqttException amex:
                                statusCode = CommandStatusCode.InternalServerError;
                                statusMessage = amex.Message;
                                isAppError = true;
                                invalidPropertyName = amex?.HeaderName ?? amex?.PropertyName;
                                invalidPropertyValue = amex?.HeaderValue ?? amex?.PropertyValue?.ToString();
                                Trace.TraceWarning($"Command '{_commandName}' execution failed due to Akri Mqtt error: {amex}.");
                                break;
                            default:
                                statusCode = CommandStatusCode.InternalServerError;
                                statusMessage = ex.Message;
                                isAppError = true;
                                invalidPropertyName = null;
                                invalidPropertyValue = null;
                                Trace.TraceWarning($"Command '{_commandName}' execution failed due to error: {ex}.");
                                break;
                        }

                        await GenerateAndPublishResponseAsync(commandExpirationTime, args.ApplicationMessage.ResponseTopic, args.ApplicationMessage.CorrelationData, statusCode, statusMessage, null, null, isAppError, invalidPropertyName, invalidPropertyValue, requestedProtocolVersion);
                    }
                    finally
                    {
                        commandCts.Dispose();
                    }
                }

                await GetDispatcher()(CmdFunc, async () => { await args.AcknowledgeAsync(CancellationToken.None).ConfigureAwait(false); }).ConfigureAwait(false);
            }
        }