def publish_ctlogs()

in moz_kinto_publisher/main.py [0:0]


def publish_ctlogs(*, args, rw_client):
    # Copy CT log metadata from google's v3 log_list to our kinto collection.
    # This will notify reviewers who can then manually enroll the log in CRLite.
    #
    # Schema for our ct-logs kinto collection:
    #   {
    #       "admissible": boolean
    #       "crlite_enrolled": boolean,
    #       "description": string,
    #       "key": string,
    #       "logID": string,
    #       "mmd": integer,
    #       "operator": string
    #       "url": string
    #   }
    #

    log_list_json = requests.get(
        "https://www.gstatic.com/ct/log_list/v3/log_list.json"
    ).json()

    # The "state" of a log determines whether its SCTs are admissible in policy checks.
    # We largely follow Chrome's behavior defined here:
    #    https://googlechrome.github.io/CertificateTransparency/log_states.html,
    # except we are not enforcing the restrictions on "retired" logs.
    admissible_states = ["qualified", "usable", "readonly", "retired"]

    # Google groups CT logs according to their operators, we want a flat list
    upstream_logs_raw = []
    for operator in log_list_json["operators"]:
        for ctlog in operator["logs"]:
            ctlog["operator"] = operator["name"]
            ctlog["admissible"] = any(
                state in ctlog["state"] for state in admissible_states
            )
            upstream_logs_raw.append(ctlog)

    # Translate |upstream_logs_raw| to our schema (and remove unused fields)
    upstream_logs = [
        {
            "admissible": ctlog["admissible"],
            "crlite_enrolled": True,
            "description": ctlog["description"],
            "key": ctlog["key"],
            "logID": ctlog["log_id"],
            "mmd": ctlog["mmd"],
            "operator": ctlog["operator"],
            "url": ctlog["url"],
        }
        for ctlog in upstream_logs_raw
    ]
    upstream_lut = {ctlog["logID"]: ctlog for ctlog in upstream_logs}

    if len(upstream_logs) != len(upstream_lut):
        raise ConsistencyException(
            "We expect the 'log_id' field to be unique in log_list.json"
        )

    # LogID is supposed to be a hash of the CT Log's key
    for upstream_log in upstream_logs:
        rfc6962_log_id = base64.b64encode(
            hashlib.sha256(base64.b64decode(upstream_log["key"])).digest()
        )
        if rfc6962_log_id != upstream_log["logID"].encode("utf8"):
            raise ConsistencyException(
                f"Google log list contains incorrectly computed logID {upstream_log}"
            )

    # Fetch our existing Kinto records
    known_logs = rw_client.get_records(collection=settings.KINTO_CTLOGS_COLLECTION)
    known_lut = {ctlog["logID"]: ctlog for ctlog in known_logs}

    if len(known_logs) != len(known_lut):
        raise ConsistencyException(
            "We expect the 'logID' field to be unique the ct-logs collection"
        )

    # Add new logs
    for upstream_id, upstream_log in upstream_lut.items():
        if upstream_id in known_lut:
            continue

        if args.noop:
            log.info(
                f"Noop enabled, skipping upload of \"{upstream_log['description']}\"."
            )
            continue

        log.info(f"Uploading new log {upstream_log}")
        try:
            rw_client.create_record(
                collection=settings.KINTO_CTLOGS_COLLECTION,
                data=upstream_log,
                permissions={"read": ["system.Everyone"]},
            )
        except KintoException as ke:
            log.error(f"Upload failed, {ke}")

    # Delete logs that have been removed from Google's list
    # (this probably doesn't happen)
    for known_id, known_log in known_lut.items():
        if known_id in upstream_lut:
            continue

        if args.noop:
            log.info(f"Noop enabled, skipping deletion of {known_log}.")
            continue

        log.info(
            f"Removing log {known_log}, which has been deleted from Google's list."
        )
        try:
            rw_client.delete_record(
                collection=settings.KINTO_CTLOGS_COLLECTION,
                id=known_log["id"],
            )
        except KintoException as ke:
            log.error(f"Deletion failed, {ke}")

    # Update logs if upstream metadata has changed.
    # (These will be unenrolled and manually reviewed.)
    for known_id, known_log in known_lut.items():
        if known_id not in upstream_lut:  # skip deletions
            continue

        upstream_log = upstream_lut[known_id]

        # This script is not responsible for updating crlite enrollment,
        # so preserve the existing value.
        upstream_log["crlite_enrolled"] = known_log["crlite_enrolled"]

        need_update = False
        for i in ["description", "key", "url", "mmd", "admissible", "operator"]:
            if upstream_log[i] != known_log.get(i, None):
                need_update = True

        if not need_update:
            continue

        if args.noop:
            log.info(f"Noop enabled, skipping update log with id {known_id}.")
            continue

        log.info(f"Changing {known_log} to {upstream_log}")
        try:
            rw_client.update_record(
                collection=settings.KINTO_CTLOGS_COLLECTION,
                data=upstream_log,
                id=known_log["id"],
            )
        except KintoException as ke:
            log.error(f"Update failed, {ke}")

    if not args.noop:
        rw_client.request_review_of_collection(
            collection=settings.KINTO_CTLOGS_COLLECTION
        )