in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/KafkaListener.cs [178:272]
private ConsumerConfig GetConsumerConfiguration()
{
ConsumerConfig conf = new ConsumerConfig()
{
// enable auto-commit
EnableAutoCommit = true,
// disable auto storing read offsets since we need to store them after calling the trigger function
EnableAutoOffsetStore = false,
// Interval in which commits stored in memory will be saved
AutoCommitIntervalMs = this.options.AutoCommitIntervalMs,
// Librdkafka debug options
Debug = this.options.LibkafkaDebug,
// start from earliest if no checkpoint has been committed
AutoOffsetReset = this.options.AutoOffsetReset,
// Secure communication/authentication
SaslMechanism = this.listenerConfiguration.SaslMechanism,
SaslUsername = this.listenerConfiguration.SaslUsername,
SaslPassword = this.listenerConfiguration.SaslPassword,
SecurityProtocol = this.listenerConfiguration.SecurityProtocol,
SslCaLocation = this.listenerConfiguration.SslCaLocation,
SslCertificateLocation = this.listenerConfiguration.SslCertificateLocation,
SslKeyLocation = this.listenerConfiguration.SslKeyLocation,
SslKeyPassword = this.listenerConfiguration.SslKeyPassword,
SslCaPem = this.listenerConfiguration.SslCaPEM,
SslCertificatePem = this.listenerConfiguration.SslCertificatePEM,
SslKeyPem = this.listenerConfiguration.SslKeyPEM,
// OAuthBearer config
SaslOauthbearerMethod = this.listenerConfiguration.SaslOAuthBearerMethod,
SaslOauthbearerClientId = this.listenerConfiguration.SaslOAuthBearerClientId,
SaslOauthbearerClientSecret = this.listenerConfiguration.SaslOAuthBearerClientSecret,
SaslOauthbearerScope = this.listenerConfiguration.SaslOAuthBearerScope,
SaslOauthbearerTokenEndpointUrl = this.listenerConfiguration.SaslOAuthBearerTokenEndpointUrl,
SaslOauthbearerExtensions = this.listenerConfiguration.SaslOAuthBearerExtensions,
// Values from host configuration
StatisticsIntervalMs = this.options.StatisticsIntervalMs,
ReconnectBackoffMs = this.options.ReconnectBackoffMs,
ReconnectBackoffMaxMs = this.options.ReconnectBackoffMaxMs,
SessionTimeoutMs = this.options.SessionTimeoutMs,
MaxPollIntervalMs = this.options.MaxPollIntervalMs,
QueuedMinMessages = this.options.QueuedMinMessages,
QueuedMaxMessagesKbytes = this.options.QueuedMaxMessagesKbytes,
MaxPartitionFetchBytes = this.options.MaxPartitionFetchBytes,
FetchMaxBytes = this.options.FetchMaxBytes,
MetadataMaxAgeMs = this.options.MetadataMaxAgeMs,
SocketKeepaliveEnable = this.options.SocketKeepaliveEnable
};
if (string.IsNullOrEmpty(this.listenerConfiguration.EventHubConnectionString))
{
// Setup native kafka configuration
conf.BootstrapServers = this.listenerConfiguration.BrokerList;
conf.GroupId = this.listenerConfiguration.ConsumerGroup;
if (!string.IsNullOrWhiteSpace(conf.SslCaLocation))
{
if (AzureFunctionsFileHelper.TryGetValidFilePath(conf.SslCaLocation, out var resolvedSslCaLocation))
{
this.logger.LogDebug("Found SslCaLocation in {filePath}", resolvedSslCaLocation);
conf.SslCaLocation = resolvedSslCaLocation;
}
else
{
this.logger.LogWarning("Could not find valid file path for SslCaLocation {filePath}", conf.SslCaLocation);
}
}
}
else
{
// Setup eventhubs kafka head configuration
var ehBrokerList = this.listenerConfiguration.BrokerList;
if (!ehBrokerList.Contains(EventHubsBrokerListDns))
{
ehBrokerList = $"{this.listenerConfiguration.BrokerList}{EventHubsBrokerListDns}:{EventHubsBrokerListPort}";
}
var consumerGroupToUse = string.IsNullOrEmpty(this.listenerConfiguration.ConsumerGroup) ? "$Default" : this.listenerConfiguration.ConsumerGroup;
conf.BootstrapServers = ehBrokerList;
conf.SecurityProtocol = SecurityProtocol.SaslSsl;
conf.SaslMechanism = SaslMechanism.Plain;
conf.SaslUsername = EventHubsSaslUsername;
conf.SaslPassword = this.listenerConfiguration.EventHubConnectionString;
conf.SslCaLocation= this.EnsureValidEventHubsCertificateLocation(this.listenerConfiguration.SslCaLocation);
conf.GroupId = consumerGroupToUse;
conf.BrokerVersionFallback = EventHubsBrokerVersionFallback;
}
return conf;
}