internal static bool ReconcileMessagesWithState()

in src/DurableTask.Core/TaskOrchestrationDispatcher.cs [808:919]


        internal static bool ReconcileMessagesWithState(TaskOrchestrationWorkItem workItem, string dispatcher, ErrorPropagationMode errorPropagationMode, LogHelper logHelper)
        {
            foreach (TaskMessage message in workItem.NewMessages)
            {
                OrchestrationInstance orchestrationInstance = message.OrchestrationInstance;
                if (string.IsNullOrWhiteSpace(orchestrationInstance?.InstanceId))
                {
                    throw TraceHelper.TraceException(
                        TraceEventType.Error,
                        $"{dispatcher}-OrchestrationInstanceMissing",
                        new InvalidOperationException("Message does not contain any OrchestrationInstance information"));
                }

                if (!workItem.OrchestrationRuntimeState.IsValid)
                {
                    // we get here if the orchestration history is somehow corrupted (partially deleted, etc.)
                    return false;
                }

                if (workItem.OrchestrationRuntimeState.Events.Count == 1 && message.Event.EventType != EventType.ExecutionStarted)
                {
                    // we get here because of:
                    //      i) responses for scheduled tasks after the orchestrations have been completed
                    //      ii) responses for explicitly deleted orchestrations
                    return false;
                }

                logHelper.ProcessingOrchestrationMessage(workItem, message);
                TraceHelper.TraceInstance(
                    TraceEventType.Information,
                    $"{dispatcher}-ProcessEvent",
                    orchestrationInstance!,
                    "Processing new event with Id {0} and type {1}",
                    message.Event.EventId,
                    message.Event.EventType);

                if (message.Event.EventType == EventType.ExecutionStarted)
                {
                    if (workItem.OrchestrationRuntimeState.ExecutionStartedEvent != null)
                    {
                        // this was caused due to a dupe execution started event, swallow this one
                        logHelper.DroppingOrchestrationMessage(workItem, message, "Duplicate start event");
                        TraceHelper.TraceInstance(
                            TraceEventType.Warning,
                            $"{dispatcher}-DuplicateStartEvent",
                            orchestrationInstance!,
                            "Duplicate start event.  Ignoring event with Id {0} and type {1} ",
                            message.Event.EventId,
                            message.Event.EventType);
                        continue;
                    }
                }
                else if (!string.IsNullOrWhiteSpace(orchestrationInstance?.ExecutionId)
                         &&
                         !string.Equals(orchestrationInstance!.ExecutionId,
                             workItem.OrchestrationRuntimeState.OrchestrationInstance?.ExecutionId))
                {
                    // eat up any events for previous executions
                    logHelper.DroppingOrchestrationMessage(
                        workItem,
                        message,
                        $"ExecutionId of event ({orchestrationInstance.ExecutionId}) does not match current executionId");
                    TraceHelper.TraceInstance(
                        TraceEventType.Warning,
                        $"{dispatcher}-ExecutionIdMismatch",
                        orchestrationInstance,
                        "ExecutionId of event does not match current executionId.  Ignoring event with Id {0} and type {1} ",
                        message.Event.EventId,
                        message.Event.EventType);
                    continue;
                }

                if (Activity.Current != null)
                {
                    HistoryEvent historyEvent = message.Event;
                    if (historyEvent is TimerFiredEvent timerFiredEvent)
                    {
                        // We immediately publish the activity span for this timer by creating the activity and immediately calling Dispose() on it.
                        TraceHelper.EmitTraceActivityForTimer(workItem.OrchestrationRuntimeState.OrchestrationInstance, workItem.OrchestrationRuntimeState.Name, message.Event.Timestamp, timerFiredEvent);
                    }
                    else if (historyEvent is SubOrchestrationInstanceCompletedEvent subOrchestrationInstanceCompletedEvent)
                    {
                        SubOrchestrationInstanceCreatedEvent subOrchestrationCreatedEvent = workItem.OrchestrationRuntimeState.Events.OfType<SubOrchestrationInstanceCreatedEvent>().FirstOrDefault(x => x.EventId == subOrchestrationInstanceCompletedEvent.TaskScheduledId);

                        // We immediately publish the activity span for this sub-orchestration by creating the activity and immediately calling Dispose() on it.
                        TraceHelper.EmitTraceActivityForSubOrchestrationCompleted(workItem.OrchestrationRuntimeState.OrchestrationInstance, subOrchestrationCreatedEvent);
                    }
                    else if (historyEvent is SubOrchestrationInstanceFailedEvent subOrchestrationInstanceFailedEvent)
                    {
                        SubOrchestrationInstanceCreatedEvent subOrchestrationCreatedEvent = workItem.OrchestrationRuntimeState.Events.OfType<SubOrchestrationInstanceCreatedEvent>().FirstOrDefault(x => x.EventId == subOrchestrationInstanceFailedEvent.TaskScheduledId);

                        // We immediately publish the activity span for this sub-orchestration by creating the activity and immediately calling Dispose() on it.
                        TraceHelper.EmitTraceActivityForSubOrchestrationFailed(workItem.OrchestrationRuntimeState.OrchestrationInstance, subOrchestrationCreatedEvent, subOrchestrationInstanceFailedEvent, errorPropagationMode);
                    }
                }

                if (message.Event is TaskCompletedEvent taskCompletedEvent)
                {
                    TaskScheduledEvent taskScheduledEvent = workItem.OrchestrationRuntimeState.Events.OfType<TaskScheduledEvent>().LastOrDefault(x => x.EventId == taskCompletedEvent.TaskScheduledId);
                    TraceHelper.EmitTraceActivityForTaskCompleted(workItem.OrchestrationRuntimeState.OrchestrationInstance, taskScheduledEvent);
                }
                else if (message.Event is TaskFailedEvent taskFailedEvent)
                {
                    TaskScheduledEvent taskScheduledEvent = workItem.OrchestrationRuntimeState.Events.OfType<TaskScheduledEvent>().LastOrDefault(x => x.EventId == taskFailedEvent.TaskScheduledId);
                    TraceHelper.EmitTraceActivityForTaskFailed(workItem.OrchestrationRuntimeState.OrchestrationInstance, taskScheduledEvent, taskFailedEvent, errorPropagationMode);
                }

                workItem.OrchestrationRuntimeState.AddEvent(message.Event);
            }

            return true;
        }