foundation_security_advisories/common.py (225 lines of code) (raw):
#!/usr/bin/env python3
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import codecs
import fnmatch
import os
import re
from glob import glob
from subprocess import check_output
from dataclasses import dataclass, field
import yaml
from markdown import markdown
GIT = os.getenv("GIT_BIN", "git")
ADVISORIES_DIR = "announce"
HOF_DIR = "bug-bounty-hof"
MFSA_FILENAME_RE = re.compile("mfsa(\d{4}-\d{2,3})\.(md|yml)$")
HOF_FILENAME_RE = re.compile("bug-bounty-hof/\w+\.yml$")
HTML_BR_TAG_RE = re.compile(r"<br */?>")
HTML_CODE_TAG_RE = re.compile(r"</?code>")
HTML_TAG_RE = re.compile(r"<[^>]+>")
HTML_NEWLINE_RE = re.compile(r"\n")
HTML_DOUBLE_NEWLINE_RE = re.compile(r"\n\n")
def mfsa_id_from_filename(filename):
match = MFSA_FILENAME_RE.search(filename)
if match:
return match.group(1)
return None
def git_diff(staged):
"""
Return the modified files in the repo.
:param staged: boolean return only those changes staged in git
:return: list modified file names.
"""
command = [GIT, "diff", "--name-only"]
if staged:
command.append("--cached")
git_out = check_output(command, universal_newlines=True).split()
return [
fn
for fn in git_out
if MFSA_FILENAME_RE.search(fn) or HOF_FILENAME_RE.search(fn)
]
def get_modified_files(staged_only):
"""
Return the modified file names in the repo.
:param staged_only: boolean include all changes or only staged.
:return: list modified file names.
"""
staged_files = git_diff(staged=True)
if staged_only:
return staged_files
modified_files = set(staged_files)
modified_files.update(git_diff(staged=False))
return list(modified_files)
def get_all_files():
"""
Return all advisory file names in the repo.
:return: generator of file names.
"""
for root, dirnames, filenames in os.walk(ADVISORIES_DIR):
for filename in fnmatch.filter(filenames, "mfsa*.*"):
yield os.path.join(root, filename)
for filename in glob("{}/*.yml".format(HOF_DIR)):
yield filename
def parse_md_front_matter(lines):
"""Return the YAML and MD sections.
:param: lines iterator
:return: str YAML, str Markdown
"""
# fm_count: 0: init, 1: in YAML, 2: in Markdown
fm_count = 0
yaml_lines = []
md_lines = []
for line in lines:
# first line we care about is FM start
if fm_count < 2 and line.strip() == "---":
fm_count += 1
continue
if fm_count == 1:
yaml_lines.append(line)
if fm_count == 2:
md_lines.append(line)
if fm_count < 2:
raise ValueError("Front Matter not found.")
return "".join(yaml_lines), "".join(md_lines)
def parse_yml_file(file_name):
"""Return the YAML data for file_name."""
with codecs.open(file_name, encoding="utf8") as fh:
data = yaml.safe_load(fh)
if "mfsa_id" not in data:
mfsa_id = mfsa_id_from_filename(file_name)
if mfsa_id:
data["mfsa_id"] = mfsa_id
return data
def parse_md_file(file_name):
"""Return the YAML and MD sections for file_name."""
with codecs.open(file_name, encoding="utf8") as fh:
yamltext, mdtext = parse_md_front_matter(fh)
data = yaml.safe_load(yamltext)
if "mfsa_id" not in data:
mfsa_id = mfsa_id_from_filename(file_name)
if mfsa_id:
data["mfsa_id"] = mfsa_id
# run it through parser in case of exception
markdown(mdtext)
return data
def remove_newlines(content: str | None):
"""Removes markdown-style newlines. Replaces '\\n\\n' with '<br />' and '\\n' with a space ' '."""
if not content:
return None
content = HTML_DOUBLE_NEWLINE_RE.sub("<br />", content)
content = HTML_NEWLINE_RE.sub(" ", content)
return content
def remove_html_tags(content: str | None):
"""Executes `remove_newlines` and replaces <br> tags with '\\n', <code> tags with '`' and removes all other tags."""
if not content:
return None
content = remove_newlines(content)
content = HTML_BR_TAG_RE.sub("\n", content)
content = HTML_CODE_TAG_RE.sub("`", content)
content = HTML_TAG_RE.sub("", content)
return content
def comma_separated(sequence: list[str], conjunction="and"):
"""
Returns the given string list comma separated. For example: \n
["a","b","c","d"] -> "a, b, c, and d" \n
["a","b"] -> "a and b" \n
["a"] -> "a"
"""
if len(sequence) > 2:
return f"{', '.join(sequence[:-1])}, {conjunction} {sequence[-1]}"
elif len(sequence) == 2:
return f"{sequence[0]} {conjunction} {sequence[-1]}"
else:
return sequence[0]
@dataclass
class CVEAdvisory:
"""A collection of `CVEAdvisoryInstance`s with the same CVE-ID."""
id: str
year: int
instances: list["CVEAdvisoryInstance"] = field(default_factory=list)
@property
def newest_instance(self):
"""
Returns the last modified instance of this CVE advisory (determined by git commit time).
Useful for when only one of the instances is being updated with the latest information.
"""
greatest_last_modified = 0
newest_instance: CVEAdvisoryInstance = None
for instance in self.instances:
if instance.file_last_modified > greatest_last_modified:
greatest_last_modified = instance.file_last_modified
newest_instance = instance
return newest_instance
@property
def full_description(self):
return (
self.newest_instance.description.strip()
+ " This vulnerability affects "
+ comma_separated(
[
f"{instance.product} < {instance.version_fixed}"
for instance in self.instances
],
)
+ "."
)
def to_json(self):
"""
Convert advisory in yml format into
[CVE JSON](https://cveproject.github.io/cve-schema/schema/docs/) format.
"""
return {
"containers": {
"cna": {
"affected": [
{
"product": instance.product,
"vendor": "Mozilla",
"versions": [
{
"lessThan": instance.version_fixed,
"status": "affected",
"version": "unspecified",
"versionType": "custom",
}
],
}
for instance in self.instances
],
"descriptions": [
{
"lang": "en",
"value": remove_html_tags(self.full_description),
"supportingMedia": [
{
"type": "text/html",
"base64": False,
"value": remove_newlines(self.full_description),
}
],
}
],
**(
{
"problemTypes": [
{
"descriptions": [
{
"description": remove_html_tags(
self.newest_instance.title
),
"lang": "en",
"type": "text",
}
]
}
]
}
if self.newest_instance.title
else {}
),
"references": [
{
"url": f"https://www.mozilla.org/security/advisories/mfsa{mfsa_id}/"
}
for mfsa_id in sorted(
set([instance.mfsa_id for instance in self.instances])
)
]
+ [
{
"url": url,
**({"name": desc} if desc else {}),
}
for url, desc in self.newest_instance.references
],
**(
{
"credits": [
{
"lang": "en",
"value": remove_html_tags(
self.newest_instance.reporter
),
}
],
}
if self.newest_instance.reporter
else {}
),
}
},
"dataType": "CVE_RECORD",
"dataVersion": "5.1",
}
@dataclass
class CVEAdvisoryInstance:
"""
A manifestation of a CVE advisory in this repository.
Objects of this class correspond to the entries in the
`advisories:` section of the advisory YAML format.
"""
parent: CVEAdvisory
title: str
description: str
reporter: str | None
references: list[(str, str | None)]
mfsa_id: str
product: str
version_fixed: str
file_name: str
file_last_modified: int