in edge-hub/core/src/Microsoft.Azure.Devices.Edge.Hub.Amqp/AmqpMessageConverter.cs [20:104]
public IMessage ToMessage(AmqpMessage sourceMessage)
{
byte[] GetMessageBody()
{
using (var ms = new MemoryStream())
{
sourceMessage.BodyStream.CopyTo(ms);
return ms.ToArray();
}
}
var systemProperties = new Dictionary<string, string>();
var properties = new Dictionary<string, string>();
systemProperties.AddIfNonEmpty(SystemProperties.MessageId, sourceMessage.Properties.MessageId?.ToString());
systemProperties.AddIfNonEmpty(SystemProperties.MsgCorrelationId, sourceMessage.Properties.CorrelationId?.ToString());
systemProperties.AddIfNonEmpty(SystemProperties.ContentType, sourceMessage.Properties.ContentType.Value);
systemProperties.AddIfNonEmpty(SystemProperties.ContentEncoding, sourceMessage.Properties.ContentEncoding.Value);
systemProperties.AddIfNonEmpty(SystemProperties.To, sourceMessage.Properties.To?.ToString());
systemProperties.AddIfNonEmpty(SystemProperties.UserId, sourceMessage.Properties.UserId.Count > 0 ? Encoding.UTF8.GetString(sourceMessage.Properties.UserId.Array) : null);
systemProperties.AddIfNonEmpty(SystemProperties.ExpiryTimeUtc, sourceMessage.Properties.AbsoluteExpiryTime?.ToString("o"));
if (sourceMessage.MessageAnnotations.Map.TryGetValue(Constants.MessageAnnotationsEnqueuedTimeKey, out DateTime enqueuedTime))
{
systemProperties[SystemProperties.EnqueuedTime] = enqueuedTime.ToString("o");
}
if (sourceMessage.MessageAnnotations.Map.TryGetValue(Constants.MessageAnnotationsDeliveryCountKey, out byte deliveryCount))
{
systemProperties[SystemProperties.DeliveryCount] = deliveryCount.ToString();
}
if (sourceMessage.MessageAnnotations.Map.TryGetValue(Constants.MessageAnnotationsSequenceNumberName, out ulong sequenceNumber) && sequenceNumber > 0)
{
systemProperties[SystemProperties.SequenceNumber] = sequenceNumber.ToString();
}
if (sourceMessage.MessageAnnotations.Map.TryGetValue(Constants.MessageAnnotationsLockTokenName, out string lockToken))
{
systemProperties.AddIfNonEmpty(SystemProperties.LockToken, lockToken);
}
if (sourceMessage.MessageAnnotations.Map.TryGetValue(SystemProperties.InterfaceId, out string hubInterfaceId))
{
systemProperties.AddIfNonEmpty(SystemProperties.InterfaceId, hubInterfaceId);
}
if (sourceMessage.MessageAnnotations.Map.TryGetValue(SystemProperties.ComponentName, out string componentName))
{
systemProperties.AddIfNonEmpty(SystemProperties.ComponentName, componentName);
}
if (sourceMessage.ApplicationProperties != null)
{
foreach (KeyValuePair<MapKey, object> property in sourceMessage.ApplicationProperties.Map)
{
string key = property.Key.ToString();
string value = property.Value as string;
switch (key)
{
case Constants.MessagePropertiesMessageSchemaKey:
systemProperties[SystemProperties.MessageSchema] = value;
break;
case Constants.MessagePropertiesCreationTimeKey:
systemProperties[SystemProperties.CreationTime] = value;
break;
case Constants.MessagePropertiesOperationKey:
systemProperties[SystemProperties.Operation] = value;
break;
case Constants.MessagePropertiesOutputNameKey:
systemProperties[SystemProperties.OutputName] = value;
break;
default:
properties[key] = value;
break;
}
}
}
return new EdgeMessage(GetMessageBody(), properties, systemProperties);
}