def _with_custom_authorizer()

in awsiot/mqtt_connection_builder.py [0:0]


def _with_custom_authorizer(auth_username=None,
        auth_authorizer_name=None,
        auth_authorizer_signature=None,
        auth_password=None,
        auth_token_key_name=None,
        auth_token_value=None,
        use_websockets=False,
        websockets_credentials_provider=None,
        websockets_region=None,
        **kwargs) -> awscrt.mqtt.Connection:
    """
    Helper function that contains the setup needed for custom authorizers
    """

    _check_required_kwargs(**kwargs)
    username_string = ""

    if auth_username is None:
        if not _get(kwargs, "username") is None:
            username_string += _get(kwargs, "username")
    else:
        username_string += auth_username

    if auth_authorizer_name is not None:
        username_string = _add_to_username_parameter(
            username_string, auth_authorizer_name, "x-amz-customauthorizer-name=")

    if auth_authorizer_signature is not None:
        encoded_signature = auth_authorizer_signature
        if "%" not in encoded_signature:
            encoded_signature = urllib.parse.quote(encoded_signature)

        username_string = _add_to_username_parameter(
            username_string, encoded_signature, "x-amz-customauthorizer-signature=")

    if auth_token_key_name is not None and auth_token_value is not None:
        username_string = _add_to_username_parameter(username_string, auth_token_value, auth_token_key_name + "=")

    kwargs["username"] = username_string
    kwargs["password"] = auth_password

    tls_ctx_options = awscrt.io.TlsContextOptions()
    if use_websockets == False:
        kwargs["port"] = 443
        tls_ctx_options.alpn_list = ["mqtt"]

    def _sign_websocket_handshake_request(transform_args, **kwargs):
        # transform_args need to know when transform is done
        try:
            transform_args.set_done()
        except Exception as e:
            transform_args.set_done(e)

    return _builder(tls_ctx_options=tls_ctx_options,
                    use_websockets=use_websockets,
                    use_custom_authorizer=True,
                    websocket_handshake_transform=_sign_websocket_handshake_request if use_websockets else None,
                    **kwargs)