in source/code/builders/cross_account_role_builder.py [0:0]
def compress_template(source_template):
def build_policy(condition_actions, ops_automator_role, policy_number):
policy = {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyName": POLICY_NAME.format(policy_number),
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Action": list(condition_actions[1]),
"Resource": "*",
"Effect": "Allow"
}
]
},
"Roles": [
{
"Ref": ops_automator_role
}
]
}
}
return policy
def condition_groups(condition_list):
conditions = copy.deepcopy(condition_list)
batch_list = []
while len(conditions) > 0:
i = conditions.pop()
batch_list.append(i)
if len(batch_list) == 10 or len(batch_list) == 9 and len(conditions) == 2:
yield batch_list
batch_list = []
if len(batch_list) > 0:
yield batch_list
def build_compressed_condition_to_actions_map(actions_conditions, policies):
for p in policies:
policy_actions = policies[p]["Properties"]["PolicyDocument"]["Statement"][0]["Action"]
# build dict of actions mapped to conditions
for action in policy_actions:
if action not in actions_conditions:
actions_conditions[action] = set()
actions_conditions[action].add(policies[p]["Condition"])
# build list of sets of conditions mapped to the actions
conditions_actions = []
for u in actions_conditions:
conditions_actions.append((actions_conditions[u], {u}))
# sort, longest conditions set first
conditions_actions = sorted(conditions_actions, key=lambda l: len(l[0]), reverse=True)
# compressed table, combines condition sets and actions
compressed_conditions_actions = []
for ca in conditions_actions:
for cca in compressed_conditions_actions:
if cca[0].issubset(ca[0]) or cca[0] == ca[0]:
cca[1].update(ca[1])
break
else:
compressed_conditions_actions.append(ca)
return compressed_conditions_actions
template = copy.deepcopy(source_template)
resources = OrderedDict()
resources.update(template.get("Resources", {}))
conditions = OrderedDict()
conditions.update(template.get("Conditions", {}))
policies = {r: resources[r] for r in resources if resources[r]["Type"] == "AWS::IAM::Policy"}
ops_automator_role = [r for r in resources if resources[r]["Type"] == "AWS::IAM::Role"][0]
# maps actions to conditions
actions_conditions = {}
compressed_conditions_actions = build_compressed_condition_to_actions_map(actions_conditions, policies)
condition_number = 0
policy_number = 0
for p in policies:
del resources[p]
for condition_actions in compressed_conditions_actions:
policy = build_policy(condition_actions, ops_automator_role, policy_number)
if len(condition_actions[0]) == 1:
policy["Condition"] = list(condition_actions[0])[0]
else:
conditions_groupings = [c for c in condition_groups(condition_actions[0])]
for cg in conditions_groupings:
conditions[CONDITION_NAME.format(condition_number)] = {
"Fn::Or": [{"Condition": c} for c in cg]
}
condition_number += 1
if len(conditions_groupings) > 1:
conditions[CONDITION_NAME.format(condition_number)] = {
"Fn::Or": [{"Condition": CONDITION_NAME.format(cg)} for cg in
range(condition_number - len(conditions_groupings), condition_number)]
}
condition_number += 1
policy["Condition"] = CONDITION_NAME.format(condition_number - 1)
resources[POLICY_NAME.format(policy_number)] = policy
policy_number += 1
template["Resources"] = resources
template["Conditions"] = conditions
return template