in source/config_deployer.py [0:0]
def config_deployer(event):
try:
s3 = S3(logger)
# set variables
source_bucket_name = event.get('BucketConfig', {}) \
.get('SourceBucketName')
key_name = event.get('BucketConfig', {}).get('SourceS3Key')
destination_bucket_name = event.get('BucketConfig', {}) \
.get('DestinationBucketName')
input_zip_file_name = key_name.split("/")[-1] if "/" in key_name \
else key_name
output_zip_file_name = event.get('BucketConfig', {}) \
.get('DestinationS3Key')
alias_name = event.get('KMSConfig', {}).get('KMSKeyAlias')
policy = event.get('KMSConfig', {}).get('KMSKeyPolicy')
flag_value = event.get('MetricsFlag')
base_path = '/tmp/custom_control_tower'
input_file_path = base_path + "/" + input_zip_file_name
extract_path = base_path + "/" + 'extract'
output_path = base_path + "/" + 'out'
exclude_j2_files = []
# Search for existing KMS key alias
key_id = find_alias(alias_name)
# if alias name not found in the list, create a new alias with
# new target key
if not key_id:
key_id = create_cmk_with_alias(alias_name, policy)
logger.info('Key ID created: {}'.format(key_id))
kms.enable_key_rotation(key_id)
logger.info('Automatic key rotation enabled.')
else:
logger.info('Key ID: {} found attached with alias: {}'
.format(key_id, alias_name))
logger.info('Updating KMS key policy')
update_key_policy(key_id, policy)
kms.enable_key_rotation(key_id)
# Encrypt configuration bucket
s3.put_bucket_encryption(destination_bucket_name, key_id)
# Download the file from Solutions S3 bucket
make_dir(base_path, logger)
s3.download_file(source_bucket_name, key_name, input_file_path)
# Unzip the config zip file
unzip_function(input_zip_file_name, base_path, extract_path)
# Find and replace the variable in Manifest file
for item in event.get('FindReplace'):
f = item.get('FileName')
parameters = item.get('Parameters')
exclude_j2_files.append(f)
filename, file_extension = os.path.splitext(f)
destination_file_path = extract_path + "/" + filename \
if file_extension == '.j2' else extract_path + "/" + f
find_replace(extract_path, f, destination_file_path, parameters)
# Zip the contents
exclude = ['zip'] + exclude_j2_files
make_dir(output_path, logger)
zip_function(output_zip_file_name, extract_path, output_path, exclude)
# Upload the file in the customer S3 bucket
local_file = output_path + "/" + output_zip_file_name
remote_file = output_zip_file_name
s3.upload_file(destination_bucket_name, local_file, remote_file)
# create SSM parameters to send anonymous data if opted in
put_ssm_parameter('/org/primary/metrics_flag', flag_value)
put_ssm_parameter('/org/primary/customer_uuid', str(uuid4()))
return None
except Exception as e:
logger.log_general_exception(
__file__.split('/')[-1], inspect.stack()[0][3], e)
raise