in moz_kinto_publisher/main.py [0:0]
def publish_ctlogs(*, args, rw_client):
# Copy CT log metadata from google's v3 log_list to our kinto collection.
# This will notify reviewers who can then manually enroll the log in CRLite.
#
# Schema for our ct-logs kinto collection:
# {
# "admissible": boolean
# "crlite_enrolled": boolean,
# "description": string,
# "key": string,
# "logID": string,
# "mmd": integer,
# "operator": string
# "url": string
# }
#
log_list_json = requests.get(
"https://www.gstatic.com/ct/log_list/v3/log_list.json"
).json()
# The "state" of a log determines whether its SCTs are admissible in policy checks.
# We largely follow Chrome's behavior defined here:
# https://googlechrome.github.io/CertificateTransparency/log_states.html,
# except we are not enforcing the restrictions on "retired" logs.
admissible_states = ["qualified", "usable", "readonly", "retired"]
# Google groups CT logs according to their operators, we want a flat list
upstream_logs_raw = []
for operator in log_list_json["operators"]:
for ctlog in operator["logs"]:
ctlog["operator"] = operator["name"]
ctlog["admissible"] = any(
state in ctlog["state"] for state in admissible_states
)
upstream_logs_raw.append(ctlog)
# Translate |upstream_logs_raw| to our schema (and remove unused fields)
upstream_logs = [
{
"admissible": ctlog["admissible"],
"crlite_enrolled": True,
"description": ctlog["description"],
"key": ctlog["key"],
"logID": ctlog["log_id"],
"mmd": ctlog["mmd"],
"operator": ctlog["operator"],
"url": ctlog["url"],
}
for ctlog in upstream_logs_raw
]
upstream_lut = {ctlog["logID"]: ctlog for ctlog in upstream_logs}
if len(upstream_logs) != len(upstream_lut):
raise ConsistencyException(
"We expect the 'log_id' field to be unique in log_list.json"
)
# LogID is supposed to be a hash of the CT Log's key
for upstream_log in upstream_logs:
rfc6962_log_id = base64.b64encode(
hashlib.sha256(base64.b64decode(upstream_log["key"])).digest()
)
if rfc6962_log_id != upstream_log["logID"].encode("utf8"):
raise ConsistencyException(
f"Google log list contains incorrectly computed logID {upstream_log}"
)
# Fetch our existing Kinto records
known_logs = rw_client.get_records(collection=settings.KINTO_CTLOGS_COLLECTION)
known_lut = {ctlog["logID"]: ctlog for ctlog in known_logs}
if len(known_logs) != len(known_lut):
raise ConsistencyException(
"We expect the 'logID' field to be unique the ct-logs collection"
)
# Add new logs
for upstream_id, upstream_log in upstream_lut.items():
if upstream_id in known_lut:
continue
if args.noop:
log.info(
f"Noop enabled, skipping upload of \"{upstream_log['description']}\"."
)
continue
log.info(f"Uploading new log {upstream_log}")
try:
rw_client.create_record(
collection=settings.KINTO_CTLOGS_COLLECTION,
data=upstream_log,
permissions={"read": ["system.Everyone"]},
)
except KintoException as ke:
log.error(f"Upload failed, {ke}")
# Delete logs that have been removed from Google's list
# (this probably doesn't happen)
for known_id, known_log in known_lut.items():
if known_id in upstream_lut:
continue
if args.noop:
log.info(f"Noop enabled, skipping deletion of {known_log}.")
continue
log.info(
f"Removing log {known_log}, which has been deleted from Google's list."
)
try:
rw_client.delete_record(
collection=settings.KINTO_CTLOGS_COLLECTION,
id=known_log["id"],
)
except KintoException as ke:
log.error(f"Deletion failed, {ke}")
# Update logs if upstream metadata has changed.
# (These will be unenrolled and manually reviewed.)
for known_id, known_log in known_lut.items():
if known_id not in upstream_lut: # skip deletions
continue
upstream_log = upstream_lut[known_id]
# This script is not responsible for updating crlite enrollment,
# so preserve the existing value.
upstream_log["crlite_enrolled"] = known_log["crlite_enrolled"]
need_update = False
for i in ["description", "key", "url", "mmd", "admissible", "operator"]:
if upstream_log[i] != known_log.get(i, None):
need_update = True
if not need_update:
continue
if args.noop:
log.info(f"Noop enabled, skipping update log with id {known_id}.")
continue
log.info(f"Changing {known_log} to {upstream_log}")
try:
rw_client.update_record(
collection=settings.KINTO_CTLOGS_COLLECTION,
data=upstream_log,
id=known_log["id"],
)
except KintoException as ke:
log.error(f"Update failed, {ke}")
if not args.noop:
rw_client.request_review_of_collection(
collection=settings.KINTO_CTLOGS_COLLECTION
)