in usecase-4/usecase-4-step-6.py [0:0]
def main():
"""
####################################################
# Creating a server cert for a flask web server #
####################################################
"""
try:
acm_pca_client = boto3.client('acm-pca')
ddb_client = boto3.client('dynamodb')
current_directory_path = os.path.dirname(os.path.realpath(__file__)) + '/'
print("This step will take about 2 minutes to complete\n")
###########################################################################################
# Getting subordinate_pca_arn stored in dynamo DB #
# The subordinate pca arn created in part 1 is required for issuing certificates #
###########################################################################################
response = ddb_client.get_item(
TableName='shared_variables_crypto_builders',
Key={
'shared_variables': {
'N': '1000',
},
'session': {
'N': '1000',
},
},
)
subordinate_pca_arn = response['Item']['subordinate_pca_arn']['S']
###########################################################################################
# create a key pair for the webserver endpoint #
# The private key portion of the key pair is required for signing the webserver CSR #
# The private key used here is for demonstration purposes, the best practice #
# is to store private keys on an HSM #
###########################################################################################
csr_webserver_privkey = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
csr_webserver_privkey_pem = csr_webserver_privkey.private_bytes(encoding=serialization.Encoding.PEM,\
format=serialization.PrivateFormat.PKCS8,\
encryption_algorithm=NoEncryption())
response = ddb_client.update_item(
ExpressionAttributeNames={
'#wep': 'webserver_endpoint_privatekey',
},
ExpressionAttributeValues={
':b': {
'B': csr_webserver_privkey_pem,
},
},
Key={
'shared_variables': {
'N': '1000',
},
'session': {
'N': '1000',
},
},
ReturnValues='ALL_NEW',
TableName='shared_variables_crypto_builders',
UpdateExpression='SET #wep = :b',
)
###########################################################################################
# createa a csr for the webserver endpoint 127.0.0.1 #
# #
# A simple flask webserver is the endpoint here #
# #
# We are not using domain names in this example because obtaining a domain name takes #
# time, therefore we are using 127.0.0.1 as the common name within the CSR subject #
# This is purely for demonstration purposes #
###########################################################################################
endpoint_serial_number = random.randint(1, 100000)
subject_name_csr = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, '127.0.0.1'),
x509.NameAttribute(NameOID.COUNTRY_NAME, 'US'),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, 'Nevada'),
x509.NameAttribute(NameOID.LOCALITY_NAME, 'Las Vegas'),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, 'customer'),
x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, 'customerdept'),
x509.NameAttribute(NameOID.SERIAL_NUMBER, str(str(endpoint_serial_number)))
])
# ca=False for non ca Certs
basic_contraints = x509.BasicConstraints(ca=False, path_length=None)
csr = (
x509.CertificateSigningRequestBuilder()
.subject_name(subject_name_csr)
.add_extension(basic_contraints, True)
.sign(csr_webserver_privkey, hashes.SHA256(), default_backend())
)
csr_pem = csr.public_bytes(encoding=serialization.Encoding.PEM)
############################################################################################
# #
# ACM issues a certificate for the webserver endpoint that's valid for 6 months #
# or 180 days #
# #
# Currently waiters are not available for ACM,therefore sleeping for 30 seconds so that # #
# certificate is issued. The reason for the sleep is that issuing the certificate #
# takes a few seconds #
# #
############################################################################################
response = acm_pca_client.issue_certificate(
CertificateAuthorityArn=subordinate_pca_arn,
Csr=csr_pem,
SigningAlgorithm='SHA256WITHRSA',
Validity={
'Value': 180,
'Type': 'DAYS'
},
IdempotencyToken='dp-workshop-subordinate'
)
cert_arn = response['CertificateArn']
time.sleep(30)
##########################################################################################
# 1.The web server endpoint cert file is called webserver_cert.pem #
# #
# 2.The certificate chain file is called webserver_cert_chain.pem #
# #
# 3.The pem key file webserver_privkey.pem contains the private key for the webserver #
# private keys shoud not be stored in files ,best practice is to store it on an HSM #
##########################################################################################
response = acm_pca_client.get_certificate(
CertificateAuthorityArn=subordinate_pca_arn,
CertificateArn=cert_arn
)
current_directory_path = os.path.dirname(os.path.realpath(__file__)) + '/'
webserver_cert_path = current_directory_path + 'webserver_cert.pem'
webserver_cert_chain_path = current_directory_path + 'webserver_cert_chain.pem'
webserver_privkey_path = current_directory_path + 'webserver_privkey.pem'
textfilecert = open(webserver_cert_path, 'wb')
textfilecert.write(response['Certificate'].encode('utf_8'))
textfilecert.close()
textfilecertchain = open(webserver_cert_chain_path, 'wb')
textfilecertchain.write(response['CertificateChain'].encode('utf_8'))
textfilecertchain.close()
textfilecertchain = open(webserver_privkey_path, 'wb')
textfilecertchain.write(csr_webserver_privkey_pem)
textfilecertchain.close()
print("Successfully created server certificate webserver_cert.pem for the flask web server\n")
print("Successfully created chain of trust for the flask web server")
print("\nStep-6 has been successfully completed \n")
except:
print("Unexpected error:", sys.exc_info()[0])
raise
else:
exit(0)