in pulsar/__init__.py [0:0]
def __init__(self, service_url,
authentication=None,
operation_timeout_seconds=30,
io_threads=1,
message_listener_threads=1,
concurrent_lookup_requests=50000,
log_conf_file_path=None,
stats_interval_in_seconds=600,
use_tls=False,
tls_trust_certs_file_path=None,
tls_allow_insecure_connection=False,
tls_validate_hostname=False,
logger=None,
connection_timeout_ms=10000,
listener_name=None,
tls_private_key_file_path: Optional[str] = None,
tls_certificate_file_path: Optional[str] = None,
):
"""
Create a new Pulsar client instance.
Parameters
----------
service_url: str
The Pulsar service url eg: pulsar://my-broker.com:6650/
authentication: Authentication, optional
Set the authentication provider to be used with the broker. Supported methods:
* `AuthenticationTLS`
* `AuthenticationToken`
* `AuthenticationAthenz`
* `AuthenticationOauth2`
operation_timeout_seconds: int, default=30
Set timeout on client operations (subscribe, create producer, close, unsubscribe).
io_threads: int, default=1
Set the number of IO threads to be used by the Pulsar client.
message_listener_threads: int, default=1
Set the number of threads to be used by the Pulsar client when delivering messages through
message listener. The default is 1 thread per Pulsar client. If using more than 1 thread,
messages for distinct ``message_listener``s will be delivered in different threads, however a
single ``MessageListener`` will always be assigned to the same thread.
concurrent_lookup_requests: int, default=50000
Number of concurrent lookup-requests allowed on each broker connection to prevent overload
on the broker.
log_conf_file_path: str, optional
This parameter is deprecated and makes no effect. It's retained only for compatibility.
Use `logger` to customize a logger.
stats_interval_in_seconds: int, default=600
Set the interval between each stats information update. Stats are printed and/or
passed to the statistics listener at this interval. Set to 0 to disable stats collection.
use_tls: bool, default=False
Configure whether to use TLS encryption on the connection. This setting is deprecated.
TLS will be automatically enabled if the ``serviceUrl`` is set to ``pulsar+ssl://`` or ``https://``
tls_trust_certs_file_path: str, optional
Set the path to the trusted TLS certificate file. If empty defaults to certifi.
tls_allow_insecure_connection: bool, default=False
Configure whether the Pulsar client accepts untrusted TLS certificates from the broker.
tls_validate_hostname: bool, default=False
Configure whether the Pulsar client validates that the hostname of the endpoint,
matches the common name on the TLS certificate presented by the endpoint.
logger: optional
Set a Python logger for this Pulsar client. Should be an instance of `logging.Logger`.
It should be noted that if the Python logger is configured, during the termination of the Python
interpreter, the Python logger will be unavailable and the default logger will be used for logging.
To avoid strange behavior, you'd better delete all instances explicitly before exiting.
.. code-block:: python
import logging
client = Client(service_url, logger=logging.getLogger('pulsar'))
producer = client.create_producer(topic)
# ...
del producer
del client
connection_timeout_ms: int, default=10000
Set timeout in milliseconds on TCP connections.
listener_name: str, optional
Listener name for lookup. Clients can use listenerName to choose one of the listeners as
the service URL to create a connection to the broker as long as the network is accessible.
``advertisedListeners`` must be enabled in broker side.
tls_private_key_file_path: str, optional
The path to the TLS private key file
tls_certificate_file_path: str, optional
The path to the TLS certificate file.
"""
_check_type(str, service_url, 'service_url')
_check_type_or_none(Authentication, authentication, 'authentication')
_check_type(int, operation_timeout_seconds, 'operation_timeout_seconds')
_check_type(int, connection_timeout_ms, 'connection_timeout_ms')
_check_type(int, io_threads, 'io_threads')
_check_type(int, message_listener_threads, 'message_listener_threads')
_check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests')
_check_type_or_none(str, log_conf_file_path, 'log_conf_file_path')
_check_type(int, stats_interval_in_seconds, 'stats_interval_in_seconds')
_check_type(bool, use_tls, 'use_tls')
_check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')
_check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection')
_check_type(bool, tls_validate_hostname, 'tls_validate_hostname')
_check_type_or_none(str, listener_name, 'listener_name')
_check_type_or_none(str, tls_private_key_file_path, 'tls_private_key_file_path')
_check_type_or_none(str, tls_certificate_file_path, 'tls_certificate_file_path')
conf = _pulsar.ClientConfiguration()
if authentication:
conf.authentication(authentication.auth)
conf.operation_timeout_seconds(operation_timeout_seconds)
conf.connection_timeout(connection_timeout_ms)
conf.io_threads(io_threads)
conf.message_listener_threads(message_listener_threads)
conf.concurrent_lookup_requests(concurrent_lookup_requests)
conf.stats_interval_in_seconds(stats_interval_in_seconds)
if isinstance(logger, logging.Logger):
conf.set_logger(self._prepare_logger(logger))
elif isinstance(logger, ConsoleLogger):
conf.set_console_logger(logger.log_level)
elif isinstance(logger, FileLogger):
conf.set_file_logger(logger.log_level, logger.log_file)
elif logger is not None:
raise ValueError("Logger is expected to be either None, logger.Logger, pulsar.ConsoleLogger or pulsar.FileLogger")
if listener_name:
conf.listener_name(listener_name)
if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'):
conf.use_tls(True)
if tls_trust_certs_file_path:
conf.tls_trust_certs_file_path(tls_trust_certs_file_path)
else:
import certifi
conf.tls_trust_certs_file_path(certifi.where())
conf.tls_allow_insecure_connection(tls_allow_insecure_connection)
conf.tls_validate_hostname(tls_validate_hostname)
if tls_private_key_file_path is not None:
conf.tls_private_key_file_path(tls_private_key_file_path)
if tls_certificate_file_path is not None:
conf.tls_certificate_file_path(tls_certificate_file_path)
self._client = _pulsar.Client(service_url, conf)
self._consumers = []