in src/DurableTask.Core/TaskActivityDispatcher.cs [95:292]
async Task OnProcessWorkItemAsync(TaskActivityWorkItem workItem)
{
Task? renewTask = null;
using var renewCancellationTokenSource = new CancellationTokenSource();
TaskMessage taskMessage = workItem.TaskMessage;
OrchestrationInstance orchestrationInstance = taskMessage.OrchestrationInstance;
TaskScheduledEvent? scheduledEvent = null;
Activity? diagnosticActivity = null;
try
{
if (orchestrationInstance == null || string.IsNullOrWhiteSpace(orchestrationInstance.InstanceId))
{
this.logHelper.TaskActivityDispatcherError(
workItem,
$"The activity worker received a message that does not have any OrchestrationInstance information.");
throw TraceHelper.TraceException(
TraceEventType.Error,
"TaskActivityDispatcher-MissingOrchestrationInstance",
new InvalidOperationException("Message does not contain any OrchestrationInstance information"));
}
if (taskMessage.Event.EventType != EventType.TaskScheduled)
{
this.logHelper.TaskActivityDispatcherError(
workItem,
$"The activity worker received an event of type '{taskMessage.Event.EventType}' but only '{EventType.TaskScheduled}' is supported.");
throw TraceHelper.TraceException(
TraceEventType.Critical,
"TaskActivityDispatcher-UnsupportedEventType",
new NotSupportedException("Activity worker does not support event of type: " +
taskMessage.Event.EventType));
}
scheduledEvent = (TaskScheduledEvent)taskMessage.Event;
// Distributed tracing: start a new trace activity derived from the orchestration's trace context.
Activity? traceActivity = TraceHelper.StartTraceActivityForTaskExecution(scheduledEvent, orchestrationInstance);
if (scheduledEvent.Name == null)
{
string message = $"The activity worker received a {nameof(EventType.TaskScheduled)} event that does not specify an activity name.";
this.logHelper.TaskActivityDispatcherError(workItem, message);
throw TraceHelper.TraceException(
TraceEventType.Error,
"TaskActivityDispatcher-MissingActivityName",
new InvalidOperationException(message));
}
this.logHelper.TaskActivityStarting(orchestrationInstance, scheduledEvent);
TaskActivity? taskActivity = this.objectManager.GetObject(scheduledEvent.Name, scheduledEvent.Version);
if (workItem.LockedUntilUtc < DateTime.MaxValue)
{
// start a task to run RenewUntil
renewTask = Task.Factory.StartNew(
() => this.RenewUntil(workItem, renewCancellationTokenSource.Token),
renewCancellationTokenSource.Token);
}
var dispatchContext = new DispatchMiddlewareContext();
dispatchContext.SetProperty(taskMessage.OrchestrationInstance);
dispatchContext.SetProperty(taskActivity);
dispatchContext.SetProperty(scheduledEvent);
// In transitionary phase (activity queued from old code, accessed in new code) context can be null.
if (taskMessage.OrchestrationExecutionContext != null)
{
dispatchContext.SetProperty(taskMessage.OrchestrationExecutionContext);
}
// correlation
CorrelationTraceClient.Propagate(() =>
{
workItem.TraceContextBase?.SetActivityToCurrent();
diagnosticActivity = workItem.TraceContextBase?.CurrentActivity;
});
ActivityExecutionResult? result;
try
{
await this.dispatchPipeline.RunAsync(dispatchContext, async _ =>
{
if (taskActivity == null)
{
// This likely indicates a deployment error of some kind. Because these unhandled exceptions are
// automatically retried, resolving this may require redeploying the app code so that the activity exists again.
// CONSIDER: Should this be changed into a permanent error that fails the orchestration? Perhaps
// the app owner doesn't care to preserve existing instances when doing code deployments?
throw new TypeMissingException($"TaskActivity {scheduledEvent.Name} version {scheduledEvent.Version} was not found");
}
var context = new TaskContext(
taskMessage.OrchestrationInstance,
scheduledEvent.Name,
scheduledEvent.Version,
scheduledEvent.EventId);
context.ErrorPropagationMode = this.errorPropagationMode;
HistoryEvent? responseEvent;
try
{
string? output = await taskActivity.RunAsync(context, scheduledEvent.Input);
responseEvent = new TaskCompletedEvent(-1, scheduledEvent.EventId, output);
}
catch (Exception e) when (e is not TaskFailureException && !Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
{
// These are unexpected exceptions that occur in the task activity abstraction. Normal exceptions from
// activities are expected to be translated into TaskFailureException and handled outside the middleware
// context (see further below).
TraceHelper.TraceExceptionInstance(TraceEventType.Error, "TaskActivityDispatcher-ProcessException", taskMessage.OrchestrationInstance, e);
string? details = this.IncludeDetails
? $"Unhandled exception while executing task: {e}"
: null;
responseEvent = new TaskFailedEvent(-1, scheduledEvent.EventId, e.Message, details, new FailureDetails(e));
traceActivity?.SetStatus(ActivityStatusCode.Error, e.Message);
this.logHelper.TaskActivityFailure(orchestrationInstance, scheduledEvent.Name, (TaskFailedEvent)responseEvent, e);
}
var result = new ActivityExecutionResult { ResponseEvent = responseEvent };
dispatchContext.SetProperty(result);
});
result = dispatchContext.GetProperty<ActivityExecutionResult>();
}
catch (TaskFailureException e)
{
// These are normal task activity failures. They can come from Activity implementations or from middleware.
TraceHelper.TraceExceptionInstance(TraceEventType.Error, "TaskActivityDispatcher-ProcessTaskFailure", taskMessage.OrchestrationInstance, e);
string? details = this.IncludeDetails ? e.Details : null;
var failureEvent = new TaskFailedEvent(-1, scheduledEvent.EventId, e.Message, details, e.FailureDetails);
traceActivity?.SetStatus(ActivityStatusCode.Error, e.Message);
this.logHelper.TaskActivityFailure(orchestrationInstance, scheduledEvent.Name, failureEvent, e);
CorrelationTraceClient.Propagate(() => CorrelationTraceClient.TrackException(e));
result = new ActivityExecutionResult { ResponseEvent = failureEvent };
}
catch (Exception middlewareException) when (!Utils.IsFatal(middlewareException))
{
traceActivity?.SetStatus(ActivityStatusCode.Error, middlewareException.Message);
// These are considered retriable
this.logHelper.TaskActivityDispatcherError(workItem, $"Unhandled exception in activity middleware pipeline: {middlewareException}");
throw;
}
HistoryEvent? eventToRespond = result?.ResponseEvent;
if (eventToRespond is TaskCompletedEvent completedEvent)
{
this.logHelper.TaskActivityCompleted(orchestrationInstance, scheduledEvent.Name, completedEvent);
}
else if (eventToRespond is null)
{
// Default response if middleware prevents a response from being generated
eventToRespond = new TaskCompletedEvent(-1, scheduledEvent.EventId, null);
}
var responseTaskMessage = new TaskMessage
{
Event = eventToRespond,
OrchestrationInstance = orchestrationInstance
};
// Stop the trace activity here to avoid including the completion time in the latency calculation
traceActivity?.Stop();
await this.orchestrationService.CompleteTaskActivityWorkItemAsync(workItem, responseTaskMessage);
}
catch (SessionAbortedException e)
{
// The activity aborted its execution
this.logHelper.TaskActivityAborted(orchestrationInstance, scheduledEvent!, e.Message);
TraceHelper.TraceInstance(TraceEventType.Warning, "TaskActivityDispatcher-ExecutionAborted", orchestrationInstance, "{0}: {1}", scheduledEvent?.Name ?? "", e.Message);
await this.orchestrationService.AbandonTaskActivityWorkItemAsync(workItem);
}
finally
{
diagnosticActivity?.Stop(); // Ensure the activity is stopped here to prevent it from leaking out.
if (renewTask != null)
{
renewCancellationTokenSource.Cancel();
try
{
// wait the renewTask finish
await renewTask;
}
catch (OperationCanceledException)
{
// ignore
}
}
}
}