in flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java [239:259]
private void setConsumerProps(Context ctx, String bootStrapServers) {
consumerProps.clear();
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DEFAULT_KEY_DESERIALIZER);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_DESERIAIZER);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, DEFAULT_AUTO_OFFSET_RESET);
// Defaults overridden based on config
consumerProps.putAll(ctx.getSubProperties(KAFKA_CONSUMER_PREFIX));
// These always take precedence over config
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// The default value of `ssl.endpoint.identification.algorithm`
// is changed to `https`, since kafka client 2.0+
// And because flume does not accept an empty string as property value,
// so we need to use an alternative custom property
// `ssl.disableTLSHostnameVerification` to check if enable fqdn check.
if (isSSLEnabled(consumerProps) && "true".equalsIgnoreCase(consumerProps.getProperty(SSL_DISABLE_FQDN_CHECK))) {
consumerProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
}
KafkaSSLUtil.addGlobalSSLParameters(consumerProps);
}