in src/DurableTask.Core/WorkItemDispatcher.cs [357:459]
async Task ProcessWorkItemAsync(WorkItemDispatcherContext context, object workItemObj)
{
var workItem = (T) workItemObj;
var abortWorkItem = true;
string workItemId = string.Empty;
try
{
workItemId = this.workItemIdentifier(workItem);
this.LogHelper.ProcessWorkItemStarting(context, workItemId);
TraceHelper.Trace(
TraceEventType.Information,
"WorkItemDispatcherProcess-Begin",
this.GetFormattedLog(context.DispatcherId, $"Starting to process workItem {workItemId}"));
await this.ProcessWorkItem(workItem);
this.AdjustDelayModifierOnSuccess();
this.LogHelper.ProcessWorkItemCompleted(context, workItemId);
TraceHelper.Trace(
TraceEventType.Information,
"WorkItemDispatcherProcess-End",
this.GetFormattedLog(context.DispatcherId, $"Finished processing workItem {workItemId}"));
abortWorkItem = false;
}
catch (TypeMissingException exception)
{
this.LogHelper.ProcessWorkItemFailed(
context,
workItemId,
$"Backing off for {BackOffIntervalOnInvalidOperationSecs} seconds",
exception);
TraceHelper.TraceException(
TraceEventType.Error,
"WorkItemDispatcherProcess-TypeMissingException",
exception,
this.GetFormattedLog(context.DispatcherId, $"Exception while processing workItem {workItemId}"));
TraceHelper.Trace(
TraceEventType.Error,
"WorkItemDispatcherProcess-TypeMissingBackingOff",
"Backing off after invalid operation by " + BackOffIntervalOnInvalidOperationSecs);
// every time we hit invalid operation exception we back off the dispatcher
this.AdjustDelayModifierOnFailure(BackOffIntervalOnInvalidOperationSecs);
}
catch (Exception exception) when (!Utils.IsFatal(exception))
{
TraceHelper.TraceException(
TraceEventType.Error,
"WorkItemDispatcherProcess-Exception",
exception,
this.GetFormattedLog(context.DispatcherId, $"Exception while processing workItem {workItemId}"));
int delayInSecs = this.GetDelayInSecondsAfterOnProcessException(exception);
if (delayInSecs > 0)
{
this.LogHelper.ProcessWorkItemFailed(
context,
workItemId,
$"Backing off for {delayInSecs} seconds until {CountDownToZeroDelay} successful operations",
exception);
TraceHelper.Trace(
TraceEventType.Error,
"WorkItemDispatcherProcess-BackingOff",
"Backing off after exception by at least " + delayInSecs + " until " + CountDownToZeroDelay +
" successful operations");
this.AdjustDelayModifierOnFailure(delayInSecs);
}
else
{
// if the derived dispatcher doesn't think this exception worthy of back-off then
// count it as a 'successful' operation
this.AdjustDelayModifierOnSuccess();
}
}
finally
{
Interlocked.Decrement(ref this.concurrentWorkItemCount);
this.concurrencyLock.Release();
}
if (abortWorkItem && this.AbortWorkItem != null)
{
await this.ExceptionTraceWrapperAsync(
context,
workItemId,
nameof(this.AbortWorkItem),
() => this.AbortWorkItem(workItem));
}
if (this.SafeReleaseWorkItem != null)
{
await this.ExceptionTraceWrapperAsync(
context,
workItemId,
nameof(this.SafeReleaseWorkItem),
() => this.SafeReleaseWorkItem(workItem));
}
}