in scripts/Authorizer/lambda_function.py [0:0]
def lambda_handler(event, context):
global client
if client == None:
client = boto3.client('cognito-idp')
log.debug("Event: " + json.dumps(event))
principalId = event['requestContext']['accountId']
tmp = event['methodArn'].split(':')
apiGatewayArnTmp = tmp[5].split('/')
awsAccountId = tmp[4]
policy = AuthPolicy(principalId, awsAccountId)
policy.restApiId = apiGatewayArnTmp[0]
policy.region = tmp[3]
policy.stage = apiGatewayArnTmp[1]
# Get authorization header in lowercase
authorization_header = {k.lower(): v for k, v in event['headers'].items() if k.lower() == 'authorization'}
log.debug("authorization: " + json.dumps(authorization_header))
# Get the username:password hash from the authorization header
header_auth = authorization_header['authorization'].split()[1]
base64_bytes = header_auth.encode('ascii')
message_bytes = base64.b64decode(base64_bytes)
username_password_hash = message_bytes.decode('ascii')
log.debug("username_password_hash: " + username_password_hash)
# Decode username_password_hash and get username
#username = base64.standard_b64decode(username_password_hash).split(':')[0]
username = username_password_hash.split(':')[0]
log.debug("username: " + username)
# Decode username_password_hash and get password
password = username_password_hash.split(':')[1]
log.debug("password: " + password)
# Returns an allow policy on the requested API resource if auth against Cognito is successful
try:
response = client.initiate_auth(
ClientId=CLIENT_ID,
AuthFlow='USER_PASSWORD_AUTH',
AuthParameters={
'USERNAME': username,
'PASSWORD': password
}
)
policy.allowMethod(event['requestContext']['httpMethod'], event['path'])
authResponse = policy.build()
return authResponse
# Returns a deny policy on the requested API resource if auth against Cognito fails
except Exception as e:
policy.denyMethod(event['requestContext']['httpMethod'], event['path'])
authResponse = policy.build()
return authResponse