private void setConsumerProps()

in flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java [427:448]


  private void setConsumerProps(Context ctx) {
    kafkaProps.clear();
    kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, DEFAULT_KEY_DESERIALIZER);
    kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_DESERIALIZER);
    //Defaults overridden based on config
    kafkaProps.putAll(ctx.getSubProperties(KAFKA_CONSUMER_PREFIX));
    //These always take precedence over config
    kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    if (groupId != null) {
      kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    }
    kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, DEFAULT_AUTO_COMMIT);
    //  The default value of `ssl.endpoint.identification.algorithm`
    //  is changed to `https`, since kafka client 2.0+
    //  And because flume does not accept an empty string as property value,
    //  so we need to use an alternative custom property
    //  `ssl.disableTLSHostnameVerification` to check if enable fqdn check.
    if (isSSLEnabled(kafkaProps) && "true".equalsIgnoreCase(kafkaProps.getProperty(SSL_DISABLE_FQDN_CHECK))) {
      kafkaProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
    }
    KafkaSSLUtil.addGlobalSSLParameters(kafkaProps);
  }