in src/WebJobs.Extensions.DurableTask/Listener/TaskEntityShim.cs [314:407]
public async Task ExecuteBatch(CancellationToken onHostStopping)
{
async Task ExecuteOperationsInBatch()
{
if (this.GetFunctionInfo().IsOutOfProc)
{
// process all operations in the batch using a single function call
await this.ExecuteOutOfProcBatch();
}
else
{
var stopwatch = new Stopwatch();
stopwatch.Start();
// call the function once per operation in the batch
for (this.BatchPosition = 0; this.BatchPosition < this.operationBatch.Count; this.BatchPosition++)
{
// first, check if we should stop here and not execute the rest of the batch
bool InterruptBatchProcessing()
{
if (onHostStopping.IsCancellationRequested)
{
// host is shutting down
return true;
}
else if (this.TimeoutTask.IsCompleted)
{
// we timed out
this.AddTraceFlag(EntityTraceFlags.TimedOut);
return true;
}
else if (stopwatch.Elapsed > TimeSpan.FromMinutes(3))
{
// we have spent a significant time on this batch already
this.AddTraceFlag(EntityTraceFlags.SignificantTimeElapsed);
return true;
}
else
{
return false;
}
}
if (InterruptBatchProcessing())
{
// put the requests back into the request queue
var queue = new Queue<RequestMessage>();
for (int i = this.BatchPosition; i < this.operationBatch.Count; i++)
{
queue.Enqueue(this.operationBatch[i]);
}
if (this.lockRequest != null)
{
queue.Enqueue(this.lockRequest);
this.lockRequest = null;
}
this.context.State.PutBack(queue);
this.ToBeContinuedWithDelay();
break;
}
else
{
// execute the operation
var request = this.operationBatch[this.BatchPosition];
await this.ProcessOperationRequestAsync(request);
}
}
}
}
this.eventsReceived = this.NumberEventsToReceive;
// process the operations, if any.
if (this.operationBatch.Count > 0)
{
await ExecuteOperationsInBatch();
}
// process the lock request, if any. This is always the last operation in the batch.
if (this.lockRequest != null)
{
this.ProcessLockRequest(this.lockRequest);
this.lockRequest = null;
}
// if the host is shutting down, take note that we want to suspend processing
if (onHostStopping.IsCancellationRequested)
{
this.AddTraceFlag(EntityTraceFlags.HostShutdown);
this.ToBeContinuedWithDelay();
}
}