def _builder()

in awsiot/mqtt5_client_builder.py [0:0]


def _builder(
        tls_ctx_options,
        use_websockets=False,
        websocket_handshake_transform=None,
        use_custom_authorizer=False,
        **kwargs):

    username = _get(kwargs, 'username', '')
    if _get(kwargs, 'enable_metrics_collection', True):
        username += _get_metrics_str(username)

    client_options = _get(kwargs, 'client_options')
    if client_options is None:
        client_options = awscrt.mqtt5.ClientOptions(
            host_name=_get(kwargs, 'endpoint')
        )
    if client_options.connect_options is None:
        client_options.connect_options = _get(kwargs, 'connect_options', awscrt.mqtt5.ConnectPacket())

    # Client Options
    if client_options.port is None:
        client_options.port = _get(kwargs, 'port')
    if client_options.bootstrap is None:
        client_options.bootstrap = _get(kwargs, 'client_bootstrap')
    if client_options.socket_options is None:
        client_options.socket_options = _get(kwargs, 'socket_options')
    if client_options.http_proxy_options is None:
        client_options.http_proxy_options = kwargs.get(
            'http_proxy_options', kwargs.get(
                'websocket_proxy_options', None))
    if client_options.session_behavior is None:
        client_options.session_behavior = _get(kwargs, 'session_behavior')
    if client_options.extended_validation_and_flow_control_options is None:
        client_options.extended_validation_and_flow_control_options = _get(
            kwargs,
            'extended_validation_and_flow_control_options',
            default=awscrt.mqtt5.ExtendedValidationAndFlowControlOptions.AWS_IOT_CORE_DEFAULTS)
    if client_options.offline_queue_behavior is None:
        client_options.offline_queue_behavior = _get(kwargs, 'offline_queue_behavior')
    if client_options.retry_jitter_mode is None:
        client_options.retry_jitter_mode = _get(kwargs, 'retry_jitter_mode')
    if client_options.min_reconnect_delay_ms is None:
        client_options.min_reconnect_delay_ms = _get(kwargs, 'min_reconnect_delay_ms')
    if client_options.max_reconnect_delay_ms is None:
        client_options.max_reconnect_delay_ms = _get(kwargs, 'max_reconnect_delay_ms')
    if client_options.min_connected_time_to_reset_reconnect_delay_ms is None:
        client_options.min_connected_time_to_reset_reconnect_delay_ms = _get(
            kwargs, 'min_connected_time_to_reset_reconnect_delay_ms')
    if client_options.ping_timeout_ms is None:
        client_options.ping_timeout_ms = _get(kwargs, 'ping_timeout_ms')
    if client_options.connack_timeout_ms is None:
        client_options.connack_timeout_ms = _get(kwargs, 'connack_timeout_ms')
    if client_options.ack_timeout_sec is None:
        client_options.ack_timeout_sec = _get(kwargs, 'ack_timeout_sec')
    if client_options.websocket_handshake_transform is None:
        client_options.websocket_handshake_transform = websocket_handshake_transform
    if client_options.topic_aliasing_options is None:
        client_options.topic_aliasing_options = _get(kwargs, 'topic_aliasing_options')

    # Connect Options
    if client_options.connect_options.client_id is None:
        client_options.connect_options.client_id = _get(kwargs, 'client_id')
    if client_options.connect_options.keep_alive_interval_sec is None:
        client_options.connect_options.keep_alive_interval_sec = _get(
            kwargs, 'keep_alive_interval_sec', DEFAULT_KEEP_ALIVE)
    client_options.connect_options.username = username
    if client_options.connect_options.password is None:
        client_options.connect_options.password = _get(kwargs, 'password')
    if client_options.connect_options.session_expiry_interval_sec is None:
        client_options.connect_options.session_expiry_interval_sec = _get(kwargs, 'session_expiry_interval_sec')
    if client_options.connect_options.request_response_information is None:
        client_options.connect_options.request_response_information = _get(kwargs, 'request_response_information')
    if client_options.connect_options.request_problem_information is None:
        client_options.connect_options.request_problem_information = _get(kwargs, 'request_problem_information')
    if client_options.connect_options.receive_maximum is None:
        client_options.connect_options.receive_maximum = _get(kwargs, 'receive_maximum')
    if client_options.connect_options.maximum_packet_size is None:
        client_options.connect_options.maximum_packet_size = _get(kwargs, 'maximum_packet_size')
    if client_options.connect_options.will_delay_interval_sec is None:
        client_options.connect_options.will_delay_interval_sec = _get(kwargs, 'will_delay_interval_sec')
    if client_options.connect_options.will is None:
        client_options.connect_options.will = _get(kwargs, 'will')
    if client_options.connect_options.user_properties is None:
        client_options.connect_options.user_properties = _get(kwargs, 'user_properties')

    # Callbacks
    if client_options.on_publish_callback_fn is None:
        client_options.on_publish_callback_fn = _get(kwargs, 'on_publish_received')
    if client_options.on_lifecycle_event_stopped_fn is None:
        client_options.on_lifecycle_event_stopped_fn = _get(kwargs, 'on_lifecycle_stopped')
    if client_options.on_lifecycle_event_attempting_connect_fn is None:
        client_options.on_lifecycle_event_attempting_connect_fn = _get(kwargs, 'on_lifecycle_attempting_connect')
    if client_options.on_lifecycle_event_connection_success_fn is None:
        client_options.on_lifecycle_event_connection_success_fn = _get(kwargs, 'on_lifecycle_connection_success')
    if client_options.on_lifecycle_event_connection_failure_fn is None:
        client_options.on_lifecycle_event_connection_failure_fn = _get(kwargs, 'on_lifecycle_connection_failure')
    if client_options.on_lifecycle_event_disconnection_fn is None:
        client_options.on_lifecycle_event_disconnection_fn = _get(kwargs, 'on_lifecycle_disconnection')

    ca_bytes = _get(kwargs, 'ca_bytes')
    ca_filepath = _get(kwargs, 'ca_filepath')
    ca_dirpath = _get(kwargs, 'ca_dirpath')
    if ca_bytes:
        tls_ctx_options.override_default_trust_store(ca_bytes)
    elif ca_filepath or ca_dirpath:
        tls_ctx_options.override_default_trust_store_from_path(ca_dirpath, ca_filepath)

    if client_options.port is None:
        # prefer 443, even for direct MQTT connections, since it's less likely to be blocked by firewalls
        if use_websockets or awscrt.io.is_alpn_available():
            client_options.port = DEFAULT_WEBSOCKET_MQTT_PORT
        else:
            client_options.port = DEFAULT_DIRECT_MQTT_PORT

    if client_options.port == 443 and awscrt.io.is_alpn_available() and use_custom_authorizer is False:
        tls_ctx_options.alpn_list = ['http/1.1'] if use_websockets else ['x-amzn-mqtt-ca']

    tls_ctx = awscrt.io.ClientTlsContext(tls_ctx_options)
    client_options.tls_ctx = tls_ctx
    client = awscrt.mqtt5.Client(client_options=client_options)

    return client