internal class AzureServiceBusMessageBus()

in src/Relecloud.Messaging/ServiceBus/AzureServiceBusMessageBus.cs [13:90]


internal class AzureServiceBusMessageBus(ILoggerFactory loggerFactory, ServiceBusClient serviceBusClient) : IMessageBus
{
    // This class uses a logger factory so that it can generate the typed loggers
    // needed by the message senders and processors it creates.
    readonly ILogger<AzureServiceBusMessageBus> logger = loggerFactory.CreateLogger<AzureServiceBusMessageBus>();

    public IMessageSender<T> CreateMessageSender<T>(string path)
    {
        logger.LogDebug("Creating message sender for {Namespace}/{Path}.", serviceBusClient.FullyQualifiedNamespace, path);

        // Create an AzureServiceBusMessageSender for the given queue/topic path that can be used to publish messages
        var sender = serviceBusClient.CreateSender(path);
        return new AzureServiceBusMessageSender<T>(loggerFactory.CreateLogger<AzureServiceBusMessageSender<T>>(), sender);
    }

    public async Task<IMessageProcessor> SubscribeAsync<T>(
        Func<T, CancellationToken, Task> messageHandler, 
        Func<Exception, CancellationToken, Task>? errorHandler, 
        string path, 
        CancellationToken cancellationToken)
    {
        logger.LogDebug("Subscribing to messages from {Namespace}/{Path}.", serviceBusClient.FullyQualifiedNamespace, path);
        
        // Create a processor for the given queue that will process incoming messages
        var processor = serviceBusClient.CreateProcessor(path, new ServiceBusProcessorOptions
        {
            // Allow the messages to be auto-completed if processing finishes without failure
            AutoCompleteMessages = true,

            // PeekLock mode provides reliability in that unsettled messages will be redelivered on failure
            ReceiveMode = ServiceBusReceiveMode.PeekLock,

            // Containerized processors can scale at the container level and need not scale via the processor options
            MaxConcurrentCalls = 1,
            PrefetchCount = 0
        });

        // Called for each message received by the processor
        processor.ProcessMessageAsync += async args =>
        {
            logger.LogInformation("Processing message {MessageId} from {ServiceBusNamespace}/{Path}", args.Message.MessageId, args.FullyQualifiedNamespace, args.EntityPath);

            // Unhandled exceptions in the handler will be caught by the processor and result in abandoning and dead-lettering the message
            try
            {
                var message = args.Message.Body.ToObjectFromJson<T>();
                await messageHandler(message, args.CancellationToken);
                logger.LogInformation("Successfully processed message {MessageId} from {ServiceBusNamespace}/{Path}", args.Message.MessageId, args.FullyQualifiedNamespace, args.EntityPath);
            }
            catch (JsonException)
            {
                logger.LogError("Invalid message body; could not be deserialized to {Type}", typeof(T));
                await args.DeadLetterMessageAsync(args.Message, $"Invalid message body; could not be deserialized to {typeof(T)}", cancellationToken: args.CancellationToken);
            }
        };

        // Called when an unhandled exception occurs in the processor
        processor.ProcessErrorAsync += async args =>
        {
            logger.LogError(
                args.Exception, 
                "Error processing message from {ServiceBusNamespace}/{Path}: {ErrorSource} - {Exception}", 
                args.FullyQualifiedNamespace, 
                args.EntityPath, 
                args.ErrorSource,
                args.Exception);

            if (errorHandler != null)
            {
                await errorHandler(args.Exception, args.CancellationToken);
            }
        };

        await processor.StartProcessingAsync(cancellationToken);

        return new AzureServiceBusMessageProcessor(loggerFactory.CreateLogger<AzureServiceBusMessageProcessor>(), processor);
    }
}