in src/DurableTask.Core/TaskOrchestrationDispatcher.cs [808:919]
internal static bool ReconcileMessagesWithState(TaskOrchestrationWorkItem workItem, string dispatcher, ErrorPropagationMode errorPropagationMode, LogHelper logHelper)
{
foreach (TaskMessage message in workItem.NewMessages)
{
OrchestrationInstance orchestrationInstance = message.OrchestrationInstance;
if (string.IsNullOrWhiteSpace(orchestrationInstance?.InstanceId))
{
throw TraceHelper.TraceException(
TraceEventType.Error,
$"{dispatcher}-OrchestrationInstanceMissing",
new InvalidOperationException("Message does not contain any OrchestrationInstance information"));
}
if (!workItem.OrchestrationRuntimeState.IsValid)
{
// we get here if the orchestration history is somehow corrupted (partially deleted, etc.)
return false;
}
if (workItem.OrchestrationRuntimeState.Events.Count == 1 && message.Event.EventType != EventType.ExecutionStarted)
{
// we get here because of:
// i) responses for scheduled tasks after the orchestrations have been completed
// ii) responses for explicitly deleted orchestrations
return false;
}
logHelper.ProcessingOrchestrationMessage(workItem, message);
TraceHelper.TraceInstance(
TraceEventType.Information,
$"{dispatcher}-ProcessEvent",
orchestrationInstance!,
"Processing new event with Id {0} and type {1}",
message.Event.EventId,
message.Event.EventType);
if (message.Event.EventType == EventType.ExecutionStarted)
{
if (workItem.OrchestrationRuntimeState.ExecutionStartedEvent != null)
{
// this was caused due to a dupe execution started event, swallow this one
logHelper.DroppingOrchestrationMessage(workItem, message, "Duplicate start event");
TraceHelper.TraceInstance(
TraceEventType.Warning,
$"{dispatcher}-DuplicateStartEvent",
orchestrationInstance!,
"Duplicate start event. Ignoring event with Id {0} and type {1} ",
message.Event.EventId,
message.Event.EventType);
continue;
}
}
else if (!string.IsNullOrWhiteSpace(orchestrationInstance?.ExecutionId)
&&
!string.Equals(orchestrationInstance!.ExecutionId,
workItem.OrchestrationRuntimeState.OrchestrationInstance?.ExecutionId))
{
// eat up any events for previous executions
logHelper.DroppingOrchestrationMessage(
workItem,
message,
$"ExecutionId of event ({orchestrationInstance.ExecutionId}) does not match current executionId");
TraceHelper.TraceInstance(
TraceEventType.Warning,
$"{dispatcher}-ExecutionIdMismatch",
orchestrationInstance,
"ExecutionId of event does not match current executionId. Ignoring event with Id {0} and type {1} ",
message.Event.EventId,
message.Event.EventType);
continue;
}
if (Activity.Current != null)
{
HistoryEvent historyEvent = message.Event;
if (historyEvent is TimerFiredEvent timerFiredEvent)
{
// We immediately publish the activity span for this timer by creating the activity and immediately calling Dispose() on it.
TraceHelper.EmitTraceActivityForTimer(workItem.OrchestrationRuntimeState.OrchestrationInstance, workItem.OrchestrationRuntimeState.Name, message.Event.Timestamp, timerFiredEvent);
}
else if (historyEvent is SubOrchestrationInstanceCompletedEvent subOrchestrationInstanceCompletedEvent)
{
SubOrchestrationInstanceCreatedEvent subOrchestrationCreatedEvent = workItem.OrchestrationRuntimeState.Events.OfType<SubOrchestrationInstanceCreatedEvent>().FirstOrDefault(x => x.EventId == subOrchestrationInstanceCompletedEvent.TaskScheduledId);
// We immediately publish the activity span for this sub-orchestration by creating the activity and immediately calling Dispose() on it.
TraceHelper.EmitTraceActivityForSubOrchestrationCompleted(workItem.OrchestrationRuntimeState.OrchestrationInstance, subOrchestrationCreatedEvent);
}
else if (historyEvent is SubOrchestrationInstanceFailedEvent subOrchestrationInstanceFailedEvent)
{
SubOrchestrationInstanceCreatedEvent subOrchestrationCreatedEvent = workItem.OrchestrationRuntimeState.Events.OfType<SubOrchestrationInstanceCreatedEvent>().FirstOrDefault(x => x.EventId == subOrchestrationInstanceFailedEvent.TaskScheduledId);
// We immediately publish the activity span for this sub-orchestration by creating the activity and immediately calling Dispose() on it.
TraceHelper.EmitTraceActivityForSubOrchestrationFailed(workItem.OrchestrationRuntimeState.OrchestrationInstance, subOrchestrationCreatedEvent, subOrchestrationInstanceFailedEvent, errorPropagationMode);
}
}
if (message.Event is TaskCompletedEvent taskCompletedEvent)
{
TaskScheduledEvent taskScheduledEvent = workItem.OrchestrationRuntimeState.Events.OfType<TaskScheduledEvent>().LastOrDefault(x => x.EventId == taskCompletedEvent.TaskScheduledId);
TraceHelper.EmitTraceActivityForTaskCompleted(workItem.OrchestrationRuntimeState.OrchestrationInstance, taskScheduledEvent);
}
else if (message.Event is TaskFailedEvent taskFailedEvent)
{
TaskScheduledEvent taskScheduledEvent = workItem.OrchestrationRuntimeState.Events.OfType<TaskScheduledEvent>().LastOrDefault(x => x.EventId == taskFailedEvent.TaskScheduledId);
TraceHelper.EmitTraceActivityForTaskFailed(workItem.OrchestrationRuntimeState.OrchestrationInstance, taskScheduledEvent, taskFailedEvent, errorPropagationMode);
}
workItem.OrchestrationRuntimeState.AddEvent(message.Event);
}
return true;
}