protected async Task OnProcessWorkItemAsync()

in src/DurableTask.Core/TaskEntityDispatcher.cs [199:409]


        protected async Task<bool> OnProcessWorkItemAsync(TaskOrchestrationWorkItem workItem)
        {
            OrchestrationRuntimeState originalOrchestrationRuntimeState = workItem.OrchestrationRuntimeState;

            OrchestrationRuntimeState runtimeState = workItem.OrchestrationRuntimeState;
            runtimeState.AddEvent(new OrchestratorStartedEvent(-1));

            Task renewTask = null;
            using var renewCancellationTokenSource = new CancellationTokenSource();
            if (workItem.LockedUntilUtc < DateTime.MaxValue)
            {
                // start a task to run RenewUntil
                renewTask = Task.Factory.StartNew(
                    () => TaskOrchestrationDispatcher.RenewUntil(workItem, this.orchestrationService, this.logHelper, nameof(TaskEntityDispatcher), renewCancellationTokenSource.Token),
                    renewCancellationTokenSource.Token);
            }

            WorkItemEffects effects = new WorkItemEffects()
            {
                ActivityMessages = new List<TaskMessage>(),
                TimerMessages = new List<TaskMessage>(),
                InstanceMessages = new List<TaskMessage>(),
                taskIdCounter = 0,
                InstanceId = workItem.InstanceId,
                RuntimeState = runtimeState,
            };

            try
            {
                // Assumes that: if the batch contains a new "ExecutionStarted" event, it is the first message in the batch.
                if (!TaskOrchestrationDispatcher.ReconcileMessagesWithState(workItem, nameof(TaskEntityDispatcher), this.errorPropagationMode, this.logHelper))
                {
                    // TODO : mark an orchestration as faulted if there is data corruption
                    this.logHelper.DroppingOrchestrationWorkItem(workItem, "Received work-item for an invalid orchestration");
                }
                else
                {

                    // we start with processing all the requests and figuring out which ones to execute now
                    // results can depend on whether the entity is locked, what the maximum batch size is,
                    // and whether the messages arrived out of order

                    this.DetermineWork(workItem.OrchestrationRuntimeState,
                         out SchedulerState schedulerState,
                         out Work workToDoNow);

                    if (workToDoNow.OperationCount > 0)
                    {
                        // execute the user-defined operations on this entity, via the middleware
                        var result = await this.ExecuteViaMiddlewareAsync(workToDoNow, runtimeState.OrchestrationInstance, schedulerState.EntityState);
                        var operationResults = result.Results!;

                        // if we encountered an error, record it as the result of the operations
                        // so that callers are notified that the operation did not succeed.
                        if (result.FailureDetails != null)
                        {
                            OperationResult errorResult = new OperationResult()
                            {
                                // for older SDKs only
                                Result = result.FailureDetails.ErrorMessage,
                                ErrorMessage = "entity dispatch failed",

                                // for newer SDKs only
                                FailureDetails = result.FailureDetails,
                            };

                            for (int i = operationResults.Count; i < workToDoNow.OperationCount; i++)
                            {
                                operationResults.Add(errorResult);
                            }
                        }

                        // go through all results
                        // for each operation that is not a signal, send a result message back to the calling orchestrator
                        for (int i = 0; i < result.Results!.Count; i++)
                        {
                            var req = workToDoNow.Operations[i];
                            if (!req.IsSignal)
                            {
                                this.SendResultMessage(effects, req, result.Results[i]);
                            }
                        }

                        if (result.Results.Count < workToDoNow.OperationCount)
                        {
                            // some requests were not processed (e.g. due to shutdown or timeout)
                            // in this case we just defer the work so it can be retried
                            var deferred = workToDoNow.RemoveDeferredWork(result.Results.Count);
                            schedulerState.PutBack(deferred);
                            workToDoNow.ToBeContinued(schedulerState);
                        }

                        // update the entity state based on the result
                        schedulerState.EntityState = result.EntityState;

                        // perform the actions
                        foreach (var action in result.Actions!)
                        {
                            switch (action)
                            {
                                case (SendSignalOperationAction sendSignalAction):
                                    this.SendSignalMessage(effects, schedulerState, sendSignalAction);
                                    break;
                                case (StartNewOrchestrationOperationAction startAction):
                                    this.ProcessSendStartMessage(effects, runtimeState, startAction);
                                    break;
                            }
                        }
                    }

                    // process the lock request, if any
                    if (workToDoNow.LockRequest != null)
                    {
                        this.ProcessLockRequest(effects, schedulerState, workToDoNow.LockRequest);
                    }

                    if (workToDoNow.ToBeRescheduled != null)
                    {
                        foreach (var request in workToDoNow.ToBeRescheduled)
                        {
                            // Reschedule all signals that were received before their time
                            this.SendScheduledSelfMessage(effects, request);
                        }
                    }

                    if (workToDoNow.SuspendAndContinue)
                    {
                        this.SendContinueSelfMessage(effects);
                    }

                    // this batch is complete. Since this is an entity, we now
                    // (always) start a new execution, as in continue-as-new

                    var serializedSchedulerState = this.SerializeSchedulerStateForNextExecution(schedulerState);
                    var nextExecutionStartedEvent = new ExecutionStartedEvent(-1, serializedSchedulerState)
                    {
                        OrchestrationInstance = new OrchestrationInstance
                        {
                            InstanceId = workItem.InstanceId,
                            ExecutionId = Guid.NewGuid().ToString("N")
                        },
                        Tags = runtimeState.Tags,
                        ParentInstance = runtimeState.ParentInstance,
                        Name = runtimeState.Name,
                        Version = runtimeState.Version
                    };
                    var entityStatus = new EntityStatus()
                    {
                        EntityExists = schedulerState.EntityExists,
                        BacklogQueueSize = schedulerState.Queue?.Count ?? 0,
                        LockedBy = schedulerState.LockedBy,
                    };
                    var serializedEntityStatus = JsonConvert.SerializeObject(entityStatus, Serializer.InternalSerializerSettings);

                    // create the new runtime state for the next execution
                    runtimeState = new OrchestrationRuntimeState();
                    runtimeState.Status = serializedEntityStatus;
                    runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
                    runtimeState.AddEvent(nextExecutionStartedEvent);
                    runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
                }
            }
            finally
            {
                if (renewTask != null)
                {
                    try
                    {
                        renewCancellationTokenSource.Cancel();
                        await renewTask;
                    }
                    catch (ObjectDisposedException)
                    {
                        // ignore
                    }
                    catch (OperationCanceledException)
                    {
                        // ignore
                    }
                }
            }

            OrchestrationState instanceState = (runtimeState.ExecutionStartedEvent != null) ?
                instanceState = Utils.BuildOrchestrationState(runtimeState) : null;      

            if (workItem.RestoreOriginalRuntimeStateDuringCompletion)
            {
                // some backends expect the original runtime state object
                workItem.OrchestrationRuntimeState = originalOrchestrationRuntimeState;
            }
            else
            {
                workItem.OrchestrationRuntimeState = runtimeState;
            }

            await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync(
                workItem,
                runtimeState,
                effects.ActivityMessages,
                effects.InstanceMessages,
                effects.TimerMessages,
                null,
                instanceState);

            if (workItem.RestoreOriginalRuntimeStateDuringCompletion)
            {
                workItem.OrchestrationRuntimeState = runtimeState;
            }

            return true;
        }