in dotnet/src/Azure.Iot.Operations.Protocol/RPC/CommandInvoker.cs [221:379]
private async Task MessageReceivedCallbackAsync(MqttApplicationMessageReceivedEventArgs args)
{
if (args.ApplicationMessage.CorrelationData != null && GuidExtensions.TryParseBytes(args.ApplicationMessage.CorrelationData, out Guid? requestGuid))
{
string requestGuidString = requestGuid!.Value.ToString();
ResponsePromise? responsePromise;
lock (_requestIdMapLock)
{
if (!_requestIdMap.TryGetValue(requestGuidString, out responsePromise))
{
return;
}
}
args.AutoAcknowledge = true;
if (MqttTopicProcessor.DoesTopicMatchFilter(args.ApplicationMessage.Topic, responsePromise.ResponseTopic))
{
// Assume a protocol version of 1.0 if no protocol version was specified
string? responseProtocolVersion = args.ApplicationMessage.UserProperties?.FirstOrDefault(p => p.Name == AkriSystemProperties.ProtocolVersion)?.Value;
if (!ProtocolVersion.TryParseProtocolVersion(responseProtocolVersion, out ProtocolVersion? protocolVersion))
{
AkriMqttException akriException = new($"Received a response with an unparsable protocol version number: {responseProtocolVersion}")
{
Kind = AkriMqttErrorKind.UnsupportedVersion,
IsShallow = false,
IsRemote = false,
CommandName = _commandName,
CorrelationId = requestGuid,
SupportedMajorProtocolVersions = _supportedMajorProtocolVersions,
ProtocolVersion = responseProtocolVersion,
};
SetExceptionSafe(responsePromise.CompletionSource, akriException);
return;
}
if (!_supportedMajorProtocolVersions.Contains(protocolVersion!.MajorVersion))
{
AkriMqttException akriException = new($"Received a response with an unsupported protocol version number: {responseProtocolVersion}")
{
Kind = AkriMqttErrorKind.UnsupportedVersion,
IsShallow = false,
IsRemote = false,
CommandName = _commandName,
CorrelationId = requestGuid,
SupportedMajorProtocolVersions = _supportedMajorProtocolVersions,
ProtocolVersion = responseProtocolVersion,
};
SetExceptionSafe(responsePromise.CompletionSource, akriException);
return;
}
MqttUserProperty? statusProperty = args.ApplicationMessage.UserProperties?.FirstOrDefault(p => p.Name == AkriSystemProperties.Status);
if (!TryValidateResponseHeaders(statusProperty, requestGuidString, out AkriMqttErrorKind errorKind, out string message, out string? headerName, out string? headerValue))
{
AkriMqttException akriException = new(message)
{
Kind = errorKind,
IsShallow = false,
IsRemote = false,
HeaderName = headerName,
HeaderValue = headerValue,
CommandName = _commandName,
CorrelationId = requestGuid,
};
SetExceptionSafe(responsePromise.CompletionSource, akriException);
return;
}
int statusCode = int.Parse(statusProperty!.Value, CultureInfo.InvariantCulture);
if (statusCode is not ((int)CommandStatusCode.OK) and not ((int)CommandStatusCode.NoContent))
{
MqttUserProperty? invalidNameProperty = args.ApplicationMessage.UserProperties?.FirstOrDefault(p => p.Name == AkriSystemProperties.InvalidPropertyName);
MqttUserProperty? invalidValueProperty = args.ApplicationMessage.UserProperties?.FirstOrDefault(p => p.Name == AkriSystemProperties.InvalidPropertyValue);
bool isApplicationError = (args.ApplicationMessage.UserProperties?.TryGetProperty(AkriSystemProperties.IsApplicationError, out string? isAppError) ?? false) && isAppError?.ToLower(CultureInfo.InvariantCulture) != "false";
string? statusMessage = args.ApplicationMessage.UserProperties?.FirstOrDefault(p => p.Name == AkriSystemProperties.StatusMessage)?.Value;
errorKind = StatusCodeToErrorKind((CommandStatusCode)statusCode, isApplicationError, invalidNameProperty != null, invalidValueProperty != null);
AkriMqttException akriException = new(statusMessage ?? "Error condition identified by remote service")
{
Kind = errorKind,
IsShallow = false,
IsRemote = true,
HeaderName = UseHeaderFields(errorKind) ? invalidNameProperty?.Value : null,
HeaderValue = UseHeaderFields(errorKind) ? invalidValueProperty?.Value : null,
PropertyName = UsePropertyFields(errorKind) ? invalidNameProperty?.Value : null,
PropertyValue = UsePropertyFields(errorKind) ? invalidValueProperty?.Value : null,
TimeoutName = UseTimeoutFields(errorKind) ? invalidNameProperty?.Value : null,
TimeoutValue = UseTimeoutFields(errorKind) ? GetAsTimeSpan(invalidValueProperty?.Value) : null,
CommandName = _commandName,
CorrelationId = requestGuid,
};
if (errorKind == AkriMqttErrorKind.UnsupportedVersion)
{
MqttUserProperty? supportedMajorVersions = args.ApplicationMessage.UserProperties?.FirstOrDefault(p => p.Name == AkriSystemProperties.SupportedMajorProtocolVersions);
MqttUserProperty? requestProtocolVersion = args.ApplicationMessage.UserProperties?.FirstOrDefault(p => p.Name == AkriSystemProperties.RequestedProtocolVersion);
if (requestProtocolVersion != null)
{
akriException.ProtocolVersion = requestProtocolVersion.Value;
}
else
{
Trace.TraceWarning("Command executor failed to provide the request's protocol version");
}
if (supportedMajorVersions != null
&& ProtocolVersion.TryParseFromString(supportedMajorVersions!.Value, out int[]? versions))
{
akriException.SupportedMajorProtocolVersions = versions;
}
else
{
Trace.TraceWarning("Command executor failed to provide the supported major protocol versions");
}
}
SetExceptionSafe(responsePromise.CompletionSource, akriException);
return;
}
TResp response;
CommandResponseMetadata responseMetadata;
try
{
response = _serializer.FromBytes<TResp>(args.ApplicationMessage.Payload, args.ApplicationMessage.ContentType, args.ApplicationMessage.PayloadFormatIndicator);
responseMetadata = new CommandResponseMetadata(args.ApplicationMessage);
}
catch (Exception ex)
{
SetExceptionSafe(responsePromise.CompletionSource, ex);
return;
}
if (responseMetadata.Timestamp != null)
{
await _applicationContext.ApplicationHlc.UpdateWithOtherAsync(responseMetadata.Timestamp);
}
else
{
Trace.TraceInformation($"No timestamp present in command response metadata.");
}
ExtendedResponse<TResp> extendedResponse = new() { Response = response, ResponseMetadata = responseMetadata };
if (!responsePromise.CompletionSource.TrySetResult(extendedResponse))
{
Trace.TraceWarning("Failed to complete the command response promise. This may be because the operation was cancelled or finished with exception.");
}
}
}
return;
}