in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs [96:131]
private IConsumer<TKey, TValue> CreateConsumer()
{
AzureFunctionsFileHelper.InitializeLibrdKafka(this.logger);
var builder = this.CreateConsumerBuilder(GetConsumerConfiguration());
builder.SetErrorHandler((_, e) =>
{
logger.LogError(e.Reason);
})
.SetPartitionsAssignedHandler((_, e) =>
{
logger.LogInformation($"Assigned partitions: [{string.Join(", ", e)}]");
})
.SetPartitionsRevokedHandler((_, e) =>
{
logger.LogInformation($"Revoked partitions: [{string.Join(", ", e)}]");
});
if (ValueDeserializer != null)
{
builder.SetValueDeserializer(ValueDeserializer);
}
if (KeyDeserializer != null)
{
builder.SetKeyDeserializer(KeyDeserializer);
}
builder.SetLogHandler((_, m) =>
{
logger.Log((LogLevel)m.LevelAs(LogLevelType.MicrosoftExtensionsLogging), $"Libkafka: {m?.Message}");
});
return builder.Build();
}