moz_kinto_publisher/main.py (1,009 lines of code) (raw):

#!/usr/bin/env python3 import argparse import base64 import hashlib import json import math import re import tempfile import time from datetime import datetime, timedelta, timezone from functools import lru_cache from pathlib import Path import requests from collections import namedtuple from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat from kinto_http import Client from kinto_http.exceptions import KintoException from kinto_http.patch_type import BasicPatch import glog as log import workflow import settings # To add a channel: give it a unique slug and add it to the CHANNELS list with # the enabled flag set to True. # # To remove a channel: set its enabled flag to False in the CHANNELS list. # It can be removed from the CHANNELS list once all records for the channel # are removed from remote settings. # # NOTE: Channel names cannot contain underscores. CHANNEL_EXPERIMENTAL_DELTAS = "experimental+deltas" CHANNEL_DEFAULT = "default" CHANNEL_COMPAT = "compat" Channel = namedtuple( "Channel", [ "slug", "dir", "delta_filename", "max_filter_age_days", "supported_version", "mimetype", "enabled", ], ) CHANNELS = [ Channel( slug=CHANNEL_EXPERIMENTAL_DELTAS, dir="clubcard-all", max_filter_age_days=20, delta_filename="filter.delta", supported_version=133, # The metadata in clubcards produced by clubcard-crlite version 0.3.* # is somewhat compressible, so set a mimetype that encourages our CDN to use # compression (see: https://cloud.google.com/cdn/docs/dynamic-compression). mimetype="application/x-protobuf", enabled=False, ), Channel( slug=CHANNEL_DEFAULT, dir="clubcard-all", max_filter_age_days=45, delta_filename="filter.delta", supported_version=133, # The metadata in clubcards produced by clubcard-crlite version 0.3.* # is somewhat compressible, so set a mimetype that encourages our CDN to use # compression (see: https://cloud.google.com/cdn/docs/dynamic-compression). mimetype="application/x-protobuf", enabled=True, ), Channel( slug=CHANNEL_COMPAT, dir="clubcard-priority", max_filter_age_days=45, delta_filename="filter.delta", supported_version=133, # The metadata in clubcards produced by clubcard-crlite version 0.3.* # is somewhat compressible, so set a mimetype that encourages our CDN to use # compression (see: https://cloud.google.com/cdn/docs/dynamic-compression). mimetype="application/x-protobuf", enabled=True, ), ] class IntermediateRecordError(KintoException): pass class TimeoutException(Exception): pass class ConsistencyException(Exception): pass class PublishedRunDB: def __init__(self, filter_bucket): self.filter_bucket = filter_bucket self.run_identifiers = workflow.get_run_identifiers(self.filter_bucket) self.cached_run_times = {} def __len__(self): return len(self.run_identifiers) def is_run_valid(self, run_id, channel): is_valid = ( workflow.google_cloud_file_exists( self.filter_bucket, f"{run_id}/{channel.dir}/filter" ) and workflow.google_cloud_file_exists( self.filter_bucket, f"{run_id}/{channel.dir}/{channel.delta_filename}" ) and workflow.google_cloud_file_exists( self.filter_bucket, f"{run_id}/ct-logs.json" ) ) log.debug(f"{run_id} {'Is Valid' if is_valid else 'Is Not Valid'}") return is_valid def is_run_ready(self, run_id): is_ready = workflow.google_cloud_file_exists( self.filter_bucket, f"{run_id}/completed" ) log.debug(f"{run_id}/completed {'is ready' if is_ready else 'is not ready'}") return is_ready def await_most_recent_run(self, *, timeout=timedelta(minutes=5)): run_id = self.most_recent_id() time_start = datetime.now() while run_id is None or not self.is_run_ready(run_id): time_waiting = datetime.now() - time_start if time_waiting >= timeout: raise TimeoutException(f"{time_waiting}") log.warning( f"{run_id}/completed not found, retrying (waiting={time_waiting}, " + f"deadline={timeout-time_waiting})" ) time.sleep(30) run_id = self.most_recent_id() def most_recent_id(self): return self.run_identifiers[-1] if len(self.run_identifiers) else None def get_run_timestamp(self, run_id): if run_id not in self.cached_run_times: byte_str = workflow.download_from_google_cloud_to_string( self.filter_bucket, f"{run_id}/timestamp" ) self.cached_run_times[run_id] = datetime.fromisoformat( byte_str.decode("utf-8") ).replace(tzinfo=timezone.utc) return self.cached_run_times[run_id] def asciiPemToBinaryDer(pem: str) -> bytes: matches = re.search( r"(?<=-----BEGIN CERTIFICATE-----).*?(?=-----END CERTIFICATE-----)", pem, flags=re.DOTALL, ) return base64.b64decode(matches.group(0)) @lru_cache def get_attachments_base_url(client): return client.server_info()["capabilities"]["attachments"]["base_url"] class PublisherClient(Client): def attach_file( self, *, collection=None, filePath=None, fileName="file", fileContents=None, mimeType="application/octet-stream", recordId=None, ): if not ((filePath is None) ^ (fileContents is None)): raise Exception("Must specify either filePath or fileContents") if filePath: with open(filePath, "rb") as f: fileContents = f.read() attachmentEndpoint = "buckets/{}/collections/{}/records/{}/attachment".format( self.bucket_name, collection or self.collection_name, recordId ) response = requests.post( self.session.server_url + attachmentEndpoint, files=[("attachment", (fileName, fileContents, mimeType))], auth=self.session.auth, ) if response.status_code > 200: raise KintoException( f"Couldn't attach file at endpoint {self.session.server_url}{attachmentEndpoint}: " + f"{response.content.decode('utf-8')}" ) def request_review_of_collection(self, *, collection=None): try: resp = self.get_collection(id=collection) except KintoException as e: log.error("Couldn't determine {collection} review status") raise e original = resp.get("data") if original is None: raise KintoException("Malformed response from Kinto") status = original.get("status") if status is None: raise KintoException("Malformed response from Kinto") if status != "work-in-progress": log.info(f"Collection {collection} is unchanged. Does not need review.") return try: resp = self.patch_collection( original=original, changes=BasicPatch({"status": "to-review"}) ) except KintoException as e: log.error("Couldn't request review of {collection}") raise e class AttachedPem: def __init__(self, **kwargs): self.filename = kwargs["filename"] self.size = kwargs["size"] self.location = kwargs["location"] self.mimetype = kwargs["mimetype"] self.hash = kwargs["hash"] def _get_attributes(self): return { "filename": self.filename, "size": self.size, "location": self.location, "mimetype": self.mimetype, "hash": self.hash, } def __str__(self): return "{{PEM: {} [h={} s={}]}}".format(self.filename, self.hash, self.size) def allIn(keys, record): return all(k in record for k in keys) def exactlyOneIn(keys, record): return sum(k in record for k in keys) == 1 class Intermediate: cert: x509.Certificate derHash: bytes kinto_id: str pemAttachment: AttachedPem pemData: str pemHash: str pubKeyHash: bytes subject: str def __init__(self, **kwargs): self.derHash = None self.kinto_id = None self.pemAttachment = None self.pemData = None self.pemHash = None parseError = IntermediateRecordError(f"Malformed record: {kwargs}") # Local records have a "pem" field. # RemoteSettings records have "attachment". # TODO(jms): These should be versioned (See Issue 180). if not exactlyOneIn(["pem", "attachment"], kwargs): raise parseError if "pem" in kwargs and not allIn( [ "pubKeyHash", "subject", ], kwargs, ): raise parseError if "attachment" in kwargs and not allIn( [ "derHash", "id", ], kwargs, ): raise parseError try: if "pubKeyHash" in kwargs: self.pubKeyHash = base64.b64decode( kwargs["pubKeyHash"], altchars="-_", validate=True ) # sha256 of the SPKI else: self.pubKeyHash = None except base64.binascii.Error: raise parseError if self.pubKeyHash and len(self.pubKeyHash) != 32: raise IntermediateRecordError(f"Invalid pubkey hash: {kwargs}") try: if "derHash" in kwargs: self.derHash = base64.b64decode( kwargs["derHash"], altchars="-_", validate=True ) else: self.derHash = None except base64.binascii.Error: raise parseError if self.derHash and len(self.derHash) != 32: raise IntermediateRecordError(f"Invalid DER hash. {kwargs}") if "pem" in kwargs: self.set_pem(kwargs["pem"]) if "attachment" in kwargs: self.kinto_id = kwargs["id"] self.cert = None self.pemAttachment = AttachedPem(**kwargs["attachment"]) self.pemHash = self.pemAttachment.hash if len(self.pemHash) != 64: # sha256 hexdigest raise IntermediateRecordError(f"Invalid hash. {kwargs}") def __str__(self): return ( f"{{Int: {self.subject} " + f"[h={base64.b85encode(self.pubKeyHash).decode('utf-8')}" ) def unique_id(self): return ( f"{base64.b85encode(self.pubKeyHash).decode('utf-8')}" + f"-{self.subject}-{self.derHash}" ) def _get_attributes(self, *, complete=False): attributes = { "derHash": base64.standard_b64encode(self.derHash).decode("utf-8"), } if complete and self.pemAttachment: attributes["attachment"] = self.pemAttachment._get_attributes() return attributes def _upload_pem(self, *, rw_client=None, kinto_id=None): rw_client.attach_file( collection=settings.KINTO_INTERMEDIATES_COLLECTION, fileContents=self.pemData, fileName=f"{base64.urlsafe_b64encode(self.pubKeyHash).decode('utf-8')}.pem", mimeType="text/plain", recordId=kinto_id or self.kinto_id, ) def equals(self, *, remote_record=None): return self._get_attributes() == remote_record._get_attributes() def set_pem(self, pem_data): self.pemData = pem_data self.pemHash = hashlib.sha256(pem_data.encode("utf-8")).hexdigest() derCert = asciiPemToBinaryDer(pem_data) try: self.cert = x509.load_pem_x509_certificate( pem_data.encode("utf-8"), default_backend() ) except Exception as e: raise IntermediateRecordError("Cannot parse PEM data: {}".format(e)) derHash = hashlib.sha256(self.cert.public_bytes(Encoding.DER)).digest() if self.derHash and self.derHash != derHash: raise IntermediateRecordError("DER hash does not match") self.derHash = derHash self.subject = self.cert.subject.rfc4514_string() derSpki = self.cert.public_key().public_bytes( encoding=Encoding.DER, format=PublicFormat.SubjectPublicKeyInfo ) spkiHash = hashlib.sha256(derSpki).digest() if self.pubKeyHash and self.pubKeyHash != spkiHash: raise IntermediateRecordError("SPKI hash does not match") self.pubKeyHash = spkiHash def download_pem(self, kinto_client): if not self.pemAttachment: raise Exception("pemAttachment not set") r = requests.get( f"{get_attachments_base_url(kinto_client)}{self.pemAttachment.location}" ) r.raise_for_status() self.set_pem(r.text) def is_expired(self, kinto_client=None): if not self.cert: if not kinto_client: raise Exception("cannot download PEM without client") self.download_pem(kinto_client) return self.cert.not_valid_after <= datetime.now(datetime.UTC) def delete_from_kinto(self, *, rw_client=None): if self.kinto_id is None: raise IntermediateRecordError( "Cannot delete a record not at Kinto: {}".format(self) ) rw_client.delete_record( collection=settings.KINTO_INTERMEDIATES_COLLECTION, id=self.kinto_id, ) def update_kinto(self, *, remote_record=None, rw_client=None): if self.pemData is None: raise IntermediateRecordError( "Cannot upload a record not local: {}".format(self) ) if remote_record is None: raise IntermediateRecordError("Must provide a remote record") if remote_record.kinto_id is None: raise IntermediateRecordError( "No kinto ID available {}".format(remote_record) ) if not remote_record.pemHash == self.pemHash: log.warning("Attachment update needed for {}".format(self)) log.warning("New: {}".format(self.pemData)) # TODO: Do we delete the record? Right now it'll get caught at the end but # not get fixed. raise IntermediateRecordError( "Attachment is incorrect for ID {}".format(remote_record.kinto_id) ) # Make sure to put back the existing PEM attachment data self.pemAttachment = remote_record.pemAttachment rw_client.update_record( collection=settings.KINTO_INTERMEDIATES_COLLECTION, data=self._get_attributes(complete=True), id=remote_record.kinto_id, ) def add_to_kinto(self, *, rw_client=None): if self.pemData is None: raise IntermediateRecordError( "Cannot upload a record not local: {}".format(self) ) attributes = self._get_attributes() perms = {"read": ["system.Everyone"]} record = rw_client.create_record( collection=settings.KINTO_INTERMEDIATES_COLLECTION, data=attributes, permissions=perms, ) self.kinto_id = record["data"]["id"] try: self._upload_pem(rw_client=rw_client) except KintoException as ke: log.error( "Failed to upload attachment. Removing stale intermediate record {}.".format( self.kinto_id ) ) rw_client.delete_record( collection=settings.KINTO_INTERMEDIATES_COLLECTION, id=self.kinto_id, ) log.error("Stale record deleted.") raise ke def details(self): return self._get_attributes() def load_local_intermediates(*, intermediates_path): local_intermediates = {} with intermediates_path.open("r") as f: entries = json.load(f) for entry in entries: try: intObj = Intermediate(**entry) if intObj.unique_id() in local_intermediates: log.warning( f"[{intObj.unique_id()}] Local collision: {intObj} with " + f"{local_intermediates[intObj.unique_id()]}" ) continue local_intermediates[intObj.unique_id()] = intObj except IntermediateRecordError: log.warning( "IntermediateRecordError: {} while importing from {}".format( entry, f.name ) ) continue except Exception as e: log.error("Error importing file from {}: {}".format(f.name, e)) log.error("Record: {}".format(entry)) raise e return local_intermediates def load_remote_intermediates(*, kinto_client): remote_intermediates = {} remote_error_records = [] for record in kinto_client.get_records( collection=settings.KINTO_INTERMEDIATES_COLLECTION ): try: intObj = Intermediate(**record) intObj.download_pem( kinto_client ) # intObj.pemAttachment was set by constructor if intObj.unique_id() in remote_intermediates: log.warning(f"Will remove duplicate intermediate: {intObj}") remote_error_records.append(record) else: remote_intermediates[intObj.unique_id()] = intObj except IntermediateRecordError as ire: log.warning("Skipping broken intermediate record at Kinto: {}".format(ire)) remote_error_records.append(record) return remote_intermediates, remote_error_records def publish_intermediates(*, args, rw_client): if args.enrolled_json: # when using a local copy of enrolled.json we don't need to determine # the most recent run identifier. run_id = "local" else: run_identifiers = workflow.get_run_identifiers(args.filter_bucket) if not run_identifiers: log.warning("No run identifiers found") return run_id = run_identifiers[-1] run_id_path = args.download_path / Path(run_id) run_id_path.mkdir(parents=True, exist_ok=True) if args.enrolled_json: intermediates_path = Path(args.enrolled_json) else: intermediates_path = run_id_path / Path("enrolled.json") workflow.download_and_retry_from_google_cloud( args.filter_bucket, f"{run_id}/enrolled.json", intermediates_path, timeout=timedelta(minutes=5), ) local_intermediates = load_local_intermediates( intermediates_path=intermediates_path ) remote_intermediates, remote_error_records = load_remote_intermediates( kinto_client=rw_client ) remote_only = set(remote_intermediates.keys()) - set(local_intermediates.keys()) to_upload = set(local_intermediates.keys()) - set(remote_intermediates.keys()) to_update = set() for i in set(local_intermediates.keys()) & set(remote_intermediates.keys()): if not local_intermediates[i].equals(remote_record=remote_intermediates[i]): to_update.add(i) remote_expired = set() for i in remote_only: try: if remote_intermediates[i].is_expired(kinto_client=rw_client): remote_expired.add(i) except Exception as e: log.warning(f"Failed to track expiration for {i}: {e}") log.info(f"Local intermediates: {len(local_intermediates)}") log.info(f"Remote intermediates: {len(remote_intermediates)}") log.info(f"- Expired: {len(remote_expired)}") log.info(f"- In error: {len(remote_error_records)}") log.info(f"To add: {len(to_upload)}") log.info(f"To update: {len(to_update)}") log.info(f"To delete: {len(remote_only)}") if args.noop: log.info("Noop flag set, exiting before any intermediate updates") return # All intermediates must be in the local list for unique_id in remote_only: record = remote_intermediates[unique_id] log.info(f"Removing deleted intermediate {record}") try: record.delete_from_kinto(rw_client=rw_client) except KintoException as ke: log.error(f"Couldn't delete record {record}: {ke}") # Delete any remote records that had errors # (note these "records" are just dictionaries) for raw_record in remote_error_records: log.info(f"Deleting remote record with error: {raw_record}") try: rw_client.delete_record( collection=settings.KINTO_INTERMEDIATES_COLLECTION, id=raw_record["id"], ) except KintoException as ke: log.error(f"Couldn't delete record id {raw_record['id']}: {ke}") except KeyError: # raw_record doesn't have "id" log.error(f"Couldn't delete record: {raw_record}") # New records for unique_id in to_upload: record = local_intermediates[unique_id] log.info(f"Adding new record: {record}") try: record.add_to_kinto(rw_client=rw_client) except KintoException as ke: log.error(f"Couldn't add record {record}: {ke}") # Updates for unique_id in to_update: local_int = local_intermediates[unique_id] remote_int = remote_intermediates[unique_id] log.info(f"Updating record: {remote_int} to {local_int}") try: local_int.update_kinto( rw_client=rw_client, remote_record=remote_int, ) except KintoException as ke: log.error( f"Failed to update local={local_int} remote={remote_int} exception={ke}" ) log.info("Verifying correctness...") verified_intermediates, verified_error_records = load_remote_intermediates( kinto_client=rw_client ) if len(verified_error_records) > 0: raise KintoException( f"There are {len(verified_error_records)} broken intermediates. Re-run to fix." ) # Every local intermediate should be in the remote list for unique_id, local_int in local_intermediates.items(): if unique_id not in verified_intermediates: raise KintoException(f"Failed to upload {unique_id}") if not local_int.equals(remote_record=verified_intermediates[unique_id]): raise KintoException( "Local/Remote metadata mismatch for uniqueId={}".format(unique_id) ) # Every remote intermediate should be in the local list for unique_id in verified_intermediates.keys(): if unique_id not in local_intermediates: raise KintoException(f"Failed to remove {unique_id}") rw_client.request_review_of_collection( collection=settings.KINTO_INTERMEDIATES_COLLECTION ) def clear_crlite_filters(*, rw_client, noop, channel): if noop: log.info("Would clean up CRLite filters, but no-op set") return existing_records = rw_client.get_records( collection=settings.KINTO_CRLITE_COLLECTION ) existing_records = [ x for x in existing_records if x.get("channel", CHANNEL_DEFAULT) == channel.slug ] existing_filters = filter(lambda x: x["incremental"] is False, existing_records) for filter_record in existing_filters: log.info(f"Cleaning up stale filter record {filter_record['id']}.") rw_client.delete_record( collection=settings.KINTO_CRLITE_COLLECTION, id=filter_record["id"], ) def clear_crlite_deltas(*, rw_client, noop, channel): if noop: log.info("Would clean up CRLite deltas, but no-op set") return existing_records = rw_client.get_records( collection=settings.KINTO_CRLITE_COLLECTION ) existing_records = [ x for x in existing_records if x.get("channel", CHANNEL_DEFAULT) == channel.slug ] existing_deltas = filter(lambda x: x["incremental"] is True, existing_records) for delta in existing_deltas: log.info(f"Cleaning up stale delta record {delta['id']}.") rw_client.delete_record( collection=settings.KINTO_CRLITE_COLLECTION, id=delta["id"], ) def publish_crlite_record( *, attributes, attachment_path, attachment_name, rw_client, channel, noop, ): if noop: log.info("NoOp mode enabled") return attributes["details"]["name"] attributes["channel"] = channel.slug # You can test a filter expression relative to a mock context using the # Firefox browser console as follows. # let {FilterExpressions} = ChromeUtils.importESModule("resource://gre/modules/components-utils/FilterExpressions.sys.mjs") # let expression = "env.version|versionCompare('124.0a1') >= 0" # let context = {env: {version:"130.0.1"}} # await FilterExpressions.eval(expression, context) # See https://remote-settings.readthedocs.io/en/latest/target-filters.html # for the expression syntax and the definition of env. attributes[ "filter_expression" ] = f"env.version|versionCompare('{channel.supported_version}.!') >= 0 && '{channel.slug}' == 'security.pki.crlite_channel'|preferenceValue('none')" record = rw_client.create_record( collection=settings.KINTO_CRLITE_COLLECTION, data=attributes, permissions={"read": ["system.Everyone"]}, ) recordid = record["data"]["id"] try: rw_client.attach_file( collection=settings.KINTO_CRLITE_COLLECTION, fileName=attachment_name, filePath=attachment_path, recordId=recordid, mimeType=channel.mimetype, ) except KintoException as ke: log.error( f"Failed to upload attachment. Removing stale CRLite record {recordid}: {ke}" ) rw_client.delete_record( collection=settings.KINTO_CRLITE_COLLECTION, id=recordid, ) log.error("Stale record deleted.") raise ke return recordid def publish_crlite_main_filter( *, rw_client, filter_path, filename, timestamp, noop, channel ): record_time = timestamp.isoformat(timespec="seconds") record_epoch_time_ms = math.floor(timestamp.timestamp() * 1000) identifier = f"{record_time}Z-full" attributes = { "details": {"name": identifier}, "incremental": False, "effectiveTimestamp": record_epoch_time_ms, "coverage": [], # legacy attribute "enrolledIssuers": [], # legacy attribute } log.info(f"Publishing full filter {filter_path} {timestamp}") return publish_crlite_record( rw_client=rw_client, attributes=attributes, attachment_path=filter_path, attachment_name=filename, channel=channel, noop=noop, ) def publish_crlite_delta( *, rw_client, delta_path, filename, timestamp, previous_id, noop, channel ): record_time = timestamp.isoformat(timespec="seconds") record_epoch_time_ms = math.floor(timestamp.timestamp() * 1000) identifier = f"{record_time}Z-diff" attributes = { "details": {"name": identifier}, "incremental": True, "effectiveTimestamp": record_epoch_time_ms, "parent": previous_id, } log.info( f"Publishing incremental filter {delta_path} {timestamp} previous={previous_id}" ) return publish_crlite_record( rw_client=rw_client, attributes=attributes, attachment_path=delta_path, attachment_name=filename, channel=channel, noop=noop, ) def timestamp_from_record(record): iso_string = record["details"]["name"].split("Z-")[0] return datetime.fromisoformat(iso_string).replace(tzinfo=timezone.utc) def crlite_verify_record_consistency(*, existing_records, channel): # This function assumes that existing_records is sorted according to # record["details"]["name"], which is a "YYYY-MM-DDTHH:MM:SS+00:00Z" # timestamp. existing_records = [ x for x in existing_records if x.get("channel", CHANNEL_DEFAULT) == channel.slug ] # It's OK if there are no records yet. if len(existing_records) == 0: return for r in existing_records: if not ("id" in r and "incremental" in r and "attachment" in r): raise ConsistencyException(f"Malformed record {r}.") if r["incremental"] and not "parent" in r: raise ConsistencyException(f"Malformed record {r}.") # There must be exactly 1 full filter in the existing records. full_filters = [r for r in existing_records if not r["incremental"]] if len(full_filters) == 0: raise ConsistencyException("No full filters.") if len(full_filters) >= 2: raise ConsistencyException(f"Multiple full filters: {full_filters}") # Each incremental filter should be a descendent of the full filter ids = {r["id"]: r for r in existing_records} maxHeight = 0 for r in existing_records: ptr = r["id"] height = 0 while ids[ptr]["incremental"]: ptr = ids[ptr]["parent"] if ptr not in ids: raise ConsistencyException(f"Record {r['id']} has unknown parent {ptr}") height += 1 if height > len(existing_records): raise ConsistencyException(f"Record parent cycle") maxHeight = max(height, maxHeight) # The incremental filters should form a chain (no branching), hence there's # an incremental filter len(existing_records)-1 steps away from the full # filter. if maxHeight != len(existing_records) - 1: raise ConsistencyException(f"Multiple filter descendents: {full_filters}") def crlite_verify_run_id_consistency(*, run_db, identifiers_to_check, channel): # The runs should be complete. for r in identifiers_to_check: if not run_db.is_run_ready(r): raise ConsistencyException(f"Run is not ready: {r}") # Each run should be valid. for r in identifiers_to_check: if not run_db.is_run_valid(r, channel): raise ConsistencyException(f"Not a valid run: {r}") # When sorted by run ID, the runs should have increasing timestamps. identifiers_to_check.sort(key=lambda x: [int(y) for y in x.split("-")]) ts = [run_db.get_run_timestamp(r) for r in identifiers_to_check] for x, y in zip(ts, ts[1:]): if x > y: raise ConsistencyException(f"Out-of-order timestamp: {ts}") def crlite_determine_publish(*, existing_records, run_db, channel): assert len(run_db) > 0, "There must be run identifiers" # The default behavior is to clear all records and upload a full # filter based on the most recent run. We'll check if we can do # an incremental update instead. new_run_id = run_db.most_recent_id() default = {"clear_all": True, "upload": [new_run_id]} # If there are no existing records, publish a full filter. if not existing_records: log.info("No existing records") return default # If the existing records are bad, publish a full filter. try: crlite_verify_record_consistency( existing_records=existing_records, channel=channel ) except ConsistencyException as se: log.error(f"Failed to verify existing record consistency: {se}") return default # A run ID is a "YYYYMMDD" date and an index, e.g. "20210101-3". # The record["attachment"]["filename"] field of an existing record is # in the format "<run id>-channel.filter", "<run id>-channel.filter.stash", # or "<run id>-channel.filter.delta". record_run_ids = [ record["attachment"]["filename"].rsplit("-", 1)[0] for record in existing_records ] # Get a list of run IDs that are newer than any existing record. # These are candidates for inclusion in an incremental update. old_run_ids = [] new_run_ids = [] cut_date, cut_idx = [int(x) for x in record_run_ids[-1].split("-")] for run_id in run_db.run_identifiers: run_date, run_idx = [int(x) for x in run_id.split("-")] if run_date < cut_date or (run_date == cut_date and run_idx <= cut_idx): old_run_ids.append(run_id) else: new_run_ids.append(run_id) # If we don't have data from old runs, publish a full filter. for run_id in record_run_ids: if run_id not in old_run_ids: log.error("We do not have data to support existing records.") return default # If the new runs fail a consistency check, publish a full filter. try: crlite_verify_run_id_consistency( run_db=run_db, identifiers_to_check=new_run_ids, channel=channel ) except ConsistencyException as se: log.error(f"Failed to verify run ID consistency: {se}") return default # If the full filter is too old, publish a full filter. earliest_timestamp = run_db.get_run_timestamp(min(record_run_ids)) new_timestamp = run_db.get_run_timestamp(new_run_id) if new_timestamp - earliest_timestamp >= timedelta( days=channel.max_filter_age_days ): log.info(f"Published full filter is >= {channel.max_filter_age_days} days old") return default return {"clear_all": False, "upload": new_run_ids} def publish_crlite(*, args, rw_client, channel, timeout=timedelta(minutes=5)): # returns the run_id of a new full filter if one is published, otherwise None rv = None existing_records = rw_client.get_records( collection=settings.KINTO_CRLITE_COLLECTION ) existing_records = [ x for x in existing_records if x.get("channel", CHANNEL_DEFAULT) == channel.slug ] # Sort existing_records for crlite_verify_record_consistency, # which gets called in crlite_determine_publish. existing_records = sorted(existing_records, key=lambda x: x["details"]["name"]) published_run_db = PublishedRunDB(args.filter_bucket) # Wait for the most recent run to finish. try: published_run_db.await_most_recent_run(timeout=timeout) except TimeoutException as te: log.warning(f"The most recent run is not ready to be published (waited {te}).") return rv tasks = crlite_determine_publish( existing_records=existing_records, run_db=published_run_db, channel=channel ) log.debug(f"crlite_determine_publish tasks={tasks}") if not tasks["upload"]: log.info("Nothing to do.") return rv args.download_path.mkdir(parents=True, exist_ok=True) final_run_id = tasks["upload"][-1] final_run_id_path = args.download_path / Path(final_run_id) final_run_id_path.mkdir(parents=True, exist_ok=True) filter_path = final_run_id_path / Path("filter") workflow.download_and_retry_from_google_cloud( args.filter_bucket, f"{final_run_id}/{channel.dir}/filter", filter_path, timeout=timedelta(minutes=5), ) if tasks["clear_all"]: log.info(f"Uploading a full filter based on {final_run_id}.") clear_crlite_filters(rw_client=rw_client, noop=args.noop, channel=channel) clear_crlite_deltas(rw_client=rw_client, noop=args.noop, channel=channel) assert filter_path.is_file(), "Missing local copy of filter" publish_crlite_main_filter( filter_path=filter_path, filename=f"{final_run_id}-{channel.slug}.filter", rw_client=rw_client, timestamp=published_run_db.get_run_timestamp(final_run_id), channel=channel, noop=args.noop, ) rv = final_run_id else: log.info("Uploading deltas.") previous_id = existing_records[-1]["id"] for run_id in tasks["upload"]: run_id_path = args.download_path / Path(run_id) run_id_path.mkdir(parents=True, exist_ok=True) delta_path = run_id_path / Path("delta") workflow.download_and_retry_from_google_cloud( args.filter_bucket, f"{run_id}/{channel.dir}/{channel.delta_filename}", delta_path, timeout=timedelta(minutes=5), ) assert delta_path.is_file(), "Missing local copy of delta" previous_id = publish_crlite_delta( delta_path=delta_path, filename=f"{run_id}-{channel.slug}.{channel.delta_filename}", rw_client=rw_client, previous_id=previous_id, timestamp=published_run_db.get_run_timestamp(run_id), channel=channel, noop=args.noop, ) if not args.noop: rw_client.request_review_of_collection( collection=settings.KINTO_CRLITE_COLLECTION ) return rv def publish_ctlogs(*, args, rw_client): # Copy CT log metadata from google's v3 log_list to our kinto collection. # This will notify reviewers who can then manually enroll the log in CRLite. # # Schema for our ct-logs kinto collection: # { # "admissible": boolean # "crlite_enrolled": boolean, # "description": string, # "key": string, # "logID": string, # "mmd": integer, # "operator": string # "url": string # } # log_list_json = requests.get( "https://www.gstatic.com/ct/log_list/v3/log_list.json" ).json() # The "state" of a log determines whether its SCTs are admissible in policy checks. # We largely follow Chrome's behavior defined here: # https://googlechrome.github.io/CertificateTransparency/log_states.html, # except we are not enforcing the restrictions on "retired" logs. admissible_states = ["qualified", "usable", "readonly", "retired"] # Google groups CT logs according to their operators, we want a flat list upstream_logs_raw = [] for operator in log_list_json["operators"]: for ctlog in operator["logs"]: ctlog["operator"] = operator["name"] ctlog["admissible"] = any( state in ctlog["state"] for state in admissible_states ) upstream_logs_raw.append(ctlog) # Translate |upstream_logs_raw| to our schema (and remove unused fields) upstream_logs = [ { "admissible": ctlog["admissible"], "crlite_enrolled": True, "description": ctlog["description"], "key": ctlog["key"], "logID": ctlog["log_id"], "mmd": ctlog["mmd"], "operator": ctlog["operator"], "url": ctlog["url"], } for ctlog in upstream_logs_raw ] upstream_lut = {ctlog["logID"]: ctlog for ctlog in upstream_logs} if len(upstream_logs) != len(upstream_lut): raise ConsistencyException( "We expect the 'log_id' field to be unique in log_list.json" ) # LogID is supposed to be a hash of the CT Log's key for upstream_log in upstream_logs: rfc6962_log_id = base64.b64encode( hashlib.sha256(base64.b64decode(upstream_log["key"])).digest() ) if rfc6962_log_id != upstream_log["logID"].encode("utf8"): raise ConsistencyException( f"Google log list contains incorrectly computed logID {upstream_log}" ) # Fetch our existing Kinto records known_logs = rw_client.get_records(collection=settings.KINTO_CTLOGS_COLLECTION) known_lut = {ctlog["logID"]: ctlog for ctlog in known_logs} if len(known_logs) != len(known_lut): raise ConsistencyException( "We expect the 'logID' field to be unique the ct-logs collection" ) # Add new logs for upstream_id, upstream_log in upstream_lut.items(): if upstream_id in known_lut: continue if args.noop: log.info( f"Noop enabled, skipping upload of \"{upstream_log['description']}\"." ) continue log.info(f"Uploading new log {upstream_log}") try: rw_client.create_record( collection=settings.KINTO_CTLOGS_COLLECTION, data=upstream_log, permissions={"read": ["system.Everyone"]}, ) except KintoException as ke: log.error(f"Upload failed, {ke}") # Delete logs that have been removed from Google's list # (this probably doesn't happen) for known_id, known_log in known_lut.items(): if known_id in upstream_lut: continue if args.noop: log.info(f"Noop enabled, skipping deletion of {known_log}.") continue log.info( f"Removing log {known_log}, which has been deleted from Google's list." ) try: rw_client.delete_record( collection=settings.KINTO_CTLOGS_COLLECTION, id=known_log["id"], ) except KintoException as ke: log.error(f"Deletion failed, {ke}") # Update logs if upstream metadata has changed. # (These will be unenrolled and manually reviewed.) for known_id, known_log in known_lut.items(): if known_id not in upstream_lut: # skip deletions continue upstream_log = upstream_lut[known_id] # This script is not responsible for updating crlite enrollment, # so preserve the existing value. upstream_log["crlite_enrolled"] = known_log["crlite_enrolled"] need_update = False for i in ["description", "key", "url", "mmd", "admissible", "operator"]: if upstream_log[i] != known_log.get(i, None): need_update = True if not need_update: continue if args.noop: log.info(f"Noop enabled, skipping update log with id {known_id}.") continue log.info(f"Changing {known_log} to {upstream_log}") try: rw_client.update_record( collection=settings.KINTO_CTLOGS_COLLECTION, data=upstream_log, id=known_log["id"], ) except KintoException as ke: log.error(f"Update failed, {ke}") if not args.noop: rw_client.request_review_of_collection( collection=settings.KINTO_CTLOGS_COLLECTION ) def main(): parser = argparse.ArgumentParser( description="Upload MLBF files to Kinto as records" ) parser.add_argument("--noop", action="store_true", help="Don't update Kinto") parser.add_argument( "--download-path", type=Path, default=Path(tempfile.TemporaryDirectory().name), help="Path to temporarily store CRLite downloaded artifacts", ) parser.add_argument("--filter-bucket", default="crlite_filters") parser.add_argument("--verbose", "-v", help="Be more verbose", action="store_true") parser.add_argument("--enrolled-json", help="Path to local copy of enrolled.json") args = parser.parse_args() if args.verbose: log.setLevel("DEBUG") if args.noop: log.info("The --noop flag is set, will not make changes.") if "KINTO_AUTH_USER" not in dir(settings): raise Exception("KINTO_AUTH_USER must be defined in settings.py") if "KINTO_AUTH_PASSWORD" not in dir(settings): raise Exception("KINTO_AUTH_PASSWORD must be defined in settings.py") auth = ( None if args.noop else requests.auth.HTTPBasicAuth( settings.KINTO_AUTH_USER, settings.KINTO_AUTH_PASSWORD ) ) log.info( "Using username/password authentication. Username={}".format( settings.KINTO_AUTH_USER ) ) log.info(f"Connecting... {settings.KINTO_RW_SERVER_URL}") rw_client = PublisherClient( server_url=settings.KINTO_RW_SERVER_URL, auth=auth, bucket=settings.KINTO_BUCKET, retry=5, ) try: log.info("Updating ct-logs collection") publish_ctlogs(args=args, rw_client=rw_client) log.info("Updating cert-revocations collection") for channel in CHANNELS: if channel.enabled: publish_crlite(args=args, channel=channel, rw_client=rw_client) log.info("Removing records for unused channels") for channel in CHANNELS: if not channel.enabled: clear_crlite_filters( noop=args.noop, channel=channel, rw_client=rw_client ) clear_crlite_deltas( noop=args.noop, channel=channel, rw_client=rw_client ) log.info("Updating intermediates collection") publish_intermediates(args=args, rw_client=rw_client) except KintoException as ke: log.error("An exception at Kinto occurred: {}".format(ke)) raise ke except Exception as e: log.error("A general exception occurred: {}".format(e)) raise e if __name__ == "__main__": main()