Onboarding/AWS/GrantAccessToEksClusters.py (257 lines of code) (raw):

import os import subprocess import argparse import boto3 import botocore import sys from tqdm import tqdm from Utils import * from time import time from typing import Tuple def get_roles_details(session: boto3.Session, roles: List[str]) -> List[Dict[str, Dict]]: iam_client = session.client("iam") roles_details = list() skipped_roles = list() for role in roles: role_name = role.split("/")[-1] if is_role_arn(role) else role try: details = iam_client.get_role(RoleName=role_name) roles_details.append(details) except iam_client.exceptions.NoSuchEntityException: skipped_roles.append(role_name) except Exception as ex: sys.exit(str(ex)) if not roles_details: sys.exit("You didn't provide any valid roles to onboard. Please make sure you provide valid roles names or ARNs") for role in skipped_roles: print_warning(f"Role {role} doesn't exist, skipping...") return roles_details def fileter_unsupported_roles(roles_details: List[Dict[str, Dict]]) -> List[Dict[str, Dict]]: supported_roles = [] unsupported_roles = [] for role in roles_details: trust_policy_document = role["Role"]["AssumeRolePolicyDocument"] conditions = check_required_conditions(trust_policy_document) if any(conditions.values()): unsupported_roles.append((role, conditions)) else: supported_roles.append(role) if not supported_roles: sys.exit("All of the provided roles are not supported. Please make sure the roles you provided doesn't require MFA, ExternalId, or SourceIdentity") for role, conditions in unsupported_roles: role_arn = role['Role']['Arn'] warnings = [f"Role {role_arn} doesn't allow sts:AssumeRole action. Skipping..." if condition == "sts:AssumeRole" else f"Role {role_arn} requires {condition}, which is currently not supported. Skipping..." for condition, required in conditions.items() if required] print_warning("\n".join(warnings)) return supported_roles def remove_unavailable_regions(session: boto3.Session, regions_input: Set[str]) -> Set[str]: all_available_regions = set(get_all_available_regions(session)) available_regions = regions_input.intersection(all_available_regions) if not available_regions: sys.exit(f"All of the provided regions are unavailable to you") elif len(available_regions) != len(regions_input): print_warning(f"Some of the provided regions are unavailable: {regions_input - available_regions}. Skipping...") return available_regions def parse_file_parameter(file_name: str) -> List[str]: if not os.path.exists(file_name): sys.exit(f"File '{file_name}' does not exist") if not os.access(file_name, os.R_OK): sys.exit(f"The file {file_name} has no read permissions") with open(file_name, 'r') as file: values = [line.strip() for line in file.readlines()] if not values: sys.exit(f"The file {file_name} is empty") return values def set_credentials(credentials: Dict[str, str]) -> None: os.environ["AWS_ACCESS_KEY_ID"] = credentials['AccessKeyId'] os.environ["AWS_SECRET_ACCESS_KEY"] = credentials['SecretAccessKey'] os.environ["AWS_SESSION_TOKEN"] = credentials['SessionToken'] def get_original_credentials() -> Dict[str, str]: return {"AccessKeyId": os.environ.get("AWS_ACCESS_KEY_ID", ""), "SecretAccessKey": os.environ.get('AWS_SECRET_ACCESS_KEY', ""), "SessionToken": os.environ.get('AWS_SESSION_TOKEN', ""), "DefaultRegion": os.environ.get('AWS_DEFAULT_REGION', "")} def validate_output_file(output_file: str) -> bool: return output_file == "" or not os.path.exists(output_file) or os.access(output_file, os.W_OK) def parse_arguments() -> argparse.Namespace: parser = argparse.ArgumentParser(description=f"Granting access to MDC to query EKS clusters." f"\nThe script adds MDC role {DEFAULT_ROLE_ARN_TO_MAP.format('<account id>')} to the aws-auth of specified EKS clusters.") regions_group = parser.add_mutually_exclusive_group(required=True) regions_group.add_argument("--regions", nargs='+', help="List of AWS regions, separated by space", type=str) regions_group.add_argument("--all-regions", help="Run for all available AWS regions (if there is a region with no EKS clusters, the script would skip it)", action="store_true") regions_group.add_argument("--regions-file", help="A path to a txt file contains a list of regions separated by new lines", type=str) roles_group = parser.add_mutually_exclusive_group(required=True) roles_group.add_argument("--roles", nargs='+', help="List of IAM roles names (or ARNs), separated by space", type=str) roles_group.add_argument("--roles-file", help="A path to a txt file contains a list of IAM roles names (or ARNs) separated by new lines", type=str) clusters_group = parser.add_mutually_exclusive_group(required=True) clusters_group.add_argument("--clusters", nargs='+', help="List of EKS clusters name (that deployed in the provided regions), separated by space", type=str) clusters_group.add_argument("--all-clusters", help="Run for all EKS clusters in the provided regions", action="store_true") clusters_group.add_argument("--clusters-file", help="A path to a txt file contains a list of EKS clusters name separated by new lines", type=str) parser.add_argument("--profile", help=f"AWS profile name (default: {DEFAULT_PROFILE_NAME})", type=str, default=DEFAULT_PROFILE_NAME, required=False) parser.add_argument("--output-file", help=f"A path to a txt file which will contain the script summary (In addition to showing the summary in the console)\n" f"Please note: if the file does not exist, hte script would create it in the specified location", default="", type=str, required=False) parser.add_argument("--role-arn", help=f"The role arn to map to system:masters group in aws-auth ConfigMap (default: {DEFAULT_ROLE_ARN_TO_MAP.format('account')})", type=str, default=DEFAULT_ROLE_ARN_TO_MAP, required=False) args = parser.parse_args() return args def get_all_available_regions(session: boto3.Session) -> List[str]: try: ec2_client = session.client('ec2') return [region['RegionName'] for region in ec2_client.describe_regions()['Regions']] except Exception as ex: sys.exit(str(ex)) def get_role_credentials(session: boto3.Session, role_to_assume: str) -> Dict[str, str]: session_name = f"GrantAccessToEksClusters-{int(time())}" try: sts_client = session.client("sts") tqdm.write(f"Assuming role {role_to_assume}") return sts_client.assume_role(RoleArn=role_to_assume, RoleSessionName=session_name)["Credentials"] except Exception as ex: sys.exit(str(ex)) def create_iamidentitymapping(cluster_name: str, region: str, role_to_map: str) -> int: command = [ 'eksctl', 'create', 'iamidentitymapping', '--cluster', cluster_name, '--region', region, '--arn', role_to_map, '--group', 'system:masters', '--no-duplicate-arns' ] try: result = subprocess.run(command, capture_output=True, text=True) return result.returncode except subprocess.CalledProcessError as ex: sys.exit(f"Could not create IAM identity mapping.\n{ex.stderr}") def get_all_eks_clusters(eks_client) -> List[str]: try: response = eks_client.list_clusters() return response.get('clusters', []) except Exception as ex: sys.exit(str(ex)) def update_clusters(clusters: List[str], region: str, pbar: tqdm, role_to_map: str) -> List[str]: clusters_to_retry = [] for cluster in clusters: return_code = create_iamidentitymapping(cluster, region, role_to_map) if return_code != 0: clusters_to_retry.append(cluster) else: pbar.update(1) return clusters_to_retry def get_clusters_to_onboard(session: boto3.Session, region: str, requested_clusters: Set[str], is_all_clusters: bool) -> Set[str]: eks_client = session.client("eks", region_name=region) clusters_in_region = get_all_eks_clusters(eks_client) if not clusters_in_region: print_warning(f"You don't have any EKS clusters deployed in {region} region. Skipping...") return set() if is_all_clusters: return set(clusters_in_region) clusters_to_onboard = set(clusters_in_region).intersection(requested_clusters) if not clusters_to_onboard: print(f"The requested EKS clusters are not deployed in {region} region. Skipping...") return clusters_to_onboard def get_account_id(session: boto3.Session) -> str: sts_client = session.client("sts") try: return sts_client.get_caller_identity().get("Account") except Exception as ex: sys.exit(str(ex)) def update_aws_auth_for_region(session: boto3.Session, clusters: Set[str], region: str, roles: List[str], role_to_map: str) -> List[str]: print(f"Attempting to create iamidentitymapping for {len(clusters)} EKS clusters in {region} region") with tqdm(clusters, total=len(clusters), bar_format=CUSTOM_BAR_FORMAT, desc=f"Progress in {region}", unit=" cluster", ncols=100, file=sys.stdout, colour="GREEN") as pbar: for role in roles: role_credentials = get_role_credentials(session, role) set_credentials(role_credentials) clusters = update_clusters(clusters, region, pbar, role_to_map) if not clusters: break return clusters def update_aws_auth_for_all_regions(session: boto3.Session, args: argparse.Namespace) -> Tuple[Dict[str, List[str]], Dict[str, List[str]]]: print(f"Attempting to grant MDC access to {' '.join(args.regions)} regions") successful_clusters_map = dict() failed_clusters_map = dict() for region in args.regions: clusters_to_onboard = get_clusters_to_onboard(session, region, args.clusters, args.all_clusters) if not clusters_to_onboard: continue left_clusters = update_aws_auth_for_region(session, clusters_to_onboard, region, args.roles, args.role_arn) successful_clusters = list(set(clusters_to_onboard) - set(left_clusters)) if successful_clusters: successful_clusters_map[region] = successful_clusters if left_clusters: failed_clusters_map[region] = left_clusters print_region_summary_message(region, clusters_to_onboard, left_clusters) return failed_clusters_map, successful_clusters_map def get_session(profile: str, credentials: Dict[str, str]) -> boto3.Session: if profile == DEFAULT_PROFILE_NAME and not env_credentials_is_empty(credentials): default_region = credentials["DefaultRegion"] if credentials["DefaultRegion"] != "" else DEFAULT_REGION print("Starting session...") try: return boto3.Session(aws_access_key_id=credentials["AccessKeyId"], aws_secret_access_key=credentials["SecretAccessKey"], region_name=default_region) except Exception as ex: sys.exit(str(ex)) try: print(f"Starting session for profile {profile}...") return boto3.Session(profile_name=profile, region_name=DEFAULT_REGION) except botocore.exceptions.ProfileNotFound: if profile == DEFAULT_PROFILE_NAME: sys.exit(f"You didn't configure a {DEFAULT_PROFILE_NAME} profile\n" f"Either configure a default profile (see ReadMe) or set environment variables with the account credentials you wish to onboard.") sys.exit(f"The provided profile ({profile}) could not be found") except Exception as ex: print(DEFAULT_REGION) def init_roles(session: boto3.Session, args: argparse.Namespace) -> List[str]: roles = args.roles if args.roles_file: roles = parse_file_parameter(args.roles_file) roles_details = get_roles_details(session, roles) roles_details = fileter_unsupported_roles(roles_details) return convert_to_role_arns(roles_details) def init_clusters(args: argparse.Namespace) -> List[str]: if args.all_clusters: return [] clusters = args.clusters if args.clusters_file: clusters = parse_file_parameter(args.clusters_file) return list(remove_duplicates(clusters, "clusters names")) def init_regions(session: boto3.Session, args: argparse.Namespace) -> List[str]: if args.all_regions: return get_all_available_regions(session) regions = args.regions if args.regions_file: regions = parse_file_parameter(args.regions_file) regions = remove_duplicates(regions, "regions") return list(remove_unavailable_regions(session, regions)) def init_output_file(args: argparse.Namespace) -> str: if not validate_output_file(args.output_file): print_warning(f"The file {args.output_file} has no write permissions. The summary would not be written to the file") return "" return args.output_file def init_role_arn(session: boto3.Session, args: argparse.Namespace) -> str: role_to_map = args.role_arn.format(get_account_id(session)) print(f"The script will create iamidentitymapping between {role_to_map} to system:masters group") return role_to_map def init(original_credentials: Dict[str, str]) -> Tuple[boto3.Session, argparse.Namespace]: args = parse_arguments() print_title() session = get_session(args.profile, original_credentials) # Those function exit when error occurs: args.clusters = init_clusters(args) args.roles = init_roles(session, args) args.regions = init_regions(session, args) args.role_arn = init_role_arn(session, args) # Those function doesn't exist when error occurs: args.output_file = init_output_file(args) if not args.all_clusters and len(args.regions) > len(args.clusters): print_warning("You entered more regions than EKS clusters names. Will skip regions without the specified clusters.") return session, args def run(): if not check_eksctl(): sys.exit("Please make sure you have eksctl installed and added to the PATH") # Save original Env variables to revert to. original_credentials = get_original_credentials() session, args = init(original_credentials) failed_clusters_map, successful_clusters_map = update_aws_auth_for_all_regions(session, args) show_summary(failed_clusters_map, successful_clusters_map, args.all_clusters, args.output_file) # Reverting to original Env variables. set_credentials(original_credentials) if __name__ == '__main__': run()