in iothub/device/src/Transport/AmqpIot/AmqpIotMessageConverter.cs [69:179]
public static void UpdateMessageHeaderAndProperties(AmqpMessage amqpMessage, Message message)
{
Fx.AssertAndThrow(amqpMessage.DeliveryTag != null, "AmqpMessage should always contain delivery tag.");
message.DeliveryTag = amqpMessage.DeliveryTag;
SectionFlag sections = amqpMessage.Sections;
if ((sections & SectionFlag.Properties) != 0)
{
// Extract only the Properties that we support
message.MessageId = amqpMessage.Properties.MessageId?.ToString();
message.To = amqpMessage.Properties.To?.ToString();
if (amqpMessage.Properties.AbsoluteExpiryTime.HasValue)
{
message.ExpiryTimeUtc = amqpMessage.Properties.AbsoluteExpiryTime.Value;
}
message.CorrelationId = amqpMessage.Properties.CorrelationId?.ToString();
if (!string.IsNullOrWhiteSpace(amqpMessage.Properties.ContentType.Value))
{
message.ContentType = amqpMessage.Properties.ContentType.Value;
}
if (!string.IsNullOrWhiteSpace(amqpMessage.Properties.ContentEncoding.Value))
{
message.ContentEncoding = amqpMessage.Properties.ContentEncoding.Value;
}
message.UserId = amqpMessage.Properties.UserId.Array == null
? null
: Encoding.UTF8.GetString(amqpMessage.Properties.UserId.Array, 0, amqpMessage.Properties.UserId.Array.Length);
}
if ((sections & SectionFlag.MessageAnnotations) != 0)
{
if (amqpMessage.MessageAnnotations.Map.TryGetValue(LockTokenName, out string lockToken))
{
message.LockToken = lockToken;
}
if (amqpMessage.MessageAnnotations.Map.TryGetValue(SequenceNumberName, out ulong sequenceNumber))
{
message.SequenceNumber = sequenceNumber;
}
if (amqpMessage.MessageAnnotations.Map.TryGetValue(MessageSystemPropertyNames.EnqueuedTime, out DateTime enqueuedTime))
{
message.EnqueuedTimeUtc = enqueuedTime;
}
if (amqpMessage.MessageAnnotations.Map.TryGetValue(MessageSystemPropertyNames.DeliveryCount, out byte deliveryCount))
{
message.DeliveryCount = deliveryCount;
}
if (amqpMessage.MessageAnnotations.Map.TryGetValue(InputName, out string inputName))
{
message.InputName = inputName;
}
if (amqpMessage.MessageAnnotations.Map.TryGetValue(MessageSystemPropertyNames.ConnectionDeviceId, out string connectionDeviceId))
{
message.ConnectionDeviceId = connectionDeviceId;
}
if (amqpMessage.MessageAnnotations.Map.TryGetValue(MessageSystemPropertyNames.ConnectionModuleId, out string connectionModuleId))
{
message.ConnectionModuleId = connectionModuleId;
}
}
if ((sections & SectionFlag.ApplicationProperties) != 0)
{
foreach (KeyValuePair<MapKey, object> pair in amqpMessage.ApplicationProperties.Map)
{
if (TryGetNetObjectFromAmqpObject(pair.Value, MappingType.ApplicationProperty, out object netObject))
{
string stringObject = netObject as string;
if (stringObject != null)
{
switch (pair.Key.ToString())
{
case MessageSystemPropertyNames.Operation:
message.SystemProperties[pair.Key.ToString()] = stringObject;
break;
case MessageSystemPropertyNames.MessageSchema:
message.MessageSchema = stringObject;
break;
case MessageSystemPropertyNames.CreationTimeUtc:
message.CreationTimeUtc = DateTime.Parse(stringObject, CultureInfo.InvariantCulture);
break;
default:
message.Properties[pair.Key.ToString()] = stringObject;
break;
}
}
else
{
// TODO: RDBug 4093369 Handling of non-string property values in AMQP messages
// Drop non-string properties and log an error
Fx.Exception.TraceHandled(new InvalidDataException("IotHub does not accept non-string Amqp properties"), "MessageConverter.UpdateMessageHeaderAndProperties");
}
}
}
}
}