in src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs [607:719]
private async Task OrchestrationMiddleware(DispatchMiddlewareContext dispatchContext, Func<Task> next)
{
TaskOrchestrationShim shim = dispatchContext.GetProperty<TaskOrchestration>() as TaskOrchestrationShim;
if (shim == null)
{
// This is not an orchestration - skip.
await next();
return;
}
DurableOrchestrationContext context = (DurableOrchestrationContext)shim.Context;
OrchestrationRuntimeState orchestrationRuntimeState = dispatchContext.GetProperty<OrchestrationRuntimeState>();
if (orchestrationRuntimeState.ParentInstance != null)
{
context.ParentInstanceId = orchestrationRuntimeState.ParentInstance.OrchestrationInstance.InstanceId;
}
context.InstanceId = orchestrationRuntimeState.OrchestrationInstance?.InstanceId;
context.ExecutionId = orchestrationRuntimeState.OrchestrationInstance?.ExecutionId;
context.IsReplaying = orchestrationRuntimeState.ExecutionStartedEvent.IsPlayed;
context.History = orchestrationRuntimeState.Events;
context.RawInput = orchestrationRuntimeState.Input;
RegisteredFunctionInfo info = shim.GetFunctionInfo();
if (info == null)
{
string message = this.GetInvalidOrchestratorFunctionMessage(context.FunctionName);
this.TraceHelper.ExtensionWarningEvent(
this.Options.HubName,
orchestrationRuntimeState.Name,
orchestrationRuntimeState.OrchestrationInstance.InstanceId,
message);
Func<Task<OrchestrationFailureException>> nonExistentException = () => throw new OrchestrationFailureException(message);
shim.SetFunctionInvocationCallback(nonExistentException);
await next();
}
else
{
// 1. Start the functions invocation pipeline (billing, logging, bindings, and timeout tracking).
WrappedFunctionResult result = await FunctionExecutionHelper.ExecuteFunctionInOrchestrationMiddleware(
info.Executor,
new TriggeredFunctionData
{
TriggerValue = context,
#pragma warning disable CS0618 // Approved for use by this extension
InvokeHandler = async userCodeInvoker =>
{
// We yield control to ensure this code is executed asynchronously relative to WebJobs.
// This ensures WebJobs is able to correctly cancel the invocation in the case of a timeout.
await Task.Yield();
context.ExecutorCalledBack = true;
// 2. Configure the shim with the inner invoker to execute the user code.
shim.SetFunctionInvocationCallback(userCodeInvoker);
// 3. Move to the next stage of the DTFx pipeline to trigger the orchestrator shim.
await next();
// 4. If an activity failed, indicate to the functions Host that this execution failed via an exception
if (context.IsCompleted && context.OrchestrationException != null)
{
context.OrchestrationException.Throw();
}
},
#pragma warning restore CS0618
},
shim,
context,
this.HostLifetimeService.OnStopping);
if (result.ExecutionStatus == WrappedFunctionResult.FunctionResultStatus.FunctionsRuntimeError
|| result.ExecutionStatus == WrappedFunctionResult.FunctionResultStatus.FunctionsHostStoppingError
|| result.ExecutionStatus == WrappedFunctionResult.FunctionResultStatus.FunctionTimeoutError)
{
this.TraceHelper.FunctionAborted(
this.Options.HubName,
context.FunctionName,
context.InstanceId,
$"An internal error occurred while attempting to execute this function. The execution will be aborted and retried. Details: {result.Exception}",
functionType: FunctionType.Orchestrator);
// This will abort the execution and cause the message to go back onto the queue for re-processing
throw new SessionAbortedException(
$"An internal error occurred while attempting to execute '{context.FunctionName}'.", result.Exception);
}
}
if (!context.IsCompleted && !context.IsLongRunningTimer)
{
this.TraceHelper.FunctionAwaited(
context.HubName,
context.Name,
FunctionType.Orchestrator,
context.InstanceId,
context.IsReplaying);
}
if (context.IsCompleted &&
context.PreserveUnprocessedEvents)
{
// Reschedule any unprocessed external events so that they can be picked up
// in the next iteration.
context.RescheduleBufferedExternalEvents();
}
await context.RunDeferredTasks();
}