in log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java [277:339]
public void activateOptions() {
// check for config parameter validity
Properties props = new Properties();
if (brokerList != null)
props.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
if (props.isEmpty())
throw new ConfigException("The bootstrap servers property should be specified");
if (topic == null)
throw new ConfigException("Topic must be specified by the Kafka log4j appender");
if (compressionType != null)
props.put(COMPRESSION_TYPE_CONFIG, compressionType);
props.put(ACKS_CONFIG, Integer.toString(requiredNumAcks));
props.put(RETRIES_CONFIG, retries);
props.put(DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs);
props.put(LINGER_MS_CONFIG, lingerMs);
props.put(BATCH_SIZE_CONFIG, batchSize);
if (securityProtocol != null) {
props.put(SECURITY_PROTOCOL_CONFIG, securityProtocol);
}
if (securityProtocol != null && (securityProtocol.contains("SSL") || securityProtocol.contains("SASL"))) {
if (sslEngineFactoryClass != null) {
props.put(SSL_ENGINE_FACTORY_CLASS_CONFIG, sslEngineFactoryClass);
}
}
if (securityProtocol != null && securityProtocol.contains("SSL") && sslTruststoreLocation != null && sslTruststorePassword != null) {
props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation);
props.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, sslTruststorePassword);
if (sslKeystoreType != null && sslKeystoreLocation != null &&
sslKeystorePassword != null) {
props.put(SSL_KEYSTORE_TYPE_CONFIG, sslKeystoreType);
props.put(SSL_KEYSTORE_LOCATION_CONFIG, sslKeystoreLocation);
props.put(SSL_KEYSTORE_PASSWORD_CONFIG, sslKeystorePassword);
}
}
if (securityProtocol != null && securityProtocol.contains("SASL") && saslKerberosServiceName != null && clientJaasConfPath != null) {
props.put(SASL_KERBEROS_SERVICE_NAME, saslKerberosServiceName);
System.setProperty("java.security.auth.login.config", clientJaasConfPath);
}
if (kerb5ConfPath != null) {
System.setProperty("java.security.krb5.conf", kerb5ConfPath);
}
if (saslMechanism != null) {
props.put(SASL_MECHANISM, saslMechanism);
}
if (clientJaasConf != null) {
props.put(SASL_JAAS_CONFIG, clientJaasConf);
}
if (maxBlockMs != null) {
props.put(MAX_BLOCK_MS_CONFIG, maxBlockMs);
}
props.put(KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
props.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
this.producer = getKafkaProducer(props);
LogLog.debug("Kafka producer connected to " + brokerList);
LogLog.debug("Logging for topic: " + topic);
}