in src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs [464:612]
public async Task CallActivityAsync(DispatchMiddlewareContext dispatchContext, Func<Task> next)
{
TaskScheduledEvent? scheduledEvent = dispatchContext.GetProperty<TaskScheduledEvent>();
if (scheduledEvent == null)
{
// This should never happen, and there's no good response we can return if it does.
throw new InvalidOperationException($"An activity was scheduled but no {nameof(TaskScheduledEvent)} was found!");
}
if (scheduledEvent.Name?.StartsWith("BuiltIn::", StringComparison.OrdinalIgnoreCase) ?? false)
{
await next();
return;
}
FunctionName functionName = new FunctionName(scheduledEvent.Name);
OrchestrationInstance? instance = dispatchContext.GetProperty<OrchestrationInstance>();
if (instance == null)
{
// This should never happen, but it's almost certainly non-retriable if it does.
dispatchContext.SetProperty(new ActivityExecutionResult
{
ResponseEvent = new TaskFailedEvent(
eventId: -1,
taskScheduledId: scheduledEvent.EventId,
reason: $"Function {functionName} could not execute because instance ID metadata was missing!",
details: null),
});
return;
}
if (!this.extension.TryGetActivityInfo(functionName, out RegisteredFunctionInfo? function))
{
// Fail the activity call with an error explaining that the function name is invalid.
string errorMessage = this.extension.GetInvalidActivityFunctionMessage(functionName.Name);
dispatchContext.SetProperty(new ActivityExecutionResult
{
ResponseEvent = new TaskFailedEvent(
eventId: -1,
taskScheduledId: scheduledEvent.EventId,
reason: errorMessage,
details: null),
});
return;
}
string? rawInput = scheduledEvent.Input;
this.TraceHelper.FunctionStarting(
this.Options.HubName,
functionName.Name,
instance.InstanceId,
rawInput,
functionType: FunctionType.Activity,
isReplay: false,
taskEventId: scheduledEvent.EventId);
var inputContext = new DurableActivityContext(this.extension, instance.InstanceId, rawInput, functionName.Name);
var triggerInput = new TriggeredFunctionData { TriggerValue = inputContext };
FunctionResult result;
try
{
result = await function.Executor.TryExecuteAsync(
triggerInput,
cancellationToken: this.HostLifetimeService.OnStopping);
if (!result.Succeeded)
{
// Shutdown can surface as a completed invocation in a failed state.
// Re-throw so we can abort this invocation.
this.HostLifetimeService.OnStopping.ThrowIfCancellationRequested();
}
}
catch (Exception hostRuntimeException)
{
string reason = this.HostLifetimeService.OnStopping.IsCancellationRequested ?
"The Functions/WebJobs runtime is shutting down!" :
$"Unhandled exception in the Functions/WebJobs runtime: {hostRuntimeException}";
this.TraceHelper.FunctionAborted(
this.Options.HubName,
functionName.Name,
instance.InstanceId,
reason,
functionType: FunctionType.Activity);
// This will abort the current execution and force an durable retry
throw new SessionAbortedException(reason);
}
ActivityExecutionResult activityResult;
if (result.Succeeded)
{
string? serializedOutput = inputContext.GetSerializedOutput();
this.TraceHelper.FunctionCompleted(
this.Options.HubName,
functionName.Name,
instance.InstanceId,
serializedOutput,
continuedAsNew: false,
FunctionType.Activity,
isReplay: false,
scheduledEvent.EventId);
activityResult = new ActivityExecutionResult
{
ResponseEvent = new TaskCompletedEvent(
eventId: -1,
taskScheduledId: scheduledEvent.EventId,
result: serializedOutput),
};
}
else
{
this.TraceHelper.FunctionFailed(
this.Options.HubName,
functionName.Name,
instance.InstanceId,
result.Exception,
FunctionType.Activity,
isReplay: false,
scheduledEvent.EventId);
bool detailsParsedFromSerializedException;
activityResult = new ActivityExecutionResult
{
ResponseEvent = new TaskFailedEvent(
eventId: -1,
taskScheduledId: scheduledEvent.EventId,
reason: $"Function '{functionName}' failed with an unhandled exception.",
details: null,
GetFailureDetails(result.Exception, out detailsParsedFromSerializedException)),
};
if (!detailsParsedFromSerializedException && this.extension.PlatformInformationService.GetWorkerRuntimeType() == WorkerRuntimeType.DotNetIsolated)
{
this.TraceHelper.ExtensionWarningEvent(
this.Options.HubName,
functionName.Name,
instance.InstanceId,
"Failure details not parsed from serialized exception details, worker failed to serialize exception");
}
}
// Send the result of the activity function to the DTFx dispatch pipeline.
// This allows us to bypass the default, in-process execution and process the given results immediately.
dispatchContext.SetProperty(activityResult);
}