in src/WebJobs.Extensions.DurableTask/DurableTaskExtension.cs [727:1026]
private async Task EntityMiddleware(DispatchMiddlewareContext dispatchContext, Func<Task> next)
{
var entityShim = dispatchContext.GetProperty<TaskOrchestration>() as TaskEntityShim;
if (entityShim == null)
{
// This is not an entity - skip.
await next();
return;
}
OrchestrationRuntimeState runtimeState = dispatchContext.GetProperty<OrchestrationRuntimeState>();
DurableEntityContext entityContext = (DurableEntityContext)entityShim.Context;
entityContext.InstanceId = runtimeState.OrchestrationInstance.InstanceId;
entityContext.ExecutionId = runtimeState.OrchestrationInstance.ExecutionId;
entityContext.History = runtimeState.Events;
entityContext.RawInput = runtimeState.Input;
Queue<RequestMessage> lockHolderMessages = null;
try
{
entityShim.AddTraceFlag('1'); // add a bread crumb for the entity batch tracing
// 1. First time through the history
// we count events, add any under-lock op to the batch, and process lock releases
foreach (HistoryEvent e in runtimeState.Events)
{
switch (e.EventType)
{
case EventType.ExecutionStarted:
entityShim.Rehydrate(runtimeState.Input);
break;
case EventType.EventRaised:
EventRaisedEvent eventRaisedEvent = (EventRaisedEvent)e;
this.TraceHelper.DeliveringEntityMessage(
entityContext.InstanceId,
entityContext.ExecutionId,
e.EventId,
eventRaisedEvent.Name,
eventRaisedEvent.Input);
entityShim.NumberEventsToReceive++;
if (EntityMessageEventNames.IsRequestMessage(eventRaisedEvent.Name))
{
// we are receiving an operation request or a lock request
var requestMessage = this.MessageDataConverter.Deserialize<RequestMessage>(eventRaisedEvent.Input);
IEnumerable<RequestMessage> deliverNow;
if (requestMessage.ScheduledTime.HasValue)
{
if ((requestMessage.ScheduledTime.Value - DateTime.UtcNow) > TimeSpan.FromMilliseconds(100))
{
// message was delivered too early. This can happen if the durability provider imposes
// a limit on the delay. We handle this by rescheduling the message instead of processing it.
deliverNow = Array.Empty<RequestMessage>();
entityShim.AddMessageToBeRescheduled(requestMessage);
}
else
{
// the message is scheduled to be delivered immediately.
// There are no FIFO guarantees for scheduled messages, so we skip the message sorter.
deliverNow = new RequestMessage[] { requestMessage };
}
}
else
{
// run this through the message sorter to help with reordering and duplicate filtering
deliverNow = entityContext.State.MessageSorter.ReceiveInOrder(requestMessage, this.MessageReorderWindow);
}
foreach (var message in deliverNow)
{
if (entityContext.State.LockedBy != null
&& entityContext.State.LockedBy == message.ParentInstanceId)
{
if (lockHolderMessages == null)
{
lockHolderMessages = new Queue<RequestMessage>();
}
lockHolderMessages.Enqueue(message);
}
else
{
entityContext.State.Enqueue(message);
}
}
}
else if (EntityMessageEventNames.IsReleaseMessage(eventRaisedEvent.Name))
{
// we are receiving a lock release
var message = this.MessageDataConverter.Deserialize<ReleaseMessage>(eventRaisedEvent.Input);
if (entityContext.State.LockedBy == message.ParentInstanceId)
{
this.TraceHelper.EntityLockReleased(
entityContext.HubName,
entityContext.Name,
entityContext.InstanceId,
message.ParentInstanceId,
message.LockRequestId,
isReplay: false);
entityContext.State.LockedBy = null;
}
}
else
{
// this is a continue message.
// Resumes processing of previously queued operations, if any.
entityContext.State.Suspended = false;
entityShim.AddTraceFlag(EntityTraceFlags.Resumed);
}
break;
}
}
// lock holder messages go to the front of the queue
if (lockHolderMessages != null)
{
entityContext.State.PutBack(lockHolderMessages);
}
// mitigation for ICM358210295 : if an entity has been in suspended state for at least 10 seconds, resume
// (suspended state is never meant to last long, it is needed just so the history gets persisted to storage)
if (entityContext.State.Suspended
&& runtimeState.ExecutionStartedEvent?.Timestamp < DateTime.UtcNow - TimeSpan.FromSeconds(10))
{
entityContext.State.Suspended = false;
entityShim.AddTraceFlag(EntityTraceFlags.MitigationResumed);
}
if (!entityContext.State.Suspended)
{
entityShim.AddTraceFlag('2');
// 2. We add as many requests from the queue to the batch as possible,
// stopping at lock requests or when the maximum batch size is reached
while (entityContext.State.MayDequeue())
{
if (entityShim.OperationBatch.Count == this.Options.MaxEntityOperationBatchSize)
{
// we have reached the maximum batch size already
// insert a delay after this batch to ensure write back
entityShim.AddTraceFlag(EntityTraceFlags.BatchSizeLimit);
entityShim.ToBeContinuedWithDelay();
break;
}
var request = entityContext.State.Dequeue();
if (request.IsLockRequest)
{
entityShim.AddLockRequestToBatch(request);
break;
}
else
{
entityShim.AddOperationToBatch(request);
}
}
}
}
catch (Exception e)
{
entityContext.CaptureInternalError(e, entityShim);
}
WrappedFunctionResult result;
if (entityShim.OperationBatch.Count > 0 && !this.HostLifetimeService.OnStopping.IsCancellationRequested)
{
// 3a. (function execution) Start the functions invocation pipeline (billing, logging, bindings, and timeout tracking).
result = await FunctionExecutionHelper.ExecuteFunctionInOrchestrationMiddleware(
entityShim.GetFunctionInfo().Executor,
new TriggeredFunctionData
{
TriggerValue = entityShim.Context,
#pragma warning disable CS0618 // Approved for use by this extension
InvokeHandler = async userCodeInvoker =>
{
entityContext.ExecutorCalledBack = true;
entityShim.SetFunctionInvocationCallback(userCodeInvoker);
this.TraceHelper.FunctionStarting(
entityContext.HubName,
entityContext.Name,
entityContext.InstanceId,
runtimeState.Input,
FunctionType.Entity,
isReplay: false);
entityShim.AddTraceFlag('3');
// 3. Run all the operations in the batch
if (entityContext.InternalError == null)
{
try
{
await entityShim.ExecuteBatch(this.HostLifetimeService.OnStopping);
}
catch (Exception e)
{
entityContext.CaptureInternalError(e, entityShim);
}
}
entityShim.AddTraceFlag('4');
// 4. Run the DTFx orchestration to persist the effects,
// send the outbox, and continue as new
await next();
// 5. If there were internal or application errors, trace them for DF
if (entityContext.ErrorsPresent(out string description, out string sanitizedError))
{
this.TraceHelper.FunctionFailed(
entityContext.HubName,
entityContext.Name,
entityContext.InstanceId,
description,
sanitizedReason: sanitizedError,
functionType: FunctionType.Entity,
isReplay: false);
}
else
{
this.TraceHelper.FunctionCompleted(
entityContext.HubName,
entityContext.Name,
entityContext.InstanceId,
entityContext.State.EntityState,
continuedAsNew: true,
functionType: FunctionType.Entity,
isReplay: false);
}
// 6. If there were internal or application errors, also rethrow them here so the functions host gets to see them
entityContext.ThrowInternalExceptionIfAny();
entityContext.ThrowApplicationExceptionsIfAny();
},
#pragma warning restore CS0618
},
entityShim,
entityContext,
this.HostLifetimeService.OnStopping);
if (result.ExecutionStatus == WrappedFunctionResult.FunctionResultStatus.FunctionTimeoutError)
{
await entityShim.TimeoutTask;
}
if (result.ExecutionStatus == WrappedFunctionResult.FunctionResultStatus.FunctionsRuntimeError
|| result.ExecutionStatus == WrappedFunctionResult.FunctionResultStatus.FunctionsHostStoppingError)
{
this.TraceHelper.FunctionAborted(
this.Options.HubName,
entityContext.FunctionName,
entityContext.InstanceId,
$"An internal error occurred while attempting to execute this function. The execution will be aborted and retried. Details: {result.Exception}",
functionType: FunctionType.Entity);
// This will abort the execution and cause the message to go back onto the queue for re-processing
throw new SessionAbortedException(
$"An internal error occurred while attempting to execute '{entityContext.FunctionName}'.",
result.Exception);
}
}
else
{
entityShim.AddTraceFlag(EntityTraceFlags.DirectExecution);
// 3b. (direct execution) We do not need to call into user code because we are not going to run any operations.
// In this case we can execute without involving the functions runtime.
if (entityContext.InternalError == null)
{
try
{
await entityShim.ExecuteBatch(this.HostLifetimeService.OnStopping);
await next();
}
catch (Exception e)
{
entityContext.CaptureInternalError(e, entityShim);
}
}
}
// If there were internal errors, throw a SessionAbortedException
// here so DTFx can abort the batch and back off the work item
entityContext.AbortOnInternalError(entityShim.TraceFlags);
await entityContext.RunDeferredTasks();
}