in pulsar/__init__.py [0:0]
def __init__(self, service_url,
authentication=None,
operation_timeout_seconds=30,
io_threads=1,
message_listener_threads=1,
concurrent_lookup_requests=50000,
log_conf_file_path=None,
use_tls=False,
tls_trust_certs_file_path=None,
tls_allow_insecure_connection=False,
tls_validate_hostname=False,
logger=None,
connection_timeout_ms=10000,
listener_name=None
):
"""
Create a new Pulsar client instance.
Parameters
----------
service_url: str
The Pulsar service url eg: pulsar://my-broker.com:6650/
authentication: Authentication, optional
Set the authentication provider to be used with the broker. Supported methods:
* `AuthenticationTLS`
* `AuthenticationToken`
* `AuthenticationAthenz`
* `AuthenticationOauth2`
operation_timeout_seconds: int, default=30
Set timeout on client operations (subscribe, create producer, close, unsubscribe).
io_threads: int, default=1
Set the number of IO threads to be used by the Pulsar client.
message_listener_threads: int, default=1
Set the number of threads to be used by the Pulsar client when delivering messages through
message listener. The default is 1 thread per Pulsar client. If using more than 1 thread,
messages for distinct ``message_listener``s will be delivered in different threads, however a
single ``MessageListener`` will always be assigned to the same thread.
concurrent_lookup_requests: int, default=50000
Number of concurrent lookup-requests allowed on each broker connection to prevent overload
on the broker.
log_conf_file_path: str, optional
Initialize log4cxx from a configuration file.
use_tls: bool, default=False
Configure whether to use TLS encryption on the connection. This setting is deprecated.
TLS will be automatically enabled if the ``serviceUrl`` is set to ``pulsar+ssl://`` or ``https://``
tls_trust_certs_file_path: str, optional
Set the path to the trusted TLS certificate file. If empty defaults to certifi.
tls_allow_insecure_connection: bool, default=False
Configure whether the Pulsar client accepts untrusted TLS certificates from the broker.
tls_validate_hostname: bool, default=False
Configure whether the Pulsar client validates that the hostname of the endpoint,
matches the common name on the TLS certificate presented by the endpoint.
logger: optional
Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`.
connection_timeout_ms: int, default=10000
Set timeout in milliseconds on TCP connections.
listener_name: str, optional
Listener name for lookup. Clients can use listenerName to choose one of the listeners as
the service URL to create a connection to the broker as long as the network is accessible.
``advertisedListeners`` must be enabled in broker side.
"""
_check_type(str, service_url, 'service_url')
_check_type_or_none(Authentication, authentication, 'authentication')
_check_type(int, operation_timeout_seconds, 'operation_timeout_seconds')
_check_type(int, connection_timeout_ms, 'connection_timeout_ms')
_check_type(int, io_threads, 'io_threads')
_check_type(int, message_listener_threads, 'message_listener_threads')
_check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests')
_check_type_or_none(str, log_conf_file_path, 'log_conf_file_path')
_check_type(bool, use_tls, 'use_tls')
_check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')
_check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection')
_check_type(bool, tls_validate_hostname, 'tls_validate_hostname')
_check_type_or_none(str, listener_name, 'listener_name')
conf = _pulsar.ClientConfiguration()
if authentication:
conf.authentication(authentication.auth)
conf.operation_timeout_seconds(operation_timeout_seconds)
conf.connection_timeout(connection_timeout_ms)
conf.io_threads(io_threads)
conf.message_listener_threads(message_listener_threads)
conf.concurrent_lookup_requests(concurrent_lookup_requests)
if log_conf_file_path:
conf.log_conf_file_path(log_conf_file_path)
if isinstance(logger, logging.Logger):
conf.set_logger(self._prepare_logger(logger))
elif isinstance(logger, ConsoleLogger):
conf.set_console_logger(logger.log_level)
elif isinstance(logger, FileLogger):
conf.set_file_logger(logger.log_level, logger.log_file)
elif logger is not None:
raise ValueError("Logger is expected to be either None, logger.Logger, pulsar.ConsoleLogger or pulsar.FileLogger")
if listener_name:
conf.listener_name(listener_name)
if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'):
conf.use_tls(True)
if tls_trust_certs_file_path:
conf.tls_trust_certs_file_path(tls_trust_certs_file_path)
else:
import certifi
conf.tls_trust_certs_file_path(certifi.where())
conf.tls_allow_insecure_connection(tls_allow_insecure_connection)
conf.tls_validate_hostname(tls_validate_hostname)
self._client = _pulsar.Client(service_url, conf)
self._consumers = []