public static ClientBuilder getClientBuilder()

in pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java [51:116]


    public static ClientBuilder getClientBuilder(Properties properties) {
        ClientBuilder clientBuilder = PulsarClient.builder();

        if (properties.containsKey(AUTHENTICATION_CLASS)) {
            String className = properties.getProperty(AUTHENTICATION_CLASS);
            try {
                if (properties.containsKey(AUTHENTICATION_PARAMS_STRING)) {
                    String authParamsString = (String) properties.get(AUTHENTICATION_PARAMS_STRING);
                    clientBuilder.authentication(className, authParamsString);
                } else if (properties.containsKey(AUTHENTICATION_PARAMS_MAP)) {
                    Map<String, String> authParams = (Map<String, String>) properties.get(AUTHENTICATION_PARAMS_MAP);
                    clientBuilder.authentication(className, authParams);
                } else {
                    @SuppressWarnings("unchecked")
                    Class<Authentication> clazz = (Class<Authentication>) Class.forName(className);
                    Authentication auth = clazz.newInstance();
                    clientBuilder.authentication(auth);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        clientBuilder.enableTls(Boolean.parseBoolean(properties.getProperty(USE_TLS, "false")));
        clientBuilder.allowTlsInsecureConnection(
                Boolean.parseBoolean(properties.getProperty(TLS_ALLOW_INSECURE_CONNECTION, "false")));
        clientBuilder.enableTlsHostnameVerification(
                Boolean.parseBoolean(properties.getProperty(TLS_HOSTNAME_VERIFICATION, "false")));

        if (properties.containsKey(TLS_TRUST_CERTS_FILE_PATH)) {
            clientBuilder.tlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH));
        }

        if (properties.containsKey(OPERATION_TIMEOUT_MS)) {
            clientBuilder.operationTimeout(Integer.parseInt(properties.getProperty(OPERATION_TIMEOUT_MS)),
                    TimeUnit.MILLISECONDS);
        }

        if (properties.containsKey(STATS_INTERVAL_SECONDS)) {
            clientBuilder.statsInterval(Integer.parseInt(properties.getProperty(STATS_INTERVAL_SECONDS)),
                    TimeUnit.SECONDS);
        }

        if (properties.containsKey(NUM_IO_THREADS)) {
            clientBuilder.ioThreads(Integer.parseInt(properties.getProperty(NUM_IO_THREADS)));
        }

        if (properties.containsKey(CONNECTIONS_PER_BROKER)) {
            clientBuilder.connectionsPerBroker(Integer.parseInt(properties.getProperty(CONNECTIONS_PER_BROKER)));
        }

        if (properties.containsKey(USE_TCP_NODELAY)) {
            clientBuilder.enableTcpNoDelay(Boolean.parseBoolean(properties.getProperty(USE_TCP_NODELAY)));
        }

        if (properties.containsKey(CONCURRENT_LOOKUP_REQUESTS)) {
            clientBuilder.maxConcurrentLookupRequests(Integer.parseInt(properties.getProperty(CONCURRENT_LOOKUP_REQUESTS)));
        }

        if (properties.containsKey(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)) {
            clientBuilder.maxNumberOfRejectedRequestPerConnection(
                    Integer.parseInt(properties.getProperty(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)));
        }

        return clientBuilder;
    }