public static PulsarClient createClient()

in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java [104:187]


    public static PulsarClient createClient(PulsarConfiguration configuration)
            throws PulsarClientException {
        ClientBuilder builder = PulsarClient.builder();

        // requestTimeoutMs don't have a setter method on ClientBuilder. We have to use low level
        // setter method instead. So we put this at the beginning of the builder.
        Integer requestTimeoutMs = configuration.get(PULSAR_REQUEST_TIMEOUT_MS);
        builder.loadConf(singletonMap("requestTimeoutMs", requestTimeoutMs));

        // Create the authentication instance for the Pulsar client.
        builder.authentication(createAuthentication(configuration));

        configuration.useOption(PULSAR_SERVICE_URL, builder::serviceUrl);
        configuration.useOption(PULSAR_LISTENER_NAME, builder::listenerName);
        configuration.useOption(
                PULSAR_OPERATION_TIMEOUT_MS,
                timeout -> builder.operationTimeout(timeout, MILLISECONDS));
        configuration.useOption(
                PULSAR_LOOKUP_TIMEOUT_MS, timeout -> builder.lookupTimeout(timeout, MILLISECONDS));
        configuration.useOption(PULSAR_NUM_IO_THREADS, builder::ioThreads);
        configuration.useOption(PULSAR_NUM_LISTENER_THREADS, builder::listenerThreads);
        configuration.useOption(PULSAR_CONNECTIONS_PER_BROKER, builder::connectionsPerBroker);
        configuration.useOption(
                PULSAR_CONNECTION_MAX_IDLE_SECONDS, builder::connectionMaxIdleSeconds);
        configuration.useOption(PULSAR_USE_TCP_NO_DELAY, builder::enableTcpNoDelay);
        configuration.useOption(PULSAR_TLS_KEY_FILE_PATH, builder::tlsKeyFilePath);
        configuration.useOption(PULSAR_TLS_CERTIFICATE_FILE_PATH, builder::tlsCertificateFilePath);
        configuration.useOption(PULSAR_TLS_TRUST_CERTS_FILE_PATH, builder::tlsTrustCertsFilePath);
        configuration.useOption(
                PULSAR_TLS_ALLOW_INSECURE_CONNECTION, builder::allowTlsInsecureConnection);
        configuration.useOption(
                PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE, builder::enableTlsHostnameVerification);
        configuration.useOption(PULSAR_USE_KEY_STORE_TLS, builder::useKeyStoreTls);
        configuration.useOption(PULSAR_SSL_PROVIDER, builder::sslProvider);
        configuration.useOption(PULSAR_TLS_KEY_STORE_TYPE, builder::tlsKeyStoreType);
        configuration.useOption(PULSAR_TLS_KEY_STORE_PATH, builder::tlsKeyStorePath);
        configuration.useOption(PULSAR_TLS_KEY_STORE_PASSWORD, builder::tlsKeyStorePassword);
        configuration.useOption(PULSAR_TLS_TRUST_STORE_TYPE, builder::tlsTrustStoreType);
        configuration.useOption(PULSAR_TLS_TRUST_STORE_PATH, builder::tlsTrustStorePath);
        configuration.useOption(PULSAR_TLS_TRUST_STORE_PASSWORD, builder::tlsTrustStorePassword);
        configuration.useOption(PULSAR_TLS_CIPHERS, TreeSet::new, builder::tlsCiphers);
        configuration.useOption(PULSAR_TLS_PROTOCOLS, TreeSet::new, builder::tlsProtocols);
        configuration.useOption(
                PULSAR_MEMORY_LIMIT_BYTES, bytes -> builder.memoryLimit(bytes, BYTES));
        configuration.useOption(
                PULSAR_STATS_INTERVAL_SECONDS, v -> builder.statsInterval(v, SECONDS));
        configuration.useOption(
                PULSAR_CONCURRENT_LOOKUP_REQUEST, builder::maxConcurrentLookupRequests);
        configuration.useOption(PULSAR_MAX_LOOKUP_REQUEST, builder::maxLookupRequests);
        configuration.useOption(PULSAR_MAX_LOOKUP_REDIRECTS, builder::maxLookupRedirects);
        configuration.useOption(
                PULSAR_MAX_NUMBER_OF_REJECTED_REQUEST_PER_CONNECTION,
                builder::maxNumberOfRejectedRequestPerConnection);
        configuration.useOption(
                PULSAR_KEEP_ALIVE_INTERVAL_SECONDS, v -> builder.keepAliveInterval(v, SECONDS));
        configuration.useOption(
                PULSAR_CONNECTION_TIMEOUT_MS, v -> builder.connectionTimeout(v, MILLISECONDS));
        configuration.useOption(
                PULSAR_INITIAL_BACKOFF_INTERVAL_NANOS,
                v -> builder.startingBackoffInterval(v, NANOSECONDS));
        configuration.useOption(
                PULSAR_MAX_BACKOFF_INTERVAL_NANOS, v -> builder.maxBackoffInterval(v, NANOSECONDS));
        configuration.useOption(PULSAR_ENABLE_BUSY_WAIT, builder::enableBusyWait);
        if (configuration.contains(PULSAR_PROXY_SERVICE_URL)) {
            String proxyServiceUrl = configuration.get(PULSAR_PROXY_SERVICE_URL);
            ProxyProtocol proxyProtocol = configuration.get(PULSAR_PROXY_PROTOCOL);
            builder.proxyServiceUrl(proxyServiceUrl, proxyProtocol);
        }
        configuration.useOption(PULSAR_ENABLE_TRANSACTION, builder::enableTransaction);
        bindAddress(configuration, PULSAR_DNS_LOOKUP_BIND_ADDRESS, true, builder::dnsLookupBind);
        bindAddress(
                configuration,
                PULSAR_SOCKS5_PROXY_ADDRESS,
                false,
                (host, port) -> {
                    builder.socks5ProxyAddress(new InetSocketAddress(host, port));
                    configuration.useOption(
                            PULSAR_SOCKS5_PROXY_USERNAME, builder::socks5ProxyUsername);
                    configuration.useOption(
                            PULSAR_SOCKS5_PROXY_PASSWORD, builder::socks5ProxyPassword);
                });

        return builder.build();
    }