in sdk/core/Azure.Core.Amqp/src/Shared/AmqpAnnotatedMessageConverter.cs [65:271]
public static AmqpMessage ToAmqpMessage(AmqpAnnotatedMessage sourceMessage)
{
var message = sourceMessage switch
{
_ when sourceMessage.Body.TryGetData(out var dataBody) => AmqpMessage.Create(TranslateDataBody(dataBody!)),
_ when sourceMessage.Body.TryGetSequence(out var sequenceBody) => AmqpMessage.Create(TranslateSequenceBody(sequenceBody!)),
_ when sourceMessage.Body.TryGetValue(out var valueBody) => AmqpMessage.Create(TranslateValueBody(valueBody!)),
_ => AmqpMessage.Create(new Data { Value = new ArraySegment<byte>(Array.Empty<byte>()) })
};
// Header
if (sourceMessage.HasSection(AmqpMessageSection.Header))
{
if (sourceMessage.Header.DeliveryCount.HasValue)
{
message.Header.DeliveryCount = sourceMessage.Header.DeliveryCount;
}
if (sourceMessage.Header.Durable.HasValue)
{
message.Header.Durable = sourceMessage.Header.Durable;
}
if (sourceMessage.Header.Priority.HasValue)
{
message.Header.Priority = sourceMessage.Header.Priority;
}
if (sourceMessage.Header.FirstAcquirer.HasValue)
{
message.Header.FirstAcquirer = sourceMessage.Header.FirstAcquirer;
}
}
// Properties
if (sourceMessage.HasSection(AmqpMessageSection.Properties))
{
if (sourceMessage.Properties.AbsoluteExpiryTime.HasValue)
{
message.Properties.AbsoluteExpiryTime = sourceMessage.Properties.AbsoluteExpiryTime.Value.UtcDateTime;
}
if (!string.IsNullOrEmpty(sourceMessage.Properties.ContentEncoding))
{
message.Properties.ContentEncoding = sourceMessage.Properties.ContentEncoding;
}
if (!string.IsNullOrEmpty(sourceMessage.Properties.ContentType))
{
message.Properties.ContentType = sourceMessage.Properties.ContentType;
}
if (sourceMessage.Properties.CorrelationId.HasValue)
{
message.Properties.CorrelationId = sourceMessage.Properties.CorrelationId.Value.ToString();
}
if (sourceMessage.Properties.CreationTime.HasValue)
{
message.Properties.CreationTime = sourceMessage.Properties.CreationTime.Value.UtcDateTime;
}
if (!string.IsNullOrEmpty(sourceMessage.Properties.GroupId))
{
message.Properties.GroupId = sourceMessage.Properties.GroupId;
}
if (sourceMessage.Properties.GroupSequence.HasValue)
{
message.Properties.GroupSequence = sourceMessage.Properties.GroupSequence;
}
if (sourceMessage.Properties.MessageId.HasValue)
{
message.Properties.MessageId = sourceMessage.Properties.MessageId.Value.ToString();
}
if (sourceMessage.Properties.ReplyTo.HasValue)
{
message.Properties.ReplyTo = sourceMessage.Properties.ReplyTo.Value.ToString();
}
if (!string.IsNullOrEmpty(sourceMessage.Properties.ReplyToGroupId))
{
message.Properties.ReplyToGroupId = sourceMessage.Properties.ReplyToGroupId;
}
if (!string.IsNullOrEmpty(sourceMessage.Properties.Subject))
{
message.Properties.Subject = sourceMessage.Properties.Subject;
}
if (sourceMessage.Properties.To.HasValue)
{
message.Properties.To = sourceMessage.Properties.To.Value.ToString();
}
if (sourceMessage.Properties.UserId.HasValue)
{
if (MemoryMarshal.TryGetArray(sourceMessage.Properties.UserId.Value, out var segment))
{
message.Properties.UserId = segment;
}
else
{
message.Properties.UserId = new ArraySegment<byte>(sourceMessage.Properties.UserId.Value.ToArray());
}
}
}
// Application Properties
if ((sourceMessage.HasSection(AmqpMessageSection.ApplicationProperties)) && (sourceMessage.ApplicationProperties.Count > 0))
{
message.ApplicationProperties ??= new ApplicationProperties();
foreach (var pair in sourceMessage.ApplicationProperties)
{
if (TryCreateAmqpPropertyValueFromNetProperty(pair.Value, out var amqpValue))
{
message.ApplicationProperties.Map[pair.Key] = amqpValue;
}
else
{
ThrowSerializationFailed(nameof(sourceMessage.ApplicationProperties), pair);
}
}
}
// Message Annotations
if (sourceMessage.HasSection(AmqpMessageSection.MessageAnnotations))
{
foreach (var pair in sourceMessage.MessageAnnotations)
{
if (TryCreateAmqpPropertyValueFromNetProperty(pair.Value, out var amqpValue))
{
message.MessageAnnotations.Map[pair.Key] = amqpValue;
}
else
{
ThrowSerializationFailed(nameof(sourceMessage.MessageAnnotations), pair);
}
}
}
// Delivery Annotations
if (sourceMessage.HasSection(AmqpMessageSection.DeliveryAnnotations))
{
foreach (var pair in sourceMessage.DeliveryAnnotations)
{
if (TryCreateAmqpPropertyValueFromNetProperty(pair.Value, out var amqpValue))
{
message.DeliveryAnnotations.Map[pair.Key] = amqpValue;
}
else
{
ThrowSerializationFailed(nameof(sourceMessage.DeliveryAnnotations), pair);
}
}
}
// Footer
if (sourceMessage.HasSection(AmqpMessageSection.Footer))
{
foreach (var pair in sourceMessage.Footer)
{
if (TryCreateAmqpPropertyValueFromNetProperty(pair.Value, out var amqpValue))
{
message.Footer.Map[pair.Key] = amqpValue;
}
else
{
ThrowSerializationFailed(nameof(sourceMessage.Footer), pair);
}
}
}
// There is a loss of fidelity in the TTL header if larger than uint.MaxValue. As a workaround
// we set the AbsoluteExpiryTime and CreationTime on the message based on the TTL. These
// values are then used to reconstruct the accurate TTL for received messages.
if (sourceMessage.Header.TimeToLive.HasValue)
{
var ttl = sourceMessage.Header.TimeToLive.Value;
message.Header.Ttl = ttl.TotalMilliseconds > uint.MaxValue
? uint.MaxValue
: (uint) ttl.TotalMilliseconds;
message.Properties.CreationTime = DateTime.UtcNow;
if (AmqpConstants.MaxAbsoluteExpiryTime - message.Properties.CreationTime.Value > ttl)
{
message.Properties.AbsoluteExpiryTime = message.Properties.CreationTime.Value + ttl;
}
else
{
message.Properties.AbsoluteExpiryTime = AmqpConstants.MaxAbsoluteExpiryTime;
}
}
return message;
}