public void activateOptions()

in log4j-appender/src/main/java/org/apache/kafka/log4jappender/KafkaLog4jAppender.java [277:339]


    public void activateOptions() {
        // check for config parameter validity
        Properties props = new Properties();
        if (brokerList != null)
            props.put(BOOTSTRAP_SERVERS_CONFIG, brokerList);
        if (props.isEmpty())
            throw new ConfigException("The bootstrap servers property should be specified");
        if (topic == null)
            throw new ConfigException("Topic must be specified by the Kafka log4j appender");
        if (compressionType != null)
            props.put(COMPRESSION_TYPE_CONFIG, compressionType);

        props.put(ACKS_CONFIG, Integer.toString(requiredNumAcks));
        props.put(RETRIES_CONFIG, retries);
        props.put(DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs);
        props.put(LINGER_MS_CONFIG, lingerMs);
        props.put(BATCH_SIZE_CONFIG, batchSize);

        if (securityProtocol != null) {
            props.put(SECURITY_PROTOCOL_CONFIG, securityProtocol);
        }

        if (securityProtocol != null && (securityProtocol.contains("SSL") || securityProtocol.contains("SASL"))) {
            if (sslEngineFactoryClass != null) {
                props.put(SSL_ENGINE_FACTORY_CLASS_CONFIG, sslEngineFactoryClass);
            }
        }

        if (securityProtocol != null && securityProtocol.contains("SSL") && sslTruststoreLocation != null && sslTruststorePassword != null) {
            props.put(SSL_TRUSTSTORE_LOCATION_CONFIG, sslTruststoreLocation);
            props.put(SSL_TRUSTSTORE_PASSWORD_CONFIG, sslTruststorePassword);

            if (sslKeystoreType != null && sslKeystoreLocation != null &&
                    sslKeystorePassword != null) {
                props.put(SSL_KEYSTORE_TYPE_CONFIG, sslKeystoreType);
                props.put(SSL_KEYSTORE_LOCATION_CONFIG, sslKeystoreLocation);
                props.put(SSL_KEYSTORE_PASSWORD_CONFIG, sslKeystorePassword);
            }
        }

        if (securityProtocol != null && securityProtocol.contains("SASL") && saslKerberosServiceName != null && clientJaasConfPath != null) {
            props.put(SASL_KERBEROS_SERVICE_NAME, saslKerberosServiceName);
            System.setProperty("java.security.auth.login.config", clientJaasConfPath);
        }
        if (kerb5ConfPath != null) {
            System.setProperty("java.security.krb5.conf", kerb5ConfPath);
        }
        if (saslMechanism != null) {
            props.put(SASL_MECHANISM, saslMechanism);
        }
        if (clientJaasConf != null) {
            props.put(SASL_JAAS_CONFIG, clientJaasConf);
        }
        if (maxBlockMs != null) {
            props.put(MAX_BLOCK_MS_CONFIG, maxBlockMs);
        }

        props.put(KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        props.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        this.producer = getKafkaProducer(props);
        LogLog.debug("Kafka producer connected to " + brokerList);
        LogLog.debug("Logging for topic: " + topic);
    }