public static SBMessage AmqpMessageToSBMessage()

in sdk/servicebus/Microsoft.Azure.ServiceBus/src/Amqp/AmqpMessageConverter.cs [168:360]


        public static SBMessage AmqpMessageToSBMessage(AmqpMessage amqpMessage, bool isPeeked = false)
        {
            if (amqpMessage == null)
            {
                throw Fx.Exception.ArgumentNull(nameof(amqpMessage));
            }

            SBMessage sbMessage;

            if ((amqpMessage.BodyType & SectionFlag.AmqpValue) != 0
                && amqpMessage.ValueBody.Value != null)
            {
                sbMessage = new SBMessage();

                if (TryGetNetObjectFromAmqpObject(amqpMessage.ValueBody.Value, MappingType.MessageBody, out var dotNetObject))
                {
                    sbMessage.SystemProperties.BodyObject = dotNetObject;
                }
                else
                {
                    sbMessage.SystemProperties.BodyObject = amqpMessage.ValueBody.Value;
                }
            }
            else if ((amqpMessage.BodyType & SectionFlag.Data) != 0
                && amqpMessage.DataBody != null)
            {
                var dataSegments = new List<byte>();
                foreach (var data in amqpMessage.DataBody)
                {
                    if (data.Value is byte[] byteArrayValue)
                    {
                        dataSegments.AddRange(byteArrayValue);
                    }
                    else if (data.Value is ArraySegment<byte> arraySegmentValue)
                    {
                        byte[] byteArray;
                        if (arraySegmentValue.Count == arraySegmentValue.Array.Length)
                        {
                            byteArray = arraySegmentValue.Array;
                        }
                        else
                        {
                            byteArray = new byte[arraySegmentValue.Count];
                            Array.ConstrainedCopy(arraySegmentValue.Array, arraySegmentValue.Offset, byteArray, 0, arraySegmentValue.Count);
                        }
                        dataSegments.AddRange(byteArray);
                    }
                }
                sbMessage = new SBMessage(dataSegments.ToArray());
            }
            else
            {
                sbMessage = new SBMessage();
            }

            var sections = amqpMessage.Sections;
            if ((sections & SectionFlag.Header) != 0)
            {
                if (amqpMessage.Header.Ttl != null)
                {
                    sbMessage.TimeToLive = TimeSpan.FromMilliseconds(amqpMessage.Header.Ttl.Value);
                }

                if (amqpMessage.Header.DeliveryCount != null)
                {
                    sbMessage.SystemProperties.DeliveryCount = isPeeked ? (int)(amqpMessage.Header.DeliveryCount.Value) : (int)(amqpMessage.Header.DeliveryCount.Value + 1);
                }
            }

            if ((sections & SectionFlag.Properties) != 0)
            {
                if (amqpMessage.Properties.MessageId != null)
                {
                    sbMessage.MessageId = amqpMessage.Properties.MessageId.ToString();
                }

                if (amqpMessage.Properties.CorrelationId != null)
                {
                    sbMessage.CorrelationId = amqpMessage.Properties.CorrelationId.ToString();
                }

                if (amqpMessage.Properties.ContentType.Value != null)
                {
                    sbMessage.ContentType = amqpMessage.Properties.ContentType.Value;
                }

                if (amqpMessage.Properties.Subject != null)
                {
                    sbMessage.Label = amqpMessage.Properties.Subject;
                }

                if (amqpMessage.Properties.To != null)
                {
                    sbMessage.To = amqpMessage.Properties.To.ToString();
                }

                if (amqpMessage.Properties.ReplyTo != null)
                {
                    sbMessage.ReplyTo = amqpMessage.Properties.ReplyTo.ToString();
                }

                if (amqpMessage.Properties.GroupId != null)
                {
                    sbMessage.SessionId = amqpMessage.Properties.GroupId;
                }

                if (amqpMessage.Properties.ReplyToGroupId != null)
                {
                    sbMessage.ReplyToSessionId = amqpMessage.Properties.ReplyToGroupId;
                }

                if (amqpMessage.Properties.CreationTime.HasValue && amqpMessage.Properties.AbsoluteExpiryTime.HasValue)
                {
                    // Overwrite TimeToLive from AbsoluteExpiryTime
                    sbMessage.TimeToLive = amqpMessage.Properties.AbsoluteExpiryTime.Value - amqpMessage.Properties.CreationTime.Value;
                }
            }

            // Do application properties before message annotations, because the application properties
            // can be updated by entries from message annotation.
            if ((sections & SectionFlag.ApplicationProperties) != 0)
            {
                foreach (var pair in amqpMessage.ApplicationProperties.Map)
                {
                    if (TryGetNetObjectFromAmqpObject(pair.Value, MappingType.ApplicationProperty, out var netObject))
                    {
                        sbMessage.UserProperties[pair.Key.ToString()] = netObject;
                    }
                }
            }

            if ((sections & SectionFlag.MessageAnnotations) != 0)
            {
                foreach (var pair in amqpMessage.MessageAnnotations.Map)
                {
                    var key = pair.Key.ToString();
                    switch (key)
                    {
                        case EnqueuedTimeUtcName:
                            sbMessage.SystemProperties.EnqueuedTimeUtc = (DateTime)pair.Value;
                            break;
                        case ScheduledEnqueueTimeUtcName:
                            sbMessage.ScheduledEnqueueTimeUtc = (DateTime)pair.Value;
                            break;
                        case SequenceNumberName:
                            sbMessage.SystemProperties.SequenceNumber = (long)pair.Value;
                            break;
                        case EnqueueSequenceNumberName:
                            sbMessage.SystemProperties.EnqueuedSequenceNumber = (long)pair.Value;
                            break;
                        case LockedUntilName:
                            sbMessage.SystemProperties.LockedUntilUtc = (DateTime)pair.Value;
                            break;
                        case PartitionKeyName:
                            sbMessage.PartitionKey = (string)pair.Value;
                            break;
                        case PartitionIdName:
                            sbMessage.SystemProperties.PartitionId = (short)pair.Value;
                            break;
                        case ViaPartitionKeyName:
                            sbMessage.ViaPartitionKey = (string)pair.Value;
                            break;
                        case DeadLetterSourceName:
                            sbMessage.SystemProperties.DeadLetterSource = (string)pair.Value;
                            break;
                        case MessageStateName:
                            if (Enum.IsDefined(typeof(MessageState), (int)pair.Value))
                            {
                                sbMessage.SystemProperties.State = (MessageState)(int)pair.Value;
                            }
                            
                            break;
                        default:
                            if (TryGetNetObjectFromAmqpObject(pair.Value, MappingType.ApplicationProperty, out var netObject))
                            {
                                sbMessage.UserProperties[key] = netObject;
                            }
                            break;
                    }
                }
            }

            if (amqpMessage.DeliveryTag.Count == GuidSize)
            {
                var guidBuffer = new byte[GuidSize];
                Buffer.BlockCopy(amqpMessage.DeliveryTag.Array, amqpMessage.DeliveryTag.Offset, guidBuffer, 0, GuidSize);
                sbMessage.SystemProperties.LockTokenGuid = new Guid(guidBuffer);
            }

            amqpMessage.Dispose();

            return sbMessage;
        }