bot/code_review_bot/__init__.py (220 lines of code) (raw):
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
import abc
import enum
import hashlib
import json
import os
from functools import cached_property
import requests
import structlog
from libmozdata.phabricator import LintResult, UnitResult, UnitResultState
from taskcluster.helper import TaskclusterConfig
from code_review_bot.config import settings
from code_review_bot.stats import InfluxDb
from code_review_bot.tasks.base import AnalysisTask
logger = structlog.get_logger(__name__)
def positive_int(name, x):
"""Helper to get a positive integer or None"""
if isinstance(x, int):
if x >= 0:
return x
else:
logger.warning(f"Negative {name} value found, defaults to None", value=x)
return None
class AnalysisException(Exception):
"""
Custom exception used in controlled errors
"""
def __init__(self, code, message):
self.code = code
super().__init__(message)
class Level(enum.Enum):
# A critical issue breaks CI and must always be reported
Error = "error"
# Warnings are reported when they are in patch
Warning = "warning"
class Issue(abc.ABC):
"""
Common reported issue interface
"""
revision = None
def __init__(
self,
analyzer: AnalysisTask,
revision,
path: str,
line: int,
nb_lines: int,
check: str,
column: int = None,
message: str = None,
level: Level = Level.Warning,
fix: str = None,
language: str = None,
):
# Check while avoiding circular dependencies
from code_review_bot.revisions import Revision
assert isinstance(revision, Revision)
assert isinstance(analyzer, AnalysisTask)
# Base required fields for all issues
assert not os.path.isabs(path), f"Issue path can not be absolute {path}"
self.revision = revision
self.analyzer = analyzer
self.check = check
self.path = path
self.line = positive_int("line", line)
self.nb_lines = positive_int("nb_lines", nb_lines)
# Support line 0 for full file issues like `source-test-mozlint-test-manifest`.
if self.line == 0:
logger.info("Line 0 is not supported, falling back to full file issue")
self.line = None
# Optional common fields
self.column = column
self.message = message
self.level = level
# Reserved payload for backend
self.on_backend = None
# Store information when a fix is available
self.fix = fix
self.language = language
if self.fix is not None:
assert self.language is not None, "Missing fix language"
# Mark the issue as known by default, so only errors are reported
# The before/after feature may tag some issues as new, so they are reported
self.new_issue = False
def __str__(self):
line = f"line {self.line}" if self.line is not None else "full file"
return f"{self.analyzer.name} issue {self.check}@{self.level.value} {self.path} {line}"
@property
def display_name(self):
"""
Issue's base name (by default analyzer's name)
But can be overridden by subclasses
"""
return self.analyzer.display_name
def build_extra_identifiers(self):
"""
Used to add information when building an issue unique hash
"""
return {}
@property
def allow_before_and_after_publish(self):
"""
Allow the possibility for an issue to avoid being published based on before/after.
This allow publishing issues based on other criteria, like in_patch.
"""
if taskcluster.secrets.get(
f"{self.analyzer.name.upper()}_DISABLE_PUBLICATION_BEFORE_AFTER", False
):
return False
return self.revision.before_after_feature
def is_publishable(self):
"""
Is this issue publishable on reporters ?
"""
assert self.revision is not None, "Missing revision"
# Always check specific rules validate
if not self.validates():
return False
if self.allow_before_and_after_publish:
# Only publish new issues or issues inside the diff
return self.new_issue or self.in_patch
# An error is always published
if self.level == Level.Error:
return True
# Then check if the backend marks this issue as publishable
if self.on_backend is not None:
return self.on_backend["publishable"]
# Fallback to in_patch detection
return self.in_patch
@property
def in_patch(self):
return self.revision.contains(self)
@cached_property
def hash(self):
"""
Build a unique hash identifying that issue and cache the resulting value
The text concerned by the issue is used and not its position in the file
Message content is hashed as a single linter may return multiple issues on a single line
We make the assumption that the message does not contain the line number
If an error occurs reading the file content (locally or remotely), None is returned
"""
assert self.revision is not None, "Missing revision"
# Build the hash only if the file is not autogenerated.
# An autogenerated file resides in the build directory that it has the
# format `obj-x86_64-pc-linux-gnu`
file_content = None
if "/obj-" not in self.path:
if settings.mercurial_cache_checkout:
logger.debug("Using the local repository to build issue's hash")
try:
with (settings.mercurial_cache_checkout / self.path).open() as f:
file_content = f.read()
except (FileNotFoundError, IsADirectoryError):
logger.warning(
"Failed to find issue's related file", path=self.path
)
file_content = None
else:
try:
# Load all the lines affected by the issue
file_content = self.revision.load_file(self.path)
except ValueError:
# Build the hash with an empty content in case the path is erroneous
file_content = None
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
logger.warning(
"Failed to download a file with an issue", path=self.path
)
# We still build the hash with empty content
file_content = None
else:
# When encountering another HTTP error, raise the issue
raise
if file_content is None:
self._hash = None
return self._hash
# Build raw content:
# 1. lines affected by patch
# 2. without any spaces around each line
file_lines = file_content.splitlines()
if self.line is None or self.nb_lines is None:
# Use full file when line is not specified
lines = file_lines
else:
# Use a range of lines
start = self.line - 1 # file_lines start at 0, not 1
lines = file_lines[start : start + self.nb_lines]
raw_content = "\n".join([line.strip() for line in lines])
# Build hash payload using issue data
# excluding file position information (lines & char)
extras = json.dumps(self.build_extra_identifiers(), sort_keys=True)
payload = ":".join(
[
self.analyzer.name,
self.path,
self.level.value,
self.check,
extras,
raw_content,
self.message,
]
).encode("utf-8")
# Finally build the MD5 hash
return hashlib.md5(payload).hexdigest()
@abc.abstractmethod
def validates(self):
"""
Is this issue publishable on reporters using IN_PATCH publication ?
Should check specific rules and return a boolean
"""
raise NotImplementedError
@abc.abstractmethod
def as_text(self):
"""
Build the text content for reporters
"""
raise NotImplementedError
@abc.abstractmethod
def as_markdown(self):
"""
Build the Markdown content for debug email
"""
raise NotImplementedError
def as_error(self):
"""
Build the Markdown content for for build error issues
"""
raise NotImplementedError
def as_dict(self):
"""
Build the serializable dict representation of the issue
Used by debugging tools
"""
issue_hash = None
try:
issue_hash = self.hash
except Exception as e:
logger.warn("Failed to build issue hash", error=str(e), issue=str(self))
return {
"analyzer": self.analyzer.name,
"path": self.path,
"line": self.line,
"nb_lines": self.nb_lines,
"column": self.column,
"check": self.check,
"level": self.level.value,
"message": self.message,
"in_patch": self.in_patch,
"validates": self.validates(),
"publishable": self.is_publishable(),
"hash": issue_hash,
"fix": self.fix,
}
def as_phabricator_lint(self):
"""
Build the Phabricator LintResult instance
"""
# Add the level to the issue message
if self.level == Level.Error:
# We use the IMPORTANT red block silently
prefix = "(IMPORTANT) ERROR:"
else:
prefix = "WARNING:"
description = f"{prefix} {self.message}"
# Add a fix when available
# Prefix each line with 2 spaces as required by phabricator to trigger a code block
# with syntax highlighting
if self.fix is not None:
fix = "\n".join(f" {line}" for line in self.fix.splitlines())
description += f"\n\n lang={self.language}\n{fix}"
return LintResult(
name=self.display_name,
description=description,
code=self.check,
severity=self.level.value,
path=self.path,
# Report full file issues on line 1
line=self.line if self.line is not None else 1,
char=self.column,
)
def as_phabricator_unitresult(self):
"""
Build a Phabricator UnitResult for build errors
"""
assert (
self.is_build_error()
), "Only build errors may be published as unit results"
return UnitResult(
namespace="code-review",
name="general",
result=UnitResultState.Fail,
details=f"Code review bot found a **build error**: \n{self.message}",
format="remarkup",
)
def is_build_error(self):
"""
Is this issue a build error?
Default is False
"""
return False
class Reliability(enum.Enum):
Unknown = "unknown"
High = "high"
Medium = "medium"
Low = "low"
@property
def invert(self):
"""
Verbalize the opposite of `value` of reliability to be used in coherent
sentences.
"""
inversions = {"high": "low", "medium": "medium", "low": "high"}
return inversions.get(self.value, "unknown")
# Create common stats instance
stats = InfluxDb()
# Create common taskcluster config
taskcluster = TaskclusterConfig("https://firefox-ci-tc.services.mozilla.com")