in awsiot/mqtt5_client_builder.py [0:0]
def _builder(
tls_ctx_options,
use_websockets=False,
websocket_handshake_transform=None,
use_custom_authorizer=False,
**kwargs):
username = _get(kwargs, 'username', '')
if _get(kwargs, 'enable_metrics_collection', True):
username += _get_metrics_str(username)
client_options = _get(kwargs, 'client_options')
if client_options is None:
client_options = awscrt.mqtt5.ClientOptions(
host_name=_get(kwargs, 'endpoint')
)
if client_options.connect_options is None:
client_options.connect_options = _get(kwargs, 'connect_options', awscrt.mqtt5.ConnectPacket())
# Client Options
if client_options.port is None:
client_options.port = _get(kwargs, 'port')
if client_options.bootstrap is None:
client_options.bootstrap = _get(kwargs, 'client_bootstrap')
if client_options.socket_options is None:
client_options.socket_options = _get(kwargs, 'socket_options')
if client_options.http_proxy_options is None:
client_options.http_proxy_options = kwargs.get(
'http_proxy_options', kwargs.get(
'websocket_proxy_options', None))
if client_options.session_behavior is None:
client_options.session_behavior = _get(kwargs, 'session_behavior')
if client_options.extended_validation_and_flow_control_options is None:
client_options.extended_validation_and_flow_control_options = _get(
kwargs,
'extended_validation_and_flow_control_options',
default=awscrt.mqtt5.ExtendedValidationAndFlowControlOptions.AWS_IOT_CORE_DEFAULTS)
if client_options.offline_queue_behavior is None:
client_options.offline_queue_behavior = _get(kwargs, 'offline_queue_behavior')
if client_options.retry_jitter_mode is None:
client_options.retry_jitter_mode = _get(kwargs, 'retry_jitter_mode')
if client_options.min_reconnect_delay_ms is None:
client_options.min_reconnect_delay_ms = _get(kwargs, 'min_reconnect_delay_ms')
if client_options.max_reconnect_delay_ms is None:
client_options.max_reconnect_delay_ms = _get(kwargs, 'max_reconnect_delay_ms')
if client_options.min_connected_time_to_reset_reconnect_delay_ms is None:
client_options.min_connected_time_to_reset_reconnect_delay_ms = _get(
kwargs, 'min_connected_time_to_reset_reconnect_delay_ms')
if client_options.ping_timeout_ms is None:
client_options.ping_timeout_ms = _get(kwargs, 'ping_timeout_ms')
if client_options.connack_timeout_ms is None:
client_options.connack_timeout_ms = _get(kwargs, 'connack_timeout_ms')
if client_options.ack_timeout_sec is None:
client_options.ack_timeout_sec = _get(kwargs, 'ack_timeout_sec')
if client_options.websocket_handshake_transform is None:
client_options.websocket_handshake_transform = websocket_handshake_transform
if client_options.topic_aliasing_options is None:
client_options.topic_aliasing_options = _get(kwargs, 'topic_aliasing_options')
# Connect Options
if client_options.connect_options.client_id is None:
client_options.connect_options.client_id = _get(kwargs, 'client_id')
if client_options.connect_options.keep_alive_interval_sec is None:
client_options.connect_options.keep_alive_interval_sec = _get(
kwargs, 'keep_alive_interval_sec', DEFAULT_KEEP_ALIVE)
client_options.connect_options.username = username
if client_options.connect_options.password is None:
client_options.connect_options.password = _get(kwargs, 'password')
if client_options.connect_options.session_expiry_interval_sec is None:
client_options.connect_options.session_expiry_interval_sec = _get(kwargs, 'session_expiry_interval_sec')
if client_options.connect_options.request_response_information is None:
client_options.connect_options.request_response_information = _get(kwargs, 'request_response_information')
if client_options.connect_options.request_problem_information is None:
client_options.connect_options.request_problem_information = _get(kwargs, 'request_problem_information')
if client_options.connect_options.receive_maximum is None:
client_options.connect_options.receive_maximum = _get(kwargs, 'receive_maximum')
if client_options.connect_options.maximum_packet_size is None:
client_options.connect_options.maximum_packet_size = _get(kwargs, 'maximum_packet_size')
if client_options.connect_options.will_delay_interval_sec is None:
client_options.connect_options.will_delay_interval_sec = _get(kwargs, 'will_delay_interval_sec')
if client_options.connect_options.will is None:
client_options.connect_options.will = _get(kwargs, 'will')
if client_options.connect_options.user_properties is None:
client_options.connect_options.user_properties = _get(kwargs, 'user_properties')
# Callbacks
if client_options.on_publish_callback_fn is None:
client_options.on_publish_callback_fn = _get(kwargs, 'on_publish_received')
if client_options.on_lifecycle_event_stopped_fn is None:
client_options.on_lifecycle_event_stopped_fn = _get(kwargs, 'on_lifecycle_stopped')
if client_options.on_lifecycle_event_attempting_connect_fn is None:
client_options.on_lifecycle_event_attempting_connect_fn = _get(kwargs, 'on_lifecycle_attempting_connect')
if client_options.on_lifecycle_event_connection_success_fn is None:
client_options.on_lifecycle_event_connection_success_fn = _get(kwargs, 'on_lifecycle_connection_success')
if client_options.on_lifecycle_event_connection_failure_fn is None:
client_options.on_lifecycle_event_connection_failure_fn = _get(kwargs, 'on_lifecycle_connection_failure')
if client_options.on_lifecycle_event_disconnection_fn is None:
client_options.on_lifecycle_event_disconnection_fn = _get(kwargs, 'on_lifecycle_disconnection')
ca_bytes = _get(kwargs, 'ca_bytes')
ca_filepath = _get(kwargs, 'ca_filepath')
ca_dirpath = _get(kwargs, 'ca_dirpath')
if ca_bytes:
tls_ctx_options.override_default_trust_store(ca_bytes)
elif ca_filepath or ca_dirpath:
tls_ctx_options.override_default_trust_store_from_path(ca_dirpath, ca_filepath)
if client_options.port is None:
# prefer 443, even for direct MQTT connections, since it's less likely to be blocked by firewalls
if use_websockets or awscrt.io.is_alpn_available():
client_options.port = DEFAULT_WEBSOCKET_MQTT_PORT
else:
client_options.port = DEFAULT_DIRECT_MQTT_PORT
if client_options.port == 443 and awscrt.io.is_alpn_available() and use_custom_authorizer is False:
tls_ctx_options.alpn_list = ['http/1.1'] if use_websockets else ['x-amzn-mqtt-ca']
tls_ctx = awscrt.io.ClientTlsContext(tls_ctx_options)
client_options.tls_ctx = tls_ctx
client = awscrt.mqtt5.Client(client_options=client_options)
return client