in src/DurableTask.Core/TaskEntityDispatcher.cs [115:183]
async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
{
try
{
if (workItem.Session == null)
{
// Legacy behavior
await this.OnProcessWorkItemAsync(workItem);
return;
}
var isExtendedSession = false;
var processCount = 0;
try
{
while (true)
{
// While the work item contains messages that need to be processed, execute them.
if (workItem.NewMessages?.Count > 0)
{
bool isCompletedOrInterrupted = await this.OnProcessWorkItemAsync(workItem);
if (isCompletedOrInterrupted)
{
break;
}
processCount++;
}
// Fetches beyond the first require getting an extended session lock, used to prevent starvation.
if (processCount > 0 && !isExtendedSession)
{
isExtendedSession = this.concurrentSessionLock.Acquire();
if (!isExtendedSession)
{
break;
}
}
Stopwatch timer = Stopwatch.StartNew();
// Wait for new messages to arrive for the session. This call is expected to block (asynchronously)
// until either new messages are available or until a provider-specific timeout has expired.
workItem.NewMessages = await workItem.Session.FetchNewOrchestrationMessagesAsync(workItem);
if (workItem.NewMessages == null)
{
break;
}
workItem.OrchestrationRuntimeState.NewEvents.Clear();
}
}
finally
{
if (isExtendedSession)
{
this.concurrentSessionLock.Release();
}
}
}
catch (SessionAbortedException e)
{
// Either the orchestration or the orchestration service explicitly abandoned the session.
OrchestrationInstance instance = workItem.OrchestrationRuntimeState?.OrchestrationInstance ?? new OrchestrationInstance { InstanceId = workItem.InstanceId };
this.logHelper.OrchestrationAborted(instance, e.Message);
await this.orchestrationService.AbandonTaskOrchestrationWorkItemAsync(workItem);
}
}