private async Task RunBatchReceiveLoopAsync()

in sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs [438:625]


        private async Task RunBatchReceiveLoopAsync(CancellationTokenSource cancellationTokenSource)
        {
            var cancellationToken = cancellationTokenSource.Token;
            ServiceBusClient sessionClient = null;
            ServiceBusReceiver receiver = null;
            if (_isSessionsEnabled)
            {
                sessionClient = _client.Value;
            }
            else
            {
                receiver = _batchReceiver.Value;
            }

            // The batch receive loop below only executes functions at a concurrency level of 1,
            // so we don't need to do anything special when DynamicConcurrency is enabled. If in
            // the future we make this loop concurrent, we'll have to check with ConcurrencyManager.
            while (true)
            {
                try
                {
                    if (cancellationToken.IsCancellationRequested)
                    {
                        _logger.LogInformation($"Message processing has been stopped or cancelled ({_details.Value})");
                        return;
                    }

                    if (_isSessionsEnabled && (receiver == null || receiver.IsClosed))
                    {
                        try
                        {
                            receiver = await sessionClient.AcceptNextSessionAsync(
                                _entityPath,
                                new ServiceBusSessionReceiverOptions
                                {
                                    PrefetchCount = _serviceBusOptions.PrefetchCount
                                },
                                cancellationToken).ConfigureAwait(false);

                            // Processing messages from a new session, create a new message cache for that session
                             _cachedMessagesManager = new ServiceBusMessageManager(
                                    maxBatchSize: _maxMessageBatchSize,
                                    minBatchSize: _serviceBusOptions.MinMessageBatchSize);
                        }
                        catch (ServiceBusException ex)
                            when (ex.Reason == ServiceBusFailureReason.ServiceTimeout)
                        {
                            // it's expected if the entity is empty, try next time
                            continue;
                        }
                    }

                    // For non-session receiver, we just fall back to the operation timeout.
                    TimeSpan? maxWaitTime = _isSessionsEnabled ? _serviceBusOptions.SessionIdleTimeout : null;

                    var messages = await receiver.ReceiveMessagesAsync(
                        _maxMessageBatchSize,
                        maxWaitTime,
                        cancellationTokenSource.Token).ConfigureAwait(false);

                    if (messages.Count > 0)
                    {
                        var messageActions = _isSessionsEnabled
                            ? new ServiceBusSessionMessageActions((ServiceBusSessionReceiver)receiver)
                            : new ServiceBusMessageActions(receiver);

                        foreach (var message in messages)
                        {
                            _messagingProvider.ActionsCache.TryAdd(message.LockToken, (message, messageActions));
                            if (_isSessionsEnabled)
                            {
                                _messagingProvider.SessionActionsCache.TryAdd(message.SessionId, (ServiceBusSessionMessageActions)messageActions);
                            }
                        }

                        var receiveActions = new ServiceBusReceiveActions(receiver);

                        ServiceBusReceivedMessage[] messagesArray = _supportMinBatchSize ? Array.Empty<ServiceBusReceivedMessage>() : messages.ToArray();

                        if (_supportMinBatchSize)
                        {
                            var acquiredSemaphore = false;
                            try
                            {
                                if (!_cachedMessagesGuard.Wait(0, cancellationTokenSource.Token))
                                {
                                    await _cachedMessagesGuard.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false);
                                }
                                acquiredSemaphore = true;

                                // Get set of messages to use for batch here.
                                messagesArray = _cachedMessagesManager.GetBatchofMessagesWithCached(messages.ToArray());
                            }
                            finally
                            {
                                if (acquiredSemaphore)
                                {
                                    _cachedMessagesGuard.Release();
                                }
                            }
                        }

                        if (messagesArray.Length > 0 || !_supportMinBatchSize)
                        {
                            CancelExistingMonitoringTasks();

                            ServiceBusTriggerInput input = ServiceBusTriggerInput.CreateBatch(
                                messagesArray,
                                messageActions,
                                receiveActions,
                                _client.Value);

                            await TriggerWithMessagesInternal(messagesArray, input, receiver, receiveActions, messageActions, cancellationToken).ConfigureAwait(false);
                        }

                        if (_supportMinBatchSize && _cachedMessagesManager.HasCachedMessages)
                        {
                            if (_backgroundCacheMonitoringCts == null)
                            {
                                var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
                                _backgroundCacheMonitoringCts = linkedCts;
                                _backgroundCacheMonitoringTask = MonitorCache(linkedCts.Token);
                            }
                            _monitoringCycleReceiver = receiver;
                            _monitoringCycleMessageActions = messageActions;
                            _monitoringCycleReceiveActions = receiveActions;
                        }
                    }
                    else
                    {
                        // Close the session and release the session lock after draining all messages for the accepted session.
                        if (_isSessionsEnabled)
                        {
                            CancelExistingMonitoringTasks();

                            var messageActions = new ServiceBusSessionMessageActions((ServiceBusSessionReceiver)receiver);
                            var receiveActions = new ServiceBusReceiveActions(receiver);

                            // We know we will not get any more messages from this session, so send what is left.
                            await DispatchRemainingMessages(receiver, messageActions, receiveActions, cancellationToken).ConfigureAwait(false);

                            // Use CancellationToken.None to attempt to close the receiver even when shutting down
                            await receiver.CloseAsync(CancellationToken.None).ConfigureAwait(false);
                        }
                    }
                }
                catch (ObjectDisposedException)
                {
                    // Ignore as we are stopping the host
                }
                catch (OperationCanceledException)
                    when(cancellationToken.IsCancellationRequested)
                {
                    // Ignore as we are stopping the host
                    _logger.LogInformation($"Message processing has been stopped or cancelled ({_details.Value})");
                }
                catch (Exception ex)
                {
                    var errorMessage = $"An unhandled exception occurred in the message batch receive loop ({_details.Value}).";

                    if (_supportMinBatchSize)
                    {
                        errorMessage += " If a minimum batch size was set, any cached session messages will be abandoned.";
                    }
                    // Log another exception
                    _logger.LogError(ex, errorMessage);

                    if (_isSessionsEnabled && receiver != null)
                    {
                        // Attempt to close the session and release session lock to accept a new session on the next loop iteration

                        // Cancel any monitoring tasks since we are closing the receiver. The next batch loop iteration will drop any
                        // remaining messages in the cache.
                        CancelExistingMonitoringTasks();
                        try
                        {
                            // Use CancellationToken.None to attempt to close the receiver even when shutting down
                            await receiver.CloseAsync(CancellationToken.None).ConfigureAwait(false);
                        }
                        catch
                        {
                            // Best effort
                            receiver = null;
                        }
                    }
                }
            }
        }