in MWAA/verify_env/verify_env.py [0:0]
def check_iam_permissions(input_env, iam_client):
'''uses iam simulation to check permissions of the role assigned to the environment'''
print('### Checking the IAM execution role', input_env['ExecutionRoleArn'], 'using iam policy simulation')
account_id = get_account_id(input_env)
policies = iam_client.list_attached_role_policies(
RoleName=input_env['ExecutionRoleArn'].split("/")[-1]
)['AttachedPolicies']
policy_list = []
for policy in policies:
policy_arn = policy['PolicyArn']
policy_version = iam_client.get_policy(PolicyArn=policy_arn)['Policy']['DefaultVersionId']
policy_doc = iam_client.get_policy_version(PolicyArn=policy_arn,
VersionId=policy_version)['PolicyVersion']['Document']
policy_list.append(json.dumps(policy_doc))
eval_results = []
# Add inline policies
policy_list.extend(get_inline_policies(iam_client, input_env['ExecutionRoleArn'].split("/")[-1]))
if "KmsKey" in input_env:
print('Found Customer managed CMK')
eval_results = eval_results + iam_client.simulate_custom_policy(
PolicyInputList=policy_list,
ActionNames=[
"airflow:PublishMetrics"
],
ResourceArns=[
input_env['Arn']
]
)['EvaluationResults']
# this next test should be denied
eval_results = eval_results + iam_client.simulate_custom_policy(
PolicyInputList=policy_list,
ActionNames=[
"s3:ListAllMyBuckets"
],
ResourceArns=[
input_env['SourceBucketArn'],
input_env['SourceBucketArn'] + '/'
]
)['EvaluationResults']
eval_results = eval_results + iam_client.simulate_custom_policy(
PolicyInputList=policy_list,
ActionNames=[
"s3:GetObject*",
"s3:GetBucket*",
"s3:List*"
],
ResourceArns=[
input_env['SourceBucketArn'],
input_env['SourceBucketArn'] + '/'
]
)['EvaluationResults']
eval_results = eval_results + iam_client.simulate_custom_policy(
PolicyInputList=policy_list,
ActionNames=[
"logs:CreateLogStream",
"logs:CreateLogGroup",
"logs:PutLogEvents",
"logs:GetLogEvents",
"logs:GetLogGroupFields"
],
ResourceArns=[
"arn:aws:logs:" + REGION + ":" + account_id + ":log-group:airflow-" + ENV_NAME + "-*"
]
)['EvaluationResults']
eval_results = eval_results + iam_client.simulate_custom_policy(
PolicyInputList=policy_list,
ActionNames=[
"logs:DescribeLogGroups"
],
ResourceArns=[
"*"
]
)['EvaluationResults']
eval_results = eval_results + iam_client.simulate_custom_policy(
PolicyInputList=policy_list,
ActionNames=[
"cloudwatch:PutMetricData"
],
ResourceArns=[
"*"
]
)['EvaluationResults']
eval_results = eval_results + iam_client.simulate_custom_policy(
PolicyInputList=policy_list,
ActionNames=[
"sqs:ChangeMessageVisibility",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl",
"sqs:ReceiveMessage",
"sqs:SendMessage"
],
ResourceArns=[
"arn:aws:sqs:" + REGION + ":*:airflow-celery-*"
]
)['EvaluationResults']
eval_results = eval_results + iam_client.simulate_custom_policy(
PolicyInputList=policy_list,
ActionNames=[
"kms:GenerateDataKey*"
],
ResourceArns=[
input_env['KmsKey']
],
ContextEntries=[
{
'ContextKeyName': 'kms:viaservice',
'ContextKeyValues': [
's3.' + REGION + '.amazonaws.com'
],
'ContextKeyType': 'string'
}
],
)['EvaluationResults']
eval_results = eval_results + iam_client.simulate_custom_policy(
PolicyInputList=policy_list,
ActionNames=[
"kms:GenerateDataKey*"
],
ResourceArns=[
input_env['KmsKey']
],
ContextEntries=[
{
'ContextKeyName': 'kms:viaservice',
'ContextKeyValues': [
'sqs.' + REGION + '.amazonaws.com',
],
'ContextKeyType': 'string'
}
],
)['EvaluationResults']
eval_results = eval_results + iam_client.simulate_custom_policy(
PolicyInputList=policy_list,
ActionNames=[
"kms:Decrypt",
"kms:DescribeKey",
"kms:Encrypt"
],
ResourceArns=[
input_env['KmsKey']
],
ContextEntries=[
{
'ContextKeyName': 'kms:viaservice',
'ContextKeyValues': [
's3.' + REGION + '.amazonaws.com'
],
'ContextKeyType': 'string'
}
],
)['EvaluationResults']
eval_results = eval_results + iam_client.simulate_custom_policy(
PolicyInputList=policy_list,
ActionNames=[
"kms:Decrypt",
"kms:DescribeKey",
"kms:Encrypt"
],
ResourceArns=[
input_env['KmsKey']
],
ContextEntries=[
{
'ContextKeyName': 'kms:viaservice',
'ContextKeyValues': [
'sqs.' + REGION + '.amazonaws.com'
],
'ContextKeyType': 'string'
}
],
)['EvaluationResults']
else:
print('Using AWS CMK')
eval_results = eval_results + iam_client.simulate_custom_policy(
PolicyInputList=policy_list,
ActionNames=[
"airflow:PublishMetrics"
],
ResourceArns=[
input_env['Arn']
]
)['EvaluationResults']
# this action should be denied
eval_results = eval_results + iam_client.simulate_custom_policy(
PolicyInputList=policy_list,
ActionNames=[
"s3:ListAllMyBuckets"
],
ResourceArns=[
input_env['SourceBucketArn'],
input_env['SourceBucketArn'] + '/'
]
)['EvaluationResults']
eval_results = eval_results + iam_client.simulate_custom_policy(
PolicyInputList=policy_list,
ActionNames=[
"s3:GetObject*",
"s3:GetBucket*",
"s3:List*"
],
ResourceArns=[
input_env['SourceBucketArn'],
input_env['SourceBucketArn'] + '/'
]
)['EvaluationResults']
eval_results = eval_results + iam_client.simulate_custom_policy(
PolicyInputList=policy_list,
ActionNames=[
"logs:CreateLogStream",
"logs:CreateLogGroup",
"logs:PutLogEvents",
"logs:GetLogEvents",
"logs:GetLogGroupFields"
],
ResourceArns=[
"arn:aws:logs:" + REGION + ":" + account_id + ":log-group:airflow-" + ENV_NAME + "-*"
]
)['EvaluationResults']
eval_results = eval_results + iam_client.simulate_custom_policy(
PolicyInputList=policy_list,
ActionNames=[
"logs:DescribeLogGroups"
],
ResourceArns=[
"*"
]
)['EvaluationResults']
eval_results = eval_results + iam_client.simulate_custom_policy(
PolicyInputList=policy_list,
ActionNames=[
"cloudwatch:PutMetricData"
],
ResourceArns=[
"*"
]
)['EvaluationResults']
eval_results = eval_results + iam_client.simulate_custom_policy(
PolicyInputList=policy_list,
ActionNames=[
"sqs:ChangeMessageVisibility",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl",
"sqs:ReceiveMessage",
"sqs:SendMessage"
],
ResourceArns=[
"arn:aws:sqs:" + REGION + ":*:airflow-celery-*"
]
)['EvaluationResults']
# tests role to allow any kms all for resources not in this account and that are from the sqs service
eval_results = eval_results + iam_client.simulate_custom_policy(
PolicyInputList=policy_list,
ActionNames=[
"kms:Decrypt",
"kms:DescribeKey",
"kms:Encrypt"
],
ResourceArns=[
"arn:aws:kms:*:111122223333:key/*"
],
ContextEntries=[
{
'ContextKeyName': 'kms:viaservice',
'ContextKeyValues': [
'sqs.' + REGION + '.amazonaws.com',
],
'ContextKeyType': 'string'
}
],
)['EvaluationResults']
eval_results = eval_results + iam_client.simulate_custom_policy(
PolicyInputList=policy_list,
ActionNames=[
"kms:GenerateDataKey*"
],
ResourceArns=[
"arn:aws:kms:*:111122223333:key/*"
],
ContextEntries=[
{
'ContextKeyName': 'kms:viaservice',
'ContextKeyValues': [
'sqs.' + REGION + '.amazonaws.com',
],
'ContextKeyType': 'string'
}
],
)['EvaluationResults']
for eval_result in eval_results:
if eval_result['EvalDecision'] != 'allowed' and eval_result['EvalActionName'] == "s3:ListAllMyBuckets":
print("Action:", eval_result['EvalActionName'], "is blocked successfully on resource",
eval_result['EvalResourceName'], '✅')
elif eval_result['EvalDecision'] != 'allowed':
print("Action:", eval_result['EvalActionName'], "is not allowed on resource",
eval_result['EvalResourceName'])
print("failed with", eval_result['EvalDecision'], "🚫")
elif eval_result['EvalDecision'] == 'allowed' and eval_result['EvalActionName'] == "s3:ListAllMyBuckets":
print("Action:", eval_result['EvalActionName'], "is not blocked successfully on resource",
eval_result['EvalResourceName'], '🚫')
elif eval_result['EvalDecision'] == 'allowed':
print("Action:", eval_result['EvalActionName'], "is allowed on resource",
eval_result['EvalResourceName'], '✅')
else:
print(eval_result)
print('If the policy is denied you can investigate more at ')
print("https://policysim.aws.amazon.com/home/index.jsp?#roles/" + input_env['ExecutionRoleArn'].split("/")[-1])
print("")
print('These simulations are based off of the sample policies here ')
print('https://docs.aws.amazon.com/mwaa/latest/userguide/mwaa-create-role.html#mwaa-create-role-json\n')