in src/DurableTask.ServiceBus/Common/ServiceBusUtils.cs [41:136]
public static async Task<Message> GetBrokeredMessageFromObjectAsync(
object serializableObject,
CompressionSettings compressionSettings,
ServiceBusMessageSettings messageSettings,
OrchestrationInstance instance,
string messageType,
IOrchestrationServiceBlobStore orchestrationServiceBlobStore,
DateTime messageFireTime)
{
if (serializableObject == null)
{
throw new ArgumentNullException(nameof(serializableObject));
}
if (compressionSettings.Style == CompressionStyle.Legacy)
{
#if NETSTANDARD2_0
using (var ms = new MemoryStream())
{
var serialiser = (XmlObjectSerializer)typeof(DataContractSerializer)
.MakeGenericType(serializableObject.GetType())
.GetField("Instance")
?.GetValue(null);
serialiser?.WriteObject(ms,serializableObject);
return new Message(ms.ToArray()) { SessionId = instance?.InstanceId };
}
#else
return new Message(serializableObject) { SessionId = instance?.InstanceId};
#endif
}
if (messageSettings == null)
{
messageSettings = new ServiceBusMessageSettings();
}
var disposeStream = true;
var rawStream = new MemoryStream();
Utils.WriteObjectToStream(rawStream, serializableObject);
try
{
Message brokeredMessage;
if (compressionSettings.Style == CompressionStyle.Always ||
(compressionSettings.Style == CompressionStyle.Threshold &&
rawStream.Length > compressionSettings.ThresholdInBytes))
{
Stream compressedStream = Utils.GetCompressedStream(rawStream);
if (compressedStream.Length < messageSettings.MessageOverflowThresholdInBytes)
{
brokeredMessage = GenerateBrokeredMessageWithCompressionTypeProperty(compressedStream, FrameworkConstants.CompressionTypeGzipPropertyValue);
long rawLen = rawStream.Length;
TraceHelper.TraceInstance(
TraceEventType.Information,
"GetBrokeredMessageFromObject-CompressionStats",
instance,
() =>
"Compression stats for " + (messageType ?? string.Empty) + " : " +
brokeredMessage?.MessageId +
", uncompressed " + rawLen + " -> compressed " + compressedStream.Length);
}
else
{
brokeredMessage = await GenerateBrokeredMessageWithBlobKeyPropertyAsync(compressedStream, orchestrationServiceBlobStore, instance, messageSettings, messageFireTime, FrameworkConstants.CompressionTypeGzipPropertyValue);
}
}
else
{
if (rawStream.Length < messageSettings.MessageOverflowThresholdInBytes)
{
brokeredMessage = GenerateBrokeredMessageWithCompressionTypeProperty(rawStream, FrameworkConstants.CompressionTypeNonePropertyValue);
disposeStream = false;
}
else
{
brokeredMessage = await GenerateBrokeredMessageWithBlobKeyPropertyAsync(rawStream, orchestrationServiceBlobStore, instance, messageSettings, messageFireTime, FrameworkConstants.CompressionTypeNonePropertyValue);
}
}
brokeredMessage.SessionId = instance?.InstanceId;
// TODO : Test more if this helps, initial tests shows not change in performance
// brokeredMessage.ViaPartitionKey = instance?.InstanceId;
return brokeredMessage;
}
finally
{
if (disposeStream)
{
rawStream.Dispose();
}
}
}