in lemur/certificates/cli.py [0:0]
def disable_rotation_of_duplicate_certificates(commit):
log_data = {
"function": f"{__name__}.{sys._getframe().f_code.co_name}",
"message": "Disabling auto-rotate for duplicate certificates"
}
if commit:
click.echo("[!] Running in COMMIT mode.")
authority_names = current_app.config.get("AUTHORITY_TO_DISABLE_ROTATE_OF_DUPLICATE_CERTIFICATES")
if not authority_names:
log_data["message"] = "Skipping task: No authorities configured"
current_app.logger.debug(log_data)
return
log_data["authorities"] = authority_names
days_since_issuance = current_app.config.get("DAYS_SINCE_ISSUANCE_DISABLE_ROTATE_OF_DUPLICATE_CERTIFICATES", None)
log_data["days_since_issuance"] = f"{days_since_issuance} (Ignored if none)"
authority_ids = []
invalid_authorities = []
for authority_name in authority_names:
authority = get_authority_by_name(authority_name)
if authority:
authority_ids.append(authority.id)
else:
invalid_authorities.append(authority_name)
if invalid_authorities:
log_data["warning"] = f"Non-existing authorities: {invalid_authorities}"
if not authority_ids:
log_data["message"] = "Skipping task: No valid authorities configured"
current_app.logger.error(log_data)
return
duplicate_candidate_certs = list_recent_valid_certs_issued_by_authority(authority_ids, days_since_issuance)
log_data["duplicate_candidate_certs_count"] = len(duplicate_candidate_certs)
current_app.logger.info(log_data)
skipped_certs = []
rotation_disabled_certs = []
unique_common_names = []
failed_certs = []
for duplicate_candidate_cert in duplicate_candidate_certs:
success, duplicates = process_duplicates(duplicate_candidate_cert,
days_since_issuance,
skipped_certs,
rotation_disabled_certs,
unique_common_names,
commit
)
if not success:
for cert in duplicates:
failed_certs.append(cert.name)
metrics.send("disable_rotation_duplicates", "counter", 1,
metric_tags={"status": "failed", "certificate": cert.name}
)
# certs_with_serial_number_count + unique_common_names_count should be equal to
# rotation_disabled_cert_count + rotation_disabled_cert_count + failed_to_determine_if_duplicate_count
log_data["message"] = "Summary of task run"
log_data["unique_common_names_count"] = len(unique_common_names)
log_data["rotation_disabled_cert_count"] = len(rotation_disabled_certs)
log_data["certificate_with_no_change_count"] = len(skipped_certs)
log_data["failed_to_determine_if_duplicate_count"] = len(failed_certs)
current_app.logger.info(log_data)