private async Task EntityMiddleware()

in src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs [727:1026]


        private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, Func<Task> next)
        {
            var entityShim = dispatchContext.GetProperty<TaskOrchestration>() as TaskEntityShim;
            if (entityShim == null)
            {
                // This is not an entity - skip.
                await next();
                return;
            }

            OrchestrationRuntimeState runtimeState = dispatchContext.GetProperty<OrchestrationRuntimeState>();
            DurableEntityContext entityContext = (DurableEntityContext)entityShim.Context;
            entityContext.InstanceId = runtimeState.OrchestrationInstance.InstanceId;
            entityContext.ExecutionId = runtimeState.OrchestrationInstance.ExecutionId;
            entityContext.History = runtimeState.Events;
            entityContext.RawInput = runtimeState.Input;

            Queue<RequestMessage> lockHolderMessages = null;

            try
            {
                entityShim.AddTraceFlag('1'); // add a bread crumb for the entity batch tracing

                // 1. First time through the history
                // we count events, add any under-lock op to the batch, and process lock releases
                foreach (HistoryEvent e in runtimeState.Events)
                {
                    switch (e.EventType)
                    {
                        case EventType.ExecutionStarted:
                            entityShim.Rehydrate(runtimeState.Input);
                            break;

                        case EventType.EventRaised:
                            EventRaisedEvent eventRaisedEvent = (EventRaisedEvent)e;

                            this.TraceHelper.DeliveringEntityMessage(
                                entityContext.InstanceId,
                                entityContext.ExecutionId,
                                e.EventId,
                                eventRaisedEvent.Name,
                                eventRaisedEvent.Input);

                            entityShim.NumberEventsToReceive++;

                            if (EntityMessageEventNames.IsRequestMessage(eventRaisedEvent.Name))
                            {
                                // we are receiving an operation request or a lock request
                                var requestMessage = this.MessageDataConverter.Deserialize<RequestMessage>(eventRaisedEvent.Input);

                                IEnumerable<RequestMessage> deliverNow;

                                if (requestMessage.ScheduledTime.HasValue)
                                {
                                    if ((requestMessage.ScheduledTime.Value - DateTime.UtcNow) > TimeSpan.FromMilliseconds(100))
                                    {
                                        // message was delivered too early. This can happen if the durability provider imposes
                                        // a limit on the delay. We handle this by rescheduling the message instead of processing it.
                                        deliverNow = Array.Empty<RequestMessage>();
                                        entityShim.AddMessageToBeRescheduled(requestMessage);
                                    }
                                    else
                                    {
                                        // the message is scheduled to be delivered immediately.
                                        // There are no FIFO guarantees for scheduled messages, so we skip the message sorter.
                                        deliverNow = new RequestMessage[] { requestMessage };
                                    }
                                }
                                else
                                {
                                    // run this through the message sorter to help with reordering and duplicate filtering
                                    deliverNow = entityContext.State.MessageSorter.ReceiveInOrder(requestMessage, this.MessageReorderWindow);
                                }

                                foreach (var message in deliverNow)
                                {
                                    if (entityContext.State.LockedBy != null
                                        && entityContext.State.LockedBy == message.ParentInstanceId)
                                    {
                                        if (lockHolderMessages == null)
                                        {
                                            lockHolderMessages = new Queue<RequestMessage>();
                                        }

                                        lockHolderMessages.Enqueue(message);
                                    }
                                    else
                                    {
                                        entityContext.State.Enqueue(message);
                                    }
                                }
                            }
                            else if (EntityMessageEventNames.IsReleaseMessage(eventRaisedEvent.Name))
                            {
                                // we are receiving a lock release
                                var message = this.MessageDataConverter.Deserialize<ReleaseMessage>(eventRaisedEvent.Input);

                                if (entityContext.State.LockedBy == message.ParentInstanceId)
                                {
                                    this.TraceHelper.EntityLockReleased(
                                        entityContext.HubName,
                                        entityContext.Name,
                                        entityContext.InstanceId,
                                        message.ParentInstanceId,
                                        message.LockRequestId,
                                        isReplay: false);

                                    entityContext.State.LockedBy = null;
                                }
                            }
                            else
                            {
                                // this is a continue message.
                                // Resumes processing of previously queued operations, if any.
                                entityContext.State.Suspended = false;
                                entityShim.AddTraceFlag(EntityTraceFlags.Resumed);
                            }

                            break;
                    }
                }

                // lock holder messages go to the front of the queue
                if (lockHolderMessages != null)
                {
                    entityContext.State.PutBack(lockHolderMessages);
                }

                // mitigation for ICM358210295 : if an entity has been in suspended state for at least 10 seconds, resume
                // (suspended state is never meant to last long, it is needed just so the history gets persisted to storage)
                if (entityContext.State.Suspended
                    && runtimeState.ExecutionStartedEvent?.Timestamp < DateTime.UtcNow - TimeSpan.FromSeconds(10))
                {
                    entityContext.State.Suspended = false;
                    entityShim.AddTraceFlag(EntityTraceFlags.MitigationResumed);
                }

                if (!entityContext.State.Suspended)
                {
                    entityShim.AddTraceFlag('2');

                    // 2. We add as many requests from the queue to the batch as possible,
                    // stopping at lock requests or when the maximum batch size is reached
                    while (entityContext.State.MayDequeue())
                    {
                        if (entityShim.OperationBatch.Count == this.Options.MaxEntityOperationBatchSize)
                        {
                            // we have reached the maximum batch size already
                            // insert a delay after this batch to ensure write back
                            entityShim.AddTraceFlag(EntityTraceFlags.BatchSizeLimit);
                            entityShim.ToBeContinuedWithDelay();
                            break;
                        }

                        var request = entityContext.State.Dequeue();

                        if (request.IsLockRequest)
                        {
                            entityShim.AddLockRequestToBatch(request);
                            break;
                        }
                        else
                        {
                            entityShim.AddOperationToBatch(request);
                        }
                    }
                }
            }
            catch (Exception e)
            {
                entityContext.CaptureInternalError(e, entityShim);
            }

            WrappedFunctionResult result;

            if (entityShim.OperationBatch.Count > 0 && !this.HostLifetimeService.OnStopping.IsCancellationRequested)
            {
                // 3a. (function execution) Start the functions invocation pipeline (billing, logging, bindings, and timeout tracking).
                result = await FunctionExecutionHelper.ExecuteFunctionInOrchestrationMiddleware(
                    entityShim.GetFunctionInfo().Executor,
                    new TriggeredFunctionData
                    {
                        TriggerValue = entityShim.Context,
#pragma warning disable CS0618 // Approved for use by this extension
                        InvokeHandler = async userCodeInvoker =>
                            {
                                entityContext.ExecutorCalledBack = true;

                                entityShim.SetFunctionInvocationCallback(userCodeInvoker);

                                this.TraceHelper.FunctionStarting(
                                    entityContext.HubName,
                                    entityContext.Name,
                                    entityContext.InstanceId,
                                    runtimeState.Input,
                                    FunctionType.Entity,
                                    isReplay: false);

                                entityShim.AddTraceFlag('3');

                                // 3. Run all the operations in the batch
                                if (entityContext.InternalError == null)
                                {
                                    try
                                    {
                                        await entityShim.ExecuteBatch(this.HostLifetimeService.OnStopping);
                                    }
                                    catch (Exception e)
                                    {
                                        entityContext.CaptureInternalError(e, entityShim);
                                    }
                                }

                                entityShim.AddTraceFlag('4');

                                // 4. Run the DTFx orchestration to persist the effects,
                                // send the outbox, and continue as new
                                await next();

                                // 5. If there were internal or application errors, trace them for DF
                                if (entityContext.ErrorsPresent(out string description, out string sanitizedError))
                                {
                                    this.TraceHelper.FunctionFailed(
                                        entityContext.HubName,
                                        entityContext.Name,
                                        entityContext.InstanceId,
                                        description,
                                        sanitizedReason: sanitizedError,
                                        functionType: FunctionType.Entity,
                                        isReplay: false);
                                }
                                else
                                {
                                    this.TraceHelper.FunctionCompleted(
                                        entityContext.HubName,
                                        entityContext.Name,
                                        entityContext.InstanceId,
                                        entityContext.State.EntityState,
                                        continuedAsNew: true,
                                        functionType: FunctionType.Entity,
                                        isReplay: false);
                                }

                                // 6. If there were internal or application errors, also rethrow them here so the functions host gets to see them
                                entityContext.ThrowInternalExceptionIfAny();
                                entityContext.ThrowApplicationExceptionsIfAny();
                            },
#pragma warning restore CS0618
                    },
                    entityShim,
                    entityContext,
                    this.HostLifetimeService.OnStopping);

                if (result.ExecutionStatus == WrappedFunctionResult.FunctionResultStatus.FunctionTimeoutError)
                {
                    await entityShim.TimeoutTask;
                }

                if (result.ExecutionStatus == WrappedFunctionResult.FunctionResultStatus.FunctionsRuntimeError
                    || result.ExecutionStatus == WrappedFunctionResult.FunctionResultStatus.FunctionsHostStoppingError)
                {
                    this.TraceHelper.FunctionAborted(
                      this.Options.HubName,
                      entityContext.FunctionName,
                      entityContext.InstanceId,
                      $"An internal error occurred while attempting to execute this function. The execution will be aborted and retried. Details: {result.Exception}",
                      functionType: FunctionType.Entity);

                    // This will abort the execution and cause the message to go back onto the queue for re-processing
                    throw new SessionAbortedException(
                        $"An internal error occurred while attempting to execute '{entityContext.FunctionName}'.",
                        result.Exception);
                }
            }
            else
            {
                entityShim.AddTraceFlag(EntityTraceFlags.DirectExecution);

                // 3b. (direct execution) We do not need to call into user code because we are not going to run any operations.
                // In this case we can execute without involving the functions runtime.
                if (entityContext.InternalError == null)
                {
                    try
                    {
                        await entityShim.ExecuteBatch(this.HostLifetimeService.OnStopping);
                        await next();
                    }
                    catch (Exception e)
                    {
                        entityContext.CaptureInternalError(e, entityShim);
                    }
                }
            }

            // If there were internal errors, throw a SessionAbortedException
            // here so DTFx can abort the batch and back off the work item
            entityContext.AbortOnInternalError(entityShim.TraceFlags);

            await entityContext.RunDeferredTasks();
        }