in cli/aws_orbit/remote_files/cdk/lambda_sources/cognito_post_authentication/index.py [0:0]
def handler(event: Dict[str, Any], context: Optional[Dict[str, Any]]) -> Any:
logger.info("Entering POSTAUTH - index.py and the event is: ")
logger.info(json.dumps(event))
cognito_client = boto3.client("cognito-idp")
lambda_client = boto3.client("lambda")
user_name = cast(str, event.get("userName"))
if "preferred_username" in event["request"]["userAttributes"]:
user_name = event["request"]["userAttributes"]["preferred_username"]
user_email = cast(str, event["request"]["userAttributes"].get("email", "invalid_email"))
validate_email(user_email)
user_pool_id = cast(str, event.get("userPoolId"))
groups_from_provider = None
user_groups_info = None
# if the groups are provided by the provider, use them
if "custom:groups" in event["request"]["userAttributes"]:
groups_from_provider = str(event["request"]["userAttributes"]["custom:groups"]).strip("][").split(", ")
logger.info(f"Found groups from provider: {groups_from_provider}")
else:
logger.info("Did not find groups from provider, fetching from Cognito")
user_groups_info = cognito_client.admin_list_groups_for_user(Username=user_name, UserPoolId=user_pool_id)
team_info = get_auth_group_from_ssm()
user_groups = []
if groups_from_provider:
logger.info("Groups_from_provider populated, matching to teams")
for group_name in groups_from_provider:
for team_name in team_info:
if group_name in team_info[team_name]:
g = team_name
user_groups.append(g)
user_groups = list(dict.fromkeys(user_groups))
elif user_groups_info:
logger.info("User_group_info populated, matching to teams")
for group in user_groups_info.get("Groups"):
group_name = group.get("GroupName")
if (f"{orbit_env}-") in group_name:
group_name = group_name.split(f"{orbit_env}-")[1]
for team_name in team_info:
if group_name in team_info[team_name]:
g = team_name
user_groups.append(g)
logger.info("Authenticated successfully:")
logger.info(f"userName: {user_name}, userPoolId: {user_pool_id}, userGroups: {user_groups}")
expected_user_namespaces = {user_group: user_group + "-" + user_name for user_group in user_groups}
payload = {
"user_name": user_name,
"user_email": user_email,
"user_pool_id": user_pool_id,
"expected_user_namespaces": expected_user_namespaces,
}
logger.info(f"Produced Payload = {payload}")
lambda_client.invoke(
FunctionName=f"orbit-{orbit_env}-post-auth-k8s-manage", InvocationType="Event", Payload=json.dumps(payload)
)
return event