in pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarClientKafkaConfig.java [125:168]
public static PulsarAdminBuilder getAdminBuilder(String serviceUrl, Properties properties) {
PulsarAdminBuilder adminBuilder = PulsarAdmin.builder();
if (properties == null) {
return adminBuilder;
}
adminBuilder.serviceHttpUrl(properties.getProperty(ADMIN_SERVICE_URL, serviceUrl));
if (properties.containsKey(AUTHENTICATION_CLASS)) {
String className = properties.getProperty(AUTHENTICATION_CLASS);
try {
if (properties.containsKey(AUTHENTICATION_PARAMS_STRING)) {
String authParamsString = (String) properties.get(AUTHENTICATION_PARAMS_STRING);
adminBuilder.authentication(className, authParamsString);
} else if (properties.containsKey(AUTHENTICATION_PARAMS_MAP)) {
Map<String, String> authParams = (Map<String, String>) properties.get(AUTHENTICATION_PARAMS_MAP);
adminBuilder.authentication(className, authParams);
} else {
@SuppressWarnings("unchecked")
Class<Authentication> clazz = (Class<Authentication>) Class.forName(className);
Authentication auth = clazz.newInstance();
adminBuilder.authentication(auth);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
adminBuilder.allowTlsInsecureConnection(
Boolean.parseBoolean(properties.getProperty(TLS_ALLOW_INSECURE_CONNECTION, "false")));
adminBuilder.enableTlsHostnameVerification(
Boolean.parseBoolean(properties.getProperty(TLS_HOSTNAME_VERIFICATION, "false")));
if (properties.containsKey(TLS_TRUST_CERTS_FILE_PATH)) {
adminBuilder.tlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH));
}
if (properties.containsKey(OPERATION_TIMEOUT_MS)) {
adminBuilder.connectionTimeout(Integer.parseInt(properties.getProperty(OPERATION_TIMEOUT_MS)),
TimeUnit.MILLISECONDS);
}
return adminBuilder;
}