private async Task OrchestrationMiddleware()

in src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs [607:719]


        private async Task OrchestrationMiddleware(DispatchMiddlewareContext dispatchContext, Func<Task> next)
        {
            TaskOrchestrationShim shim = dispatchContext.GetProperty<TaskOrchestration>() as TaskOrchestrationShim;
            if (shim == null)
            {
                // This is not an orchestration - skip.
                await next();
                return;
            }

            DurableOrchestrationContext context = (DurableOrchestrationContext)shim.Context;

            OrchestrationRuntimeState orchestrationRuntimeState = dispatchContext.GetProperty<OrchestrationRuntimeState>();

            if (orchestrationRuntimeState.ParentInstance != null)
            {
                context.ParentInstanceId = orchestrationRuntimeState.ParentInstance.OrchestrationInstance.InstanceId;
            }

            context.InstanceId = orchestrationRuntimeState.OrchestrationInstance?.InstanceId;
            context.ExecutionId = orchestrationRuntimeState.OrchestrationInstance?.ExecutionId;
            context.IsReplaying = orchestrationRuntimeState.ExecutionStartedEvent.IsPlayed;
            context.History = orchestrationRuntimeState.Events;
            context.RawInput = orchestrationRuntimeState.Input;

            RegisteredFunctionInfo info = shim.GetFunctionInfo();
            if (info == null)
            {
                string message = this.GetInvalidOrchestratorFunctionMessage(context.FunctionName);

                this.TraceHelper.ExtensionWarningEvent(
                    this.Options.HubName,
                    orchestrationRuntimeState.Name,
                    orchestrationRuntimeState.OrchestrationInstance.InstanceId,
                    message);

                Func<Task<OrchestrationFailureException>> nonExistentException = () => throw new OrchestrationFailureException(message);
                shim.SetFunctionInvocationCallback(nonExistentException);
                await next();
            }
            else
            {
                // 1. Start the functions invocation pipeline (billing, logging, bindings, and timeout tracking).
                WrappedFunctionResult result = await FunctionExecutionHelper.ExecuteFunctionInOrchestrationMiddleware(
                    info.Executor,
                    new TriggeredFunctionData
                    {
                        TriggerValue = context,

#pragma warning disable CS0618 // Approved for use by this extension
                        InvokeHandler = async userCodeInvoker =>
                        {
                            // We yield control to ensure this code is executed asynchronously relative to WebJobs.
                            // This ensures WebJobs is able to correctly cancel the invocation in the case of a timeout.

                            await Task.Yield();
                            context.ExecutorCalledBack = true;

                            // 2. Configure the shim with the inner invoker to execute the user code.
                            shim.SetFunctionInvocationCallback(userCodeInvoker);

                            // 3. Move to the next stage of the DTFx pipeline to trigger the orchestrator shim.
                            await next();

                            // 4. If an activity failed, indicate to the functions Host that this execution failed via an exception
                            if (context.IsCompleted && context.OrchestrationException != null)
                            {
                                context.OrchestrationException.Throw();
                            }
                        },
#pragma warning restore CS0618
                    },
                    shim,
                    context,
                    this.HostLifetimeService.OnStopping);

                if (result.ExecutionStatus == WrappedFunctionResult.FunctionResultStatus.FunctionsRuntimeError
                    || result.ExecutionStatus == WrappedFunctionResult.FunctionResultStatus.FunctionsHostStoppingError
                    || result.ExecutionStatus == WrappedFunctionResult.FunctionResultStatus.FunctionTimeoutError)
                {
                    this.TraceHelper.FunctionAborted(
                        this.Options.HubName,
                        context.FunctionName,
                        context.InstanceId,
                        $"An internal error occurred while attempting to execute this function. The execution will be aborted and retried. Details: {result.Exception}",
                        functionType: FunctionType.Orchestrator);

                    // This will abort the execution and cause the message to go back onto the queue for re-processing
                    throw new SessionAbortedException(
                        $"An internal error occurred while attempting to execute '{context.FunctionName}'.", result.Exception);
                }
            }

            if (!context.IsCompleted && !context.IsLongRunningTimer)
            {
                this.TraceHelper.FunctionAwaited(
                    context.HubName,
                    context.Name,
                    FunctionType.Orchestrator,
                    context.InstanceId,
                    context.IsReplaying);
            }

            if (context.IsCompleted &&
                context.PreserveUnprocessedEvents)
            {
                // Reschedule any unprocessed external events so that they can be picked up
                // in the next iteration.
                context.RescheduleBufferedExternalEvents();
            }

            await context.RunDeferredTasks();
        }