in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/Scaler/KafkaScalerProvider.cs [49:94]
private ConsumerConfig GetConsumerConfiguration(KafkaMetaData kafkaMetaData, IConfiguration config, INameResolver nameResolver)
{
var adminConfig = new ConsumerConfig() {
GroupId = config.ResolveSecureSetting(nameResolver, kafkaMetaData.ConsumerGroup),
BootstrapServers = config.ResolveSecureSetting(nameResolver, kafkaMetaData.BrokerList),
};
if (kafkaMetaData.AuthenticationMode != BrokerAuthenticationMode.NotSet ||
kafkaMetaData.Protocol != BrokerProtocol.NotSet)
{
adminConfig.SaslPassword = config.ResolveSecureSetting(nameResolver, kafkaMetaData.Password);
adminConfig.SaslUsername = config.ResolveSecureSetting(nameResolver, kafkaMetaData.Username);
adminConfig.SslKeyPassword = config.ResolveSecureSetting(nameResolver, kafkaMetaData.SslKeyPassword);
adminConfig.SslCertificatePem = config.ResolveSecureSetting(nameResolver, kafkaMetaData.SslCertificatePEM);
adminConfig.SslCaPem = ExtractCertificate(config.ResolveSecureSetting(nameResolver, kafkaMetaData.SslCaPEM));
adminConfig.SslKeyPem = config.ResolveSecureSetting(nameResolver, kafkaMetaData.SslKeyPEM);
if (!string.IsNullOrEmpty(kafkaMetaData.SslCertificateandKeyPEM))
{
adminConfig.SslCertificatePem = ExtractCertificate(kafkaMetaData.SslCertificateandKeyPEM);
adminConfig.SslKeyPem = ExtractPrivateKey(kafkaMetaData.SslCertificateandKeyPEM);
}
if (kafkaMetaData.AuthenticationMode != BrokerAuthenticationMode.NotSet)
{
adminConfig.SaslMechanism = (SaslMechanism)kafkaMetaData.AuthenticationMode;
}
if (kafkaMetaData.Protocol != BrokerProtocol.NotSet)
{
adminConfig.SecurityProtocol = (SecurityProtocol)kafkaMetaData.Protocol;
}
if (kafkaMetaData.AuthenticationMode == BrokerAuthenticationMode.OAuthBearer)
{
adminConfig.SaslOauthbearerMethod = (SaslOauthbearerMethod)kafkaMetaData.OAuthBearerMethod;
adminConfig.SaslOauthbearerClientId = config.ResolveSecureSetting(nameResolver, kafkaMetaData.OAuthBearerClientId);
adminConfig.SaslOauthbearerClientSecret = config.ResolveSecureSetting(nameResolver, kafkaMetaData.OAuthBearerClientSecret);
adminConfig.SaslOauthbearerScope = config.ResolveSecureSetting(nameResolver, kafkaMetaData.OAuthBearerScope);
adminConfig.SaslOauthbearerTokenEndpointUrl = config.ResolveSecureSetting(nameResolver, kafkaMetaData.OAuthBearerTokenEndpointUrl);
adminConfig.SaslOauthbearerExtensions = config.ResolveSecureSetting(nameResolver, kafkaMetaData.OAuthBearerExtensions);
}
}
return adminConfig;
}