in pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarClientKafkaConfig.java [54:123]
public static ClientBuilder getClientBuilder(Properties properties) {
ClientBuilder clientBuilder = PulsarClient.builder();
if (properties == null) {
return clientBuilder;
}
if (properties.containsKey(AUTHENTICATION_CLASS)) {
String className = properties.getProperty(AUTHENTICATION_CLASS);
try {
if (properties.containsKey(AUTHENTICATION_PARAMS_STRING)) {
String authParamsString = (String) properties.get(AUTHENTICATION_PARAMS_STRING);
clientBuilder.authentication(className, authParamsString);
} else if (properties.containsKey(AUTHENTICATION_PARAMS_MAP)) {
Map<String, String> authParams = (Map<String, String>) properties.get(AUTHENTICATION_PARAMS_MAP);
clientBuilder.authentication(className, authParams);
} else {
@SuppressWarnings("unchecked")
Class<Authentication> clazz = (Class<Authentication>) Class.forName(className);
Authentication auth = clazz.newInstance();
clientBuilder.authentication(auth);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
clientBuilder.enableTls(Boolean.parseBoolean(properties.getProperty(USE_TLS, "false")));
clientBuilder.allowTlsInsecureConnection(
Boolean.parseBoolean(properties.getProperty(TLS_ALLOW_INSECURE_CONNECTION, "false")));
clientBuilder.enableTlsHostnameVerification(
Boolean.parseBoolean(properties.getProperty(TLS_HOSTNAME_VERIFICATION, "false")));
if (properties.containsKey(TLS_TRUST_CERTS_FILE_PATH)) {
clientBuilder.tlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH));
}
if (properties.containsKey(OPERATION_TIMEOUT_MS)) {
clientBuilder.operationTimeout(Integer.parseInt(properties.getProperty(OPERATION_TIMEOUT_MS)),
TimeUnit.MILLISECONDS);
}
if (properties.containsKey(STATS_INTERVAL_SECONDS)) {
clientBuilder.statsInterval(Integer.parseInt(properties.getProperty(STATS_INTERVAL_SECONDS)),
TimeUnit.SECONDS);
}
if (properties.containsKey(NUM_IO_THREADS)) {
clientBuilder.ioThreads(Integer.parseInt(properties.getProperty(NUM_IO_THREADS)));
}
if (properties.containsKey(CONNECTIONS_PER_BROKER)) {
clientBuilder.connectionsPerBroker(Integer.parseInt(properties.getProperty(CONNECTIONS_PER_BROKER)));
}
if (properties.containsKey(USE_TCP_NODELAY)) {
clientBuilder.enableTcpNoDelay(Boolean.parseBoolean(properties.getProperty(USE_TCP_NODELAY)));
}
if (properties.containsKey(CONCURRENT_LOOKUP_REQUESTS)) {
clientBuilder
.maxConcurrentLookupRequests(Integer.parseInt(properties.getProperty(CONCURRENT_LOOKUP_REQUESTS)));
}
if (properties.containsKey(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)) {
clientBuilder.maxNumberOfRejectedRequestPerConnection(
Integer.parseInt(properties.getProperty(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)));
}
return clientBuilder;
}