in flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java [427:448]
private void setConsumerProps(Context ctx) {
kafkaProps.clear();
kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DEFAULT_KEY_DESERIALIZER);
kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_DESERIALIZER);
//Defaults overridden based on config
kafkaProps.putAll(ctx.getSubProperties(KAFKA_CONSUMER_PREFIX));
//These always take precedence over config
kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
if (groupId != null) {
kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
}
kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, DEFAULT_AUTO_COMMIT);
// The default value of `ssl.endpoint.identification.algorithm`
// is changed to `https`, since kafka client 2.0+
// And because flume does not accept an empty string as property value,
// so we need to use an alternative custom property
// `ssl.disableTLSHostnameVerification` to check if enable fqdn check.
if (isSSLEnabled(kafkaProps) && "true".equalsIgnoreCase(kafkaProps.getProperty(SSL_DISABLE_FQDN_CHECK))) {
kafkaProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
}
KafkaSSLUtil.addGlobalSSLParameters(kafkaProps);
}