in lemur/certificates/cli.py [0:0]
def rotate(endpoint_name, source, new_certificate_name, old_certificate_name, message, commit):
"""
Rotates an endpoint and reissues it if it has not already been replaced. If it has
been replaced, will use the replacement certificate for the rotation.
"""
if commit:
click.echo("[!] Running in COMMIT mode.")
click.echo("[+] Starting endpoint rotation.")
status = FAILURE_METRIC_STATUS
log_data = {
"function": f"{__name__}.{sys._getframe().f_code.co_name}",
}
try:
old_cert = validate_certificate(old_certificate_name)
new_cert = validate_certificate(new_certificate_name)
if source:
endpoint = validate_endpoint_from_source(endpoint_name, source)
else:
try:
endpoint = validate_endpoint(endpoint_name)
except MultipleResultsFound as e:
click.echo(f"[!] Multiple endpoints found with name {endpoint_name}, try narrowing the search down to an endpoint from a specific source by re-running this command with the --source flag.")
log_data["message"] = "Multiple endpoints found with same name, unable to perform rotation"
log_data["duplicated_endpoint_name"] = endpoint_name
current_app.logger.info(log_data)
raise
if endpoint and new_cert:
click.echo(
f"[+] Rotating endpoint: {endpoint.name} to certificate {new_cert.name}"
)
log_data["message"] = "Rotating one endpoint"
log_data["endpoint"] = endpoint.dnsname
log_data["certificate"] = new_cert.name
request_rotation(endpoint, new_cert, message, commit)
current_app.logger.info(log_data)
elif old_cert and new_cert:
click.echo(f"[+] Rotating all endpoints from {old_cert.name} to {new_cert.name}")
log_data["certificate"] = new_cert.name
log_data["certificate_old"] = old_cert.name
log_data["message"] = "Rotating endpoint from old to new cert"
for endpoint in old_cert.endpoints:
click.echo(f"[+] Rotating {endpoint.name}")
log_data["endpoint"] = endpoint.dnsname
request_rotation(endpoint, new_cert, message, commit)
current_app.logger.info(log_data)
else:
# No certificate name or endpoint is provided. We will now fetch all endpoints,
# which are associated with a certificate that has been replaced
click.echo("[+] Rotating all endpoints that have new certificates available")
for endpoint in endpoint_service.get_all_pending_rotation():
log_data["message"] = "Rotating endpoint from old to new cert"
if len(endpoint.certificate.replaced) > 1:
log_data["message"] = f"Multiple replacement certificates found, going with the first one out of " \
f"{len(endpoint.certificate.replaced)}"
log_data["endpoint"] = endpoint.dnsname
log_data["certificate"] = endpoint.certificate.replaced[0].name
click.echo(
f"[+] Rotating {endpoint.name} to {endpoint.certificate.replaced[0].name}"
)
request_rotation(endpoint, endpoint.certificate.replaced[0], message, commit)
current_app.logger.info(log_data)
status = SUCCESS_METRIC_STATUS
click.echo("[+] Done!")
except Exception as e:
capture_exception(
extra={
"old_certificate_name": str(old_certificate_name),
"new_certificate_name": str(new_certificate_name),
"endpoint_name": str(endpoint_name),
"message": str(message),
}
)
metrics.send(
"endpoint_rotation_job",
"counter",
1,
metric_tags={
"status": status,
"old_certificate_name": str(old_certificate_name),
"new_certificate_name": str(new_certificate_name),
"endpoint_name": str(endpoint_name),
"message": str(message),
"endpoint": str(globals().get("endpoint")),
},
)