private IConsumer CreateConsumer()

in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs [96:131]


        private IConsumer<TKey, TValue> CreateConsumer()
        {
            AzureFunctionsFileHelper.InitializeLibrdKafka(this.logger);

            var builder = this.CreateConsumerBuilder(GetConsumerConfiguration());

            builder.SetErrorHandler((_, e) =>
            {
                logger.LogError(e.Reason);
            })
            .SetPartitionsAssignedHandler((_, e) =>
            {
                logger.LogInformation($"Assigned partitions: [{string.Join(", ", e)}]");
            })
            .SetPartitionsRevokedHandler((_, e) =>
            {
                logger.LogInformation($"Revoked partitions: [{string.Join(", ", e)}]");
            });

            if (ValueDeserializer != null)
            {
                builder.SetValueDeserializer(ValueDeserializer);
            }

            if (KeyDeserializer != null)
            {
                builder.SetKeyDeserializer(KeyDeserializer);
            }

            builder.SetLogHandler((_, m) =>
            {
                logger.Log((LogLevel)m.LevelAs(LogLevelType.MicrosoftExtensionsLogging), $"Libkafka: {m?.Message}");
            });

            return builder.Build();
        }