in flume-kafka-sink/src/main/java/org/apache/flume/sink/kafka/KafkaSink.java [412:429]
private void setProducerProps(Context context, String bootStrapServers) {
kafkaProps.clear();
kafkaProps.put(ProducerConfig.ACKS_CONFIG, KafkaSinkConstants.DEFAULT_ACKS);
//Defaults overridden based on config
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaSinkConstants.DEFAULT_KEY_SERIALIZER);
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaSinkConstants.DEFAULT_VALUE_SERIAIZER);
kafkaProps.putAll(context.getSubProperties(KafkaSinkConstants.KAFKA_PRODUCER_PREFIX));
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
// The default value of `ssl.endpoint.identification.algorithm`
// is changed to `https`, since kafka client 2.0+
// And because flume does not accept an empty string as property value,
// so we need to use an alternative custom property
// `ssl.disableTLSHostnameVerification` to check if enable fqdn check.
if (isSSLEnabled(kafkaProps) && "true".equalsIgnoreCase(kafkaProps.getProperty(SSL_DISABLE_FQDN_CHECK))) {
kafkaProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
}
KafkaSSLUtil.addGlobalSSLParameters(kafkaProps);
}