in src/WebJobs.Extensions.DurableTask/Listener/TaskEntityShim.cs [192:312]
public override async Task<string> Execute(OrchestrationContext innerContext, string serializedInput)
{
// Adding "Tags" to activity allows using App Insights to query current state of entities
var activity = Activity.Current;
OrchestrationRuntimeStatus status = OrchestrationRuntimeStatus.Running;
DurableTaskExtension.TagActivityWithOrchestrationStatus(status, this.context.InstanceId, true);
try
{
if (this.operationBatch.Count == 0
&& this.lockRequest == null
&& (this.toBeRescheduled == null || this.toBeRescheduled.Count == 0)
&& !this.suspendAndContinueWithDelay)
{
this.AddTraceFlag(EntityTraceFlags.WaitForEvents);
// we are idle after a ContinueAsNew - the batch is empty.
// Wait for more messages to get here (via extended sessions)
await this.doneProcessingMessages.Task;
}
if (!this.messageDataConverter.IsDefault)
{
innerContext.MessageDataConverter = this.messageDataConverter;
}
if (!this.errorDataConverter.IsDefault)
{
innerContext.ErrorDataConverter = this.errorDataConverter;
}
if (this.NumberEventsToReceive > 0)
{
await this.doneProcessingMessages.Task;
}
// Commit the effects of this batch, if
// we have not already run into an internal error
// (in which case we will abort the batch instead of committing it)
if (this.context.InternalError != null)
{
this.Config.TraceHelper.EntityBatchFailed(
this.context.HubName,
this.context.Name,
this.context.InstanceId,
this.entityTraceInfo.TraceFlags,
this.context.InternalError.SourceException);
}
else
{
bool writeBackSuccessful = true;
ResponseMessage serializationErrorMessage = null;
if (this.RollbackFailedOperations)
{
// the state has already been written back, since it is
// done right after each operation.
}
else
{
// we are writing back the state here, after executing
// the entire batch of operations.
writeBackSuccessful = this.context.TryWriteback(out serializationErrorMessage, out var _);
}
// Reschedule all signals that were received before their time
this.context.RescheduleMessages(innerContext, this.toBeRescheduled);
// Send all buffered outgoing messages
this.context.SendOutbox(innerContext, writeBackSuccessful, serializationErrorMessage);
// send a continue signal
if (this.suspendAndContinueWithDelay)
{
this.AddTraceFlag(EntityTraceFlags.Suspended);
this.context.SendContinue(innerContext);
this.suspendAndContinueWithDelay = false;
this.context.State.Suspended = true;
}
if (this.Config.UseImplicitEntityDeletion && this.context.State.IsEmpty)
{
// this entity scheduler is idle and the entity is deleted, so the instance and history can be removed from storage
// we convey this to the durability provider by issuing a continue-as-new with null input
innerContext.ContinueAsNew(null);
}
else
{
// we persist the state of the entity scheduler and entity by issuing a continue-as-new
var jstate = JToken.FromObject(this.context.State);
innerContext.ContinueAsNew(jstate);
}
// trace the completion of this entity execution batch
this.Config.TraceHelper.EntityBatchCompleted(
this.context.HubName,
this.context.Name,
this.context.InstanceId,
this.eventsReceived,
this.OperationBatch.Count,
this.BatchPosition,
this.context.State.MessageSorter.NumMessages,
this.context.State.Queue?.Count ?? 0,
this.context.State.UserStateSize,
this.context.State.MessageSorter.NumSources,
this.context.State.MessageSorter.NumDestinations,
this.context.State.LockedBy,
this.context.State.Suspended,
this.entityTraceInfo.TraceFlags);
}
}
catch (Exception e)
{
// we must catch unexpected exceptions here, otherwise entity goes into permanent failed state
// for example, there can be an OOM thrown during serialization https://github.com/Azure/azure-functions-durable-extension/issues/2166
this.context.CaptureInternalError(e, this);
}
// The return value is not used.
return string.Empty;
}