hghooks/mozhghooks/check/check_bug_references.py (120 lines of code) (raw):
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from __future__ import absolute_import
import json
from mozautomation import commitparser
from mercurial import (
pycompat,
urllibcompat,
)
from ..checks import (
PreTxnCloseCheck,
print_banner,
)
BUGZILLA_API_BASE_URL = b"https://bugzilla.mozilla.org/rest"
HEADERS = {b"User-Agent": b"bmo-api-client/vct-checks/0.1"}
OVERRIDE_FLAG = b"SKIP_BMO_CHECK"
OVERRIDE_INSTRUCTIONS = (
b"""
To push this commit anyway and ignore this warning, include %s
in your commit message.
"""
% OVERRIDE_FLAG
)
OVERRIDE_WARNING = b"""
You have chosen to ignore or skip checking bugs IDs referenced in your commit
message. Security bugs or invalid bugs referenced will not block your push.
"""
ERROR_MESSAGE_NOT_FOUND = b"""
Your commit message references a bug that does not exist. Please check your
commit message and try again.
Affected bug: %s
%s
"""
ERROR_MESSAGE_UNAUTHORIZED = b"""
Your commit message references bugs that are currently private. To avoid
disclosing the nature of these bugs publicly, please remove the affected bug ID
from the commit message.
Affected bug: %s
Visit https://wiki.mozilla.org/Security/Bug_Approval_Process for more
information.
%s
"""
ERROR_MESSAGE_OTHER = b"""
While checking if a bug referenced in your commit message is a security bug, an
error occurred and the bug could not be verified.
Affected bug: %s
%s
"""
ERROR_MESSAGE_NO_BUGZILLA = b"""
Could not access bugzilla.mozilla.org to check if a bug referenced in your
commit message is a security bug. Please try again later.
%s
"""
ERROR_MESSAGES = {
b"NO_BUGZILLA": ERROR_MESSAGE_NO_BUGZILLA,
b"OTHER": ERROR_MESSAGE_OTHER,
404: ERROR_MESSAGE_NOT_FOUND,
401: ERROR_MESSAGE_UNAUTHORIZED,
}
def parse_bug_ids(string):
"""
Parses a given string of a commit message into a set of bug IDs.
Args:
string (str): the string representing the commit message
Returns:
set of bytes: a set of strings representing bug IDs
"""
return {
pycompat.bytestr(b) for b in commitparser.parse_bugs(string, conservative=True)
}
class BMOAPIClient(object):
"""
A thin wrapper to communicate via the BMO API for the purposes of hooks.
"""
def __init__(self, base_url, headers):
self.base_url = base_url
self.headers = headers
def _get(self, path, params=None):
"""
Compile a request using given url and params, and send it via a GET
request to the BMO API.
Args:
path (str): the path to a particular resource (e.g. /bug/123456)
params (tuple): parameters that will be converted to query params
Returns:
file: a file-like object representing the response
Raises:
HTTPError: if the requested resource can not be accessed
"""
params = params or {}
url = self.base_url + path
if params:
url += b"?" + urllibcompat.urlreq.urlencode(params)
request = urllibcompat.urlreq.request(pycompat.sysstr(url))
for k, v in self.headers.items():
request.add_header(k, v)
response = urllibcompat.urlreq.urlopen(request)
return response
def search_bugs(self, bug_ids):
"""
Given an iterable of bug IDs, query the Bugzilla server and return only
the ones that are valid and accessible.
Args:
bug_ids (set of bytes): the set of bug IDs to check
Returns:
set of bytes: the set of valid, accessible bugs
"""
url = b"/bug"
params = (
(b"id", b",".join(sorted(list(bug_ids)))),
(b"include_fields", b"id"),
)
data = json.load(self._get(url, params))
bugs = data.get("bugs", [])
return {pycompat.bytestr(b["id"]) for b in bugs}
def get_status_code_for_bug(self, bug_id):
"""
Given a particular bug ID, query the Bugzilla API's 'get bug' endpoint
and return the status code.
Args:
bug_id (bytes)
Returns:
int: the HTTP status code
"""
url = b"/bug/" + bug_id
try:
code = self._get(url).getcode()
except urllibcompat.urlerr.httperror as e:
# code could be 400, 401, 404, or possibly another server error
code = e.getcode()
return code
class CheckBugReferencesCheck(PreTxnCloseCheck):
"""
This pre-transaction check iterates through all commits in a given push
attempt, filters out any bug IDs that are present in any commit messages
then queries the BMO API to check if any of the bugs are either invalid
or inaccessible. The check will print out a message to the user indicating
what they should do to fix the problem.
Attributes:
bug_ids (set of bytes): bugs IDs collected from all commit messages
bmo_client (BMOAPIClient): a wrapper to communicate with the BMO API
_skip_check:
A flag normally passed via a commit message to skip validating
bugs against the BMO API.
"""
@property
def name(self):
return b"check_bug_references_check"
def pre(self, node):
"""
Initialize the check with an empty set of bug_ids.
"""
self.bug_ids = set()
self.bmo_client = BMOAPIClient(BUGZILLA_API_BASE_URL, HEADERS)
self._skip_check = False
def relevant(self):
"""
Checks if the destination repository should be checked or not.
"""
repos_to_check = self.ui.configlist(
b"mozilla", b"check_bug_references_repos", default=None
)
if repos_to_check:
return self.repo_metadata[b"path"] in repos_to_check
return False
def check(self, ctx):
"""
Check the current commit's message and add any bug IDs to self.bug_ids.
Always returns True as the full check will occur in `post_check`.
This check skips over bugs that are not in draft phase.
"""
if ctx.phasestr() != b"draft":
return True
commit_message = ctx.description()
if OVERRIDE_FLAG in commit_message:
self._skip_check = True
self.bug_ids |= parse_bug_ids(commit_message)
return True
def post_check(self):
"""
If any bug IDs are detected in the commit checks, first search for all
the bug IDs on BMO API. If none are filtered out, this means that all
the bug IDs are valid. If any IDs are filtered out, check the first ID
that is excluded and print a relevant message.
"""
if not self.bug_ids:
return True
if self.bug_ids and self._skip_check:
# TODO: improve this check so that it provides a more specific
# warning (e.g. security bugs were found but ignored, etc...)
print_banner(self.ui, b"warning", OVERRIDE_WARNING)
return True
try:
found_bugs = self.bmo_client.search_bugs(self.bug_ids)
except (urllibcompat.urlerr.httperror, urllibcompat.urlerr.urlerror):
print_banner(
self.ui,
b"error",
ERROR_MESSAGES[b"NO_BUGZILLA"] % OVERRIDE_INSTRUCTIONS,
)
return False
invalid_bugs = self.bug_ids - found_bugs
if not invalid_bugs:
return True
# Check a single bug only.
bug = invalid_bugs.pop()
try:
status_code = self.bmo_client.get_status_code_for_bug(bug)
except urllibcompat.urlerr.urlerror:
print_banner(
self.ui,
b"error",
ERROR_MESSAGES[b"NO_BUGZILLA"] % OVERRIDE_INSTRUCTIONS,
)
return False
if status_code in (401, 404):
message = ERROR_MESSAGES[status_code] % (bug, OVERRIDE_INSTRUCTIONS)
else:
message = ERROR_MESSAGES[b"OTHER"] % (bug, OVERRIDE_INSTRUCTIONS)
print_banner(self.ui, b"error", message)
return False