def check_iam_permissions()

in MWAA/verify_env/verify_env.py [0:0]


def check_iam_permissions(input_env, iam_client):
    '''uses iam simulation to check permissions of the role assigned to the environment'''
    print('### Checking the IAM execution role', input_env['ExecutionRoleArn'], 'using iam policy simulation')
    account_id = get_account_id(input_env)
    policies = iam_client.list_attached_role_policies(
        RoleName=input_env['ExecutionRoleArn'].split("/")[-1]
    )['AttachedPolicies']
    policy_list = []
    for policy in policies:
        policy_arn = policy['PolicyArn']
        policy_version = iam_client.get_policy(PolicyArn=policy_arn)['Policy']['DefaultVersionId']
        policy_doc = iam_client.get_policy_version(PolicyArn=policy_arn,
                                                   VersionId=policy_version)['PolicyVersion']['Document']
        policy_list.append(json.dumps(policy_doc))
    eval_results = []
    # Add inline policies
    policy_list.extend(get_inline_policies(iam_client, input_env['ExecutionRoleArn'].split("/")[-1]))
    if "KmsKey" in input_env:
        print('Found Customer managed CMK')
        eval_results = eval_results + iam_client.simulate_custom_policy(
            PolicyInputList=policy_list,
            ActionNames=[
                "airflow:PublishMetrics"
            ],
            ResourceArns=[
                input_env['Arn']
            ]
        )['EvaluationResults']
        # this next test should be denied
        eval_results = eval_results + iam_client.simulate_custom_policy(
            PolicyInputList=policy_list,
            ActionNames=[
                "s3:ListAllMyBuckets"
            ],
            ResourceArns=[
                input_env['SourceBucketArn'],
                input_env['SourceBucketArn'] + '/'
            ]
        )['EvaluationResults']
        eval_results = eval_results + iam_client.simulate_custom_policy(
            PolicyInputList=policy_list,
            ActionNames=[
                "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*"
            ],
            ResourceArns=[
                input_env['SourceBucketArn'],
                input_env['SourceBucketArn'] + '/'
            ]
        )['EvaluationResults']
        eval_results = eval_results + iam_client.simulate_custom_policy(
            PolicyInputList=policy_list,
            ActionNames=[
                "logs:CreateLogStream",
                "logs:CreateLogGroup",
                "logs:PutLogEvents",
                "logs:GetLogEvents",
                "logs:GetLogGroupFields"
            ],
            ResourceArns=[
                "arn:aws:logs:" + REGION + ":" + account_id + ":log-group:airflow-" + ENV_NAME + "-*"
            ]
        )['EvaluationResults']
        eval_results = eval_results + iam_client.simulate_custom_policy(
            PolicyInputList=policy_list,
            ActionNames=[
                "logs:DescribeLogGroups"
            ],
            ResourceArns=[
                "*"
            ]
        )['EvaluationResults']
        eval_results = eval_results + iam_client.simulate_custom_policy(
            PolicyInputList=policy_list,
            ActionNames=[
                "cloudwatch:PutMetricData"
            ],
            ResourceArns=[
                "*"
            ]
        )['EvaluationResults']
        eval_results = eval_results + iam_client.simulate_custom_policy(
            PolicyInputList=policy_list,
            ActionNames=[
                "sqs:ChangeMessageVisibility",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl",
                "sqs:ReceiveMessage",
                "sqs:SendMessage"
            ],
            ResourceArns=[
                "arn:aws:sqs:" + REGION + ":*:airflow-celery-*"
            ]
        )['EvaluationResults']
        eval_results = eval_results + iam_client.simulate_custom_policy(
            PolicyInputList=policy_list,
            ActionNames=[
                "kms:GenerateDataKey*"
            ],
            ResourceArns=[
                input_env['KmsKey']
            ],
            ContextEntries=[
                {
                    'ContextKeyName': 'kms:viaservice',
                    'ContextKeyValues': [
                        's3.' + REGION + '.amazonaws.com'
                    ],
                    'ContextKeyType': 'string'
                }
            ],
        )['EvaluationResults']
        eval_results = eval_results + iam_client.simulate_custom_policy(
            PolicyInputList=policy_list,
            ActionNames=[
                "kms:GenerateDataKey*"
            ],
            ResourceArns=[
                input_env['KmsKey']
            ],
            ContextEntries=[
                {
                    'ContextKeyName': 'kms:viaservice',
                    'ContextKeyValues': [
                        'sqs.' + REGION + '.amazonaws.com',
                    ],
                    'ContextKeyType': 'string'
                }
            ],
        )['EvaluationResults']
        eval_results = eval_results + iam_client.simulate_custom_policy(
            PolicyInputList=policy_list,
            ActionNames=[
                "kms:Decrypt",
                "kms:DescribeKey",
                "kms:Encrypt"
            ],
            ResourceArns=[
                input_env['KmsKey']
            ],
            ContextEntries=[
                {
                    'ContextKeyName': 'kms:viaservice',
                    'ContextKeyValues': [
                        's3.' + REGION + '.amazonaws.com'
                    ],
                    'ContextKeyType': 'string'
                }
            ],
        )['EvaluationResults']
        eval_results = eval_results + iam_client.simulate_custom_policy(
            PolicyInputList=policy_list,
            ActionNames=[
                "kms:Decrypt",
                "kms:DescribeKey",
                "kms:Encrypt"
            ],
            ResourceArns=[
                input_env['KmsKey']
            ],
            ContextEntries=[
                {
                    'ContextKeyName': 'kms:viaservice',
                    'ContextKeyValues': [
                        'sqs.' + REGION + '.amazonaws.com'
                    ],
                    'ContextKeyType': 'string'
                }
            ],
        )['EvaluationResults']
    else:
        print('Using AWS CMK')
        eval_results = eval_results + iam_client.simulate_custom_policy(
            PolicyInputList=policy_list,
            ActionNames=[
                "airflow:PublishMetrics"
            ],
            ResourceArns=[
                input_env['Arn']
            ]
        )['EvaluationResults']
        # this action should be denied
        eval_results = eval_results + iam_client.simulate_custom_policy(
            PolicyInputList=policy_list,
            ActionNames=[
                "s3:ListAllMyBuckets"
            ],
            ResourceArns=[
                input_env['SourceBucketArn'],
                input_env['SourceBucketArn'] + '/'
            ]
        )['EvaluationResults']
        eval_results = eval_results + iam_client.simulate_custom_policy(
            PolicyInputList=policy_list,
            ActionNames=[
                "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*"
            ],
            ResourceArns=[
                input_env['SourceBucketArn'],
                input_env['SourceBucketArn'] + '/'
            ]
        )['EvaluationResults']
        eval_results = eval_results + iam_client.simulate_custom_policy(
            PolicyInputList=policy_list,
            ActionNames=[
                "logs:CreateLogStream",
                "logs:CreateLogGroup",
                "logs:PutLogEvents",
                "logs:GetLogEvents",
                "logs:GetLogGroupFields"
            ],
            ResourceArns=[
                "arn:aws:logs:" + REGION + ":" + account_id + ":log-group:airflow-" + ENV_NAME + "-*"
            ]
        )['EvaluationResults']
        eval_results = eval_results + iam_client.simulate_custom_policy(
            PolicyInputList=policy_list,
            ActionNames=[
                "logs:DescribeLogGroups"
            ],
            ResourceArns=[
                "*"
            ]
        )['EvaluationResults']
        eval_results = eval_results + iam_client.simulate_custom_policy(
            PolicyInputList=policy_list,
            ActionNames=[
                "cloudwatch:PutMetricData"
            ],
            ResourceArns=[
                "*"
            ]
        )['EvaluationResults']
        eval_results = eval_results + iam_client.simulate_custom_policy(
            PolicyInputList=policy_list,
            ActionNames=[
                "sqs:ChangeMessageVisibility",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl",
                "sqs:ReceiveMessage",
                "sqs:SendMessage"
            ],
            ResourceArns=[
                "arn:aws:sqs:" + REGION + ":*:airflow-celery-*"
            ]
        )['EvaluationResults']
        # tests role to allow any kms all for resources not in this account and that are from the sqs service
        eval_results = eval_results + iam_client.simulate_custom_policy(
            PolicyInputList=policy_list,
            ActionNames=[
                "kms:Decrypt",
                "kms:DescribeKey",
                "kms:Encrypt"
            ],
            ResourceArns=[
                "arn:aws:kms:*:111122223333:key/*"
            ],
            ContextEntries=[
                {
                    'ContextKeyName': 'kms:viaservice',
                    'ContextKeyValues': [
                        'sqs.' + REGION + '.amazonaws.com',
                    ],
                    'ContextKeyType': 'string'
                }
            ],
        )['EvaluationResults']
        eval_results = eval_results + iam_client.simulate_custom_policy(
            PolicyInputList=policy_list,
            ActionNames=[
                "kms:GenerateDataKey*"
            ],
            ResourceArns=[
                "arn:aws:kms:*:111122223333:key/*"
            ],
            ContextEntries=[
                {
                    'ContextKeyName': 'kms:viaservice',
                    'ContextKeyValues': [
                        'sqs.' + REGION + '.amazonaws.com',
                    ],
                    'ContextKeyType': 'string'
                }
            ],
        )['EvaluationResults']
    for eval_result in eval_results:
        if eval_result['EvalDecision'] != 'allowed' and eval_result['EvalActionName'] == "s3:ListAllMyBuckets":
            print("Action:", eval_result['EvalActionName'], "is blocked successfully on resource",
                  eval_result['EvalResourceName'], '✅')
        elif eval_result['EvalDecision'] != 'allowed':
            print("Action:", eval_result['EvalActionName'], "is not allowed on resource",
                  eval_result['EvalResourceName'])
            print("failed with", eval_result['EvalDecision'], "🚫")
        elif eval_result['EvalDecision'] == 'allowed' and eval_result['EvalActionName'] == "s3:ListAllMyBuckets":
            print("Action:", eval_result['EvalActionName'], "is not blocked successfully on resource",
                  eval_result['EvalResourceName'], '🚫')
        elif eval_result['EvalDecision'] == 'allowed':
            print("Action:", eval_result['EvalActionName'], "is allowed on resource",
                  eval_result['EvalResourceName'], '✅')
        else:
            print(eval_result)
    print('If the policy is denied you can investigate more at ')
    print("https://policysim.aws.amazon.com/home/index.jsp?#roles/" + input_env['ExecutionRoleArn'].split("/")[-1])
    print("")
    print('These simulations are based off of the sample policies here ')
    print('https://docs.aws.amazon.com/mwaa/latest/userguide/mwaa-create-role.html#mwaa-create-role-json\n')