in src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs [1597:1667]
async Task<bool> TrySetSessionStateAsync(
TaskOrchestrationWorkItem workItem,
OrchestrationRuntimeState newOrchestrationRuntimeState,
OrchestrationRuntimeState runtimeState,
IMessageSession session)
{
if (runtimeState.CompressedSize > SessionStreamWarningSizeInBytes && runtimeState.CompressedSize < this.Settings.SessionSettings.SessionOverflowThresholdInBytes)
{
TraceHelper.TraceSession(
TraceEventType.Error,
"ServiceBusOrchestrationService-SessionStateThresholdApproaching",
workItem.InstanceId,
$"Size of session state ({runtimeState.CompressedSize}B) is nearing session size limit of {this.Settings.SessionSettings.SessionOverflowThresholdInBytes}B");
}
var isSessionSizeThresholdExceeded = false;
if (newOrchestrationRuntimeState == null ||
newOrchestrationRuntimeState.ExecutionStartedEvent == null ||
newOrchestrationRuntimeState.OrchestrationStatus != OrchestrationStatus.Running)
{
await session.SetStateAsync(null);
return true;
}
try
{
Stream rawStream = await
RuntimeStateStreamConverter.OrchestrationRuntimeStateToRawStream(
newOrchestrationRuntimeState,
runtimeState,
JsonDataConverter.Default,
this.Settings.TaskOrchestrationDispatcherSettings.CompressOrchestrationState,
this.Settings.SessionSettings,
this.BlobStore,
session.SessionId);
using (var ms = new MemoryStream())
{
await rawStream.CopyToAsync(ms);
await session.SetStateAsync(ms.ToArray());
}
this.ServiceStats.OrchestrationDispatcherStats.SessionSets.Increment();
}
catch (OrchestrationException exception)
{
// basic idea is to simply enqueue a terminate message just like how we do it from TaskHubClient
// it is possible to have other messages in front of the queue and those will get processed before
// the terminate message gets processed. but that is ok since in the worst case scenario we will
// simply land in this if-block again and end up queuing up another terminate message.
//
// the interesting scenario is when the second time we *don't* land in this if-block because e.g.
// the new messages that we processed caused a new generation to be created. in that case
// it is still ok because the worst case scenario is that we will terminate a newly created generation
// which shouldn't have been created at all in the first place
isSessionSizeThresholdExceeded = true;
string reason = $"Session state size of {runtimeState.CompressedSize} exceeded the termination threshold of {this.Settings.SessionSettings.SessionMaxSizeInBytes} bytes. More info: {exception.StackTrace}";
TraceHelper.TraceSession(TraceEventType.Critical, "ServiceBusOrchestrationService-SessionSizeExceeded", workItem.InstanceId, reason);
Message forcedTerminateMessage = await CreateForcedTerminateMessageAsync(runtimeState.OrchestrationInstance.InstanceId, reason);
await this.orchestratorQueueClient.SendAsync(forcedTerminateMessage);
this.ServiceStats.OrchestrationDispatcherStats.MessagesSent.Increment();
this.ServiceStats.OrchestrationDispatcherStats.MessageBatchesSent.Increment();
}
return !isSessionSizeThresholdExceeded;
}