in src/DurableTask.Core/TaskEntityDispatcher.cs [199:409]
protected async Task<bool> OnProcessWorkItemAsync(TaskOrchestrationWorkItem workItem)
{
OrchestrationRuntimeState originalOrchestrationRuntimeState = workItem.OrchestrationRuntimeState;
OrchestrationRuntimeState runtimeState = workItem.OrchestrationRuntimeState;
runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
Task renewTask = null;
using var renewCancellationTokenSource = new CancellationTokenSource();
if (workItem.LockedUntilUtc < DateTime.MaxValue)
{
// start a task to run RenewUntil
renewTask = Task.Factory.StartNew(
() => TaskOrchestrationDispatcher.RenewUntil(workItem, this.orchestrationService, this.logHelper, nameof(TaskEntityDispatcher), renewCancellationTokenSource.Token),
renewCancellationTokenSource.Token);
}
WorkItemEffects effects = new WorkItemEffects()
{
ActivityMessages = new List<TaskMessage>(),
TimerMessages = new List<TaskMessage>(),
InstanceMessages = new List<TaskMessage>(),
taskIdCounter = 0,
InstanceId = workItem.InstanceId,
RuntimeState = runtimeState,
};
try
{
// Assumes that: if the batch contains a new "ExecutionStarted" event, it is the first message in the batch.
if (!TaskOrchestrationDispatcher.ReconcileMessagesWithState(workItem, nameof(TaskEntityDispatcher), this.errorPropagationMode, this.logHelper))
{
// TODO : mark an orchestration as faulted if there is data corruption
this.logHelper.DroppingOrchestrationWorkItem(workItem, "Received work-item for an invalid orchestration");
}
else
{
// we start with processing all the requests and figuring out which ones to execute now
// results can depend on whether the entity is locked, what the maximum batch size is,
// and whether the messages arrived out of order
this.DetermineWork(workItem.OrchestrationRuntimeState,
out SchedulerState schedulerState,
out Work workToDoNow);
if (workToDoNow.OperationCount > 0)
{
// execute the user-defined operations on this entity, via the middleware
var result = await this.ExecuteViaMiddlewareAsync(workToDoNow, runtimeState.OrchestrationInstance, schedulerState.EntityState);
var operationResults = result.Results!;
// if we encountered an error, record it as the result of the operations
// so that callers are notified that the operation did not succeed.
if (result.FailureDetails != null)
{
OperationResult errorResult = new OperationResult()
{
// for older SDKs only
Result = result.FailureDetails.ErrorMessage,
ErrorMessage = "entity dispatch failed",
// for newer SDKs only
FailureDetails = result.FailureDetails,
};
for (int i = operationResults.Count; i < workToDoNow.OperationCount; i++)
{
operationResults.Add(errorResult);
}
}
// go through all results
// for each operation that is not a signal, send a result message back to the calling orchestrator
for (int i = 0; i < result.Results!.Count; i++)
{
var req = workToDoNow.Operations[i];
if (!req.IsSignal)
{
this.SendResultMessage(effects, req, result.Results[i]);
}
}
if (result.Results.Count < workToDoNow.OperationCount)
{
// some requests were not processed (e.g. due to shutdown or timeout)
// in this case we just defer the work so it can be retried
var deferred = workToDoNow.RemoveDeferredWork(result.Results.Count);
schedulerState.PutBack(deferred);
workToDoNow.ToBeContinued(schedulerState);
}
// update the entity state based on the result
schedulerState.EntityState = result.EntityState;
// perform the actions
foreach (var action in result.Actions!)
{
switch (action)
{
case (SendSignalOperationAction sendSignalAction):
this.SendSignalMessage(effects, schedulerState, sendSignalAction);
break;
case (StartNewOrchestrationOperationAction startAction):
this.ProcessSendStartMessage(effects, runtimeState, startAction);
break;
}
}
}
// process the lock request, if any
if (workToDoNow.LockRequest != null)
{
this.ProcessLockRequest(effects, schedulerState, workToDoNow.LockRequest);
}
if (workToDoNow.ToBeRescheduled != null)
{
foreach (var request in workToDoNow.ToBeRescheduled)
{
// Reschedule all signals that were received before their time
this.SendScheduledSelfMessage(effects, request);
}
}
if (workToDoNow.SuspendAndContinue)
{
this.SendContinueSelfMessage(effects);
}
// this batch is complete. Since this is an entity, we now
// (always) start a new execution, as in continue-as-new
var serializedSchedulerState = this.SerializeSchedulerStateForNextExecution(schedulerState);
var nextExecutionStartedEvent = new ExecutionStartedEvent(-1, serializedSchedulerState)
{
OrchestrationInstance = new OrchestrationInstance
{
InstanceId = workItem.InstanceId,
ExecutionId = Guid.NewGuid().ToString("N")
},
Tags = runtimeState.Tags,
ParentInstance = runtimeState.ParentInstance,
Name = runtimeState.Name,
Version = runtimeState.Version
};
var entityStatus = new EntityStatus()
{
EntityExists = schedulerState.EntityExists,
BacklogQueueSize = schedulerState.Queue?.Count ?? 0,
LockedBy = schedulerState.LockedBy,
};
var serializedEntityStatus = JsonConvert.SerializeObject(entityStatus, Serializer.InternalSerializerSettings);
// create the new runtime state for the next execution
runtimeState = new OrchestrationRuntimeState();
runtimeState.Status = serializedEntityStatus;
runtimeState.AddEvent(new OrchestratorStartedEvent(-1));
runtimeState.AddEvent(nextExecutionStartedEvent);
runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
}
}
finally
{
if (renewTask != null)
{
try
{
renewCancellationTokenSource.Cancel();
await renewTask;
}
catch (ObjectDisposedException)
{
// ignore
}
catch (OperationCanceledException)
{
// ignore
}
}
}
OrchestrationState instanceState = (runtimeState.ExecutionStartedEvent != null) ?
instanceState = Utils.BuildOrchestrationState(runtimeState) : null;
if (workItem.RestoreOriginalRuntimeStateDuringCompletion)
{
// some backends expect the original runtime state object
workItem.OrchestrationRuntimeState = originalOrchestrationRuntimeState;
}
else
{
workItem.OrchestrationRuntimeState = runtimeState;
}
await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync(
workItem,
runtimeState,
effects.ActivityMessages,
effects.InstanceMessages,
effects.TimerMessages,
null,
instanceState);
if (workItem.RestoreOriginalRuntimeStateDuringCompletion)
{
workItem.OrchestrationRuntimeState = runtimeState;
}
return true;
}