def disable_rotation_of_duplicate_certificates()

in lemur/certificates/cli.py [0:0]


def disable_rotation_of_duplicate_certificates(commit):
    log_data = {
        "function": f"{__name__}.{sys._getframe().f_code.co_name}",
        "message": "Disabling auto-rotate for duplicate certificates"
    }

    if commit:
        click.echo("[!] Running in COMMIT mode.")

    authority_names = current_app.config.get("AUTHORITY_TO_DISABLE_ROTATE_OF_DUPLICATE_CERTIFICATES")
    if not authority_names:
        log_data["message"] = "Skipping task: No authorities configured"
        current_app.logger.debug(log_data)
        return

    log_data["authorities"] = authority_names
    days_since_issuance = current_app.config.get("DAYS_SINCE_ISSUANCE_DISABLE_ROTATE_OF_DUPLICATE_CERTIFICATES", None)
    log_data["days_since_issuance"] = f"{days_since_issuance} (Ignored if none)"

    authority_ids = []
    invalid_authorities = []
    for authority_name in authority_names:
        authority = get_authority_by_name(authority_name)
        if authority:
            authority_ids.append(authority.id)
        else:
            invalid_authorities.append(authority_name)

    if invalid_authorities:
        log_data["warning"] = f"Non-existing authorities: {invalid_authorities}"
    if not authority_ids:
        log_data["message"] = "Skipping task: No valid authorities configured"
        current_app.logger.error(log_data)
        return

    duplicate_candidate_certs = list_recent_valid_certs_issued_by_authority(authority_ids, days_since_issuance)

    log_data["duplicate_candidate_certs_count"] = len(duplicate_candidate_certs)
    current_app.logger.info(log_data)

    skipped_certs = []
    rotation_disabled_certs = []
    unique_common_names = []
    failed_certs = []

    for duplicate_candidate_cert in duplicate_candidate_certs:
        success, duplicates = process_duplicates(duplicate_candidate_cert,
                                                 days_since_issuance,
                                                 skipped_certs,
                                                 rotation_disabled_certs,
                                                 unique_common_names,
                                                 commit
                                                 )
        if not success:
            for cert in duplicates:
                failed_certs.append(cert.name)
                metrics.send("disable_rotation_duplicates", "counter", 1,
                             metric_tags={"status": "failed", "certificate": cert.name}
                             )

    # certs_with_serial_number_count + unique_common_names_count should be equal to
    # rotation_disabled_cert_count + rotation_disabled_cert_count + failed_to_determine_if_duplicate_count
    log_data["message"] = "Summary of task run"
    log_data["unique_common_names_count"] = len(unique_common_names)
    log_data["rotation_disabled_cert_count"] = len(rotation_disabled_certs)
    log_data["certificate_with_no_change_count"] = len(skipped_certs)
    log_data["failed_to_determine_if_duplicate_count"] = len(failed_certs)

    current_app.logger.info(log_data)