in src/DurableTask.AzureStorage/OrchestrationSessionManager.cs [397:500]
internal void AddMessageToPendingOrchestration(
ControlQueue controlQueue,
IEnumerable<MessageData> queueMessages,
Guid traceActivityId,
CancellationToken cancellationToken)
{
// Conditions to consider:
// 1. Do we need to create a new orchestration session or does one already exist?
// 2. Do we already have a copy of this message?
// 3. Do we need to add messages to a currently executing orchestration?
lock (this.messageAndSessionLock)
{
var existingSessionMessages = new Dictionary<OrchestrationSession, List<MessageData>>();
foreach (MessageData data in queueMessages)
{
// The instanceID identifies the orchestration across replays and ContinueAsNew generations.
// The executionID identifies a generation of an orchestration instance, doesn't change across replays.
string instanceId = data.TaskMessage.OrchestrationInstance.InstanceId;
string executionId = data.TaskMessage.OrchestrationInstance.ExecutionId;
// If the target orchestration is already in memory, we can potentially add the message to the session directly
// rather than adding it to the pending list. This behavior applies primarily when extended sessions are enabled.
// We can't do this for ExecutionStarted messages - those must *always* go to the pending list since they are for
// creating entirely new orchestration instances.
if (data.TaskMessage.Event.EventType != EventType.ExecutionStarted &&
this.activeOrchestrationSessions.TryGetValue(instanceId, out OrchestrationSession session))
{
// A null executionId value means that this is a management operation, like RaiseEvent or Terminate, which
// should be delivered to the current session.
if (executionId == null || session.Instance.ExecutionId == executionId)
{
List<MessageData> pendingMessages;
if (!existingSessionMessages.TryGetValue(session, out pendingMessages))
{
pendingMessages = new List<MessageData>();
existingSessionMessages.Add(session, pendingMessages);
}
pendingMessages.Add(data);
continue;
}
// Looks like this message is for another generation of the active orchestration. Let it fall
// into the pending list below. If it's a message for an older generation, it will be eventually
// discarded after we discover that we have no state associated with its execution ID. This is
// most common in scenarios involving durable timers and ContinueAsNew. Otherwise, this message
// will be processed after the current session unloads.
}
PendingMessageBatch? targetBatch = null; // batch for the current instanceID-executionID pair
// Unless the message is an ExecutionStarted event, we attempt to assign the current message to an
// existing batch by walking backwards through the list of batches until we find one with a matching InstanceID.
// This is assumed to be more efficient than walking forward if most messages arrive in the queue in groups.
LinkedListNode<PendingMessageBatch> node = this.pendingOrchestrationMessageBatches.Last;
while (node != null && data.TaskMessage.Event.EventType != EventType.ExecutionStarted)
{
PendingMessageBatch batch = node.Value;
if (batch.OrchestrationInstanceId == instanceId)
{
if (executionId == null || batch.OrchestrationExecutionId == executionId)
{
targetBatch = batch;
break;
}
else if (batch.OrchestrationExecutionId == null)
{
targetBatch = batch;
batch.OrchestrationExecutionId = executionId;
break;
}
}
node = node.Previous;
}
// If there is no batch for this instanceID-executionID pair, create one
if (targetBatch == null)
{
targetBatch = new PendingMessageBatch(controlQueue, instanceId, executionId);
node = this.pendingOrchestrationMessageBatches.AddLast(targetBatch);
// Before the batch of messages can be processed, we need to download the latest execution state.
// This is done beforehand in the background as a performance optimization.
Task.Run(() => this.ScheduleOrchestrationStatePrefetch(node, traceActivityId, cancellationToken));
}
// New messages are added; duplicate messages are replaced
targetBatch.Messages.AddOrReplace(data);
}
// The session might be waiting for more messages. If it is, signal them.
foreach (var pair in existingSessionMessages)
{
OrchestrationSession session = pair.Key;
List<MessageData> newMessages = pair.Value;
// New messages are added; duplicate messages are replaced
session.AddOrReplaceMessages(newMessages);
}
}
}