in dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandExecutor.cs [111:289]
private async Task MessageReceivedCallbackAsync(MqttApplicationMessageReceivedEventArgs args)
{
string requestTopicFilter = GetCommandTopic(null);
if (MqttTopicProcessor.DoesTopicMatchFilter(args.ApplicationMessage.Topic, requestTopicFilter))
{
args.AutoAcknowledge = false;
DateTime messageReceivedTime = WallClock.UtcNow;
// MessageExpiryInterval is required; if it is missing, this.ExecutionTimeout is substituted as a fail-safe value when sending the error response.
TimeSpan commandTimeout = args.ApplicationMessage.MessageExpiryInterval != default ? TimeSpan.FromSeconds(args.ApplicationMessage.MessageExpiryInterval) : ExecutionTimeout;
DateTime commandExpirationTime = messageReceivedTime + commandTimeout;
DateTime ttl = messageReceivedTime + CacheTtl;
string? requestedProtocolVersion = args.ApplicationMessage.UserProperties?.FirstOrDefault(p => p.Name == AkriSystemProperties.ProtocolVersion)?.Value;
if (!TryValidateRequestHeaders(args.ApplicationMessage, out CommandStatusCode? status, out string? statusMessage, out string? invalidPropertyName, out string? invalidPropertyValue))
{
Trace.TraceWarning($"Command '{_commandName}' header validation failed. Status message: {statusMessage}");
await GetDispatcher()(
status != null ? async () => { await GenerateAndPublishResponseAsync(commandExpirationTime, args.ApplicationMessage.ResponseTopic!, args.ApplicationMessage.CorrelationData!, (CommandStatusCode)status, statusMessage, null, null, false, invalidPropertyName, invalidPropertyValue, requestedProtocolVersion).ConfigureAwait(false); }
: null,
async () => { await args.AcknowledgeAsync(CancellationToken.None).ConfigureAwait(false); }).ConfigureAwait(false);
return;
}
// This validation is handled above, so assume a response topic is provided beyond this point.
Debug.Assert(args.ApplicationMessage.ResponseTopic != null);
Debug.Assert(args.ApplicationMessage.CorrelationData != null);
string? clientId = _mqttClient.ClientId;
Debug.Assert(!string.IsNullOrEmpty(clientId));
string executorId = ExecutorId ?? clientId;
bool isExecutorSpecific = args.ApplicationMessage.Topic.Contains(executorId);
string sourceId = args.ApplicationMessage.UserProperties?.FirstOrDefault(p => p.Name == AkriSystemProperties.SourceId)?.Value ?? string.Empty;
Task<MqttApplicationMessage>? cachedResponse =
await _commandResponseCache.RetrieveAsync(
_commandName,
sourceId,
args.ApplicationMessage.ResponseTopic,
args.ApplicationMessage.CorrelationData,
args.ApplicationMessage.Payload,
isCacheable: CacheTtl > TimeSpan.Zero,
canReuseAcrossInvokers: !isExecutorSpecific)
.ConfigureAwait(false);
if (cachedResponse != null)
{
Trace.TraceInformation($"Command '{_commandName}' has a cached response. Will use cached response instead of executing the command again.");
await GetDispatcher()(
async () =>
{
MqttApplicationMessage cachedMessage = await cachedResponse.ConfigureAwait(false);
await GenerateAndPublishResponse(commandExpirationTime, args.ApplicationMessage.ResponseTopic, args.ApplicationMessage.CorrelationData, cachedMessage.Payload, cachedMessage.UserProperties, cachedMessage.ContentType, (int)cachedMessage.PayloadFormatIndicator).ConfigureAwait(false);
},
async () => { await args.AcknowledgeAsync(CancellationToken.None).ConfigureAwait(false); }).ConfigureAwait(false);
return;
}
TReq request;
CommandRequestMetadata requestMetadata;
try
{
requestMetadata = new CommandRequestMetadata(args.ApplicationMessage, RequestTopicPattern)
{
ContentType = args.ApplicationMessage.ContentType,
PayloadFormatIndicator = args.ApplicationMessage.PayloadFormatIndicator,
};
request = _serializer.FromBytes<TReq>(args.ApplicationMessage.Payload, requestMetadata.ContentType, requestMetadata.PayloadFormatIndicator);
// Update application HLC against received timestamp
if (requestMetadata.Timestamp != null)
{
await _applicationContext.ApplicationHlc.UpdateWithOtherAsync(requestMetadata.Timestamp);
}
else
{
Trace.TraceInformation($"No timestamp present in command request metadata.");
}
}
catch (Exception ex)
{
Trace.TraceWarning($"Command '{_commandName}' invocation failed during response message contruction. Error message: {ex.Message}");
AkriMqttException? amex = ex as AkriMqttException;
CommandStatusCode statusCode = amex != null ? ErrorKindToStatusCode(amex.Kind) : CommandStatusCode.InternalServerError;
if (amex != null
&& amex.Kind == AkriMqttErrorKind.HeaderInvalid
&& amex.HeaderName != null
&& amex.HeaderName.Equals("Content Type", StringComparison.Ordinal))
{
statusCode = CommandStatusCode.UnsupportedMediaType;
}
await GetDispatcher()(
async () => { await GenerateAndPublishResponseAsync(commandExpirationTime, args.ApplicationMessage.ResponseTopic, args.ApplicationMessage.CorrelationData, statusCode, ex.Message, null, null, false, amex?.HeaderName, amex?.HeaderValue, requestedProtocolVersion).ConfigureAwait(false); },
async () => { await args.AcknowledgeAsync(CancellationToken.None).ConfigureAwait(false); }).ConfigureAwait(false);
return;
}
ExtendedRequest<TReq> extendedRequest = new() { Request = request, RequestMetadata = requestMetadata };
async Task CmdFunc()
{
DateTime executionStartTime = WallClock.UtcNow;
TimeSpan startupDelay = executionStartTime - messageReceivedTime;
TimeSpan remainingCommandTimeout = commandTimeout - startupDelay;
TimeSpan cancellationTimeout = remainingCommandTimeout < ExecutionTimeout ? remainingCommandTimeout : ExecutionTimeout;
using CancellationTokenSource commandCts = WallClock.CreateCancellationTokenSource(cancellationTimeout);
try
{
ExtendedResponse<TResp> extended = await Task.Run(() => OnCommandReceived(extendedRequest, commandCts.Token)).WaitAsync(ExecutionTimeout).ConfigureAwait(false);
var serializedPayloadContext = _serializer.ToBytes(extended.Response);
MqttApplicationMessage? responseMessage = await GenerateResponseAsync(commandExpirationTime, args.ApplicationMessage.ResponseTopic, args.ApplicationMessage.CorrelationData, !serializedPayloadContext.SerializedPayload.IsEmpty ? CommandStatusCode.OK : CommandStatusCode.NoContent, null, serializedPayloadContext, extended.ResponseMetadata);
await _commandResponseCache.StoreAsync(
_commandName,
sourceId,
args.ApplicationMessage.ResponseTopic,
args.ApplicationMessage.CorrelationData,
args.ApplicationMessage.Payload,
responseMessage,
IsIdempotent,
commandExpirationTime,
WallClock.UtcNow - executionStartTime).ConfigureAwait(false);
await PublishResponseAsync(args.ApplicationMessage.ResponseTopic, args.ApplicationMessage.CorrelationData, responseMessage);
}
catch (Exception ex)
{
CommandStatusCode statusCode;
string? statusMessage;
bool isAppError;
switch (ex)
{
case OperationCanceledException:
case TimeoutException:
statusCode = CommandStatusCode.RequestTimeout;
statusMessage = $"Executor timed out after {cancellationTimeout.TotalSeconds} seconds.";
isAppError = false;
invalidPropertyName = nameof(ExecutionTimeout);
invalidPropertyValue = XmlConvert.ToString(ExecutionTimeout);
Trace.TraceWarning($"Command '{_commandName}' execution timed out after {cancellationTimeout.TotalSeconds} seconds.");
break;
case AkriMqttException amex:
statusCode = CommandStatusCode.InternalServerError;
statusMessage = amex.Message;
isAppError = true;
invalidPropertyName = amex?.HeaderName ?? amex?.PropertyName;
invalidPropertyValue = amex?.HeaderValue ?? amex?.PropertyValue?.ToString();
Trace.TraceWarning($"Command '{_commandName}' execution failed due to Akri Mqtt error: {amex}.");
break;
default:
statusCode = CommandStatusCode.InternalServerError;
statusMessage = ex.Message;
isAppError = true;
invalidPropertyName = null;
invalidPropertyValue = null;
Trace.TraceWarning($"Command '{_commandName}' execution failed due to error: {ex}.");
break;
}
await GenerateAndPublishResponseAsync(commandExpirationTime, args.ApplicationMessage.ResponseTopic, args.ApplicationMessage.CorrelationData, statusCode, statusMessage, null, null, isAppError, invalidPropertyName, invalidPropertyValue, requestedProtocolVersion);
}
finally
{
commandCts.Dispose();
}
}
await GetDispatcher()(CmdFunc, async () => { await args.AcknowledgeAsync(CancellationToken.None).ConfigureAwait(false); }).ConfigureAwait(false);
}
}