in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs [96:148]
private KafkaListenerConfiguration CreateConsumerConfiguration(KafkaTriggerAttribute attribute)
{
var consumerConfig = new KafkaListenerConfiguration()
{
BrokerList = this.config.ResolveSecureSetting(nameResolver, attribute.BrokerList),
ConsumerGroup = this.config.ResolveSecureSetting(nameResolver, attribute.ConsumerGroup),
Topic = this.config.ResolveSecureSetting(nameResolver, attribute.Topic),
EventHubConnectionString = this.config.ResolveSecureSetting(nameResolver, attribute.EventHubConnectionString),
LagThreshold = attribute.LagThreshold
};
if (attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet ||
attribute.Protocol != BrokerProtocol.NotSet)
{
consumerConfig.SaslPassword = this.config.ResolveSecureSetting(nameResolver, attribute.Password);
consumerConfig.SaslUsername = this.config.ResolveSecureSetting(nameResolver, attribute.Username);
consumerConfig.SslKeyLocation = GetValidFilePath(attribute.SslKeyLocation);
consumerConfig.SslKeyPassword = this.config.ResolveSecureSetting(nameResolver, attribute.SslKeyPassword);
consumerConfig.SslCertificateLocation = GetValidFilePath(attribute.SslCertificateLocation);
consumerConfig.SslCaLocation = GetValidFilePath(attribute.SslCaLocation);
consumerConfig.SslCaPEM = ExtractCertificate(this.config.ResolveSecureSetting(nameResolver, attribute.SslCaPEM));
consumerConfig.SslCertificatePEM = ExtractCertificate(this.config.ResolveSecureSetting(nameResolver, attribute.SslCertificatePEM));
consumerConfig.SslKeyPEM = ExtractPrivateKey(this.config.ResolveSecureSetting(nameResolver, attribute.SslKeyPEM));
consumerConfig.SslCertificateandKeyPEM = this.config.ResolveSecureSetting(nameResolver, attribute.SslCertificateandKeyPEM);
if (!string.IsNullOrEmpty(consumerConfig.SslCertificateandKeyPEM)) {
consumerConfig.SslCertificatePEM = ExtractCertificate(consumerConfig.SslCertificateandKeyPEM);
consumerConfig.SslKeyPEM = ExtractPrivateKey(consumerConfig.SslCertificateandKeyPEM);
}
if (attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet)
{
consumerConfig.SaslMechanism = (SaslMechanism)attribute.AuthenticationMode;
}
if (attribute.Protocol != BrokerProtocol.NotSet)
{
consumerConfig.SecurityProtocol = (SecurityProtocol)attribute.Protocol;
}
if (attribute.AuthenticationMode == BrokerAuthenticationMode.OAuthBearer)
{
consumerConfig.SaslOAuthBearerMethod = (SaslOauthbearerMethod)attribute.OAuthBearerMethod;
consumerConfig.SaslOAuthBearerClientId = this.config.ResolveSecureSetting(nameResolver, attribute.OAuthBearerClientId);
consumerConfig.SaslOAuthBearerClientSecret = this.config.ResolveSecureSetting(nameResolver, attribute.OAuthBearerClientSecret);
consumerConfig.SaslOAuthBearerScope = this.config.ResolveSecureSetting(nameResolver, attribute.OAuthBearerScope);
consumerConfig.SaslOAuthBearerTokenEndpointUrl = this.config.ResolveSecureSetting(nameResolver, attribute.OAuthBearerTokenEndpointUrl);
consumerConfig.SaslOAuthBearerExtensions = this.config.ResolveSecureSetting(nameResolver, attribute.OAuthBearerExtensions);
}
}
return consumerConfig;
}