extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs (205 lines of code) (raw):
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Microsoft.Azure.WebJobs.Extensions.RabbitMQ;
internal sealed class RabbitMQListener : IListener, IScaleMonitor<RabbitMQTriggerMetrics>
{
private const int ListenerNotStarted = 0;
private const int ListenerStarting = 1;
private const int ListenerStarted = 2;
private const int ListenerStopping = 3;
private const int ListenerStopped = 4;
private const string RequeueCountHeaderName = "x-ms-rabbitmq-requeuecount";
private readonly IModel channel;
private readonly ITriggeredFunctionExecutor executor;
private readonly ILogger logger;
private readonly string queueName;
private readonly ushort prefetchCount;
private readonly string logDetails;
private readonly IDrainModeManager drainModeManager;
private readonly CancellationTokenSource listenerCancellationTokenSource;
private int listenerState = ListenerNotStarted;
private string consumerTag;
public RabbitMQListener(
IModel channel,
ITriggeredFunctionExecutor executor,
ILogger logger,
string functionId,
string queueName,
ushort prefetchCount,
IDrainModeManager drainModeManager)
{
this.channel = channel ?? throw new ArgumentNullException(nameof(channel));
this.executor = executor ?? throw new ArgumentNullException(nameof(executor));
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
this.queueName = !string.IsNullOrWhiteSpace(queueName) ? queueName : throw new ArgumentNullException(nameof(queueName));
this.prefetchCount = prefetchCount;
this.drainModeManager = drainModeManager;
this.listenerCancellationTokenSource = new CancellationTokenSource();
_ = !string.IsNullOrWhiteSpace(functionId) ? true : throw new ArgumentNullException(nameof(functionId));
// Do not convert the scale-monitor ID to lower-case string since RabbitMQ queue names are case-sensitive.
this.Descriptor = new ScaleMonitorDescriptor($"{functionId}-RabbitMQTrigger-{queueName}", functionId);
this.logDetails = $"function: '{functionId}', queue: '{queueName}'";
}
public ScaleMonitorDescriptor Descriptor { get; }
public void Cancel()
{
this.StopAsync(CancellationToken.None).Wait();
}
public void Dispose()
{
// Nothing to dispose.
}
public Task StartAsync(CancellationToken cancellationToken)
{
int previousState = Interlocked.CompareExchange(ref this.listenerState, ListenerStarting, ListenerNotStarted);
// It is possible that the WebJobs SDKS invokes StartAsync() method more than once, if there are other trigger
// listeners registered and some of them have failed to start.
if (previousState != ListenerNotStarted)
{
throw new InvalidOperationException("The listener is either starting or has already started.");
}
// The RabbitMQ server (v3.11.2 as of latest) only has support for prefetch size of zero (no specific limit).
// See: https://github.com/rabbitmq/rabbitmq-server/blob/v3.11.2/deps/rabbit/src/rabbit_channel.erl#L1543.
// See: https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.prefetch-size for protocol specification.
this.channel.BasicQos(prefetchSize: 0, this.prefetchCount, global: false);
// We should use AsyncEventingBasicConsumer to create the consumer since our handler method is async. Using
// EventingBasicConsumer led to issue: https://github.com/Azure/azure-functions-rabbitmq-extension/issues/211).
var consumer = new AsyncEventingBasicConsumer(this.channel);
consumer.Received += ReceivedHandler;
this.consumerTag = this.channel.BasicConsume(this.queueName, autoAck: false, consumer);
this.listenerState = ListenerStarted;
this.logger.LogDebug($"Started RabbitMQ trigger listener for {this.logDetails}.");
return Task.CompletedTask;
async Task ReceivedHandler(object model, BasicDeliverEventArgs args)
{
using Activity activity = RabbitMQActivitySource.StartActivity(args.BasicProperties);
var input = new TriggeredFunctionData() { TriggerValue = args };
FunctionResult result = await this.executor.TryExecuteAsync(input, this.listenerCancellationTokenSource.Token).ConfigureAwait(false);
if (!result.Succeeded)
{
// Retry by republishing a copy of message to the queue if the triggered function failed to run.
args.BasicProperties.Headers ??= new Dictionary<string, object>();
args.BasicProperties.Headers.TryGetValue(RequeueCountHeaderName, out object headerValue);
int requeueCount = Convert.ToInt32(headerValue, CultureInfo.InvariantCulture) + 1;
if (requeueCount >= 5)
{
// Discard or 'dead-letter' the message. See: https://www.rabbitmq.com/dlx.html.
this.logger.LogDebug($"Rejecting message since the requeue count exceeded for {this.logDetails}.");
this.channel.BasicReject(args.DeliveryTag, requeue: false);
return;
}
this.logger.LogDebug($"Republishing message for {this.logDetails}.");
args.BasicProperties.Headers[RequeueCountHeaderName] = requeueCount;
// We cannot call BasicReject() on the message with requeue = true since that would not enable a fixed
// number of retry attempts. See: https://stackoverflow.com/q/23158310.
this.channel.BasicPublish(exchange: string.Empty, routingKey: this.queueName, args.BasicProperties, args.Body);
}
// Acknowledge the existing message only after the message (in case of failure) is re-published.
this.channel.BasicAck(args.DeliveryTag, multiple: false);
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
int previousState = Interlocked.CompareExchange(ref this.listenerState, ListenerStopping, ListenerStarted);
if (previousState == ListenerStarted)
{
// TODO: Close RabbitMQ connection along with the channel.
this.channel.BasicCancel(this.consumerTag);
this.channel.Close();
if (!this.drainModeManager.IsDrainModeEnabled)
{
this.listenerCancellationTokenSource.Cancel();
}
this.listenerState = ListenerStopped;
this.logger.LogDebug($"Stopped RabbitMQ trigger listener for {this.logDetails}.");
}
return Task.CompletedTask;
}
async Task<ScaleMetrics> IScaleMonitor.GetMetricsAsync()
{
return await this.GetMetricsAsync().ConfigureAwait(false);
}
public Task<RabbitMQTriggerMetrics> GetMetricsAsync()
{
QueueDeclareOk queueInfo = this.channel.QueueDeclarePassive(this.queueName);
var metrics = new RabbitMQTriggerMetrics
{
MessageCount = queueInfo.MessageCount,
Timestamp = DateTime.UtcNow,
};
return Task.FromResult(metrics);
}
ScaleStatus IScaleMonitor.GetScaleStatus(ScaleStatusContext context)
{
return this.GetScaleStatusCore(context.WorkerCount, context.Metrics?.Cast<RabbitMQTriggerMetrics>().ToArray());
}
public ScaleStatus GetScaleStatus(ScaleStatusContext<RabbitMQTriggerMetrics> context)
{
return this.GetScaleStatusCore(context.WorkerCount, context.Metrics?.ToArray());
}
/// <summary>
/// Returns scale recommendation i.e. whether to scale in or out the host application. The recommendation is
/// made based on both the latest metrics and the trend of increase or decrease in the the number of messages in
/// Ready state in the queue. In all of the calculations, it is attempted to keep the number of workers minimum
/// while also ensuring that the message count per worker stays under the maximum limit.
/// </summary>
/// <param name="workerCount">The current worker count for the host application.</param>
/// <param name="metrics">The collection of metrics samples to make the scale decision.</param>
private ScaleStatus GetScaleStatusCore(int workerCount, RabbitMQTriggerMetrics[] metrics)
{
// We require minimum 5 samples to estimate the trend of variations in message count with certain reliability.
// These samples roughly cover the timespan of past 40 seconds.
int minSamplesForScaling = 5;
// TODO: Make this value configurable.
// Upper limit on the count of messages that needs to be maintained per worker.
int maxMessagesPerWorker = 1000;
var status = new ScaleStatus
{
Vote = ScaleVote.None,
};
// Do not make a scale decision unless we have enough samples.
if (metrics == null || metrics.Length < minSamplesForScaling)
{
this.logger.LogInformation($"Requesting no-scaling: Insufficient metrics for making scale decision for {this.logDetails}.");
return status;
}
// Consider only the most recent batch of samples in the rest of the method.
var latestMetrics = new RabbitMQTriggerMetrics[minSamplesForScaling];
Array.Copy(metrics, metrics.Length - minSamplesForScaling, latestMetrics, 0, minSamplesForScaling);
metrics = latestMetrics;
string counts = string.Join(", ", metrics.Select(metric => metric.MessageCount));
this.logger.LogInformation($"Message counts: [{counts}], worker count: {workerCount}, maximum messages per worker: {maxMessagesPerWorker}.");
// Add worker if the count of messages per worker exceeds the maximum limit.
long lastMessageCount = metrics.Last().MessageCount;
if (lastMessageCount > workerCount * maxMessagesPerWorker)
{
status.Vote = ScaleVote.ScaleOut;
this.logger.LogInformation($"Requesting scale-out: Found too many messages for {this.logDetails} relative to the number of workers.");
return status;
}
// Check if there is a continuous increase or decrease in the count of messages.
bool isIncreasing = true;
bool isDecreasing = true;
for (int index = 0; index < metrics.Length - 1; index++)
{
isIncreasing = isIncreasing && metrics[index].MessageCount < metrics[index + 1].MessageCount;
isDecreasing = isDecreasing && (metrics[index + 1].MessageCount == 0 || metrics[index].MessageCount > metrics[index + 1].MessageCount);
}
if (isIncreasing)
{
// Scale out only if the expected count of messages would exceed the combined limit after 30 seconds.
DateTime referenceTime = metrics[metrics.Length - 1].Timestamp - TimeSpan.FromSeconds(30);
RabbitMQTriggerMetrics referenceMetric = metrics.First(metric => metric.Timestamp > referenceTime);
long expectedMessageCount = (2 * metrics[metrics.Length - 1].MessageCount) - referenceMetric.MessageCount;
if (expectedMessageCount > workerCount * maxMessagesPerWorker)
{
status.Vote = ScaleVote.ScaleOut;
this.logger.LogInformation($"Requesting scale-out: Found the messages for {this.logDetails} to be continuously increasing" +
" and may exceed the maximum limit set for the workers.");
return status;
}
else
{
this.logger.LogDebug($"Avoiding scale-out: Found the messages for {this.logDetails} to be increasing" +
" but they may not exceed the maximum limit set for the workers.");
}
}
if (isDecreasing)
{
// Scale in only if the count of messages will not exceed the combined limit post the scale-in operation.
if (lastMessageCount <= (workerCount - 1) * maxMessagesPerWorker)
{
status.Vote = ScaleVote.ScaleIn;
this.logger.LogInformation($"Requesting scale-in: Found {this.logDetails} to be either idle or the messages to be continuously decreasing.");
return status;
}
else
{
this.logger.LogDebug($"Avoiding scale-in: Found the messages for {this.logDetails} to be decreasing" +
" but they are high enough to require all existing workers for processing.");
}
}
this.logger.LogInformation($"Requesting no-scaling: Found {this.logDetails} to not require scaling.");
return status;
}
}