foundation_security_advisories/common_cve.py (288 lines of code) (raw):

#!/usr/bin/env python3 # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import os import sys import subprocess from datetime import datetime, timezone from json import dumps import difflib from bisect import insort import requests from cvelib.cve_api import CveApi from requests import HTTPError from foundation_security_advisories.common import ( get_all_files, parse_yml_file, CVEAdvisory, CVEAdvisoryInstance, ) cve_api = CveApi( username=os.getenv("CVE_USER"), org=os.getenv("CVE_ORG"), api_key=os.getenv("CVE_API_KEY"), env=os.getenv("CVE_ENV"), ) announced_cve_steps: list[str] = [] def print_cve_step(cve_id: str): if cve_id not in announced_cve_steps: print(f"\n-> {cve_id}") announced_cve_steps.append(cve_id) def publish_cve(cve_id: str, cve_json: dict): """ CVE Services: Publish the content for a already existing and given CVE-ID with the given data in CVE JSON format. """ cve_json["containers"]["cna"]["references"].sort( key=lambda reference: reference["url"] ) diff = difflib.unified_diff( "", dumps(cve_json, indent=2, sort_keys=True).split("\n"), lineterm="", fromfile=f"Remote (not yet published)", tofile=f"Local", ) for line in diff: print(line) if not prompt_yes_no(f"\nShould this content be published for {cve_id}?"): print(f"Skipping {cve_id}") return False print(f"Publishing {cve_id}") try: cve_api.publish(cve_id, cve_json) # The timestamp on the API needs to be younger than the commit timestamp so that # the file does not get registered as modified. touch_cve_id(cve_id) except HTTPError as e: raise Exception(f"Failed to publish {cve_id}, {e.response.text}") def get_cve(cve_id: str): """CVE Services: Get CVE for the given CVE-ID.""" try: return cve_api.show_cve_record(cve_id) except HTTPError as e: raise Exception(f"Failed to publish {cve_id}, {e.response.text}") def touch_cve_id(cve_id: str): """CVE Services: Update the timestamp of the given CVE-ID to the current date.""" print( f"Updating timestamp on {cve_id} to current date {pretty_date(datetime.now(tz=timezone.utc).timestamp())}" ) return cve_api._put(f"cve-id/{cve_id}").json() def update_published_cve(cve_id: str, cve_json: dict): """CVE Servies: Update the content of the given CVE-ID with the given data in CVE JSON 5.1 format.""" print(f"Updating {cve_id}") try: cve_api.update_published(cve_id, cve_json) # We need to update the timestamp on the CVE-ID itself, because that is what we use # later to check for modified files. touch_cve_id(cve_id) except HTTPError as e: raise Exception(f"Failed to update {cve_id}, {e.response.text}") def try_update_published_cve(local_cve: CVEAdvisory, local_date: int, remote_date): """ Check if there is a difference between the local and the remote CVE. If there is one, update the CVE. """ remote_date_str = pretty_date(remote_date) local_date_str = pretty_date(local_date) if remote_date > local_date and not os.getenv("FORCE_UPDATE"): return print_cve_step(local_cve.id) # We need to modify the remote and local json a bit to make sure we only # detect a diff if something actually changed. remote_cve_json = get_cve(local_cve.id) remote_cve_json.pop("cveMetadata") remote_cve_json_container = remote_cve_json["containers"]["cna"] remote_cve_json_container.pop("providerMetadata") if "x_legacyV4Record" in remote_cve_json_container: remote_cve_json_container.pop("x_legacyV4Record") local_cve_json = local_cve.to_json() local_reference_urls = [ local_reference[0] for local_instance in local_cve.instances for local_reference in local_instance.references ] # If there are references which we did not add automatically, we probably don't # want to remove them, so we move them to our to-be-published object. remote_extra_references = list( filter( lambda reference: not reference["url"] in local_reference_urls and all( not reference["url"].startswith(prefix) for prefix in [ "https://bugzilla.mozilla.org", "https://www.bugzilla.mozilla.org", "https://mozilla.org", "https://www.mozilla.org", ] ), remote_cve_json["containers"]["cna"]["references"], ) ) local_cve_json["containers"]["cna"]["references"].extend(remote_extra_references) # Sort the references to make sure we detect the diff correctly. remote_cve_json["containers"]["cna"]["references"].sort( key=lambda reference: reference["url"] ) local_cve_json["containers"]["cna"]["references"].sort( key=lambda reference: reference["url"] ) # Include any other containers from the remote we do not know about (like "adp") for container_name in remote_cve_json["containers"].keys(): if container_name not in local_cve_json["containers"].keys(): local_cve_json["containers"][container_name] = remote_cve_json[ "containers" ][container_name] diff = difflib.unified_diff( dumps(remote_cve_json, indent=2, sort_keys=True).split("\n"), dumps(local_cve_json, indent=2, sort_keys=True).split("\n"), lineterm="", fromfile=f"Remote", fromfiledate=remote_date_str, tofile=f"Local ", tofiledate=local_date_str, ) is_unchanged = True for line in diff: print(line) is_unchanged = False if is_unchanged: # There seems to be no actual difference, lets update the # timestamp so that we won't be here again next time. print(f"--- Remote\t{remote_date_str}") print(f"+++ Local \t{local_date_str}") print(f"Not actual difference found for {local_cve.id}") touch_cve_id(local_cve.id) return if local_cve.year < 2023: if not prompt_yes_no( f"\nThis CVE lies before the cutoff year 2023. Should the content still be updated for {local_cve.id}?", default=False, ): print(f"Skipping {local_cve.id} because it lies before the cutoff year") touch_cve_id(local_cve.id) return False else: if not prompt_yes_no(f"\nShould this content be updated for {local_cve.id}?"): print(f"Skipping {local_cve.id}") return False update_published_cve(local_cve.id, local_cve_json) def reserve_cve_id(year: str): """CVE Servies: Reserve a new CVE-ID for a given year and return that new id.""" print(f"Reserving CVE-ID for year {year}") try: response = cve_api.reserve(1, False, year) except HTTPError as e: raise Exception(f"Failed to reserve CVE-ID, {e.response.text}") if ( "cve_ids" not in response or len(response["cve_ids"]) != 1 or "cve_id" not in response["cve_ids"][0] ): raise ValueError(f"API did not respond with valid CVE-ID") return response["cve_ids"][0]["cve_id"] def get_owned_cve_ids(): """ CVE-Services: Get all the CVE-IDs owned by the current CNA. Returns a tuple containing: - A list of all the owned IDs, regardless of their state - A dictionary of all the IDs with the state `PUBLISHED`, mapped to the time they were last modified. """ published_dates: dict[str, float] = {} owned_ids = [] print("-> Fetching already owned CVE-IDs") for cve_advisory in cve_api.list_cves(): cve_id = cve_advisory["cve_id"] owned_ids.append(cve_id) if cve_advisory["state"] == "PUBLISHED" or cve_advisory["state"] == "REJECTED": published_dates[cve_id] = parse_iso_date(cve_advisory["time"]["modified"]) elif cve_advisory["state"] == "RESERVED": continue else: raise ValueError(f"Invalid CVE state '{cve_advisory['state']}'") return owned_ids, published_dates def replace_cve_id(cve: CVEAdvisory): """ Replace the id of a given `CVEAdvisory` with a new CVE-ID. Returns True if the id of the given advisory has been changed and False if it hasn't. """ old_id = cve.id if not prompt_yes_no(f"Should a new CVE-ID be reserved to replace {old_id}?"): print(f"Skipping {old_id}") return False print(f"Replacing CVE-ID for {old_id}") new_id = reserve_cve_id(cve.year) print(f"Reserved {new_id}") cve.id = new_id for instance in cve.instances: with open(instance.file_name) as r: file_content = r.read().replace(old_id + ":", new_id + ":") with open(instance.file_name, "w") as w: w.write(file_content) if os.getenv("CI"): subprocess.run(["git", "add", instance.file_name]) print(f"Renamed {old_id} to {new_id}") return True def parse_iso_date(date_string: str): """ Parse the given date string in the format used by CVE Servies and return the corresponding date as a unix timestamp. """ return datetime.fromisoformat(date_string).timestamp() def pretty_date(utc_timestamp: str): """ Return the given Unix UTC timestamp as a string in the following format: %Y-%m-%d %H:%M:%S UTC """ return datetime.fromtimestamp(utc_timestamp, timezone.utc).strftime( "%Y-%m-%d %H:%M:%S UTC" ) def parse_bug(bug: dict): """ Parse a single given bug from the advisory YAML, and return the corresponding URL and (optionally) description. """ url = str(bug["url"]) desc = str(bug["desc"]) if "desc" in bug else None if not url.startswith("http"): if "," in url: url = "https://bugzilla.mozilla.org/buglist.cgi?bug_id=" + url.replace( " ", "" ).replace(",", "%2C") else: url = "https://bugzilla.mozilla.org/show_bug.cgi?id=" + url return url, desc def prompt_yes_no(question: str, default=True): if os.getenv("CI") or os.getenv("PROMPT_CHOOSE_DEFAULT"): return default try: response = input(question + (" (Y/n)" if default else " (y/N)")) except KeyboardInterrupt: exit(0) return response.strip().lower() in (["", "y", "yes"] if default else ["y", "yes"]) def get_local_cve_advisories(): """ Get all the CVE advisories located in this repository as `CVEAdvisory` objects. Returns a dictionary of all the local CVE-IDs mapped to their respective `CVEAdvisory` objects. """ local_advisories: dict[str, CVEAdvisory] = {} print("\n-> Checking local files") for file_name in get_all_files(): if not file_name.endswith(".yml"): continue file_data: dict = parse_yml_file(file_name) file_last_modified = int( subprocess.run( [ "git", "log", "--pretty=format:%at", "-1", "HEAD", "--", file_name, ], capture_output=True, ).stdout.strip() ) if "advisories" in file_data: for cve_id in file_data["advisories"]: cve_data = file_data["advisories"][cve_id] if cve_id not in local_advisories: year = int(cve_id.split("-")[-2]) local_advisories[cve_id] = CVEAdvisory(id=cve_id, year=year) for fixed_in in file_data["fixed_in"]: product, version_fixed = fixed_in.rsplit(None, 1) references = [parse_bug(bug) for bug in cve_data["bugs"]] cve_instance = CVEAdvisoryInstance( parent=local_advisories[cve_id], title=cve_data["title"], description=cve_data["description"].strip(), reporter=cve_data["reporter"], references=references, mfsa_id=file_data["mfsa_id"], product=product, version_fixed=version_fixed, file_name=file_name, file_last_modified=file_last_modified, ) # We want the instances to be sorted by the msfa id to avoid pushing updates # to the API where the only thing that changes is the order of the instances. insort( local_advisories[cve_id].instances, cve_instance, key=lambda x: x.mfsa_id, ) return local_advisories def try_set_bugzilla_alias(bug: str, cve_id: int): """ Try to set the alias of the given bugzilla bug to the given CVE-ID. The bug number is supposed to come from the temporary MSFA-RESERVE-{year}-{id} IDs, where {id} potentially is a bugzilla bug number. All {id}s smaller than 100000 will be ignored. Will return without error if anything fails. """ try: # Check if we have a bugzilla API key available BUGZILLA_API_KEY = os.getenv("BUGZILLA_API_KEY") if not BUGZILLA_API_KEY: print( f"Skipping alias assignment for {cve_id} (bug {bug}) as no BUGZILLA_API_KEY was provided" ) return # Make sure this is actually a number bug_number = int(bug) # Skip smaller numbers as there is a high chance these aren't any actual bugzilla bug numbers if bug_number < 100000: print( f"Skipping alias assignment for {cve_id} as '{bug_number}' does not seem to be a bug number" ) return if not prompt_yes_no( f"Should '{cve_id}' be set as an alias for bug {bug_number} on bugzilla?" ): print(f"Skipping alias assignment for {cve_id} (bug {bug})") return # Try to update the alias for the given bug number. If this fails our try block will catch it. requests.put( f"https://bugzilla.mozilla.org/rest/bug/{bug_number}", data={"alias": cve_id}, headers={"X-BUGZILLA-API-KEY": BUGZILLA_API_KEY}, ) print(f"Assigned alias {cve_id} to bug {bug}") except Exception as e: print(f"Failed to assign alias {cve_id} to bug {bug} - {e}")