async Task LockNextTaskOrchestrationWorkItemAsync()

in src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs [697:882]


        async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(bool entitiesOnly, CancellationToken cancellationToken)
        {
            Guid traceActivityId = StartNewLogicalTraceScope(useExisting: true);

            await this.EnsureTaskHubAsync();

            using (var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, this.shutdownSource.Token))
            {
                OrchestrationSession session = null;
                TaskOrchestrationWorkItem orchestrationWorkItem = null;

                try
                {
                    // This call will block until the next session is ready
                    session = await this.orchestrationSessionManager.GetNextSessionAsync(entitiesOnly, linkedCts.Token);
                    if (session == null)
                    {
                        return null;
                    }

                    // Make sure we still own the partition. If not, abandon the session.
                    if (session.ControlQueue.IsReleased)
                    {
                        await this.AbandonAndReleaseSessionAsync(session);
                        return null;
                    }

                    session.StartNewLogicalTraceScope();

                    List<MessageData> outOfOrderMessages = null;

                    foreach (MessageData message in session.CurrentMessageBatch)
                    {
                        if (session.IsOutOfOrderMessage(message))
                        {
                            if (outOfOrderMessages == null)
                            {
                                outOfOrderMessages = new List<MessageData>();
                            }

                            // This can happen if a lease change occurs and a new node receives a message for an 
                            // orchestration that has not yet checkpointed its history. We abandon such messages
                            // so that they can be reprocessed after the history checkpoint has completed.
                            this.settings.Logger.ReceivedOutOfOrderMessage(
                                this.azureStorageClient.QueueAccountName,
                                this.settings.TaskHubName,
                                session.Instance.InstanceId,
                                session.Instance.ExecutionId,
                                session.ControlQueue.Name,
                                message.TaskMessage.Event.EventType.ToString(),
                                Utils.GetTaskEventId(message.TaskMessage.Event),
                                message.OriginalQueueMessage.MessageId,
                                message.Episode.GetValueOrDefault(-1),
                                session.LastCheckpointTime);
                            outOfOrderMessages.Add(message);
                        }
                        else
                        {
                            session.TraceProcessingMessage(
                                message,
                                isExtendedSession: false,
                                partitionId: session.ControlQueue.Name);
                        }
                    }

                    if (outOfOrderMessages?.Count > 0)
                    {
                        // This will also remove the messages from the current batch.
                        await this.AbandonMessagesAsync(session, outOfOrderMessages);
                    }

                    if (session.CurrentMessageBatch.Count == 0)
                    {
                        // All messages were removed. Release the work item.
                        await this.AbandonAndReleaseSessionAsync(session);
                        return null;
                    }

                    // Create or restore Correlation TraceContext

                    TraceContextBase currentRequestTraceContext = null;
                    CorrelationTraceClient.Propagate(
                        () =>
                        {
                            var isReplaying = session.RuntimeState.ExecutionStartedEvent?.IsPlayed ?? false;
                            TraceContextBase parentTraceContext = GetParentTraceContext(session);
                            currentRequestTraceContext = GetRequestTraceContext(isReplaying, parentTraceContext);
                        });

                    orchestrationWorkItem = new TaskOrchestrationWorkItem
                    {
                        InstanceId = session.Instance.InstanceId,
                        LockedUntilUtc = session.CurrentMessageBatch.Min(msg => msg.OriginalQueueMessage.NextVisibleOn.Value.UtcDateTime),
                        NewMessages = session.CurrentMessageBatch.Select(m => m.TaskMessage).ToList(),
                        OrchestrationRuntimeState = session.RuntimeState,
                        Session = this.settings.ExtendedSessionsEnabled ? session : null,
                        TraceContext = currentRequestTraceContext,
                    };

                    if (!this.IsExecutableInstance(session.RuntimeState, orchestrationWorkItem.NewMessages, settings.AllowReplayingTerminalInstances, out string warningMessage))
                    {
                        // If all messages belong to the same execution ID, then all of them need to be discarded.
                        // However, it's also possible to have messages for *any* execution ID batched together with messages
                        // to a *specific* (non-executable) execution ID. Those messages should *not* be discarded since
                        // they might be consumable by another orchestration with the same instance id but different execution ID.
                        var messagesToDiscard = new List<MessageData>();
                        var messagesToAbandon = new List<MessageData>();
                        foreach (MessageData msg in session.CurrentMessageBatch)
                        {
                            if (msg.TaskMessage.OrchestrationInstance.ExecutionId == session.Instance.ExecutionId)
                            {
                                messagesToDiscard.Add(msg);
                            }
                            else
                            {
                                messagesToAbandon.Add(msg);
                            }
                        }

                        // If no messages have a matching execution ID, then delete all of them. This means all the
                        // messages are external (external events, termination, etc.) and were sent to an instance that
                        // doesn't exist or is no longer in a running state.
                        if (messagesToDiscard.Count == 0)
                        {
                            messagesToDiscard.AddRange(messagesToAbandon);
                            messagesToAbandon.Clear();
                        }

                        // Add all abandoned messages to the deferred list. These messages will not be deleted right now.
                        // If they can be matched with another orchestration, then great. Otherwise they will be deleted
                        // the next time they are picked up.
                        messagesToAbandon.ForEach(session.DeferMessage);

                        var eventListBuilder = new StringBuilder(orchestrationWorkItem.NewMessages.Count * 40);
                        foreach (MessageData msg in messagesToDiscard)
                        {
                            eventListBuilder.Append(msg.TaskMessage.Event.EventType.ToString()).Append(',');
                        }

                        this.settings.Logger.DiscardingWorkItem(
                            this.azureStorageClient.QueueAccountName,
                            this.settings.TaskHubName,
                            session.Instance.InstanceId,
                            session.Instance.ExecutionId,
                            orchestrationWorkItem.NewMessages.Count,
                            session.RuntimeState.Events.Count,
                            eventListBuilder.ToString(0, eventListBuilder.Length - 1) /* remove trailing comma */,
                            warningMessage);

                        // The instance has already completed or never existed. Delete this message batch.
                        await this.DeleteMessageBatchAsync(session, messagesToDiscard);
                        await this.ReleaseTaskOrchestrationWorkItemAsync(orchestrationWorkItem);
                        return null;
                    }

                    return orchestrationWorkItem;
                }
                catch (OperationCanceledException)
                {
                    if (session != null)
                    {
                        // host is shutting down - release any queued messages
                        await this.AbandonAndReleaseSessionAsync(session);
                    }

                    return null;
                }
                catch (Exception e)
                {
                    this.settings.Logger.OrchestrationProcessingFailure(
                        this.azureStorageClient.QueueAccountName,
                        this.settings.TaskHubName,
                        session?.Instance.InstanceId ?? string.Empty,
                        session?.Instance.ExecutionId ?? string.Empty,
                        e.ToString());

                    if (orchestrationWorkItem != null)
                    {
                        // The work-item needs to be released so that it can be retried later.
                        await this.ReleaseTaskOrchestrationWorkItemAsync(orchestrationWorkItem);
                    }

                    throw;
                }
            }
        }