in iothub/device/src/Transport/AmqpIot/AmqpIotReceivingLink.cs [250:323]
private void OnTwinChangesReceived(AmqpMessage amqpMessage)
{
if (Logging.IsEnabled)
Logging.Enter(this, amqpMessage, nameof(OnTwinChangesReceived));
try
{
_receivingAmqpLink.DisposeDelivery(amqpMessage, true, AmqpIotConstants.AcceptedOutcome);
string correlationId = amqpMessage.Properties?.CorrelationId?.ToString();
int status = GetStatus(amqpMessage);
Twin twin = null;
TwinCollection twinProperties = null;
if (status >= 400)
{
// Handle failures
if (correlationId.StartsWith(AmqpTwinMessageType.Get.ToString(), StringComparison.OrdinalIgnoreCase)
|| correlationId.StartsWith(AmqpTwinMessageType.Patch.ToString(), StringComparison.OrdinalIgnoreCase))
{
string error = null;
using var reader = new StreamReader(amqpMessage.BodyStream, System.Text.Encoding.UTF8);
error = reader.ReadToEnd();
// Retry for Http status code request timeout, Too many requests and server errors
var exception = new IotHubException(error, status >= 500 || status == 429 || status == 408);
_onTwinMessageReceived.Invoke(null, correlationId, null, exception);
}
}
else
{
if (correlationId == null)
{
// Here we are getting desired property update notifications and want to handle it first
using var reader = new StreamReader(amqpMessage.BodyStream, System.Text.Encoding.UTF8);
string patch = reader.ReadToEnd();
twinProperties = JsonConvert.DeserializeObject<TwinCollection>(patch, JsonSerializerSettingsInitializer.GetJsonSerializerSettings());
}
else if (correlationId.StartsWith(AmqpTwinMessageType.Get.ToString(), StringComparison.OrdinalIgnoreCase))
{
// This a response of a GET TWIN so return (set) the full twin
using var reader = new StreamReader(amqpMessage.BodyStream, System.Text.Encoding.UTF8);
string body = reader.ReadToEnd();
TwinProperties properties = JsonConvert.DeserializeObject<TwinProperties>(body, JsonSerializerSettingsInitializer.GetJsonSerializerSettings());
twin = new Twin(properties);
}
else if (correlationId.StartsWith(AmqpTwinMessageType.Patch.ToString(), StringComparison.OrdinalIgnoreCase))
{
// This can be used to coorelate success response with updating reported properties
// However currently we do not have it as request response style implementation
if (Logging.IsEnabled)
Logging.Info("Updated twin reported properties successfully", nameof(OnTwinChangesReceived));
}
else if (correlationId.StartsWith(AmqpTwinMessageType.Put.ToString(), StringComparison.OrdinalIgnoreCase))
{
// This is an acknowledgement received from service for subscribing to desired property updates
if (Logging.IsEnabled)
Logging.Info("Subscribed for twin successfully", nameof(OnTwinChangesReceived));
}
else
{
// This shouldn't happen
if (Logging.IsEnabled)
Logging.Info("Received a correlation Id for Twin operation that does not match Get, Patch or Put request", nameof(OnTwinChangesReceived));
}
_onTwinMessageReceived.Invoke(twin, correlationId, twinProperties, null);
}
}
finally
{
if (Logging.IsEnabled)
Logging.Exit(this, amqpMessage, nameof(OnTwinChangesReceived));
}
}