private ConsumerConfig GetConsumerConfiguration()

in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Listeners/Scaler/KafkaScalerProvider.cs [49:94]


        private ConsumerConfig GetConsumerConfiguration(KafkaMetaData kafkaMetaData, IConfiguration config, INameResolver nameResolver)
        {
            var adminConfig = new ConsumerConfig() {
                GroupId = config.ResolveSecureSetting(nameResolver, kafkaMetaData.ConsumerGroup),
                BootstrapServers = config.ResolveSecureSetting(nameResolver, kafkaMetaData.BrokerList),
            };

            if (kafkaMetaData.AuthenticationMode != BrokerAuthenticationMode.NotSet ||
                kafkaMetaData.Protocol != BrokerProtocol.NotSet)
            {
                adminConfig.SaslPassword = config.ResolveSecureSetting(nameResolver, kafkaMetaData.Password);
                adminConfig.SaslUsername = config.ResolveSecureSetting(nameResolver, kafkaMetaData.Username);
                adminConfig.SslKeyPassword = config.ResolveSecureSetting(nameResolver, kafkaMetaData.SslKeyPassword);
                adminConfig.SslCertificatePem = config.ResolveSecureSetting(nameResolver, kafkaMetaData.SslCertificatePEM);
                adminConfig.SslCaPem = ExtractCertificate(config.ResolveSecureSetting(nameResolver, kafkaMetaData.SslCaPEM));
                adminConfig.SslKeyPem = config.ResolveSecureSetting(nameResolver, kafkaMetaData.SslKeyPEM);

                if (!string.IsNullOrEmpty(kafkaMetaData.SslCertificateandKeyPEM))
                {
                    adminConfig.SslCertificatePem = ExtractCertificate(kafkaMetaData.SslCertificateandKeyPEM);
                    adminConfig.SslKeyPem = ExtractPrivateKey(kafkaMetaData.SslCertificateandKeyPEM);
                }

                if (kafkaMetaData.AuthenticationMode != BrokerAuthenticationMode.NotSet)
                {
                    adminConfig.SaslMechanism = (SaslMechanism)kafkaMetaData.AuthenticationMode;
                }

                if (kafkaMetaData.Protocol != BrokerProtocol.NotSet)
                {
                    adminConfig.SecurityProtocol = (SecurityProtocol)kafkaMetaData.Protocol;
                }

                if (kafkaMetaData.AuthenticationMode == BrokerAuthenticationMode.OAuthBearer)
                {
                    adminConfig.SaslOauthbearerMethod = (SaslOauthbearerMethod)kafkaMetaData.OAuthBearerMethod;
                    adminConfig.SaslOauthbearerClientId = config.ResolveSecureSetting(nameResolver, kafkaMetaData.OAuthBearerClientId);
                    adminConfig.SaslOauthbearerClientSecret = config.ResolveSecureSetting(nameResolver, kafkaMetaData.OAuthBearerClientSecret);
                    adminConfig.SaslOauthbearerScope = config.ResolveSecureSetting(nameResolver, kafkaMetaData.OAuthBearerScope);
                    adminConfig.SaslOauthbearerTokenEndpointUrl = config.ResolveSecureSetting(nameResolver, kafkaMetaData.OAuthBearerTokenEndpointUrl);
                    adminConfig.SaslOauthbearerExtensions = config.ResolveSecureSetting(nameResolver, kafkaMetaData.OAuthBearerExtensions);
                }
            }

            return adminConfig;
        }