public async Task LockNextTaskOrchestrationWorkItemAsync()

in src/DurableTask.ServiceBus/ServiceBusOrchestrationService.cs [533:611]


        public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(TimeSpan receiveTimeout, CancellationToken cancellationToken)
        {
            var session = await this.orchestratorSessionClient.AcceptMessageSessionAsync(receiveTimeout);
            if (session == null)
            {
                return null;
            }

            this.ServiceStats.OrchestrationDispatcherStats.SessionsReceived.Increment();

            // TODO : Here and elsewhere, consider standard retry block instead of our own hand rolled version
            IList<Message> newMessages =
                (await Utils.ExecuteWithRetries(() => session.ReceiveAsync(this.Settings.PrefetchCount),
                    session.SessionId, "Receive Session Message Batch", this.Settings.MaxRetries, this.Settings.IntervalBetweenRetriesSecs)).Cast<Message>().ToList();

            this.ServiceStats.OrchestrationDispatcherStats.MessagesReceived.Increment(newMessages.Count);
            TraceHelper.TraceSession(
                TraceEventType.Information,
                "ServiceBusOrchestrationService-LockNextTaskOrchestrationWorkItem-MessageToProcess",
                session.SessionId,
                GetFormattedLog(
                    $@"{newMessages.Count} new messages to process: {
                            string.Join(",", newMessages.Select(m => m.MessageId))}, max latency: {
                            newMessages.Max(message => message.DeliveryLatency())}ms"));

            ServiceBusUtils.CheckAndLogDeliveryCount(session.SessionId, newMessages, this.Settings.MaxTaskOrchestrationDeliveryCount);

            IList<TaskMessage> newTaskMessages = await Task.WhenAll(
                newMessages.Select(async message => await ServiceBusUtils.GetObjectFromBrokeredMessageAsync<TaskMessage>(message, this.BlobStore)));

            OrchestrationRuntimeState runtimeState = await GetSessionStateAsync(session, this.BlobStore);

            long maxSequenceNumber = newMessages.Max(message => message.SystemProperties.SequenceNumber);

            Dictionary<string, Message> lockTokens = newMessages.ToDictionary(m => m.SystemProperties.LockToken.ToString(), m => m);
            var sessionState = new ServiceBusOrchestrationSession
            {
                Session = session,
                LockTokens = lockTokens,
                SequenceNumber = maxSequenceNumber
            };

            if (!this.orchestrationSessions.TryAdd(session.SessionId, sessionState))
            {
                string error = $"Duplicate orchestration session id '{session.SessionId}', id already exists in session list.";
                TraceHelper.Trace(TraceEventType.Error, "ServiceBusOrchestrationService-LockNextTaskOrchestrationWorkItem-DuplicateSessionId", error);
                throw new OrchestrationFrameworkException(error);
            }

            if (this.InstanceStore != null)
            {
                try
                {
                    TaskMessage executionStartedMessage = newTaskMessages.FirstOrDefault(m => m.Event is ExecutionStartedEvent);

                    if (executionStartedMessage != null)
                    {
                        await UpdateInstanceStoreAsync(executionStartedMessage.Event as ExecutionStartedEvent, maxSequenceNumber);
                    }
                }
                catch (Exception exception)
                {
                    this.orchestrationSessions.TryRemove(session.SessionId, out ServiceBusOrchestrationSession _);

                    string error = $"Exception while updating instance store. Session id: {session.SessionId}";
                    TraceHelper.TraceException(TraceEventType.Error, "ServiceBusOrchestrationService-LockNextTaskOrchestrationWorkItem-ErrorUpdatingInstanceStore", exception, error);

                    throw;
                }
            }

            return new TaskOrchestrationWorkItem
            {
                InstanceId = session.SessionId,
                LockedUntilUtc = session.LockedUntilUtc,
                NewMessages = newTaskMessages.ToList(),
                OrchestrationRuntimeState = runtimeState
            };
        }