extension/WebJobs.Extensions.RabbitMQ/Bindings/RabbitMQAsyncCollector.cs (42 lines of code) (raw):
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
namespace Microsoft.Azure.WebJobs.Extensions.RabbitMQ;
internal class RabbitMQAsyncCollector : IAsyncCollector<ReadOnlyMemory<byte>>
{
private readonly RabbitMQContext context;
private readonly ILogger logger;
public RabbitMQAsyncCollector(RabbitMQContext context, ILogger logger)
{
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
_ = context ?? throw new ArgumentNullException(nameof(context));
_ = context.Service ?? throw new ArgumentException("Value cannot be null. Parameter name: context.Service");
this.context = context;
}
public Task AddAsync(ReadOnlyMemory<byte> message, CancellationToken cancellationToken = default)
{
this.logger.LogDebug("Adding message to batch for publishing...");
lock (this.context.Service.PublishBatchLock)
{
this.context.Service.BasicPublishBatch.Add(exchange: string.Empty, routingKey: this.context.ResolvedAttribute.QueueName, mandatory: false, properties: null, body: message);
}
return Task.CompletedTask;
}
public Task FlushAsync(CancellationToken cancellationToken = default)
{
return this.PublishAsync();
}
internal Task PublishAsync()
{
this.logger.LogDebug("Publishing messages to queue.");
lock (this.context.Service.PublishBatchLock)
{
this.context.Service.BasicPublishBatch.Publish();
this.context.Service.ResetPublishBatch();
}
return Task.CompletedTask;
}
}