in src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs [48:284]
public async Task CallOrchestratorAsync(DispatchMiddlewareContext dispatchContext, Func<Task> next)
{
OrchestrationRuntimeState? runtimeState = dispatchContext.GetProperty<OrchestrationRuntimeState>();
if (runtimeState == null)
{
// This should never happen, but it's almost certainly non-retryable if it does.
dispatchContext.SetProperty(OrchestratorExecutionResult.ForFailure(
message: "Orchestration runtime state was missing!",
details: null));
return;
}
OrchestrationInstance? instance = dispatchContext.GetProperty<OrchestrationInstance>();
if (instance == null)
{
// This should never happen, but it's almost certainly non-retryable if it does.
dispatchContext.SetProperty(OrchestratorExecutionResult.ForFailure(
message: "Instance ID metadata was missing!",
details: null));
return;
}
FunctionName functionName = new FunctionName(runtimeState.Name);
RegisteredFunctionInfo? function = this.extension.GetOrchestratorInfo(functionName);
if (function == null)
{
// Fail the orchestration with an error explaining that the function name is invalid.
string errorMessage = this.extension.GetInvalidOrchestratorFunctionMessage(functionName.Name);
dispatchContext.SetProperty(OrchestratorExecutionResult.ForFailure(errorMessage, details: null));
return;
}
ExecutionStartedEvent? startEvent = runtimeState.ExecutionStartedEvent;
if (startEvent == null)
{
// This should never happen, but it's almost certainly non-retriable if it does.
dispatchContext.SetProperty(OrchestratorExecutionResult.ForFailure(
message: "ExecutionStartedEvent was missing from runtime state!",
details: null));
return;
}
TaskOrchestrationEntityParameters? entityParameters = dispatchContext.GetProperty<TaskOrchestrationEntityParameters>();
bool isReplaying = runtimeState.PastEvents.Any();
this.TraceHelper.FunctionStarting(
this.Options.HubName,
functionName.Name,
instance.InstanceId,
startEvent.Input,
FunctionType.Orchestrator,
isReplaying);
// One-time logging/notifications for when the orchestration first starts.
if (!isReplaying)
{
DurableTaskExtension.TagActivityWithOrchestrationStatus(OrchestrationRuntimeStatus.Running, instance.InstanceId);
await this.LifeCycleNotificationHelper.OrchestratorStartingAsync(
this.Options.HubName,
functionName.Name,
instance.InstanceId,
isReplay: false);
}
var context = new RemoteOrchestratorContext(runtimeState, entityParameters);
var input = new TriggeredFunctionData
{
TriggerValue = context,
#pragma warning disable CS0618 // Type or member is obsolete (not intended for general public use)
InvokeHandler = async functionInvoker =>
{
// Invoke the function and look for a return value. Trigger return values are an undocumented feature that we depend on.
Task invokeTask = functionInvoker();
if (invokeTask is not Task<object> invokeTaskWithResult)
{
// This should never happen
throw new InvalidOperationException("The internal function invoker returned a task that does not support return values!");
}
// The return value is expected to be a JSON string of a well-known schema.
string? triggerReturnValue = (await invokeTaskWithResult) as string;
if (string.IsNullOrEmpty(triggerReturnValue))
{
throw new InvalidOperationException(
"The function invocation resulted in a null response. This means that either the orchestrator function was implemented " +
"incorrectly, the Durable Task language SDK was implemented incorrectly, or that the destination language worker is not " +
"sending the function result back to the host.");
}
byte[] triggerReturnValueBytes = Convert.FromBase64String(triggerReturnValue);
P.OrchestratorResponse response = P.OrchestratorResponse.Parser.ParseFrom(triggerReturnValueBytes);
// TrySetResult may throw if a platform-level error is encountered (like an out of memory exception).
context.SetResult(
response.Actions.Select(ProtobufUtils.ToOrchestratorAction),
response.CustomStatus);
// Here we throw if the orchestrator completed with an application-level error. When we do this,
// the function's result type will be of type `OrchestrationFailureException` which is reserved
// for application-level errors that do not need to be re-tried.
context.ThrowIfFailed();
},
#pragma warning restore CS0618 // Type or member is obsolete (not intended for general public use)
};
FunctionResult functionResult;
try
{
functionResult = await function.Executor.TryExecuteAsync(
input,
cancellationToken: this.HostLifetimeService.OnStopping);
if (!functionResult.Succeeded)
{
// Shutdown can surface as a completed invocation in a failed state.
// Re-throw so we can abort this invocation.
this.HostLifetimeService.OnStopping.ThrowIfCancellationRequested();
}
// we abort the invocation on "platform level errors" such as:
// - a timeout
// - an out of memory exception
// - a worker process exit
if (functionResult.Exception is Host.FunctionTimeoutException
|| functionResult.Exception?.InnerException is SessionAbortedException // see RemoteOrchestrationContext.TrySetResultInternal for details on OOM-handling
|| (functionResult.Exception?.InnerException?.GetType().ToString().Contains("WorkerProcessExitException") ?? false))
{
// TODO: the `WorkerProcessExitException` type is not exposed in our dependencies, it's part of WebJobs.Host.Script.
// Should we add that dependency or should it be exposed in WebJobs.Host?
throw functionResult.Exception;
}
}
catch (Exception hostRuntimeException)
{
string reason = this.HostLifetimeService.OnStopping.IsCancellationRequested ?
"The Functions/WebJobs runtime is shutting down!" :
$"Unhandled exception in the Functions/WebJobs runtime: {hostRuntimeException}";
this.TraceHelper.FunctionAborted(
this.Options.HubName,
functionName.Name,
instance.InstanceId,
reason,
functionType: FunctionType.Orchestrator);
// This will abort the current execution and force an durable retry
throw new SessionAbortedException(reason);
}
OrchestratorExecutionResult orchestratorResult;
if (functionResult.Succeeded)
{
orchestratorResult = context.GetResult();
if (context.OrchestratorCompleted)
{
this.TraceHelper.FunctionCompleted(
this.Options.HubName,
functionName.Name,
instance.InstanceId,
context.SerializedOutput,
context.ContinuedAsNew,
FunctionType.Orchestrator,
isReplay: false);
DurableTaskExtension.TagActivityWithOrchestrationStatus(
OrchestrationRuntimeStatus.Completed,
instance.InstanceId);
await this.LifeCycleNotificationHelper.OrchestratorCompletedAsync(
this.Options.HubName,
functionName.Name,
instance.InstanceId,
context.ContinuedAsNew,
isReplay: false);
}
else
{
this.TraceHelper.FunctionAwaited(
this.Options.HubName,
functionName.Name,
FunctionType.Orchestrator,
instance.InstanceId,
isReplay: false);
}
}
else if (context.TryGetOrchestrationErrorDetails(out Exception? exception))
{
// the function failed because the orchestrator failed.
orchestratorResult = context.GetResult();
this.TraceHelper.FunctionFailed(
this.Options.HubName,
functionName.Name,
instance.InstanceId,
exception,
FunctionType.Orchestrator,
isReplay: false);
await this.LifeCycleNotificationHelper.OrchestratorFailedAsync(
this.Options.HubName,
functionName.Name,
instance.InstanceId,
exception?.Message ?? string.Empty,
isReplay: false);
}
else
{
// the function failed for some other reason
string exceptionDetails = functionResult.Exception?.ToString() ?? "Framework-internal message: exception details could not be extracted";
this.TraceHelper.FunctionFailed(
this.Options.HubName,
functionName.Name,
instance.InstanceId,
functionResult.Exception,
FunctionType.Orchestrator,
isReplay: false);
await this.LifeCycleNotificationHelper.OrchestratorFailedAsync(
this.Options.HubName,
functionName.Name,
instance.InstanceId,
exceptionDetails,
isReplay: false);
orchestratorResult = OrchestratorExecutionResult.ForFailure(
message: $"Function '{functionName}' failed with an unhandled exception.",
functionResult.Exception ?? new Exception($"Function '{functionName}' failed with an unknown unhandled exception"));
}
// Send the result of the orchestrator function to the DTFx dispatch pipeline.
// This allows us to bypass the default, in-process execution and process the given results immediately.
dispatchContext.SetProperty(orchestratorResult);
}