private async Task ExecuteOutOfProcBatch()

in src/WebJobs.Extensions.DurableTask/Listener/TaskEntityShim.cs [562:708]


        private async Task ExecuteOutOfProcBatch()
        {
            OutOfProcResult outOfProcResult = null;

            Task invokeTask = await Task.WhenAny(this.FunctionInvocationCallback(), this.TimeoutTask);

            if (invokeTask == this.TimeoutTask)
            {
                var timeoutException = await this.TimeoutTask;

                // for now, we treat this as if ALL operations in the batch failed with a timeout exception.
                // in the future, we may want to catch timeouts out-of-proc and return results for the completed operations.
                outOfProcResult = new OutOfProcResult()
                {
                    EntityExists = this.context.State.EntityExists,
                    EntityState = this.context.State.EntityState,
                    Results = this.OperationBatch.Select(mbox => new OutOfProcResult.OperationResult()
                    {
                        IsError = true,
                        DurationInMilliseconds = 0,
                        Result = timeoutException.Message,
                    }).ToList(),
                    Signals = new List<OutOfProcResult.Signal>(),
                };
            }
            else if (invokeTask is Task<object> resultTask)
            {
                try
                {
                    var outOfProcResults = await resultTask;
                    var jObj = outOfProcResults as JObject;
                    outOfProcResult = jObj.ToObject<OutOfProcResult>();
                }
                catch (Exception e)
                {
                    throw new ArgumentException("Out of proc orchestrators must return a valid JSON schema.", e);
                }
            }
            else
            {
                throw new InvalidOperationException("The WebJobs runtime returned a invocation task that does not support return values!");
            }

            // determine what happened to the entity state
            bool stateWasCreated = !this.context.State.EntityExists && outOfProcResult.EntityExists;
            bool stateWasDeleted = this.context.State.EntityExists && !outOfProcResult.EntityExists;

            // update the state
            this.context.State.EntityExists = outOfProcResult.EntityExists;
            this.context.State.EntityState = outOfProcResult.EntityState;

            // emit trace if state was created
            if (stateWasCreated)
            {
                this.Config.TraceHelper.EntityStateCreated(
                        this.context.HubName,
                        this.context.Name,
                        this.context.InstanceId,
                        "", // operation name left blank because it is a batch
                        "", // operation id left blank because it is a batch
                        isReplay: false);
            }

            // for each operation, emit trace and send response message (if not a signal)
            for (int i = 0; i < this.OperationBatch.Count; i++)
            {
                var request = this.OperationBatch[i];
                var result = outOfProcResult.Results[i];

                if (!result.IsError)
                {
                    this.Config.TraceHelper.OperationCompleted(
                        this.context.HubName,
                        this.context.Name,
                        this.context.InstanceId,
                        request.Id.ToString(),
                        request.Operation,
                        request.Input,
                        result.Result,
                        result.DurationInMilliseconds,
                        isReplay: false);
                }
                else
                {
                    this.context.CaptureApplicationError(new OperationErrorException(
                        $"Error in operation '{request.Operation}': {result}"));

                    this.Config.TraceHelper.OperationFailed(
                        this.context.HubName,
                        this.context.Name,
                        this.context.InstanceId,
                        request.Id.ToString(),
                        request.Operation,
                        request.Input,
                        result.Result,
                        result.DurationInMilliseconds,
                        isReplay: false);
                }

                if (!request.IsSignal)
                {
                    var target = new OrchestrationInstance()
                    {
                        InstanceId = request.ParentInstanceId,
                        ExecutionId = request.ParentExecutionId,
                    };
                    var responseMessage = new ResponseMessage()
                    {
                        Result = result.Result,
                        ExceptionType = result.IsError ? "Error" : null,
                    };
                    this.context.SendResponseMessage(target, request.Id, responseMessage, !result.IsError);
                }
            }

            this.BatchPosition = this.OperationBatch.Count;

            // send signal messages
            foreach (var signal in outOfProcResult.Signals)
            {
                var request = new RequestMessage()
                {
                    ParentInstanceId = this.context.InstanceId,
                    ParentExecutionId = null, // for entities, message sorter persists across executions
                    Id = Guid.NewGuid(),
                    IsSignal = true,
                    Operation = signal.Name,
                    Input = signal.Input,
                };
                var target = new OrchestrationInstance()
                {
                    InstanceId = EntityId.GetSchedulerIdFromEntityId(signal.Target),
                };
                this.context.SendOperationMessage(target, request);
            }

            if (stateWasDeleted)
            {
                this.Config.TraceHelper.EntityStateDeleted(
                        this.context.HubName,
                        this.context.Name,
                        this.context.InstanceId,
                        "", // operation name left blank because it is a batch
                        "", // operation id left blank because it is a batch
                        isReplay: false);
            }
        }