in src/WebJobs.Extensions.DurableTask/Listener/TaskActivityShim.cs [49:132]
public override async Task<string> RunAsync(TaskContext context, string rawInput)
{
string instanceId = context.OrchestrationInstance.InstanceId;
var inputContext = new DurableActivityContext(this.config, instanceId, rawInput, this.activityName);
// TODO: Wire up the parent ID to improve dashboard logging.
Guid? parentId = null;
var triggerInput = new TriggeredFunctionData { ParentId = parentId, TriggerValue = inputContext };
this.config.TraceHelper.FunctionStarting(
this.config.Options.HubName,
this.activityName,
instanceId,
rawInput,
functionType: FunctionType.Activity,
isReplay: false,
taskEventId: this.taskEventId);
WrappedFunctionResult result = await FunctionExecutionHelper.ExecuteActivityFunction(
this.executor,
triggerInput,
this.hostServiceLifetime.OnStopping);
switch (result.ExecutionStatus)
{
case WrappedFunctionResult.FunctionResultStatus.Success:
string serializedOutput = inputContext.GetSerializedOutput();
this.config.TraceHelper.FunctionCompleted(
this.config.Options.HubName,
this.activityName,
instanceId,
serializedOutput,
continuedAsNew: false,
functionType: FunctionType.Activity,
isReplay: false,
taskEventId: this.taskEventId);
return serializedOutput;
case WrappedFunctionResult.FunctionResultStatus.FunctionsHostStoppingError:
case WrappedFunctionResult.FunctionResultStatus.FunctionsRuntimeError:
this.config.TraceHelper.FunctionAborted(
this.config.Options.HubName,
this.activityName,
instanceId,
$"An internal error occurred while attempting to execute this function. The execution will be aborted and retried. Details: {result.Exception}",
functionType: FunctionType.Activity);
// This will abort the execution and cause the message to go back onto the queue for re-processing
throw new SessionAbortedException(
$"An internal error occurred while attempting to execute '{this.activityName}'.", result.Exception);
case WrappedFunctionResult.FunctionResultStatus.UserCodeError:
case WrappedFunctionResult.FunctionResultStatus.FunctionTimeoutError:
// Flow the original activity function exception to the orchestration
// without the outer FunctionInvocationException.
Exception? exceptionToReport = StripFunctionInvocationException(result.Exception);
if (OutOfProcExceptionHelpers.TryGetExceptionWithFriendlyMessage(
exceptionToReport,
out Exception friendlyMessageException))
{
exceptionToReport = friendlyMessageException;
}
this.config.TraceHelper.FunctionFailed(
this.config.Options.HubName,
this.activityName,
instanceId,
exceptionToReport,
functionType: FunctionType.Activity,
isReplay: false,
taskEventId: this.taskEventId);
throw new TaskFailureException(
$"Activity function '{this.activityName}' failed: {exceptionToReport!.Message}",
Utils.SerializeCause(exceptionToReport, this.config.ErrorDataConverter));
default:
// we throw a TaskFailureException to ensure deserialization is possible.
var innerException = new Exception($"{nameof(TaskActivityShim.RunAsync)} does not handle the function execution status {result.ExecutionStatus}.");
throw new TaskFailureException(
$"Activity function '{this.activityName}' failed: {innerException}",
Utils.SerializeCause(innerException, this.config.ErrorDataConverter));
}
}