Onboarding/AWS/Utils.py (116 lines of code) (raw):
import pyfiglet
import re
from typing import List, Dict, Set
from shutil import which
GREEN = '\033[92m'
ORANGE = '\033[38;5;208m'
RED = "\033[91m"
RESET = '\033[0m' # Reset color to default
IAM_ROLE_ARN_PATTERN = r'^arn:aws:iam::\d{12}:role/[a-zA-Z_0-9+=,.@-]+$'
CUSTOM_BAR_FORMAT = "{desc} {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt}"
DEFAULT_ROLE_ARN_TO_MAP = "arn:aws:iam::{}:role/MDCContainersAgentlessDiscoveryK8sRole"
DEFAULT_REGION = "us-west-1"
DEFAULT_PROFILE_NAME = "default"
def print_colored_message(message: str, color: str) -> None:
print(color + message + RESET)
def print_warning(message: str) -> None:
print_colored_message(message, ORANGE)
def add_color(message: str, color: str, with_color: bool) -> str:
return color + message + RESET if with_color else message
def format_region_cluster_map(region_cluster_map: Dict[str, List[str]], color, with_color) -> List[str]:
messages = []
for region, clusters in region_cluster_map.items():
messages.append(add_color(f"{region}: {' '.join(clusters)}", color, with_color))
return messages
def format_successful_clusters(failed_clusters_map: Dict[str, List[str]], with_color: bool) -> List[str]:
success_message = add_color("Successfully granted MDC permission to the following clusters:", GREEN, with_color)
return [success_message] + format_region_cluster_map(failed_clusters_map, GREEN, with_color)
def format_failed_clusters(failed_clusters_map: Dict[str, List[str]], with_color: bool) -> List[str]:
failure_message = add_color("Failed to grant MDC permissions to the following clusters:", RED, with_color)
return [failure_message] + format_region_cluster_map(failed_clusters_map, RED, with_color)
def print_region_summary_message(region: str, clusters_to_update: Set[str], left_clusters: List[str]):
num_of_successfully_updated_clusters = len(clusters_to_update) - len(left_clusters)
if num_of_successfully_updated_clusters > 0:
print_colored_message(f"Successfully updated {num_of_successfully_updated_clusters} EKS clusters in {region} region", GREEN)
if len(left_clusters):
print_colored_message(f"Failed on {len(left_clusters)} Eks clusters, due to insufficient permissions.", RED)
def format_summary_messages(failed_clusters_map: Dict[str, List[str]], successful_clusters_map: Dict[str, List[str]], is_all_clusters: bool, with_color: bool) -> str:
if not failed_clusters_map and not successful_clusters_map:
message = "You didn't have any EKS clusters in all regions, exiting..." if is_all_clusters else \
"The EKS clusters you requested to onboard doesn't exist in all requested regions. Make sure you don't have spelling mistakes"
return add_color(message, RED, with_color)
if successful_clusters_map and not failed_clusters_map:
return add_color("Successfully granted MDC permission to all requested EKS clusters", GREEN, with_color)
summary_messages = []
if successful_clusters_map:
summary_messages += format_successful_clusters(successful_clusters_map, with_color)
else:
summary_messages.append(add_color("Failed to grant MDC permissions to all EKS clusters", RED, with_color))
if failed_clusters_map:
summary_messages += format_failed_clusters(failed_clusters_map, with_color)
return "\n".join(summary_messages)
def show_summary(failed_clusters_map: Dict[str, List[str]], successful_clusters_map: Dict[str, List[str]], is_all_clusters: bool, output_file: str) -> None:
print_summary(failed_clusters_map, successful_clusters_map, is_all_clusters)
write_summary_to_file(failed_clusters_map, successful_clusters_map, is_all_clusters, output_file)
def print_summary(failed_clusters_map: Dict[str, List[str]], successful_clusters_map: Dict[str, List[str]], is_all_clusters: bool) -> None:
summary = format_summary_messages(failed_clusters_map, successful_clusters_map, is_all_clusters, with_color=True)
print('\n' + pyfiglet.figlet_format("Summary", font="slant") + '\n')
print(summary)
def write_summary_to_file(failed_clusters_map: Dict[str, List[str]], successful_clusters_map: Dict[str, List[str]], is_all_clusters: bool, output_file: str) -> None:
if output_file == "":
return
summary = format_summary_messages(failed_clusters_map, successful_clusters_map, is_all_clusters, with_color=False)
with open(output_file, "w") as file:
file.write(summary)
def is_role_arn(role_arn: str) -> bool:
return bool(re.match(r"^arn:aws:[^:]*:[^:]*:[0-9]{12}:role/.*", role_arn))
def print_title() -> None:
print(pyfiglet.figlet_format("Powered By Microsoft"))
def check_assume_role_permitted(trust_policy_document: Dict[str, Dict]) -> bool:
for statement in trust_policy_document.get('Statement', []):
effect = statement.get('Effect', '')
if effect != 'Allow':
continue
actions = statement.get('Action', {})
if not isinstance(actions, list):
actions = [actions]
if "sts:AssumeRole" not in actions:
return False
return True
def check_condition(trust_policy_document: Dict[str, Dict], predicate) -> bool:
for statement in trust_policy_document.get('Statement', []):
effect = statement.get('Effect', '')
if effect != 'Allow':
continue
conditions = statement.get('Condition', {})
if predicate(conditions):
return True
return False
def is_mfa_required(conditions: Dict[str, Dict[str, str]]) -> bool:
condition_type, condition_key = 'Bool', 'aws:MultiFactorAuthPresent'
return conditions.get(condition_type, {}).get(condition_key, "") == "true"
def is_external_id_required(conditions: Dict[str, Dict[str, str]]) -> bool:
condition_type, value = 'StringEquals', 'sts:ExternalId'
return conditions.get(condition_type, {}).get(value, "")
def is_source_identity_required(conditions: Dict[str, Dict[str, str]]) -> bool:
condition_type, value = 'StringEquals', 'sts:SourceIdentity'
return conditions.get(condition_type, {}).get(value, "")
def check_required_conditions(trust_policy_document: Dict[str, Dict]):
conditions = {
"sts:AssumeRole": not check_assume_role_permitted(trust_policy_document),
"MFA": check_condition(trust_policy_document, is_mfa_required),
"ExternalId": check_condition(trust_policy_document, is_external_id_required),
"SourceIdentity": check_condition(trust_policy_document, is_source_identity_required)
}
return conditions
def convert_to_role_arns(roles_details: List[Dict[str, Dict]]) -> List[str]:
return [role_details["Role"]["Arn"] for role_details in roles_details]
def remove_duplicates(lst: List[str], msg: str) -> Set[str]:
no_dup = set(lst)
if len(no_dup) != len(lst):
print_warning(f"You entered duplicate {msg}, ignoring...")
return no_dup
def check_eksctl() -> bool:
return which("eksctl") is not None
def env_credentials_is_empty(credentials: Dict[str, str]):
return credentials["AccessKeyId"] == "" or credentials["SecretAccessKey"] == ""