public ProducerConfig GetProducerConfig()

in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs [127:199]


        public ProducerConfig GetProducerConfig(KafkaProducerEntity entity)
        {
            var sslCertificateLocation = config.ResolveSecureSetting(nameResolver, entity.Attribute.SslCertificateLocation);
            if (!AzureFunctionsFileHelper.TryGetValidFilePath(sslCertificateLocation, out var resolvedSslCertificationLocation))
            {
                resolvedSslCertificationLocation = sslCertificateLocation;
            }
            
            var sslCaLocation = config.ResolveSecureSetting(nameResolver, entity.Attribute.SslCaLocation);
            if (!AzureFunctionsFileHelper.TryGetValidFilePath(sslCaLocation, out var resolvedSslCaLocation))
            {
                resolvedSslCaLocation = sslCaLocation;
            }
            
            var sslKeyLocation = config.ResolveSecureSetting(nameResolver, entity.Attribute.SslKeyLocation);
            if (!AzureFunctionsFileHelper.TryGetValidFilePath(sslKeyLocation, out var resolvedSslKeyLocation))
            {
                resolvedSslKeyLocation = sslKeyLocation;
            }
            var kafkaOptions = this.config.Get<KafkaOptions>();
            var conf = new ProducerConfig()
            {
                BootstrapServers = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.BrokerList),
                BatchNumMessages = entity.Attribute.BatchSize,
                EnableIdempotence = entity.Attribute.EnableIdempotence,
                MessageSendMaxRetries = entity.Attribute.MaxRetries,
                MessageTimeoutMs = entity.Attribute.MessageTimeoutMs,
                RequestTimeoutMs = entity.Attribute.RequestTimeoutMs,
                MessageMaxBytes = entity.Attribute.MaxMessageBytes,
                SaslPassword = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.Password),
                SaslUsername = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.Username),
                SslKeyLocation = resolvedSslKeyLocation,
                SslKeyPassword = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.SslKeyPassword),
                SslCertificateLocation = resolvedSslCertificationLocation,
                SslCaLocation = resolvedSslCaLocation,
                SslCaPem = ExtractCertificate(this.config.ResolveSecureSetting(nameResolver, entity.Attribute.SslCaPEM)),
                SslCertificatePem = ExtractCertificate(this.config.ResolveSecureSetting(nameResolver, entity.Attribute.SslCertificatePEM)),
                SslKeyPem = ExtractPrivateKey(this.config.ResolveSecureSetting(nameResolver, entity.Attribute.SslKeyPEM)),
                Debug = kafkaOptions?.LibkafkaDebug,
                MetadataMaxAgeMs = kafkaOptions?.MetadataMaxAgeMs,
                SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable,
                LingerMs = entity.Attribute.LingerMs,
            };

            if (!string.IsNullOrEmpty(entity.Attribute.SslCertificateandKeyPEM))
            {
                var sslCertificateandKeyPEM = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.SslCertificateandKeyPEM);
                conf.SslCertificatePem = ExtractCertificate(sslCertificateandKeyPEM);
                conf.SslKeyPem = ExtractPrivateKey(sslCertificateandKeyPEM);
            }

            if (entity.Attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet)
            {
                conf.SaslMechanism = (SaslMechanism)entity.Attribute.AuthenticationMode;
            }

            if (entity.Attribute.Protocol != BrokerProtocol.NotSet)
            {
                conf.SecurityProtocol = (SecurityProtocol)entity.Attribute.Protocol;
            }

            if (entity.Attribute.AuthenticationMode == BrokerAuthenticationMode.OAuthBearer)
            {
                conf.SaslOauthbearerMethod = (SaslOauthbearerMethod)entity.Attribute.OAuthBearerMethod;
                conf.SaslOauthbearerClientId = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.OAuthBearerClientId);
                conf.SaslOauthbearerClientSecret = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.OAuthBearerClientSecret);
                conf.SaslOauthbearerScope = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.OAuthBearerScope);
                conf.SaslOauthbearerTokenEndpointUrl = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.OAuthBearerTokenEndpointUrl);
                conf.SaslOauthbearerExtensions = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.OAuthBearerExtensions);
            }

            return conf;
        }