in sdk/servicebus/Microsoft.Azure.ServiceBus/src/Amqp/AmqpMessageConverter.cs [168:360]
public static SBMessage AmqpMessageToSBMessage(AmqpMessage amqpMessage, bool isPeeked = false)
{
if (amqpMessage == null)
{
throw Fx.Exception.ArgumentNull(nameof(amqpMessage));
}
SBMessage sbMessage;
if ((amqpMessage.BodyType & SectionFlag.AmqpValue) != 0
&& amqpMessage.ValueBody.Value != null)
{
sbMessage = new SBMessage();
if (TryGetNetObjectFromAmqpObject(amqpMessage.ValueBody.Value, MappingType.MessageBody, out var dotNetObject))
{
sbMessage.SystemProperties.BodyObject = dotNetObject;
}
else
{
sbMessage.SystemProperties.BodyObject = amqpMessage.ValueBody.Value;
}
}
else if ((amqpMessage.BodyType & SectionFlag.Data) != 0
&& amqpMessage.DataBody != null)
{
var dataSegments = new List<byte>();
foreach (var data in amqpMessage.DataBody)
{
if (data.Value is byte[] byteArrayValue)
{
dataSegments.AddRange(byteArrayValue);
}
else if (data.Value is ArraySegment<byte> arraySegmentValue)
{
byte[] byteArray;
if (arraySegmentValue.Count == arraySegmentValue.Array.Length)
{
byteArray = arraySegmentValue.Array;
}
else
{
byteArray = new byte[arraySegmentValue.Count];
Array.ConstrainedCopy(arraySegmentValue.Array, arraySegmentValue.Offset, byteArray, 0, arraySegmentValue.Count);
}
dataSegments.AddRange(byteArray);
}
}
sbMessage = new SBMessage(dataSegments.ToArray());
}
else
{
sbMessage = new SBMessage();
}
var sections = amqpMessage.Sections;
if ((sections & SectionFlag.Header) != 0)
{
if (amqpMessage.Header.Ttl != null)
{
sbMessage.TimeToLive = TimeSpan.FromMilliseconds(amqpMessage.Header.Ttl.Value);
}
if (amqpMessage.Header.DeliveryCount != null)
{
sbMessage.SystemProperties.DeliveryCount = isPeeked ? (int)(amqpMessage.Header.DeliveryCount.Value) : (int)(amqpMessage.Header.DeliveryCount.Value + 1);
}
}
if ((sections & SectionFlag.Properties) != 0)
{
if (amqpMessage.Properties.MessageId != null)
{
sbMessage.MessageId = amqpMessage.Properties.MessageId.ToString();
}
if (amqpMessage.Properties.CorrelationId != null)
{
sbMessage.CorrelationId = amqpMessage.Properties.CorrelationId.ToString();
}
if (amqpMessage.Properties.ContentType.Value != null)
{
sbMessage.ContentType = amqpMessage.Properties.ContentType.Value;
}
if (amqpMessage.Properties.Subject != null)
{
sbMessage.Label = amqpMessage.Properties.Subject;
}
if (amqpMessage.Properties.To != null)
{
sbMessage.To = amqpMessage.Properties.To.ToString();
}
if (amqpMessage.Properties.ReplyTo != null)
{
sbMessage.ReplyTo = amqpMessage.Properties.ReplyTo.ToString();
}
if (amqpMessage.Properties.GroupId != null)
{
sbMessage.SessionId = amqpMessage.Properties.GroupId;
}
if (amqpMessage.Properties.ReplyToGroupId != null)
{
sbMessage.ReplyToSessionId = amqpMessage.Properties.ReplyToGroupId;
}
if (amqpMessage.Properties.CreationTime.HasValue && amqpMessage.Properties.AbsoluteExpiryTime.HasValue)
{
// Overwrite TimeToLive from AbsoluteExpiryTime
sbMessage.TimeToLive = amqpMessage.Properties.AbsoluteExpiryTime.Value - amqpMessage.Properties.CreationTime.Value;
}
}
// Do application properties before message annotations, because the application properties
// can be updated by entries from message annotation.
if ((sections & SectionFlag.ApplicationProperties) != 0)
{
foreach (var pair in amqpMessage.ApplicationProperties.Map)
{
if (TryGetNetObjectFromAmqpObject(pair.Value, MappingType.ApplicationProperty, out var netObject))
{
sbMessage.UserProperties[pair.Key.ToString()] = netObject;
}
}
}
if ((sections & SectionFlag.MessageAnnotations) != 0)
{
foreach (var pair in amqpMessage.MessageAnnotations.Map)
{
var key = pair.Key.ToString();
switch (key)
{
case EnqueuedTimeUtcName:
sbMessage.SystemProperties.EnqueuedTimeUtc = (DateTime)pair.Value;
break;
case ScheduledEnqueueTimeUtcName:
sbMessage.ScheduledEnqueueTimeUtc = (DateTime)pair.Value;
break;
case SequenceNumberName:
sbMessage.SystemProperties.SequenceNumber = (long)pair.Value;
break;
case EnqueueSequenceNumberName:
sbMessage.SystemProperties.EnqueuedSequenceNumber = (long)pair.Value;
break;
case LockedUntilName:
sbMessage.SystemProperties.LockedUntilUtc = (DateTime)pair.Value;
break;
case PartitionKeyName:
sbMessage.PartitionKey = (string)pair.Value;
break;
case PartitionIdName:
sbMessage.SystemProperties.PartitionId = (short)pair.Value;
break;
case ViaPartitionKeyName:
sbMessage.ViaPartitionKey = (string)pair.Value;
break;
case DeadLetterSourceName:
sbMessage.SystemProperties.DeadLetterSource = (string)pair.Value;
break;
case MessageStateName:
if (Enum.IsDefined(typeof(MessageState), (int)pair.Value))
{
sbMessage.SystemProperties.State = (MessageState)(int)pair.Value;
}
break;
default:
if (TryGetNetObjectFromAmqpObject(pair.Value, MappingType.ApplicationProperty, out var netObject))
{
sbMessage.UserProperties[key] = netObject;
}
break;
}
}
}
if (amqpMessage.DeliveryTag.Count == GuidSize)
{
var guidBuffer = new byte[GuidSize];
Buffer.BlockCopy(amqpMessage.DeliveryTag.Array, amqpMessage.DeliveryTag.Offset, guidBuffer, 0, GuidSize);
sbMessage.SystemProperties.LockTokenGuid = new Guid(guidBuffer);
}
amqpMessage.Dispose();
return sbMessage;
}