in src/WebJobs.Extensions.DurableTask/Listener/TaskEntityShim.cs [562:708]
private async Task ExecuteOutOfProcBatch()
{
OutOfProcResult outOfProcResult = null;
Task invokeTask = await Task.WhenAny(this.FunctionInvocationCallback(), this.TimeoutTask);
if (invokeTask == this.TimeoutTask)
{
var timeoutException = await this.TimeoutTask;
// for now, we treat this as if ALL operations in the batch failed with a timeout exception.
// in the future, we may want to catch timeouts out-of-proc and return results for the completed operations.
outOfProcResult = new OutOfProcResult()
{
EntityExists = this.context.State.EntityExists,
EntityState = this.context.State.EntityState,
Results = this.OperationBatch.Select(mbox => new OutOfProcResult.OperationResult()
{
IsError = true,
DurationInMilliseconds = 0,
Result = timeoutException.Message,
}).ToList(),
Signals = new List<OutOfProcResult.Signal>(),
};
}
else if (invokeTask is Task<object> resultTask)
{
try
{
var outOfProcResults = await resultTask;
var jObj = outOfProcResults as JObject;
outOfProcResult = jObj.ToObject<OutOfProcResult>();
}
catch (Exception e)
{
throw new ArgumentException("Out of proc orchestrators must return a valid JSON schema.", e);
}
}
else
{
throw new InvalidOperationException("The WebJobs runtime returned a invocation task that does not support return values!");
}
// determine what happened to the entity state
bool stateWasCreated = !this.context.State.EntityExists && outOfProcResult.EntityExists;
bool stateWasDeleted = this.context.State.EntityExists && !outOfProcResult.EntityExists;
// update the state
this.context.State.EntityExists = outOfProcResult.EntityExists;
this.context.State.EntityState = outOfProcResult.EntityState;
// emit trace if state was created
if (stateWasCreated)
{
this.Config.TraceHelper.EntityStateCreated(
this.context.HubName,
this.context.Name,
this.context.InstanceId,
"", // operation name left blank because it is a batch
"", // operation id left blank because it is a batch
isReplay: false);
}
// for each operation, emit trace and send response message (if not a signal)
for (int i = 0; i < this.OperationBatch.Count; i++)
{
var request = this.OperationBatch[i];
var result = outOfProcResult.Results[i];
if (!result.IsError)
{
this.Config.TraceHelper.OperationCompleted(
this.context.HubName,
this.context.Name,
this.context.InstanceId,
request.Id.ToString(),
request.Operation,
request.Input,
result.Result,
result.DurationInMilliseconds,
isReplay: false);
}
else
{
this.context.CaptureApplicationError(new OperationErrorException(
$"Error in operation '{request.Operation}': {result}"));
this.Config.TraceHelper.OperationFailed(
this.context.HubName,
this.context.Name,
this.context.InstanceId,
request.Id.ToString(),
request.Operation,
request.Input,
result.Result,
result.DurationInMilliseconds,
isReplay: false);
}
if (!request.IsSignal)
{
var target = new OrchestrationInstance()
{
InstanceId = request.ParentInstanceId,
ExecutionId = request.ParentExecutionId,
};
var responseMessage = new ResponseMessage()
{
Result = result.Result,
ExceptionType = result.IsError ? "Error" : null,
};
this.context.SendResponseMessage(target, request.Id, responseMessage, !result.IsError);
}
}
this.BatchPosition = this.OperationBatch.Count;
// send signal messages
foreach (var signal in outOfProcResult.Signals)
{
var request = new RequestMessage()
{
ParentInstanceId = this.context.InstanceId,
ParentExecutionId = null, // for entities, message sorter persists across executions
Id = Guid.NewGuid(),
IsSignal = true,
Operation = signal.Name,
Input = signal.Input,
};
var target = new OrchestrationInstance()
{
InstanceId = EntityId.GetSchedulerIdFromEntityId(signal.Target),
};
this.context.SendOperationMessage(target, request);
}
if (stateWasDeleted)
{
this.Config.TraceHelper.EntityStateDeleted(
this.context.HubName,
this.context.Name,
this.context.InstanceId,
"", // operation name left blank because it is a batch
"", // operation id left blank because it is a batch
isReplay: false);
}
}