public Task StartAsync()

in extension/WebJobs.Extensions.RabbitMQ/Trigger/RabbitMQListener.cs [79:141]


    public Task StartAsync(CancellationToken cancellationToken)
    {
        int previousState = Interlocked.CompareExchange(ref this.listenerState, ListenerStarting, ListenerNotStarted);

        // It is possible that the WebJobs SDKS invokes StartAsync() method more than once, if there are other trigger
        // listeners registered and some of them have failed to start.
        if (previousState != ListenerNotStarted)
        {
            throw new InvalidOperationException("The listener is either starting or has already started.");
        }

        // The RabbitMQ server (v3.11.2 as of latest) only has support for prefetch size of zero (no specific limit).
        // See: https://github.com/rabbitmq/rabbitmq-server/blob/v3.11.2/deps/rabbit/src/rabbit_channel.erl#L1543.
        // See: https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.qos.prefetch-size for protocol specification.
        this.channel.BasicQos(prefetchSize: 0, this.prefetchCount, global: false);

        // We should use AsyncEventingBasicConsumer to create the consumer since our handler method is async. Using
        // EventingBasicConsumer led to issue: https://github.com/Azure/azure-functions-rabbitmq-extension/issues/211).
        var consumer = new AsyncEventingBasicConsumer(this.channel);
        consumer.Received += ReceivedHandler;

        this.consumerTag = this.channel.BasicConsume(this.queueName, autoAck: false, consumer);

        this.listenerState = ListenerStarted;
        this.logger.LogDebug($"Started RabbitMQ trigger listener for {this.logDetails}.");

        return Task.CompletedTask;

        async Task ReceivedHandler(object model, BasicDeliverEventArgs args)
        {
            using Activity activity = RabbitMQActivitySource.StartActivity(args.BasicProperties);

            var input = new TriggeredFunctionData() { TriggerValue = args };

            FunctionResult result = await this.executor.TryExecuteAsync(input, this.listenerCancellationTokenSource.Token).ConfigureAwait(false);

            if (!result.Succeeded)
            {
                // Retry by republishing a copy of message to the queue if the triggered function failed to run.
                args.BasicProperties.Headers ??= new Dictionary<string, object>();
                args.BasicProperties.Headers.TryGetValue(RequeueCountHeaderName, out object headerValue);
                int requeueCount = Convert.ToInt32(headerValue, CultureInfo.InvariantCulture) + 1;

                if (requeueCount >= 5)
                {
                    // Discard or 'dead-letter' the message. See: https://www.rabbitmq.com/dlx.html.
                    this.logger.LogDebug($"Rejecting message since the requeue count exceeded for {this.logDetails}.");
                    this.channel.BasicReject(args.DeliveryTag, requeue: false);
                    return;
                }

                this.logger.LogDebug($"Republishing message for {this.logDetails}.");
                args.BasicProperties.Headers[RequeueCountHeaderName] = requeueCount;

                // We cannot call BasicReject() on the message with requeue = true since that would not enable a fixed
                // number of retry attempts. See: https://stackoverflow.com/q/23158310.
                this.channel.BasicPublish(exchange: string.Empty, routingKey: this.queueName, args.BasicProperties, args.Body);
            }

            // Acknowledge the existing message only after the message (in case of failure) is re-published.
            this.channel.BasicAck(args.DeliveryTag, multiple: false);
        }
    }