in src/DurableTask.Core/TaskOrchestrationExecutor.cs [112:196]
OrchestratorExecutionResult ExecuteCore(IEnumerable<HistoryEvent> pastEvents, IEnumerable<HistoryEvent> newEvents)
{
SynchronizationContext prevCtx = SynchronizationContext.Current;
try
{
SynchronizationContext syncCtx = new TaskOrchestrationSynchronizationContext(this.decisionScheduler);
SynchronizationContext.SetSynchronizationContext(syncCtx);
OrchestrationContext.IsOrchestratorThread = true;
try
{
void ProcessEvents(IEnumerable<HistoryEvent> events)
{
foreach (HistoryEvent historyEvent in events)
{
if (historyEvent.EventType == EventType.OrchestratorStarted)
{
var decisionStartedEvent = (OrchestratorStartedEvent)historyEvent;
this.context.CurrentUtcDateTime = decisionStartedEvent.Timestamp;
continue;
}
this.ProcessEvent(historyEvent);
historyEvent.IsPlayed = true;
}
}
// Replay the old history to rebuild the local state of the orchestration.
// TODO: Log a verbose message indicating that the replay has started (include event count?)
this.context.IsReplaying = true;
ProcessEvents(pastEvents);
// Play the newly arrived events to determine the next action to take.
// TODO: Log a verbose message indicating that new events are being processed (include event count?)
this.context.IsReplaying = false;
ProcessEvents(newEvents);
// check if workflow is completed after this replay
// TODO: Create a setting that allows orchestrations to complete when the orchestrator
// function completes, even if there are open tasks.
if (!this.context.HasOpenTasks)
{
if (this.result!.IsCompleted)
{
if (this.result.IsFaulted)
{
Exception? exception = this.result.Exception?.InnerExceptions.FirstOrDefault();
Debug.Assert(exception != null);
if (Utils.IsExecutionAborting(exception!))
{
// Let this exception propagate out to be handled by the dispatcher
ExceptionDispatchInfo.Capture(exception).Throw();
}
this.context.FailOrchestration(exception, this.orchestrationRuntimeState);
}
else
{
this.context.CompleteOrchestration(this.result.Result);
}
}
// TODO: It is an error if result is not completed when all OpenTasks are done.
// Throw an exception in that case.
}
}
catch (NonDeterministicOrchestrationException exception)
{
this.context.FailOrchestration(exception, this.orchestrationRuntimeState);
}
return new OrchestratorExecutionResult
{
Actions = this.context.OrchestratorActions,
CustomStatus = this.taskOrchestration.GetStatus(),
};
}
finally
{
SynchronizationContext.SetSynchronizationContext(prevCtx);
OrchestrationContext.IsOrchestratorThread = false;
}
}