in src/DurableTask.Core/TaskOrchestrationDispatcher.cs [204:298]
async Task OnProcessWorkItemSessionAsync(TaskOrchestrationWorkItem workItem)
{
// DTFx history replay expects that ExecutionStarted comes before other events.
// If this is not already the case, due to a race-condition, we re-order the
// messages to enforce this expectation.
EnsureExecutionStartedIsFirst(workItem.NewMessages);
try
{
if (workItem.Session == null)
{
// Legacy behavior
await this.OnProcessWorkItemAsync(workItem);
return;
}
var isExtendedSession = false;
CorrelationTraceClient.Propagate(
() =>
{
// Check if it is extended session.
// TODO: Remove this code - it looks incorrect and dangerous
isExtendedSession = this.concurrentSessionLock.Acquire();
this.concurrentSessionLock.Release();
workItem.IsExtendedSession = isExtendedSession;
});
var processCount = 0;
try
{
while (true)
{
// If the provider provided work items, execute them.
if (workItem.NewMessages?.Count > 0)
{
bool isCompletedOrInterrupted = await this.OnProcessWorkItemAsync(workItem);
if (isCompletedOrInterrupted)
{
break;
}
processCount++;
}
// Fetches beyond the first require getting an extended session lock, used to prevent starvation.
if (processCount > 0 && !isExtendedSession)
{
isExtendedSession = this.concurrentSessionLock.Acquire();
if (!isExtendedSession)
{
TraceHelper.Trace(TraceEventType.Verbose, "OnProcessWorkItemSession-MaxOperations", "Failed to acquire concurrent session lock.");
break;
}
}
TraceHelper.Trace(TraceEventType.Verbose, "OnProcessWorkItemSession-StartFetch", "Starting fetch of existing session.");
Stopwatch timer = Stopwatch.StartNew();
// Wait for new messages to arrive for the session. This call is expected to block (asynchronously)
// until either new messages are available or until a provider-specific timeout has expired.
workItem.NewMessages = await workItem.Session.FetchNewOrchestrationMessagesAsync(workItem);
if (workItem.NewMessages == null)
{
break;
}
TraceHelper.Trace(
TraceEventType.Verbose,
"OnProcessWorkItemSession-EndFetch",
$"Fetched {workItem.NewMessages.Count} new message(s) after {timer.ElapsedMilliseconds} ms from existing session.");
workItem.OrchestrationRuntimeState.NewEvents.Clear();
}
}
finally
{
if (isExtendedSession)
{
TraceHelper.Trace(
TraceEventType.Verbose,
"OnProcessWorkItemSession-Release",
$"Releasing extended session after {processCount} batch(es).");
this.concurrentSessionLock.Release();
}
}
}
catch (SessionAbortedException e)
{
// Either the orchestration or the orchestration service explicitly abandoned the session.
OrchestrationInstance instance = workItem.OrchestrationRuntimeState?.OrchestrationInstance ?? new OrchestrationInstance { InstanceId = workItem.InstanceId };
this.logHelper.OrchestrationAborted(instance, e.Message);
TraceHelper.TraceInstance(TraceEventType.Warning, "TaskOrchestrationDispatcher-ExecutionAborted", instance, "{0}", e.Message);
await this.orchestrationService.AbandonTaskOrchestrationWorkItemAsync(workItem);
}
}