in sdk/core/Azure.Core.Amqp/src/Shared/AmqpAnnotatedMessageConverter.cs [281:463]
public static AmqpAnnotatedMessage FromAmqpMessage(AmqpMessage source)
{
var message = source switch
{
_ when TryGetDataBody(source, out var dataBody) => new AmqpAnnotatedMessage(dataBody!),
_ when TryGetSequenceBody(source, out var sequenceBody) => new AmqpAnnotatedMessage(sequenceBody!),
_ when TryGetValueBody(source, out var valueBody) => new AmqpAnnotatedMessage(valueBody!),
_ => new AmqpAnnotatedMessage(AmqpMessageBody.FromData(MessageBody.FromReadOnlyMemorySegment(ReadOnlyMemory<byte>.Empty)))
};
// Header
if ((source.Sections & SectionFlag.Header) > 0)
{
if (source.Header.DeliveryCount.HasValue)
{
message.Header.DeliveryCount = source.Header.DeliveryCount;
}
if (source.Header.Durable.HasValue)
{
message.Header.Durable = source.Header.Durable;
}
if (source.Header.Priority.HasValue)
{
message.Header.Priority = source.Header.Priority;
}
if (source.Header.FirstAcquirer.HasValue)
{
message.Header.FirstAcquirer = source.Header.FirstAcquirer;
}
if (source.Header.DeliveryCount.HasValue)
{
message.Header.DeliveryCount = source.Header.DeliveryCount;
}
if (source.Header.Ttl.HasValue)
{
message.Header.TimeToLive = TimeSpan.FromMilliseconds(source.Header.Ttl.Value);
}
}
// Properties
if ((source.Sections & SectionFlag.Properties) > 0)
{
if (source.Properties.AbsoluteExpiryTime.HasValue)
{
DateTimeOffset absoluteExpiryTime = source.Properties.AbsoluteExpiryTime >= DateTimeOffset.MaxValue.UtcDateTime
? DateTimeOffset.MaxValue
: source.Properties.AbsoluteExpiryTime.Value;
message.Properties.AbsoluteExpiryTime = absoluteExpiryTime;
// The TTL from the header can be at most approximately 49 days (Uint32.MaxValue milliseconds) due
// to the AMQP spec. In order to allow for larger TTLs set by the user, we take the difference of the AbsoluteExpiryTime
// and the CreationTime (if both are set). If either of those properties is not set, we fall back to the
// TTL from the header.
if (source.Properties.CreationTime.HasValue)
{
message.Header.TimeToLive = absoluteExpiryTime- source.Properties.CreationTime.Value;
}
}
if (!string.IsNullOrEmpty(source.Properties.ContentEncoding.Value))
{
message.Properties.ContentEncoding = source.Properties.ContentEncoding.Value;
}
if (!string.IsNullOrEmpty(source.Properties.ContentType.Value))
{
message.Properties.ContentType = source.Properties.ContentType.Value;
}
if (source.Properties.CorrelationId != null)
{
message.Properties.CorrelationId = new AmqpMessageId(source.Properties.CorrelationId.ToString()!);
}
if (source.Properties.CreationTime.HasValue)
{
message.Properties.CreationTime = source.Properties.CreationTime;
}
if (!string.IsNullOrEmpty(source.Properties.GroupId))
{
message.Properties.GroupId = source.Properties.GroupId;
}
if (source.Properties.GroupSequence.HasValue)
{
message.Properties.GroupSequence = source.Properties.GroupSequence;
}
if (source.Properties.MessageId != null)
{
message.Properties.MessageId = new AmqpMessageId(source.Properties.MessageId.ToString()!);
}
if (source.Properties.ReplyTo != null)
{
message.Properties.ReplyTo = new AmqpAddress(source.Properties.ReplyTo.ToString()!);
}
if (!string.IsNullOrEmpty(source.Properties.ReplyToGroupId))
{
message.Properties.ReplyToGroupId = source.Properties.ReplyToGroupId;
}
if (!string.IsNullOrEmpty(source.Properties.Subject))
{
message.Properties.Subject = source.Properties.Subject;
}
if (source.Properties.To != null)
{
message.Properties.To = new AmqpAddress(source.Properties.To.ToString()!);
}
if (source.Properties.UserId != default)
{
message.Properties.UserId = source.Properties.UserId;
}
}
// Application Properties
if ((source.Sections & SectionFlag.ApplicationProperties) > 0)
{
foreach (var pair in source.ApplicationProperties.Map)
{
if (TryCreateNetPropertyFromAmqpProperty(pair.Value, out var propertyValue))
{
message.ApplicationProperties[pair.Key.ToString()] = propertyValue;
}
}
}
// Message Annotations
if ((source.Sections & SectionFlag.MessageAnnotations) > 0)
{
foreach (var pair in source.MessageAnnotations.Map)
{
if (TryCreateNetPropertyFromAmqpProperty(pair.Value, out var propertyValue))
{
message.MessageAnnotations[pair.Key.ToString()] = propertyValue;
}
}
}
// Delivery Annotations
if ((source.Sections & SectionFlag.DeliveryAnnotations) > 0)
{
foreach (var pair in source.DeliveryAnnotations.Map)
{
if (TryCreateNetPropertyFromAmqpProperty(pair.Value, out var eventValue))
{
message.DeliveryAnnotations[pair.Key.ToString()] = eventValue;
}
}
}
// Footer
if ((source.Sections & SectionFlag.Footer) > 0)
{
foreach (var pair in source.Footer.Map)
{
if (TryCreateNetPropertyFromAmqpProperty(pair.Value, out var eventValue))
{
message.Footer[pair.Key.ToString()] = eventValue;
}
}
}
return message;
}