in source/idea/idea-administrator/resources/lambda_functions/idea_custom_resource_self_signed_certificate/handler.py [0:0]
def handle_create_or_update(event: dict, context):
resource_properties = event.get('ResourceProperties', {})
domain_name = resource_properties.get('domain_name')
certificate_name = resource_properties.get('certificate_name')
create_acm_certificate = resource_properties.get('create_acm_certificate', False)
kms_key_id = resource_properties.get('kms_key_id', None)
tags = resource_properties.get('tags', {})
client = HttpClient()
try:
common_tags = []
for key, value in tags.items():
common_tags.append({
'Key': key,
'Value': value
})
certificate_secret_name = f'{certificate_name}-certificate'
private_key_secret_name = f'{certificate_name}-private-key'
certificate_content = None
private_key_content = None
certificate_secret_arn = None
private_key_secret_arn = None
secretsmanager_client = boto3.client('secretsmanager')
list_secrets_result = secretsmanager_client.list_secrets(
Filters=[
{
'Key': 'tag-key',
'Values': ['res:SecretName']
},
{
'Key': 'tag-value',
'Values': [certificate_secret_name, private_key_secret_name]
}
]
)
secret_list = list_secrets_result.get('SecretList', [])
for secret in secret_list:
name = secret.get('Name')
arn = secret.get('ARN')
get_secret_result = secretsmanager_client.get_secret_value(
SecretId=arn
)
secret_string = get_secret_result.get('SecretString')
if name == certificate_secret_name:
certificate_content = secret_string
certificate_secret_arn = arn
logger.info(f'found: {name}')
elif name == private_key_secret_name:
logger.info(f'found: {name}')
private_key_content = secret_string
private_key_secret_arn = arn
if certificate_content is None and private_key_content is None:
one_day = datetime.timedelta(1, 0, 0)
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend())
public_key = private_key.public_key()
subject = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, 'US'),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, 'California'),
x509.NameAttribute(NameOID.LOCALITY_NAME, 'Sunnyvale'),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, certificate_name),
x509.NameAttribute(NameOID.COMMON_NAME, domain_name)
])
certificate = x509.CertificateBuilder() \
.subject_name(subject) \
.issuer_name(subject) \
.not_valid_before(datetime.datetime.today() - one_day) \
.not_valid_after(datetime.datetime.today() + (one_day * 3650)) \
.serial_number(x509.random_serial_number()) \
.public_key(public_key) \
.add_extension(
x509.SubjectAlternativeName([
x509.DNSName(domain_name)
]), critical=False) \
.add_extension(x509.BasicConstraints(ca=False, path_length=None), critical=True)\
.sign(private_key=private_key, algorithm=hashes.SHA256(), backend=default_backend())
certificate_content = certificate.public_bytes(serialization.Encoding.PEM).decode("utf-8")
private_key_content = private_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.TraditionalOpenSSL,
serialization.NoEncryption()
).decode("utf-8")
# create certificate secret
certificate_secret_tags = list(common_tags)
certificate_secret_tags.append({
'Key': 'res:SecretName',
'Value': certificate_secret_name
})
create_secret_request = {
'Name': f'{certificate_secret_name}',
'Description': f'Self-Signed certificate for domain name: {domain_name}',
'SecretString': certificate_content,
'Tags': certificate_secret_tags
}
if kms_key_id is not None:
create_secret_request['KmsKeyId'] = kms_key_id
create_certificate_secret_result = secretsmanager_client.create_secret(**create_secret_request)
certificate_secret_arn = create_certificate_secret_result.get('ARN')
# create private key secret
private_key_secret_tags = list(common_tags)
private_key_secret_tags.append({
'Key': 'res:SecretName',
'Value': private_key_secret_name
})
create_secret_request = {
'Name': f'{private_key_secret_name}',
'Description': f'Self-Signed certificate private key for domain name: {domain_name}',
'SecretString': private_key_content,
'Tags': private_key_secret_tags
}
if kms_key_id is not None:
create_secret_request['KmsKeyId'] = kms_key_id
create_private_key_secret_result = secretsmanager_client.create_secret(**create_secret_request)
private_key_secret_arn = create_private_key_secret_result.get('ARN')
acm_certificate_arn = None
if create_acm_certificate:
acm_client = boto3.client('acm')
result = acm_client.list_certificates(CertificateStatuses=['ISSUED'])
certificate_summary_list = result.get('CertificateSummaryList', [])
for cert in certificate_summary_list:
if domain_name == cert.get('DomainName'):
acm_certificate_arn = cert.get('CertificateArn')
if acm_certificate_arn is None:
response = acm_client.import_certificate(
Certificate=certificate_content,
PrivateKey=private_key_content,
Tags=common_tags
)
acm_certificate_arn = response.get('CertificateArn')
client.send_cfn_response(CfnResponse(
context=context,
event=event,
status=CfnResponseStatus.SUCCESS,
data={
'certificate_secret_arn': certificate_secret_arn,
'private_key_secret_arn': private_key_secret_arn,
'acm_certificate_arn': acm_certificate_arn
},
physical_resource_id=certificate_name
))
except Exception as e:
error_message = f'failed to create certificate: {certificate_name} - {e}'
logger.exception(error_message)
client.send_cfn_response(CfnResponse(
context=context,
event=event,
status=CfnResponseStatus.FAILED,
data={},
physical_resource_id=certificate_name,
reason=error_message
))
finally:
client.destroy()