in provisioning_lambda/lambda_function.py [0:0]
def provisioning_handler(event, context):
resp = {'statusCode': HTTPStatus.OK.value, 'body': {}}
try:
check_env()
body = get_request_body(event)
dsn = body['DSN']
public_key = body['publicKey']
ca_arn = os.environ['CA_ARN']
config = get_config()
csr, privatekey = create_csr_key(config['certificate']['subject'])
acmpca = AcmPca()
cert_arn, cert_pem = acmpca.issue_cert(ca_arn, csr, config['iot']['certValidity'])
ca_cert_pem = acmpca.get_ca_certificate(ca_arn)
thing_info = Iot().register_iot_thing(dsn, cert_pem, ca_cert_pem, config['iot'])
record_device_info(dsn, cert_arn, thing_info)
# generate X25519 key pair
local_private_key = X25519PrivateKey.generate()
local_public_key = local_private_key.public_key()
local_public_key_str = base64.b64encode(
local_public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
)
).decode('ascii')
# generate shared key
secret = local_private_key.exchange(public_key)
# encrypt private key
fernet = Fernet(base64.b64encode(secret))
encrypted_private_key = fernet.encrypt(privatekey.encode('utf-8')).decode('utf-8')
resp['body'] = {
'certificatePem': cert_pem,
'encryptedPrivateKey': encrypted_private_key,
'publicKey': local_public_key_str
}
except InvalidRequestException as ire:
logger.warning('Invalid request: %s', json.dumps(event))
resp.update({'statusCode': HTTPStatus.BAD_REQUEST.value, 'body': {'msg': str(ire)}})
except EnvironmentException as ire:
logger.error('Environment error: %s', str(ire))
resp.update(
{
'statusCode': HTTPStatus.INTERNAL_SERVER_ERROR.value,
'body': {'msg': HTTPStatus.INTERNAL_SERVER_ERROR.description},
}
)
except ConfigError as ce:
logger.error('Config error: %s', str(ce))
resp.update(
{
'statusCode': HTTPStatus.INTERNAL_SERVER_ERROR.value,
'body': {'msg': HTTPStatus.INTERNAL_SERVER_ERROR.description},
}
)
except IssueCertException as ice:
logger.error('acm-pca issue certificate fail: %s', str(ice))
resp.update({'statusCode': HTTPStatus.BAD_GATEWAY.value, 'body': {'msg': 'Fail to issue certificate'}})
except IotException as ie:
logger.error('Fail to create IoT thing on AWS IoT: %s', str(ie))
resp.update({'statusCode': HTTPStatus.BAD_GATEWAY.value, 'body': {'msg': 'Fail to create info on IoT Core'}})
except Exception as e:
logger.error('Unexpected exception: %s', str(e))
resp.update({'statusCode': HTTPStatus.INTERNAL_SERVER_ERROR.value, 'body': {'msg': 'Unknown error'}})
resp['body'] = json.dumps(resp['body'])
return resp