in lib/custom_resources/create_and_attach_scp.py [0:0]
def create_or_attach_scps(org_session, bucket_name, attach_scp=False):
LOGGER.info("Bucket name : {}".format(bucket_name))
dir_root = "scp/"
metadata_key = "".join([dir_root, "metadata.json"])
metadata = get_dict_from_key(bucket_name, metadata_key)
# Extract the OU metadata
ou_list = metadata['ou-ids']
for ou in ou_list:
ou_id = ou['id']
dir_name = ou['dir_name']
scp_list = ou['scps']
for file_name in scp_list:
scp_key = "".join([dir_root, dir_name, "/", file_name])
scp_name = Path(file_name).resolve().stem.title()
ou_name = dir_name.title()
policy_name = "{}-{}".format(scp_name, ou_name)
policy_desc = "{} SCP for {}".format(scp_name, ou_name)
policy_body = get_policy(bucket_name, scp_key)
"""Fetch all the policies"""
current_policies = get_current_policies(org_session)
if attach_scp is False:
"""Check if policy with the same name exists - Update the policy ELSE create policy"""
policy_names = list(current_policies.keys())
if policy_name in policy_names:
org_session.update_policy(
PolicyId=current_policies[policy_name],
Name=policy_name,
Description=policy_desc,
Content=policy_body
)
else:
new_policy_response = org_session.create_policy(
Name=policy_name,
Description=policy_desc,
Content=policy_body,
Type="SERVICE_CONTROL_POLICY"
)
"""Add to the current policies"""
new_policy_id_ = new_policy_response['Policy']['PolicySummary']['Id']
current_policies[policy_name] = new_policy_id_
if attach_scp is True:
LOGGER.info("Within the delivery of the SCPs - Attach to the OU")
try:
org_session.attach_policy(
PolicyId=current_policies[policy_name],
TargetId=ou_id
)
except ClientError as ce:
code_ = ce.response['Error']['Code']
message_ = ce.response['Error']['Message']
if code_ == "DuplicatePolicyAttachmentException" or code_ == "ConstraintViolationException":
LOGGER.info(message_)
LOGGER.info("Skipping the exception since the policy is existing or can't be attached")
else:
raise BaseException(ce)