in src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs [140:202]
public bool IsOutOfOrderMessage(MessageData message)
{
if (message.TaskMessage.Event.EventType != EventType.TaskCompleted &&
message.TaskMessage.Event.EventType != EventType.TaskFailed &&
message.TaskMessage.Event.EventType != EventType.SubOrchestrationInstanceCompleted &&
message.TaskMessage.Event.EventType != EventType.SubOrchestrationInstanceFailed &&
message.TaskMessage.Event.EventType != EventType.TimerFired &&
!(message.TaskMessage.Event.EventType == EventType.EventRaised && Core.Common.Entities.IsEntityInstance(message.Sender.InstanceId) && !Core.Common.Entities.IsEntityInstance(this.Instance.InstanceId)))
{
// The above message types are the only ones that can potentially be considered out-of-order.
return false;
}
if (this.IsNonexistantInstance() && message.OriginalQueueMessage.DequeueCount > 5)
{
// The first five times a message for a nonexistant instance is dequeued, give the message the benefit
// of the doubt and assume that the instance hasn't had its history table populated yet. After the
// fifth execution, ~30 seconds have passed and the most likely scenario is that this is a zombie event.
// This means the history table for the message's orchestration no longer exists, either due to an explicit
// PurgeHistory request or due to a ContinueAsNew call cleaning the old execution's history.
return false;
}
if (this.LastCheckpointTime > message.TaskMessage.Event.Timestamp)
{
// LastCheckpointTime represents the time at which the most recent history checkpoint completed.
// The checkpoint is written to the history table only *after* all queue messages are sent.
// A message is out of order when its timestamp *preceeds* the most recent checkpoint timestamp.
// In this case, we see that the checkpoint came *after* the message, so there is no out-of-order
// concern. Note that this logic only applies for messages sent by orchestrations to themselves.
// The next check considers the other cases (activities, sub-orchestrations, etc.).
// Orchestration checkpoint time information was added only after v1.6.4.
return false;
}
if (Utils.TryGetTaskScheduledId(message.TaskMessage.Event, out int taskScheduledId))
{
// This message is a response to a task. Search the history to make sure that we've recorded the fact that
// this task was scheduled.
HistoryEvent mostRecentTaskEvent = this.RuntimeState.Events.LastOrDefault(e => e.EventId == taskScheduledId);
if (mostRecentTaskEvent != null)
{
return false;
}
}
if (message.TaskMessage.Event.EventType == EventType.EventRaised)
{
// This EventRaised message is a response to an EventSent message.
var requestId = ((EventRaisedEvent)message.TaskMessage.Event).Name;
if (requestId != null)
{
HistoryEvent mostRecentTaskEvent = this.RuntimeState.Events.FirstOrDefault(e => e.EventType == EventType.EventSent && FindRequestId(((EventSentEvent)e).Input)?.ToString() == requestId);
if (mostRecentTaskEvent != null)
{
return false;
}
}
}
// The message is out of order and cannot be handled by the current session.
return true;
}