in src/DurableTask.ServiceBus/Common/ServiceBusUtils.cs [196:257]
public static async Task<T> GetObjectFromBrokeredMessageAsync<T>(Message message, IOrchestrationServiceBlobStore orchestrationServiceBlobStore)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
T deserializedObject;
string compressionType = string.Empty;
if (message.UserProperties.TryGetValue(FrameworkConstants.CompressionTypePropertyName, out object compressionTypeObj))
{
compressionType = (string)compressionTypeObj;
}
if (string.IsNullOrWhiteSpace(compressionType))
{
// no compression, legacy style
#if NETSTANDARD2_0
var dataContractSerializer = new DataContractSerializer(typeof(T));
using (var ms = new MemoryStream(message.Body))
deserializedObject = (T)dataContractSerializer.ReadObject(ms);
#else
deserializedObject = message.GetBody<T>();
#endif
}
else if (string.Equals(compressionType, FrameworkConstants.CompressionTypeGzipPropertyValue,
StringComparison.OrdinalIgnoreCase))
{
using (Stream compressedStream = await LoadMessageStreamAsync(message, orchestrationServiceBlobStore))
{
if (!Utils.IsGzipStream(compressedStream))
{
throw new ArgumentException(
$"message specifies a CompressionType of {compressionType} but content is not compressed",
nameof(message));
}
using (Stream objectStream = await Utils.GetDecompressedStreamAsync(compressedStream))
{
deserializedObject = SafeReadFromStream<T>(objectStream);
}
}
}
else if (string.Equals(compressionType, FrameworkConstants.CompressionTypeNonePropertyValue,
StringComparison.OrdinalIgnoreCase))
{
using (Stream rawStream = await LoadMessageStreamAsync(message, orchestrationServiceBlobStore))
{
deserializedObject = SafeReadFromStream<T>(rawStream);
}
}
else
{
throw new ArgumentException(
$"message specifies an invalid CompressionType: {compressionType}",
nameof(message));
}
return deserializedObject;
}