in src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs [127:199]
public ProducerConfig GetProducerConfig(KafkaProducerEntity entity)
{
var sslCertificateLocation = config.ResolveSecureSetting(nameResolver, entity.Attribute.SslCertificateLocation);
if (!AzureFunctionsFileHelper.TryGetValidFilePath(sslCertificateLocation, out var resolvedSslCertificationLocation))
{
resolvedSslCertificationLocation = sslCertificateLocation;
}
var sslCaLocation = config.ResolveSecureSetting(nameResolver, entity.Attribute.SslCaLocation);
if (!AzureFunctionsFileHelper.TryGetValidFilePath(sslCaLocation, out var resolvedSslCaLocation))
{
resolvedSslCaLocation = sslCaLocation;
}
var sslKeyLocation = config.ResolveSecureSetting(nameResolver, entity.Attribute.SslKeyLocation);
if (!AzureFunctionsFileHelper.TryGetValidFilePath(sslKeyLocation, out var resolvedSslKeyLocation))
{
resolvedSslKeyLocation = sslKeyLocation;
}
var kafkaOptions = this.config.Get<KafkaOptions>();
var conf = new ProducerConfig()
{
BootstrapServers = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.BrokerList),
BatchNumMessages = entity.Attribute.BatchSize,
EnableIdempotence = entity.Attribute.EnableIdempotence,
MessageSendMaxRetries = entity.Attribute.MaxRetries,
MessageTimeoutMs = entity.Attribute.MessageTimeoutMs,
RequestTimeoutMs = entity.Attribute.RequestTimeoutMs,
MessageMaxBytes = entity.Attribute.MaxMessageBytes,
SaslPassword = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.Password),
SaslUsername = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.Username),
SslKeyLocation = resolvedSslKeyLocation,
SslKeyPassword = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.SslKeyPassword),
SslCertificateLocation = resolvedSslCertificationLocation,
SslCaLocation = resolvedSslCaLocation,
SslCaPem = ExtractCertificate(this.config.ResolveSecureSetting(nameResolver, entity.Attribute.SslCaPEM)),
SslCertificatePem = ExtractCertificate(this.config.ResolveSecureSetting(nameResolver, entity.Attribute.SslCertificatePEM)),
SslKeyPem = ExtractPrivateKey(this.config.ResolveSecureSetting(nameResolver, entity.Attribute.SslKeyPEM)),
Debug = kafkaOptions?.LibkafkaDebug,
MetadataMaxAgeMs = kafkaOptions?.MetadataMaxAgeMs,
SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable,
LingerMs = entity.Attribute.LingerMs,
};
if (!string.IsNullOrEmpty(entity.Attribute.SslCertificateandKeyPEM))
{
var sslCertificateandKeyPEM = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.SslCertificateandKeyPEM);
conf.SslCertificatePem = ExtractCertificate(sslCertificateandKeyPEM);
conf.SslKeyPem = ExtractPrivateKey(sslCertificateandKeyPEM);
}
if (entity.Attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet)
{
conf.SaslMechanism = (SaslMechanism)entity.Attribute.AuthenticationMode;
}
if (entity.Attribute.Protocol != BrokerProtocol.NotSet)
{
conf.SecurityProtocol = (SecurityProtocol)entity.Attribute.Protocol;
}
if (entity.Attribute.AuthenticationMode == BrokerAuthenticationMode.OAuthBearer)
{
conf.SaslOauthbearerMethod = (SaslOauthbearerMethod)entity.Attribute.OAuthBearerMethod;
conf.SaslOauthbearerClientId = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.OAuthBearerClientId);
conf.SaslOauthbearerClientSecret = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.OAuthBearerClientSecret);
conf.SaslOauthbearerScope = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.OAuthBearerScope);
conf.SaslOauthbearerTokenEndpointUrl = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.OAuthBearerTokenEndpointUrl);
conf.SaslOauthbearerExtensions = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.OAuthBearerExtensions);
}
return conf;
}