public async Task CallOrchestratorAsync()

in src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs [48:284]


        public async Task CallOrchestratorAsync(DispatchMiddlewareContext dispatchContext, Func<Task> next)
        {
            OrchestrationRuntimeState? runtimeState = dispatchContext.GetProperty<OrchestrationRuntimeState>();
            if (runtimeState == null)
            {
                // This should never happen, but it's almost certainly non-retryable if it does.
                dispatchContext.SetProperty(OrchestratorExecutionResult.ForFailure(
                    message: "Orchestration runtime state was missing!",
                    details: null));
                return;
            }

            OrchestrationInstance? instance = dispatchContext.GetProperty<OrchestrationInstance>();
            if (instance == null)
            {
                // This should never happen, but it's almost certainly non-retryable if it does.
                dispatchContext.SetProperty(OrchestratorExecutionResult.ForFailure(
                    message: "Instance ID metadata was missing!",
                    details: null));
                return;
            }

            FunctionName functionName = new FunctionName(runtimeState.Name);
            RegisteredFunctionInfo? function = this.extension.GetOrchestratorInfo(functionName);
            if (function == null)
            {
                // Fail the orchestration with an error explaining that the function name is invalid.
                string errorMessage = this.extension.GetInvalidOrchestratorFunctionMessage(functionName.Name);
                dispatchContext.SetProperty(OrchestratorExecutionResult.ForFailure(errorMessage, details: null));
                return;
            }

            ExecutionStartedEvent? startEvent = runtimeState.ExecutionStartedEvent;
            if (startEvent == null)
            {
                // This should never happen, but it's almost certainly non-retriable if it does.
                dispatchContext.SetProperty(OrchestratorExecutionResult.ForFailure(
                    message: "ExecutionStartedEvent was missing from runtime state!",
                    details: null));
                return;
            }

            TaskOrchestrationEntityParameters? entityParameters = dispatchContext.GetProperty<TaskOrchestrationEntityParameters>();

            bool isReplaying = runtimeState.PastEvents.Any();

            this.TraceHelper.FunctionStarting(
                this.Options.HubName,
                functionName.Name,
                instance.InstanceId,
                startEvent.Input,
                FunctionType.Orchestrator,
                isReplaying);

            // One-time logging/notifications for when the orchestration first starts.
            if (!isReplaying)
            {
                DurableTaskExtension.TagActivityWithOrchestrationStatus(OrchestrationRuntimeStatus.Running, instance.InstanceId);
                await this.LifeCycleNotificationHelper.OrchestratorStartingAsync(
                    this.Options.HubName,
                    functionName.Name,
                    instance.InstanceId,
                    isReplay: false);
            }

            var context = new RemoteOrchestratorContext(runtimeState, entityParameters);

            var input = new TriggeredFunctionData
            {
                TriggerValue = context,
#pragma warning disable CS0618 // Type or member is obsolete (not intended for general public use)
                InvokeHandler = async functionInvoker =>
                {
                    // Invoke the function and look for a return value. Trigger return values are an undocumented feature that we depend on.
                    Task invokeTask = functionInvoker();
                    if (invokeTask is not Task<object> invokeTaskWithResult)
                    {
                        // This should never happen
                        throw new InvalidOperationException("The internal function invoker returned a task that does not support return values!");
                    }

                    // The return value is expected to be a JSON string of a well-known schema.
                    string? triggerReturnValue = (await invokeTaskWithResult) as string;
                    if (string.IsNullOrEmpty(triggerReturnValue))
                    {
                        throw new InvalidOperationException(
                            "The function invocation resulted in a null response. This means that either the orchestrator function was implemented " +
                            "incorrectly, the Durable Task language SDK was implemented incorrectly, or that the destination language worker is not " +
                            "sending the function result back to the host.");
                    }

                    byte[] triggerReturnValueBytes = Convert.FromBase64String(triggerReturnValue);
                    P.OrchestratorResponse response = P.OrchestratorResponse.Parser.ParseFrom(triggerReturnValueBytes);

                    // TrySetResult may throw if a platform-level error is encountered (like an out of memory exception).
                    context.SetResult(
                        response.Actions.Select(ProtobufUtils.ToOrchestratorAction),
                        response.CustomStatus);

                    // Here we throw if the orchestrator completed with an application-level error. When we do this,
                    // the function's result type will be of type `OrchestrationFailureException` which is reserved
                    // for application-level errors that do not need to be re-tried.
                    context.ThrowIfFailed();
                },
#pragma warning restore CS0618 // Type or member is obsolete (not intended for general public use)
            };

            FunctionResult functionResult;
            try
            {
                functionResult = await function.Executor.TryExecuteAsync(
                    input,
                    cancellationToken: this.HostLifetimeService.OnStopping);
                if (!functionResult.Succeeded)
                {
                    // Shutdown can surface as a completed invocation in a failed state.
                    // Re-throw so we can abort this invocation.
                    this.HostLifetimeService.OnStopping.ThrowIfCancellationRequested();
                }

                // we abort the invocation on "platform level errors" such as:
                // - a timeout
                // - an out of memory exception
                // - a worker process exit
                if (functionResult.Exception is Host.FunctionTimeoutException
                    || functionResult.Exception?.InnerException is SessionAbortedException // see RemoteOrchestrationContext.TrySetResultInternal for details on OOM-handling
                    || (functionResult.Exception?.InnerException?.GetType().ToString().Contains("WorkerProcessExitException") ?? false))
                {
                    // TODO: the `WorkerProcessExitException` type is not exposed in our dependencies, it's part of WebJobs.Host.Script.
                    // Should we add that dependency or should it be exposed in WebJobs.Host?
                    throw functionResult.Exception;
                }
            }
            catch (Exception hostRuntimeException)
            {
                string reason = this.HostLifetimeService.OnStopping.IsCancellationRequested ?
                    "The Functions/WebJobs runtime is shutting down!" :
                    $"Unhandled exception in the Functions/WebJobs runtime: {hostRuntimeException}";

                this.TraceHelper.FunctionAborted(
                    this.Options.HubName,
                    functionName.Name,
                    instance.InstanceId,
                    reason,
                    functionType: FunctionType.Orchestrator);

                // This will abort the current execution and force an durable retry
                throw new SessionAbortedException(reason);
            }

            OrchestratorExecutionResult orchestratorResult;
            if (functionResult.Succeeded)
            {
                orchestratorResult = context.GetResult();

                if (context.OrchestratorCompleted)
                {
                    this.TraceHelper.FunctionCompleted(
                        this.Options.HubName,
                        functionName.Name,
                        instance.InstanceId,
                        context.SerializedOutput,
                        context.ContinuedAsNew,
                        FunctionType.Orchestrator,
                        isReplay: false);

                    DurableTaskExtension.TagActivityWithOrchestrationStatus(
                        OrchestrationRuntimeStatus.Completed,
                        instance.InstanceId);

                    await this.LifeCycleNotificationHelper.OrchestratorCompletedAsync(
                        this.Options.HubName,
                        functionName.Name,
                        instance.InstanceId,
                        context.ContinuedAsNew,
                        isReplay: false);
                }
                else
                {
                    this.TraceHelper.FunctionAwaited(
                        this.Options.HubName,
                        functionName.Name,
                        FunctionType.Orchestrator,
                        instance.InstanceId,
                        isReplay: false);
                }
            }
            else if (context.TryGetOrchestrationErrorDetails(out Exception? exception))
            {
                // the function failed because the orchestrator failed.

                orchestratorResult = context.GetResult();

                this.TraceHelper.FunctionFailed(
                     this.Options.HubName,
                     functionName.Name,
                     instance.InstanceId,
                     exception,
                     FunctionType.Orchestrator,
                     isReplay: false);

                await this.LifeCycleNotificationHelper.OrchestratorFailedAsync(
                    this.Options.HubName,
                    functionName.Name,
                    instance.InstanceId,
                    exception?.Message ?? string.Empty,
                    isReplay: false);
            }
            else
            {
                // the function failed for some other reason
                string exceptionDetails = functionResult.Exception?.ToString() ?? "Framework-internal message: exception details could not be extracted";

                this.TraceHelper.FunctionFailed(
                    this.Options.HubName,
                    functionName.Name,
                    instance.InstanceId,
                    functionResult.Exception,
                    FunctionType.Orchestrator,
                    isReplay: false);

                await this.LifeCycleNotificationHelper.OrchestratorFailedAsync(
                    this.Options.HubName,
                    functionName.Name,
                    instance.InstanceId,
                    exceptionDetails,
                    isReplay: false);

                orchestratorResult = OrchestratorExecutionResult.ForFailure(
                    message: $"Function '{functionName}' failed with an unhandled exception.",
                    functionResult.Exception ?? new Exception($"Function '{functionName}' failed with an unknown unhandled exception"));
            }

            // Send the result of the orchestrator function to the DTFx dispatch pipeline.
            // This allows us to bypass the default, in-process execution and process the given results immediately.
            dispatchContext.SetProperty(orchestratorResult);
        }