in awsiot/mqtt_connection_builder.py [0:0]
def _builder(
tls_ctx_options,
use_websockets=False,
websocket_handshake_transform=None,
use_custom_authorizer=False,
**kwargs):
ca_bytes = _get(kwargs, 'ca_bytes')
ca_filepath = _get(kwargs, 'ca_filepath')
ca_dirpath = _get(kwargs, 'ca_dirpath')
if ca_bytes:
tls_ctx_options.override_default_trust_store(ca_bytes)
elif ca_filepath or ca_dirpath:
tls_ctx_options.override_default_trust_store_from_path(ca_dirpath, ca_filepath)
port = _get(kwargs, 'port')
if port is None:
# prefer 443, even for direct MQTT connections, since it's less likely to be blocked by firewalls
if use_websockets or awscrt.io.is_alpn_available():
port = 443
else:
port = 8883
if port == 443 and awscrt.io.is_alpn_available() and use_custom_authorizer is False:
tls_ctx_options.alpn_list = ['http/1.1'] if use_websockets else ['x-amzn-mqtt-ca']
socket_options = awscrt.io.SocketOptions()
socket_options.connect_timeout_ms = _get(kwargs, 'tcp_connect_timeout_ms', 5000)
# These have been inconsistent between keepalive/keep_alive. Resolve both for now to ease transition.
socket_options.keep_alive = \
_get(kwargs, 'tcp_keep_alive', _get(kwargs, 'tcp_keepalive', False))
socket_options.keep_alive_timeout_secs = \
_get(kwargs, 'tcp_keep_alive_timeout_secs', _get(kwargs, 'tcp_keepalive_timeout_secs', 0))
socket_options.keep_alive_interval_secs = \
_get(kwargs, 'tcp_keep_alive_interval_secs', _get(kwargs, 'tcp_keepalive_interval_secs', 0))
socket_options.keep_alive_max_probes = \
_get(kwargs, 'tcp_keep_alive_max_probes', _get(kwargs, 'tcp_keepalive_max_probes', 0))
username = _get(kwargs, 'username', '')
if _get(kwargs, 'enable_metrics_collection', True):
username += _get_metrics_str(username)
if username == "":
username = None
client_bootstrap = _get(kwargs, 'client_bootstrap')
if client_bootstrap is None:
client_bootstrap = awscrt.io.ClientBootstrap.get_or_create_static_default()
tls_ctx = awscrt.io.ClientTlsContext(tls_ctx_options)
mqtt_client = awscrt.mqtt.Client(client_bootstrap, tls_ctx)
proxy_options = kwargs.get('http_proxy_options', kwargs.get('websocket_proxy_options', None))
return awscrt.mqtt.Connection(
client=mqtt_client,
on_connection_interrupted=_get(kwargs, 'on_connection_interrupted'),
on_connection_resumed=_get(kwargs, 'on_connection_resumed'),
client_id=_get(kwargs, 'client_id'),
host_name=_get(kwargs, 'endpoint'),
port=port,
clean_session=_get(kwargs, 'clean_session', False),
reconnect_min_timeout_secs=_get(kwargs, 'reconnect_min_timeout_secs', 5),
reconnect_max_timeout_secs=_get(kwargs, 'reconnect_max_timeout_secs', 60),
keep_alive_secs=_get(kwargs, 'keep_alive_secs', 1200),
ping_timeout_ms=_get(kwargs, 'ping_timeout_ms', 3000),
protocol_operation_timeout_ms=_get(kwargs, 'protocol_operation_timeout_ms', 0),
will=_get(kwargs, 'will'),
username=username,
password=_get(kwargs, 'password'),
socket_options=socket_options,
use_websockets=use_websockets,
websocket_handshake_transform=websocket_handshake_transform,
proxy_options=proxy_options,
on_connection_success=_get(kwargs, 'on_connection_success'),
on_connection_failure=_get(kwargs, 'on_connection_failure'),
on_connection_closed=_get(kwargs, 'on_connection_closed'),
)