in extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs [79:141]
public Task StartAsync(CancellationToken cancellationToken)
{
int previousState = Interlocked.CompareExchange(ref this.listenerState, ListenerStarting, ListenerNotStarted);
// It is possible that the WebJobs SDKS invokes StartAsync() method more than once, if there are other trigger
// listeners registered and some of them have failed to start.
if (previousState != ListenerNotStarted)
{
throw new InvalidOperationException("The listener is either starting or has already started.");
}
// The RabbitMQ server (v3.11.2 as of latest) only has support for prefetch size of zero (no specific limit).
// See: https://github.com/rabbitmq/rabbitmq-server/blob/v3.11.2/deps/rabbit/src/rabbit_channel.erl#L1543.
// See: https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.prefetch-size for protocol specification.
this.channel.BasicQos(prefetchSize: 0, this.prefetchCount, global: false);
// We should use AsyncEventingBasicConsumer to create the consumer since our handler method is async. Using
// EventingBasicConsumer led to issue: https://github.com/Azure/azure-functions-rabbitmq-extension/issues/211).
var consumer = new AsyncEventingBasicConsumer(this.channel);
consumer.Received += ReceivedHandler;
this.consumerTag = this.channel.BasicConsume(this.queueName, autoAck: false, consumer);
this.listenerState = ListenerStarted;
this.logger.LogDebug($"Started RabbitMQ trigger listener for {this.logDetails}.");
return Task.CompletedTask;
async Task ReceivedHandler(object model, BasicDeliverEventArgs args)
{
using Activity activity = RabbitMQActivitySource.StartActivity(args.BasicProperties);
var input = new TriggeredFunctionData() { TriggerValue = args };
FunctionResult result = await this.executor.TryExecuteAsync(input, this.listenerCancellationTokenSource.Token).ConfigureAwait(false);
if (!result.Succeeded)
{
// Retry by republishing a copy of message to the queue if the triggered function failed to run.
args.BasicProperties.Headers ??= new Dictionary<string, object>();
args.BasicProperties.Headers.TryGetValue(RequeueCountHeaderName, out object headerValue);
int requeueCount = Convert.ToInt32(headerValue, CultureInfo.InvariantCulture) + 1;
if (requeueCount >= 5)
{
// Discard or 'dead-letter' the message. See: https://www.rabbitmq.com/dlx.html.
this.logger.LogDebug($"Rejecting message since the requeue count exceeded for {this.logDetails}.");
this.channel.BasicReject(args.DeliveryTag, requeue: false);
return;
}
this.logger.LogDebug($"Republishing message for {this.logDetails}.");
args.BasicProperties.Headers[RequeueCountHeaderName] = requeueCount;
// We cannot call BasicReject() on the message with requeue = true since that would not enable a fixed
// number of retry attempts. See: https://stackoverflow.com/q/23158310.
this.channel.BasicPublish(exchange: string.Empty, routingKey: this.queueName, args.BasicProperties, args.Body);
}
// Acknowledge the existing message only after the message (in case of failure) is re-published.
this.channel.BasicAck(args.DeliveryTag, multiple: false);
}
}