in lemur/plugins/lemur_acme/acme_handlers.py [0:0]
def setup_acme_client_no_retry(self, authority):
if not authority.options:
raise InvalidAuthority("Invalid authority. Options not set")
options = {}
for option in json.loads(authority.options):
options[option["name"]] = option.get("value")
email = options.get("email", current_app.config.get("ACME_EMAIL"))
tel = options.get("telephone", current_app.config.get("ACME_TEL"))
directory_url = options.get(
"acme_url", current_app.config.get("ACME_DIRECTORY_URL")
)
existing_key = options.get(
"acme_private_key", current_app.config.get("ACME_PRIVATE_KEY")
)
existing_regr = options.get("acme_regr", current_app.config.get("ACME_REGR"))
eab_kid = options.get("eab_kid", None)
eab_hmac_key = options.get("eab_hmac_key", None)
if existing_key and existing_regr:
current_app.logger.debug("Reusing existing ACME account")
# Reuse the same account for each certificate issuance
# existing_key might be encrypted
if not is_json(existing_key):
# decrypt the private key, if not already in plaintext (json format)
existing_key = data_decrypt(existing_key)
key = jose.JWK.json_loads(existing_key)
regr = messages.RegistrationResource.json_loads(existing_regr)
current_app.logger.debug(
f"Connecting with directory at {directory_url}"
)
net = ClientNetwork(key, account=regr, alg=key_to_alg(key))
directory = ClientV2.get_directory(directory_url, net)
client = ClientV2(directory, net=net)
return client, {}
else:
# Create an account for each certificate issuance
key = jose.JWKRSA(key=generate_private_key("RSA2048"))
current_app.logger.debug("Creating a new ACME account")
current_app.logger.debug(
f"Connecting with directory at {directory_url}"
)
net = ClientNetwork(key, account=None, timeout=3600, alg=key_to_alg(key))
directory = ClientV2.get_directory(directory_url, net)
client = ClientV2(directory, net=net)
if eab_kid and eab_hmac_key:
# external account binding (eab_kid and eab_hmac_key could be potentially single use to establish
# long-term credentials)
eab = messages.ExternalAccountBinding.from_data(account_public_key=key.public_key(),
kid=eab_kid,
hmac_key=eab_hmac_key,
directory=client.directory)
registration = client.new_account(
messages.NewRegistration.from_data(email=email, external_account_binding=eab,
terms_of_service_agreed=True)
)
else:
registration = client.new_account(
messages.NewRegistration.from_data(email=email, terms_of_service_agreed=True)
)
# if store_account is checked, add the private_key and registration resources to the options
if options['store_account']:
new_options = json.loads(authority.options)
# the key returned by fields_to_partial_json is missing the key type, so we add it manually
key_dict = key.fields_to_partial_json()
key_dict["kty"] = "RSA"
acme_private_key = {
"name": "acme_private_key",
"value": data_encrypt(json.dumps(key_dict))
}
new_options.append(acme_private_key)
acme_regr = {
"name": "acme_regr",
"value": json.dumps({"body": {}, "uri": registration.uri})
}
new_options.append(acme_regr)
authorities_service.update_options(authority.id, options=json.dumps(new_options))
current_app.logger.debug(f"Connected: {registration.uri}")
return client, registration