in lemur/plugins/lemur_acme/challenge_types.py [0:0]
def create_certificate(self, csr, issuer_options):
"""
Creates an ACME certificate using the HTTP-01 challenge.
:param csr:
:param issuer_options:
:return: :raise Exception:
"""
self.acme = AcmeHandler()
authority = issuer_options.get("authority")
acme_client, registration = self.acme.setup_acme_client(authority)
orderr = acme_client.new_order(csr_to_string(csr))
chall = []
deployed_challenges = []
all_pre_validated = True
for authz in orderr.authorizations:
# Choosing challenge.
if authz.body.status != STATUS_VALID:
all_pre_validated = False
# authz.body.challenges is a set of ChallengeBody objects.
for i in authz.body.challenges:
# Find the supported challenge.
if isinstance(i.chall, challenges.HTTP01):
chall.append(i)
else:
metrics.send("get_acme_challenges_already_valid", "counter", 1)
log_data = {"message": "already validated, skipping", "hostname": authz.body.identifier.value}
current_app.logger.info(log_data)
if len(chall) == 0 and not all_pre_validated:
raise Exception(f'HTTP-01 challenge was not offered by the CA server at {orderr.uri}')
elif not all_pre_validated:
validation_target = None
for option in json.loads(issuer_options["authority"].options):
if option["name"] == "tokenDestination":
validation_target = option["value"]
if validation_target is None:
raise Exception('No token_destination configured for this authority. Cant complete HTTP-01 challenge')
for challenge in chall:
try:
response = self.deploy(challenge, acme_client, validation_target)
deployed_challenges.append(challenge.chall.path)
acme_client.answer_challenge(challenge, response)
except Exception as e:
current_app.logger.error(e)
raise Exception('Failure while trying to deploy token to configure destination. See logs for more information')
current_app.logger.info("Uploaded HTTP-01 challenge tokens, trying to poll and finalize the order")
try:
deadline = datetime.now() + timedelta(seconds=90)
orderr = acme_client.poll_authorizations(orderr, deadline)
finalized_orderr = acme_client.finalize_order(orderr, deadline, fetch_alternative_chains=True)
except errors.ValidationError as validationError:
error_message = "Validation error occurred, can\'t complete challenges. See logs for more information."
for authz in validationError.failed_authzrs:
for chall in authz.body.challenges:
if chall.error:
error_message = f"ValidationError occurred of type: {chall.error.typ}, " \
f"with message: {ERROR_CODES[chall.error.code]}, " \
f"detail: {chall.error.detail}"
current_app.logger.error(error_message)
raise Exception(error_message)
pem_certificate, pem_certificate_chain = self.acme.extract_cert_and_chain(finalized_orderr.fullchain_pem,
finalized_orderr.alternative_fullchains_pem)
if "drop_last_cert_from_chain" in authority.options:
for option in json.loads(authority.options):
if option["name"] == "drop_last_cert_from_chain" and option["value"] is True:
# skipping the last element
pem_certificate_chain = drop_last_cert_from_chain(pem_certificate_chain)
self.acme.log_remaining_validation(finalized_orderr.authorizations,
acme_client.net.account.uri.replace('https://', ''))
if len(deployed_challenges) != 0:
for token_path in deployed_challenges:
self.cleanup(token_path, validation_target)
# validation is a random string, we use it as external id, to make it possible to implement revoke_certificate
return pem_certificate, pem_certificate_chain, None