async Task OnProcessWorkItemAsync()

in src/DurableTask.Core/TaskActivityDispatcher.cs [95:292]


        async Task OnProcessWorkItemAsync(TaskActivityWorkItem workItem)
        {
            Task? renewTask = null;
            using var renewCancellationTokenSource = new CancellationTokenSource();

            TaskMessage taskMessage = workItem.TaskMessage;
            OrchestrationInstance orchestrationInstance = taskMessage.OrchestrationInstance;
            TaskScheduledEvent? scheduledEvent = null;
            Activity? diagnosticActivity = null;
            try
            {
                if (orchestrationInstance == null || string.IsNullOrWhiteSpace(orchestrationInstance.InstanceId))
                {
                    this.logHelper.TaskActivityDispatcherError(
                        workItem,
                        $"The activity worker received a message that does not have any OrchestrationInstance information.");
                    throw TraceHelper.TraceException(
                        TraceEventType.Error,
                        "TaskActivityDispatcher-MissingOrchestrationInstance",
                        new InvalidOperationException("Message does not contain any OrchestrationInstance information"));
                }

                if (taskMessage.Event.EventType != EventType.TaskScheduled)
                {
                    this.logHelper.TaskActivityDispatcherError(
                        workItem, 
                        $"The activity worker received an event of type '{taskMessage.Event.EventType}' but only '{EventType.TaskScheduled}' is supported.");
                    throw TraceHelper.TraceException(
                        TraceEventType.Critical,
                        "TaskActivityDispatcher-UnsupportedEventType",
                        new NotSupportedException("Activity worker does not support event of type: " +
                                                  taskMessage.Event.EventType));
                }

                scheduledEvent = (TaskScheduledEvent)taskMessage.Event;

                // Distributed tracing: start a new trace activity derived from the orchestration's trace context.
                Activity? traceActivity = TraceHelper.StartTraceActivityForTaskExecution(scheduledEvent, orchestrationInstance);

                if (scheduledEvent.Name == null)
                {
                    string message = $"The activity worker received a {nameof(EventType.TaskScheduled)} event that does not specify an activity name.";
                    this.logHelper.TaskActivityDispatcherError(workItem, message);
                    throw TraceHelper.TraceException(
                        TraceEventType.Error,
                        "TaskActivityDispatcher-MissingActivityName",
                        new InvalidOperationException(message));
                }

                this.logHelper.TaskActivityStarting(orchestrationInstance, scheduledEvent);
                TaskActivity? taskActivity = this.objectManager.GetObject(scheduledEvent.Name, scheduledEvent.Version);

                if (workItem.LockedUntilUtc < DateTime.MaxValue)
                {
                    // start a task to run RenewUntil
                    renewTask = Task.Factory.StartNew(
                        () => this.RenewUntil(workItem, renewCancellationTokenSource.Token),
                        renewCancellationTokenSource.Token);
                }

                var dispatchContext = new DispatchMiddlewareContext();
                dispatchContext.SetProperty(taskMessage.OrchestrationInstance);
                dispatchContext.SetProperty(taskActivity);
                dispatchContext.SetProperty(scheduledEvent);

                // In transitionary phase (activity queued from old code, accessed in new code) context can be null.
                if (taskMessage.OrchestrationExecutionContext != null)
                {
                    dispatchContext.SetProperty(taskMessage.OrchestrationExecutionContext);
                }

                // correlation
                CorrelationTraceClient.Propagate(() =>
                {
                    workItem.TraceContextBase?.SetActivityToCurrent();
                    diagnosticActivity = workItem.TraceContextBase?.CurrentActivity;
                });

                ActivityExecutionResult? result;
                try
                {
                    await this.dispatchPipeline.RunAsync(dispatchContext, async _ =>
                    {
                        if (taskActivity == null)
                        {
                            // This likely indicates a deployment error of some kind. Because these unhandled exceptions are
                            // automatically retried, resolving this may require redeploying the app code so that the activity exists again.
                            // CONSIDER: Should this be changed into a permanent error that fails the orchestration? Perhaps
                            //           the app owner doesn't care to preserve existing instances when doing code deployments?
                            throw new TypeMissingException($"TaskActivity {scheduledEvent.Name} version {scheduledEvent.Version} was not found");
                        }

                        var context = new TaskContext(
                            taskMessage.OrchestrationInstance,
                            scheduledEvent.Name,
                            scheduledEvent.Version,
                            scheduledEvent.EventId);
                        context.ErrorPropagationMode = this.errorPropagationMode;

                        HistoryEvent? responseEvent;

                        try
                        {
                            string? output = await taskActivity.RunAsync(context, scheduledEvent.Input);
                            responseEvent = new TaskCompletedEvent(-1, scheduledEvent.EventId, output);
                        }
                        catch (Exception e) when (e is not TaskFailureException && !Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
                        {
                            // These are unexpected exceptions that occur in the task activity abstraction. Normal exceptions from 
                            // activities are expected to be translated into TaskFailureException and handled outside the middleware
                            // context (see further below).
                            TraceHelper.TraceExceptionInstance(TraceEventType.Error, "TaskActivityDispatcher-ProcessException", taskMessage.OrchestrationInstance, e);
                            string? details = this.IncludeDetails
                                ? $"Unhandled exception while executing task: {e}"
                                : null;
                            responseEvent = new TaskFailedEvent(-1, scheduledEvent.EventId, e.Message, details, new FailureDetails(e));

                            traceActivity?.SetStatus(ActivityStatusCode.Error, e.Message);

                            this.logHelper.TaskActivityFailure(orchestrationInstance, scheduledEvent.Name, (TaskFailedEvent)responseEvent, e);
                        }

                        var result = new ActivityExecutionResult { ResponseEvent = responseEvent };
                        dispatchContext.SetProperty(result);
                    });

                    result = dispatchContext.GetProperty<ActivityExecutionResult>();
                }
                catch (TaskFailureException e)
                {
                    // These are normal task activity failures. They can come from Activity implementations or from middleware.
                    TraceHelper.TraceExceptionInstance(TraceEventType.Error, "TaskActivityDispatcher-ProcessTaskFailure", taskMessage.OrchestrationInstance, e);
                    string? details = this.IncludeDetails ? e.Details : null;
                    var failureEvent = new TaskFailedEvent(-1, scheduledEvent.EventId, e.Message, details, e.FailureDetails);

                    traceActivity?.SetStatus(ActivityStatusCode.Error, e.Message);

                    this.logHelper.TaskActivityFailure(orchestrationInstance, scheduledEvent.Name, failureEvent, e);
                    CorrelationTraceClient.Propagate(() => CorrelationTraceClient.TrackException(e));
                    result = new ActivityExecutionResult { ResponseEvent = failureEvent };
                }
                catch (Exception middlewareException) when (!Utils.IsFatal(middlewareException))
                {
                    traceActivity?.SetStatus(ActivityStatusCode.Error, middlewareException.Message);

                    // These are considered retriable
                    this.logHelper.TaskActivityDispatcherError(workItem, $"Unhandled exception in activity middleware pipeline: {middlewareException}");
                    throw;
                }

                HistoryEvent? eventToRespond = result?.ResponseEvent;

                if (eventToRespond is TaskCompletedEvent completedEvent)
                {
                    this.logHelper.TaskActivityCompleted(orchestrationInstance, scheduledEvent.Name, completedEvent);
                }
                else if (eventToRespond is null)
                {
                    // Default response if middleware prevents a response from being generated
                    eventToRespond = new TaskCompletedEvent(-1, scheduledEvent.EventId, null);
                }

                var responseTaskMessage = new TaskMessage
                {
                    Event = eventToRespond,
                    OrchestrationInstance = orchestrationInstance
                };

                // Stop the trace activity here to avoid including the completion time in the latency calculation
                traceActivity?.Stop();

                await this.orchestrationService.CompleteTaskActivityWorkItemAsync(workItem, responseTaskMessage);
            }
            catch (SessionAbortedException e)
            {
                // The activity aborted its execution
                this.logHelper.TaskActivityAborted(orchestrationInstance, scheduledEvent!, e.Message);
                TraceHelper.TraceInstance(TraceEventType.Warning, "TaskActivityDispatcher-ExecutionAborted", orchestrationInstance, "{0}: {1}", scheduledEvent?.Name ?? "", e.Message);
                await this.orchestrationService.AbandonTaskActivityWorkItemAsync(workItem);
            }
            finally
            {
                diagnosticActivity?.Stop(); // Ensure the activity is stopped here to prevent it from leaking out.
                if (renewTask != null)
                {
                    renewCancellationTokenSource.Cancel();
                    try
                    {
                        // wait the renewTask finish
                        await renewTask;
                    }
                    catch (OperationCanceledException)
                    {
                        // ignore
                    }
                }
            }
        }