in awsiot/mqtt_connection_builder.py [0:0]
def _with_custom_authorizer(auth_username=None,
auth_authorizer_name=None,
auth_authorizer_signature=None,
auth_password=None,
auth_token_key_name=None,
auth_token_value=None,
use_websockets=False,
websockets_credentials_provider=None,
websockets_region=None,
**kwargs) -> awscrt.mqtt.Connection:
"""
Helper function that contains the setup needed for custom authorizers
"""
_check_required_kwargs(**kwargs)
username_string = ""
if auth_username is None:
if not _get(kwargs, "username") is None:
username_string += _get(kwargs, "username")
else:
username_string += auth_username
if auth_authorizer_name is not None:
username_string = _add_to_username_parameter(
username_string, auth_authorizer_name, "x-amz-customauthorizer-name=")
if auth_authorizer_signature is not None:
encoded_signature = auth_authorizer_signature
if "%" not in encoded_signature:
encoded_signature = urllib.parse.quote(encoded_signature)
username_string = _add_to_username_parameter(
username_string, encoded_signature, "x-amz-customauthorizer-signature=")
if auth_token_key_name is not None and auth_token_value is not None:
username_string = _add_to_username_parameter(username_string, auth_token_value, auth_token_key_name + "=")
kwargs["username"] = username_string
kwargs["password"] = auth_password
tls_ctx_options = awscrt.io.TlsContextOptions()
if use_websockets == False:
kwargs["port"] = 443
tls_ctx_options.alpn_list = ["mqtt"]
def _sign_websocket_handshake_request(transform_args, **kwargs):
# transform_args need to know when transform is done
try:
transform_args.set_done()
except Exception as e:
transform_args.set_done(e)
return _builder(tls_ctx_options=tls_ctx_options,
use_websockets=use_websockets,
use_custom_authorizer=True,
websocket_handshake_transform=_sign_websocket_handshake_request if use_websockets else None,
**kwargs)