in iothub/service/src/Messaging/Models/MessageConverter.cs [29:119]
public static void UpdateMessageHeaderAndProperties(AmqpMessage amqpMessage, Message data)
{
Fx.AssertAndThrow(amqpMessage.DeliveryTag != null, "AmqpMessage should always contain delivery tag.");
data.DeliveryTag = amqpMessage.DeliveryTag;
SectionFlag sections = amqpMessage.Sections;
if ((sections & SectionFlag.Properties) != 0)
{
// Extract only the Properties that we support
data.MessageId = amqpMessage.Properties.MessageId?.ToString();
data.To = amqpMessage.Properties.To?.ToString();
if (amqpMessage.Properties.AbsoluteExpiryTime.HasValue)
{
data.ExpiryTimeUtc = amqpMessage.Properties.AbsoluteExpiryTime.Value;
}
data.CorrelationId = amqpMessage.Properties.CorrelationId?.ToString();
data.UserId = amqpMessage.Properties.UserId.Array != null ? Encoding.UTF8.GetString(amqpMessage.Properties.UserId.Array) : null;
if (!string.IsNullOrWhiteSpace(amqpMessage.Properties.ContentType.Value))
{
data.ContentType = amqpMessage.Properties.ContentType.Value;
}
if (!string.IsNullOrWhiteSpace(amqpMessage.Properties.ContentEncoding.Value))
{
data.ContentEncoding = amqpMessage.Properties.ContentEncoding.Value;
}
}
if ((sections & SectionFlag.MessageAnnotations) != 0)
{
if (amqpMessage.MessageAnnotations.Map.TryGetValue(LockTokenName, out string lockToken))
{
data.LockToken = lockToken;
}
if (amqpMessage.MessageAnnotations.Map.TryGetValue(SequenceNumberName, out ulong sequenceNumber))
{
data.SequenceNumber = sequenceNumber;
}
if (amqpMessage.MessageAnnotations.Map.TryGetValue(MessageSystemPropertyNames.EnqueuedTime, out DateTime enqueuedTime))
{
data.EnqueuedTimeUtc = enqueuedTime;
}
if (amqpMessage.MessageAnnotations.Map.TryGetValue(MessageSystemPropertyNames.DeliveryCount, out byte deliveryCount))
{
data.DeliveryCount = deliveryCount;
}
}
if ((sections & SectionFlag.ApplicationProperties) != 0)
{
foreach (KeyValuePair<MapKey, object> pair in amqpMessage.ApplicationProperties.Map)
{
if (TryGetNetObjectFromAmqpObject(pair.Value, MappingType.ApplicationProperty, out object netObject))
{
if (netObject is string stringObject)
{
switch (pair.Key.ToString())
{
case MessageSystemPropertyNames.Operation:
data.SystemProperties[pair.Key.ToString()] = stringObject;
break;
case MessageSystemPropertyNames.MessageSchema:
data.MessageSchema = stringObject;
break;
case MessageSystemPropertyNames.CreationTimeUtc:
data.CreationTimeUtc = DateTime.Parse(stringObject, CultureInfo.InvariantCulture);
break;
default:
data.Properties[pair.Key.ToString()] = stringObject;
break;
}
}
else
{
// TODO: RDBug 4093369 Handling of non-string property values in Amqp messages
// Drop non-string properties and log an error
Fx.Exception.TraceHandled(new InvalidDataException("IotHub does not accept non-string Amqp properties"), "MessageConverter.UpdateMessageHeaderAndProperties");
}
}
}
}
}