in sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/src/Listeners/ServiceBusListener.cs [438:625]
private async Task RunBatchReceiveLoopAsync(CancellationTokenSource cancellationTokenSource)
{
var cancellationToken = cancellationTokenSource.Token;
ServiceBusClient sessionClient = null;
ServiceBusReceiver receiver = null;
if (_isSessionsEnabled)
{
sessionClient = _client.Value;
}
else
{
receiver = _batchReceiver.Value;
}
// The batch receive loop below only executes functions at a concurrency level of 1,
// so we don't need to do anything special when DynamicConcurrency is enabled. If in
// the future we make this loop concurrent, we'll have to check with ConcurrencyManager.
while (true)
{
try
{
if (cancellationToken.IsCancellationRequested)
{
_logger.LogInformation($"Message processing has been stopped or cancelled ({_details.Value})");
return;
}
if (_isSessionsEnabled && (receiver == null || receiver.IsClosed))
{
try
{
receiver = await sessionClient.AcceptNextSessionAsync(
_entityPath,
new ServiceBusSessionReceiverOptions
{
PrefetchCount = _serviceBusOptions.PrefetchCount
},
cancellationToken).ConfigureAwait(false);
// Processing messages from a new session, create a new message cache for that session
_cachedMessagesManager = new ServiceBusMessageManager(
maxBatchSize: _maxMessageBatchSize,
minBatchSize: _serviceBusOptions.MinMessageBatchSize);
}
catch (ServiceBusException ex)
when (ex.Reason == ServiceBusFailureReason.ServiceTimeout)
{
// it's expected if the entity is empty, try next time
continue;
}
}
// For non-session receiver, we just fall back to the operation timeout.
TimeSpan? maxWaitTime = _isSessionsEnabled ? _serviceBusOptions.SessionIdleTimeout : null;
var messages = await receiver.ReceiveMessagesAsync(
_maxMessageBatchSize,
maxWaitTime,
cancellationTokenSource.Token).ConfigureAwait(false);
if (messages.Count > 0)
{
var messageActions = _isSessionsEnabled
? new ServiceBusSessionMessageActions((ServiceBusSessionReceiver)receiver)
: new ServiceBusMessageActions(receiver);
foreach (var message in messages)
{
_messagingProvider.ActionsCache.TryAdd(message.LockToken, (message, messageActions));
if (_isSessionsEnabled)
{
_messagingProvider.SessionActionsCache.TryAdd(message.SessionId, (ServiceBusSessionMessageActions)messageActions);
}
}
var receiveActions = new ServiceBusReceiveActions(receiver);
ServiceBusReceivedMessage[] messagesArray = _supportMinBatchSize ? Array.Empty<ServiceBusReceivedMessage>() : messages.ToArray();
if (_supportMinBatchSize)
{
var acquiredSemaphore = false;
try
{
if (!_cachedMessagesGuard.Wait(0, cancellationTokenSource.Token))
{
await _cachedMessagesGuard.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false);
}
acquiredSemaphore = true;
// Get set of messages to use for batch here.
messagesArray = _cachedMessagesManager.GetBatchofMessagesWithCached(messages.ToArray());
}
finally
{
if (acquiredSemaphore)
{
_cachedMessagesGuard.Release();
}
}
}
if (messagesArray.Length > 0 || !_supportMinBatchSize)
{
CancelExistingMonitoringTasks();
ServiceBusTriggerInput input = ServiceBusTriggerInput.CreateBatch(
messagesArray,
messageActions,
receiveActions,
_client.Value);
await TriggerWithMessagesInternal(messagesArray, input, receiver, receiveActions, messageActions, cancellationToken).ConfigureAwait(false);
}
if (_supportMinBatchSize && _cachedMessagesManager.HasCachedMessages)
{
if (_backgroundCacheMonitoringCts == null)
{
var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_backgroundCacheMonitoringCts = linkedCts;
_backgroundCacheMonitoringTask = MonitorCache(linkedCts.Token);
}
_monitoringCycleReceiver = receiver;
_monitoringCycleMessageActions = messageActions;
_monitoringCycleReceiveActions = receiveActions;
}
}
else
{
// Close the session and release the session lock after draining all messages for the accepted session.
if (_isSessionsEnabled)
{
CancelExistingMonitoringTasks();
var messageActions = new ServiceBusSessionMessageActions((ServiceBusSessionReceiver)receiver);
var receiveActions = new ServiceBusReceiveActions(receiver);
// We know we will not get any more messages from this session, so send what is left.
await DispatchRemainingMessages(receiver, messageActions, receiveActions, cancellationToken).ConfigureAwait(false);
// Use CancellationToken.None to attempt to close the receiver even when shutting down
await receiver.CloseAsync(CancellationToken.None).ConfigureAwait(false);
}
}
}
catch (ObjectDisposedException)
{
// Ignore as we are stopping the host
}
catch (OperationCanceledException)
when(cancellationToken.IsCancellationRequested)
{
// Ignore as we are stopping the host
_logger.LogInformation($"Message processing has been stopped or cancelled ({_details.Value})");
}
catch (Exception ex)
{
var errorMessage = $"An unhandled exception occurred in the message batch receive loop ({_details.Value}).";
if (_supportMinBatchSize)
{
errorMessage += " If a minimum batch size was set, any cached session messages will be abandoned.";
}
// Log another exception
_logger.LogError(ex, errorMessage);
if (_isSessionsEnabled && receiver != null)
{
// Attempt to close the session and release session lock to accept a new session on the next loop iteration
// Cancel any monitoring tasks since we are closing the receiver. The next batch loop iteration will drop any
// remaining messages in the cache.
CancelExistingMonitoringTasks();
try
{
// Use CancellationToken.None to attempt to close the receiver even when shutting down
await receiver.CloseAsync(CancellationToken.None).ConfigureAwait(false);
}
catch
{
// Best effort
receiver = null;
}
}
}
}
}