in src/WebJobs.Extensions.DurableTask/Listener/TaskEntityShim.cs [440:560]
private async Task ProcessOperationRequestAsync(RequestMessage request)
{
// set context for operation
this.context.CurrentOperation = request;
this.context.CurrentOperationResponse = new ResponseMessage();
// set the async-local static context that is visible to the application code
Entity.SetContext(this.context);
bool operationFailed = false;
var initialOutboxPosition = this.context.OutboxPosition;
var stopwatch = new System.Diagnostics.Stopwatch();
stopwatch.Start();
Exception exception = null;
try
{
Task invokeTask = this.FunctionInvocationCallback();
if (invokeTask is Task resultTask)
{
var completedTask = await Task.WhenAny(resultTask, this.TimeoutTask);
if (completedTask == this.TimeoutTask)
{
exception = await this.TimeoutTask;
}
else
{
await resultTask;
}
}
else
{
throw new InvalidOperationException("The WebJobs runtime returned a invocation task that is not awaitable!");
}
}
catch (Exception e)
{
exception = e;
}
stopwatch.Stop();
if (exception != null)
{
this.context.CaptureApplicationError(exception);
// exception must be sent with response back to caller
this.context.CurrentOperationResponse.SetExceptionResult(
exception,
this.context.CurrentOperation.Operation,
this.errorDataConverter);
operationFailed = true;
}
if (this.RollbackFailedOperations)
{
// we write back the entity state after each successful operation
if (!operationFailed)
{
if (!this.context.TryWriteback(out var errorResponseMessage, out exception, request.Operation, request.Id.ToString()))
{
// state serialization failed; create error response and roll back.
this.context.CurrentOperationResponse = errorResponseMessage;
operationFailed = true;
}
}
if (operationFailed)
{
// discard changes and don't send any signals
this.context.Rollback(initialOutboxPosition);
}
}
// clear the async-local static context that is visible to the application code
Entity.SetContext(null);
// read and clear context
var response = this.context.CurrentOperationResponse;
this.context.CurrentOperation = null;
this.context.CurrentOperationResponse = null;
if (!operationFailed)
{
this.Config.TraceHelper.OperationCompleted(
this.context.HubName,
this.context.Name,
this.context.InstanceId,
request.Id.ToString(),
request.Operation,
this.context.RawInput,
response.Result,
stopwatch.Elapsed.TotalMilliseconds,
isReplay: false);
}
else
{
this.Config.TraceHelper.OperationFailed(
this.context.HubName,
this.context.Name,
this.context.InstanceId,
request.Id.ToString(),
request.Operation,
this.context.RawInput,
exception,
stopwatch.Elapsed.TotalMilliseconds,
isReplay: false);
}
// send response
if (!request.IsSignal)
{
var target = new OrchestrationInstance() { InstanceId = request.ParentInstanceId, ExecutionId = request.ParentExecutionId };
var jresponse = JToken.FromObject(response, this.messageDataConverter.JsonSerializer);
this.context.SendResponseMessage(target, request.Id, jresponse, response.IsException);
}
}