in src/WebJobs.Extensions.DurableTask/OutOfProcMiddleware.cs [292:456]
public async Task CallEntityAsync(DispatchMiddlewareContext dispatchContext, Func<Task> next)
{
EntityBatchRequest? batchRequest = dispatchContext.GetProperty<EntityBatchRequest>();
if (batchRequest == null)
{
// This should never happen, and there's no good response we can return if it does.
throw new InvalidOperationException($"An entity was scheduled but no {nameof(EntityBatchRequest)} was found!");
}
if (batchRequest.InstanceId == null)
{
// This should never happen, and there's no good response we can return if it does.
throw new InvalidOperationException($"An entity was scheduled but InstanceId is null!");
}
EntityId entityId = EntityId.GetEntityIdFromSchedulerId(batchRequest.InstanceId);
FunctionName functionName = new FunctionName(entityId.EntityName);
RegisteredFunctionInfo functionInfo = this.extension.GetEntityInfo(functionName);
void SetErrorResult(FailureDetails failureDetails)
{
// Returns a result with no operation results and no state change,
// and with failure details that explain what error was encountered.
dispatchContext.SetProperty(new EntityBatchResult()
{
Actions = new List<OperationAction>(),
Results = new List<OperationResult>(),
EntityState = batchRequest!.EntityState,
FailureDetails = failureDetails,
});
}
if (functionInfo == null)
{
SetErrorResult(new FailureDetails(
errorType: "EntityFunctionNotFound",
errorMessage: this.extension.GetInvalidEntityFunctionMessage(functionName.Name),
stackTrace: null,
innerFailure: null,
isNonRetriable: true));
return;
}
this.TraceHelper.FunctionStarting(
this.Options.HubName,
functionName.Name,
batchRequest.InstanceId,
batchRequest.EntityState,
functionType: FunctionType.Entity,
isReplay: false);
var context = new RemoteEntityContext(batchRequest);
var input = new TriggeredFunctionData
{
TriggerValue = context,
#pragma warning disable CS0618 // Type or member is obsolete (not intended for general public use)
InvokeHandler = async functionInvoker =>
{
// Invoke the function and look for a return value. Trigger return values are an undocumented feature that we depend on.
Task invokeTask = functionInvoker();
if (invokeTask is not Task<object> invokeTaskWithResult)
{
// This should never happen
throw new InvalidOperationException("The internal function invoker returned a task that does not support return values!");
}
// The return value is expected to be a base64 string containing the protobuf-encoding of the batch result.
string? triggerReturnValue = (await invokeTaskWithResult) as string;
if (string.IsNullOrEmpty(triggerReturnValue))
{
throw new InvalidOperationException(
"The function invocation resulted in a null response. This means that either the entity function was implemented " +
"incorrectly, the Durable Task language SDK was implemented incorrectly, or that the destination language worker is not " +
"sending the function result back to the host.");
}
byte[] triggerReturnValueBytes = Convert.FromBase64String(triggerReturnValue);
P.EntityBatchResult response = P.EntityBatchResult.Parser.ParseFrom(triggerReturnValueBytes);
context.Result = response.ToEntityBatchResult();
context.ThrowIfFailed();
#pragma warning restore CS0618 // Type or member is obsolete (not intended for general public use)
},
};
FunctionResult functionResult;
try
{
functionResult = await functionInfo.Executor.TryExecuteAsync(
input,
cancellationToken: this.HostLifetimeService.OnStopping);
if (!functionResult.Succeeded)
{
// Shutdown can surface as a completed invocation in a failed state.
// Re-throw so we can abort this invocation.
this.HostLifetimeService.OnStopping.ThrowIfCancellationRequested();
}
}
catch (Exception hostRuntimeException)
{
string reason = this.HostLifetimeService.OnStopping.IsCancellationRequested ?
"The Functions/WebJobs runtime is shutting down!" :
$"Unhandled exception in the Functions/WebJobs runtime: {hostRuntimeException}";
this.TraceHelper.FunctionAborted(
this.Options.HubName,
functionName.Name,
batchRequest.InstanceId,
reason,
functionType: FunctionType.Entity);
// This will abort the current execution and force an durable retry
throw new SessionAbortedException(reason);
}
if (!functionResult.Succeeded)
{
this.TraceHelper.FunctionFailed(
this.Options.HubName,
functionName.Name,
batchRequest.InstanceId,
functionResult.Exception,
FunctionType.Entity,
isReplay: false);
if (context.Result != null)
{
// Send the results of the entity batch execution back to the DTFx dispatch pipeline.
// This is important so we can propagate the individual failure details of each failed operation back to the
// calling orchestrator. Also, even though the function execution was reported as a failure,
// it may not be a "total failure", i.e. some of the operations in the batch may have succeeded and updated
// the entity state.
dispatchContext.SetProperty(context.Result);
}
else
{
SetErrorResult(new FailureDetails(
errorType: "FunctionInvocationFailed",
errorMessage: $"Invocation of function '{functionName}' failed with an exception.",
stackTrace: null,
innerFailure: new FailureDetails(functionResult.Exception),
isNonRetriable: true));
}
return;
}
EntityBatchResult batchResult = context.Result
?? throw new InvalidOperationException($"The entity function executed successfully but {nameof(context.Result)} is still null!");
this.TraceHelper.FunctionCompleted(
this.Options.HubName,
functionName.Name,
batchRequest.InstanceId,
batchRequest.EntityState,
batchResult.EntityState != null,
FunctionType.Entity,
isReplay: false);
// Send the results of the entity batch execution back to the DTFx dispatch pipeline.
dispatchContext.SetProperty(batchResult);
}