in aws_msk_iam_sasl_signer/MSKAuthTokenProvider.py [0:0]
def __construct_auth_token(region, aws_credentials):
"""
Private function that constructs the authorization token using IAM
Credentials.
Args: region (str): The AWS region where the cluster is located.
aws_credentials (dict): The credentials to be used to generate signed
url. Returns: str: A base64-encoded authorization token.
"""
# Validate credentials are not empty
if not aws_credentials.access_key or not aws_credentials.secret_key:
raise ValueError("AWS Credentials can not be empty")
# Extract endpoint URL
endpoint_url = ENDPOINT_URL_TEMPLATE.format(region)
# Set up resource path and query parameters
query_params = {ACTION_TYPE: ACTION_NAME}
# Create SigV4 instance
sig_v4 = SigV4QueryAuth(
aws_credentials, SIGNING_NAME, region,
expires=DEFAULT_TOKEN_EXPIRY_SECONDS
)
# Create request with url and parameters
request = AWSRequest(method="GET", url=endpoint_url, params=query_params)
# Add auth to the request and prepare the request
sig_v4.add_auth(request)
query_params = {USER_AGENT_KEY: __get_user_agent__()}
request.params = query_params
prepped = request.prepare()
# Get the signed url
signed_url = prepped.url
# Base 64 encode and remove the padding from the end
signed_url_bytes = signed_url.encode("utf-8")
base64_bytes = base64.urlsafe_b64encode(signed_url_bytes)
base64_encoded_signed_url = base64_bytes.decode("utf-8").rstrip("=")
return base64_encoded_signed_url, __get_expiration_time_ms(request)