in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarClientFactory.java [104:187]
public static PulsarClient createClient(PulsarConfiguration configuration)
throws PulsarClientException {
ClientBuilder builder = PulsarClient.builder();
// requestTimeoutMs don't have a setter method on ClientBuilder. We have to use low level
// setter method instead. So we put this at the beginning of the builder.
Integer requestTimeoutMs = configuration.get(PULSAR_REQUEST_TIMEOUT_MS);
builder.loadConf(singletonMap("requestTimeoutMs", requestTimeoutMs));
// Create the authentication instance for the Pulsar client.
builder.authentication(createAuthentication(configuration));
configuration.useOption(PULSAR_SERVICE_URL, builder::serviceUrl);
configuration.useOption(PULSAR_LISTENER_NAME, builder::listenerName);
configuration.useOption(
PULSAR_OPERATION_TIMEOUT_MS,
timeout -> builder.operationTimeout(timeout, MILLISECONDS));
configuration.useOption(
PULSAR_LOOKUP_TIMEOUT_MS, timeout -> builder.lookupTimeout(timeout, MILLISECONDS));
configuration.useOption(PULSAR_NUM_IO_THREADS, builder::ioThreads);
configuration.useOption(PULSAR_NUM_LISTENER_THREADS, builder::listenerThreads);
configuration.useOption(PULSAR_CONNECTIONS_PER_BROKER, builder::connectionsPerBroker);
configuration.useOption(
PULSAR_CONNECTION_MAX_IDLE_SECONDS, builder::connectionMaxIdleSeconds);
configuration.useOption(PULSAR_USE_TCP_NO_DELAY, builder::enableTcpNoDelay);
configuration.useOption(PULSAR_TLS_KEY_FILE_PATH, builder::tlsKeyFilePath);
configuration.useOption(PULSAR_TLS_CERTIFICATE_FILE_PATH, builder::tlsCertificateFilePath);
configuration.useOption(PULSAR_TLS_TRUST_CERTS_FILE_PATH, builder::tlsTrustCertsFilePath);
configuration.useOption(
PULSAR_TLS_ALLOW_INSECURE_CONNECTION, builder::allowTlsInsecureConnection);
configuration.useOption(
PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE, builder::enableTlsHostnameVerification);
configuration.useOption(PULSAR_USE_KEY_STORE_TLS, builder::useKeyStoreTls);
configuration.useOption(PULSAR_SSL_PROVIDER, builder::sslProvider);
configuration.useOption(PULSAR_TLS_KEY_STORE_TYPE, builder::tlsKeyStoreType);
configuration.useOption(PULSAR_TLS_KEY_STORE_PATH, builder::tlsKeyStorePath);
configuration.useOption(PULSAR_TLS_KEY_STORE_PASSWORD, builder::tlsKeyStorePassword);
configuration.useOption(PULSAR_TLS_TRUST_STORE_TYPE, builder::tlsTrustStoreType);
configuration.useOption(PULSAR_TLS_TRUST_STORE_PATH, builder::tlsTrustStorePath);
configuration.useOption(PULSAR_TLS_TRUST_STORE_PASSWORD, builder::tlsTrustStorePassword);
configuration.useOption(PULSAR_TLS_CIPHERS, TreeSet::new, builder::tlsCiphers);
configuration.useOption(PULSAR_TLS_PROTOCOLS, TreeSet::new, builder::tlsProtocols);
configuration.useOption(
PULSAR_MEMORY_LIMIT_BYTES, bytes -> builder.memoryLimit(bytes, BYTES));
configuration.useOption(
PULSAR_STATS_INTERVAL_SECONDS, v -> builder.statsInterval(v, SECONDS));
configuration.useOption(
PULSAR_CONCURRENT_LOOKUP_REQUEST, builder::maxConcurrentLookupRequests);
configuration.useOption(PULSAR_MAX_LOOKUP_REQUEST, builder::maxLookupRequests);
configuration.useOption(PULSAR_MAX_LOOKUP_REDIRECTS, builder::maxLookupRedirects);
configuration.useOption(
PULSAR_MAX_NUMBER_OF_REJECTED_REQUEST_PER_CONNECTION,
builder::maxNumberOfRejectedRequestPerConnection);
configuration.useOption(
PULSAR_KEEP_ALIVE_INTERVAL_SECONDS, v -> builder.keepAliveInterval(v, SECONDS));
configuration.useOption(
PULSAR_CONNECTION_TIMEOUT_MS, v -> builder.connectionTimeout(v, MILLISECONDS));
configuration.useOption(
PULSAR_INITIAL_BACKOFF_INTERVAL_NANOS,
v -> builder.startingBackoffInterval(v, NANOSECONDS));
configuration.useOption(
PULSAR_MAX_BACKOFF_INTERVAL_NANOS, v -> builder.maxBackoffInterval(v, NANOSECONDS));
configuration.useOption(PULSAR_ENABLE_BUSY_WAIT, builder::enableBusyWait);
if (configuration.contains(PULSAR_PROXY_SERVICE_URL)) {
String proxyServiceUrl = configuration.get(PULSAR_PROXY_SERVICE_URL);
ProxyProtocol proxyProtocol = configuration.get(PULSAR_PROXY_PROTOCOL);
builder.proxyServiceUrl(proxyServiceUrl, proxyProtocol);
}
configuration.useOption(PULSAR_ENABLE_TRANSACTION, builder::enableTransaction);
bindAddress(configuration, PULSAR_DNS_LOOKUP_BIND_ADDRESS, true, builder::dnsLookupBind);
bindAddress(
configuration,
PULSAR_SOCKS5_PROXY_ADDRESS,
false,
(host, port) -> {
builder.socks5ProxyAddress(new InetSocketAddress(host, port));
configuration.useOption(
PULSAR_SOCKS5_PROXY_USERNAME, builder::socks5ProxyUsername);
configuration.useOption(
PULSAR_SOCKS5_PROXY_PASSWORD, builder::socks5ProxyPassword);
});
return builder.build();
}