internal void AddMessageToPendingOrchestration()

in src/DurableTask.AzureStorage/OrchestrationSessionManager.cs [397:500]


        internal void AddMessageToPendingOrchestration(
            ControlQueue controlQueue,
            IEnumerable<MessageData> queueMessages,
            Guid traceActivityId,
            CancellationToken cancellationToken)
        {
            // Conditions to consider:
            //  1. Do we need to create a new orchestration session or does one already exist?
            //  2. Do we already have a copy of this message?
            //  3. Do we need to add messages to a currently executing orchestration?
            lock (this.messageAndSessionLock)
            {
                var existingSessionMessages = new Dictionary<OrchestrationSession, List<MessageData>>();

                foreach (MessageData data in queueMessages)
                {
                    // The instanceID identifies the orchestration across replays and ContinueAsNew generations.
                    // The executionID identifies a generation of an orchestration instance, doesn't change across replays.
                    string instanceId = data.TaskMessage.OrchestrationInstance.InstanceId;
                    string executionId = data.TaskMessage.OrchestrationInstance.ExecutionId;

                    // If the target orchestration is already in memory, we can potentially add the message to the session directly
                    // rather than adding it to the pending list. This behavior applies primarily when extended sessions are enabled.
                    // We can't do this for ExecutionStarted messages - those must *always* go to the pending list since they are for
                    // creating entirely new orchestration instances.
                    if (data.TaskMessage.Event.EventType != EventType.ExecutionStarted &&
                        this.activeOrchestrationSessions.TryGetValue(instanceId, out OrchestrationSession session))
                    {
                        // A null executionId value means that this is a management operation, like RaiseEvent or Terminate, which
                        // should be delivered to the current session.
                        if (executionId == null || session.Instance.ExecutionId == executionId)
                        {
                            List<MessageData> pendingMessages;
                            if (!existingSessionMessages.TryGetValue(session, out pendingMessages))
                            {
                                pendingMessages = new List<MessageData>();
                                existingSessionMessages.Add(session, pendingMessages);
                            }

                            pendingMessages.Add(data);
                            continue;
                        }

                        // Looks like this message is for another generation of the active orchestration. Let it fall
                        // into the pending list below. If it's a message for an older generation, it will be eventually
                        // discarded after we discover that we have no state associated with its execution ID. This is
                        // most common in scenarios involving durable timers and ContinueAsNew. Otherwise, this message
                        // will be processed after the current session unloads.
                    }

                    PendingMessageBatch? targetBatch = null; // batch for the current instanceID-executionID pair

                    // Unless the message is an ExecutionStarted event, we attempt to assign the current message to an
                    // existing batch by walking backwards through the list of batches until we find one with a matching InstanceID.
                    // This is assumed to be more efficient than walking forward if most messages arrive in the queue in groups.
                    LinkedListNode<PendingMessageBatch> node = this.pendingOrchestrationMessageBatches.Last;
                    while (node != null && data.TaskMessage.Event.EventType != EventType.ExecutionStarted)
                    {
                        PendingMessageBatch batch = node.Value;

                        if (batch.OrchestrationInstanceId == instanceId)
                        {
                            if (executionId == null || batch.OrchestrationExecutionId == executionId)
                            {
                                targetBatch = batch;
                                break;
                            }
                            else if (batch.OrchestrationExecutionId == null)
                            {
                                targetBatch = batch;
                                batch.OrchestrationExecutionId = executionId;
                                break;
                            }
                        }

                        node = node.Previous;
                    }

                    // If there is no batch for this instanceID-executionID pair, create one
                    if (targetBatch == null)
                    {
                        targetBatch = new PendingMessageBatch(controlQueue, instanceId, executionId);
                        node = this.pendingOrchestrationMessageBatches.AddLast(targetBatch);

                        // Before the batch of messages can be processed, we need to download the latest execution state.
                        // This is done beforehand in the background as a performance optimization.
                        Task.Run(() => this.ScheduleOrchestrationStatePrefetch(node, traceActivityId, cancellationToken));
                    }

                    // New messages are added; duplicate messages are replaced
                    targetBatch.Messages.AddOrReplace(data);
                }

                // The session might be waiting for more messages. If it is, signal them.
                foreach (var pair in existingSessionMessages)
                {
                    OrchestrationSession session = pair.Key;
                    List<MessageData> newMessages = pair.Value;

                    // New messages are added; duplicate messages are replaced
                    session.AddOrReplaceMessages(newMessages);
                }
            }
        }