private void setProducerProps()

in flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java [216:233]


  private void setProducerProps(Context ctx, String bootStrapServers) {
    producerProps.clear();
    producerProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_ACKS);
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KEY_SERIALIZER);
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_VALUE_SERIAIZER);
    // Defaults overridden based on config
    producerProps.putAll(ctx.getSubProperties(KAFKA_PRODUCER_PREFIX));
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
    //  The default value of `ssl.endpoint.identification.algorithm`
    //  is changed to `https`, since kafka client 2.0+
    //  And because flume does not accept an empty string as property value,
    //  so we need to use an alternative custom property
    //  `ssl.disableTLSHostnameVerification` to check if enable fqdn check.
    if (isSSLEnabled(producerProps) && "true".equalsIgnoreCase(producerProps.getProperty(SSL_DISABLE_FQDN_CHECK))) {
      producerProps.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
    }
    KafkaSSLUtil.addGlobalSSLParameters(producerProps);
  }