bugbug/tools/code_review.py (1,104 lines of code) (raw):
# -*- coding: utf-8 -*-
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
import enum
import json
import os
import re
from abc import ABC, abstractmethod
from collections import defaultdict
from dataclasses import asdict, dataclass
from datetime import datetime
from functools import cached_property
from itertools import chain
from logging import INFO, basicConfig, getLogger
from typing import Iterable, Literal, Optional
import tenacity
from langchain.chains import ConversationChain, LLMChain
from langchain.memory import ConversationBufferMemory
from langchain.prompts import PromptTemplate
from langchain_openai import OpenAIEmbeddings
from libmozdata.phabricator import ConduitError
from tenacity import retry, retry_if_exception_type, stop_after_attempt
from tqdm import tqdm
from unidiff import Hunk, PatchedFile, PatchSet
from unidiff.errors import UnidiffParseError
from bugbug import db, phabricator, utils
from bugbug.code_search.function_search import FunctionSearch
from bugbug.generative_model_tool import GenerativeModelTool, get_tokenizer
from bugbug.utils import get_secret
from bugbug.vectordb import PayloadScore, QueryFilter, VectorDB, VectorPoint
basicConfig(level=INFO)
logger = getLogger(__name__)
@dataclass
class InlineComment:
filename: str
start_line: int
end_line: int
content: str
on_removed_code: bool | None
id: int | None = None
date_created: int | None = None
date_modified: int | None = None
is_done: bool | None = None
hunk_start_line: int | None = None
hunk_end_line: int | None = None
is_generated: bool | None = None
class ModelResultError(Exception):
"""Occurs when the model returns an unexpected result."""
class FileNotInPatchError(ModelResultError):
"""Occurs when the file in the model result is not part of the patch."""
class HunkNotInPatchError(ModelResultError):
"""Occurs when the hunk in the model result is not part of the patch."""
class LargeDiffError(Exception):
"""Occurs when the diff is too large to be processed."""
TARGET_SOFTWARE: str | None = None
PROMPT_TEMPLATE_SUMMARIZATION = """You are an expert reviewer for {experience_scope}, with experience on source code reviews.
Please, analyze the code provided and report a summarization about the new changes; for that, focus on the coded added represented by lines that start with "+".
{patch}"""
PROMPT_TEMPLATE_REVIEW = """**Task**:
Generate code review comments for the patch provided below.
**Instructions**:
1. **Understand the Changes**:
- Analyze the changes made in the patch.
- Use the previously reported summarization as context.
2. **Identify Potential Issues**:
- Look for code that might result in possible bugs.
- Identify major regressions, issues, or similar concerns.
3. **Validate Each Issue**:
- Ensure each identified problem is valid.
- Confirm consistency with the {target_code_consistency} source code standards.
**Guidelines for Writing Comments**:
- **Style**:
- **Clarity**: Comments should be short and to the point.
- **Focus**: Concentrate on the changed code only.
- **Tone**: Do not form suggestions as questions.
- **Avoid Comments That**:
- Focus on documentation or tests.
- Ask for confirmation of existence (e.g., "Does this function exist?").
- Instruct the author to ensure correctness (avoid "Ensure", "Verify", "Check").
- Merely describe the code without providing constructive feedback.
- Offer praise (e.g., "This is a good addition to the code.").
- Refer to code not added in this patch (lines without a '+' at the start).
**Output Format**:
- Write down the comments in a JSON list as shown in the valid comment examples.
- Do **not** include any explanations about your choices.
- Only return the JSON list.
**Valid Comment Examples**:
{comment_examples}
{approved_examples}
**Patch to Review**:
{patch}
"""
TEMPLATE_COMMENT_EXAMPLE = """Patch example {example_number}:
{patch}
Review comments for example {example_number}:
{comments}"""
PROMPT_TEMPLATE_FILTERING_ANALYSIS = """Filter review comments to keep those that:
- are consistent with the {target_code_consistency} source code;
- focus on reporting possible bugs, functional regressions, issues, or similar concerns;
- report readability or design concerns.
Exclude comments that:
- only describe the change;
- restate obvious facts like renamed variables or replaced code;
- include praising;
- ask if changes are intentional or ask to ensure things exist.
Do not report any explanation about your choice. Only return a valid JSON list.
Comments:
{comments}
As examples of not expected comments, not related to the current patch, please, check some below:
- {rejected_examples}
"""
DEFAULT_REJECTED_EXAMPLES = """Please note that these are minor improvements and the overall quality of the patch is good. The documentation is being expanded in a clear and structured way, which will likely be beneficial for future development.
- Please note that these are just suggestions and the code might work perfectly fine as it is. It's always a good idea to test all changes thoroughly to ensure they work as expected.
- Overall, the patch seems to be well implemented with no major concerns. The developers have made a conscious decision to align with Chrome's behavior, and the reasoning is well documented.
- There are no complex code changes in this patch, so there's no potential for major readability regressions or bugs introduced by the changes.
- The `focus(...)` method is called without checking if the element and its associated parameters exist or not. It would be better to check if the element exists before calling the `focus()` method to avoid potential errors.
- It's not clear if the `SearchService.sys.mjs` file exists or not. If it doesn't exist, this could cause an error. Please ensure that the file path is correct.
- This is a good addition to the code."""
PROMPT_TEMPLATE_DEDUPLICATE = """Please, double check the code review comments below.
Just report the comments that are not redundant and not duplicating each other.
Do not change the contents of the comments and the report format.
Adopt the template below as the report format:
[
{{
"file": "com/br/main/Pressure.java",
"code_line": 458,
"comment" : "In the third code block, you are using `nsAutoStringN<256>` instead of `nsString`. This is a good change as `nsAutoStringN<256>` is more efficient for small strings. However, you should ensure that the size of `tempString` does not exceed 256 characters, as `nsAutoStringN<256>` has a fixed size."
}}
]
Do not report any explanation about your choice. Only return a valid JSON list.
Review:
{review}"""
PROMPT_TEMPLATE_FURTHER_INFO = """Based on the patch provided below and its related summarization, identify the functions you don't know and need to look up for reviewing the patch.
List the names of these functions, providing only the function names, with each name on a separate line.
Avoid using list indicators such as hyphens or numbers.
If no function declaration is required, just return "".
{patch}
{summarization}"""
PROMPT_TEMPLATE_FURTHER_CONTEXT_LINES = """Based on the patch provided below and its related summarization, report the code lines more context is required.
For that, list the lines with the their associated line numbers, grouping each one on a separated line.
Avoid using list indicators such as hyphens or numbers. If no code line is required, just return "".
Examples of valid code lines:
- '152 const selector = notification.getDescription();'
- '56 file.getElement(this.targetElement());'
{patch}
{summarization}"""
STATIC_COMMENT_EXAMPLES = [
{
"comment": {
"filename": "netwerk/streamconv/converters/mozTXTToHTMLConv.cpp",
"start_line": 1211,
"content": "You are using `nsAutoStringN<256>` instead of `nsString`. This is a good change as `nsAutoStringN<256>` is more efficient for small strings. However, you should ensure that the size of `tempString` does not exceed 256 characters, as `nsAutoStringN<256>` has a fixed size.",
},
"raw_hunk": """@@ -1206,11 +1206,11 @@
} else {
uint32_t start = uint32_t(i);
i = aInString.FindChar('<', i);
if (i == kNotFound) i = lengthOfInString;
- nsString tempString;
+ nsAutoStringN<256> tempString;
tempString.SetCapacity(uint32_t((uint32_t(i) - start) * growthRate));
UnescapeStr(uniBuffer, start, uint32_t(i) - start, tempString);
ScanTXT(tempString, whattodo, aOutString);
}
}""",
},
{
"comment": {
"filename": "toolkit/components/extensions/ExtensionDNR.sys.mjs",
"start_line": 1837,
"content": "The `filterAAR` function inside `#updateAllowAllRequestRules()` is created every time the method is called. Consider defining this function outside of the method to avoid unnecessary function creation.",
},
"raw_hunk": """@@ -1812,18 +1821,27 @@
rulesets.push(
this.makeRuleset(id, idx + PRECEDENCE_STATIC_RULESETS_BASE, rules)
);
}
this.enabledStaticRules = rulesets;
+ this.#updateAllowAllRequestRules();
}
getSessionRules() {
return this.sessionRules.rules;
}
getDynamicRules() {
return this.dynamicRules.rules;
+ }
+
+ #updateAllowAllRequestRules() {
+ const filterAAR = rule => rule.action.type === "allowAllRequests";
+ this.hasRulesWithAllowAllRequests =
+ this.sessionRules.rules.some(filterAAR) ||
+ this.dynamicRules.rules.some(filterAAR) ||
+ this.enabledStaticRules.some(ruleset => ruleset.rules.some(filterAAR));
}
}
function getRuleManager(extension, createIfMissing = true) {
let ruleManager = gRuleManagers.find(rm => rm.extension === extension);""",
},
{
"comment": {
"filename": "devtools/shared/network-observer/NetworkUtils.sys.mjs",
"start_line": 496,
"content": "The condition in the `if` statement is a bit complex and could be simplified for better readability. Consider extracting `!Components.isSuccessCode(status) && blockList.includes(ChromeUtils.getXPCOMErrorName(status))` into a separate function with a descriptive name, such as `isBlockedError`.",
},
"raw_hunk": """@@ -481,26 +481,21 @@
}
} catch (err) {
// "cancelledByExtension" doesn't have to be available.
}
- const ignoreList = [
- // This is emitted when the request is already in the cache.
- "NS_ERROR_PARSED_DATA_CACHED",
- // This is emitted when there is some issues around imgages e.g When the img.src
- // links to a non existent url. This is typically shown as a 404 request.
- "NS_IMAGELIB_ERROR_FAILURE",
- // This is emitted when there is a redirect. They are shown as 301 requests.
- "NS_BINDING_REDIRECTED",
+ const blockList = [
+ // When a host is not found (NS_ERROR_UNKNOWN_HOST)
+ "NS_ERROR_UNKNOWN_HOST",
];
// If the request has not failed or is not blocked by a web extension, check for
// any errors not on the ignore list. e.g When a host is not found (NS_ERROR_UNKNOWN_HOST).
if (
blockedReason == 0 &&
!Components.isSuccessCode(status) &&
- !ignoreList.includes(ChromeUtils.getXPCOMErrorName(status))
+ blockList.includes(ChromeUtils.getXPCOMErrorName(status))
) {
blockedReason = ChromeUtils.getXPCOMErrorName(status);
}
return { blockingExtension, blockedReason };""",
},
]
TEMPLATE_PATCH_FROM_HUNK = """diff --git a/{filename} b/{filename}
--- a/{filename}
+++ b/{filename}
{raw_hunk}
"""
class ReviewRequest:
patch_id: str
def __init__(self, patch_id) -> None:
super().__init__()
self.patch_id = patch_id
class Patch(ABC):
def __init__(self, patch_id: str) -> None:
self.patch_id = patch_id
@property
@abstractmethod
def base_commit_hash(self) -> str: ...
@property
@abstractmethod
def raw_diff(self) -> str: ...
@property
@abstractmethod
def date_created(self) -> datetime: ...
@cached_property
def patch_set(self) -> PatchSet:
return PatchSet.from_string(self.raw_diff)
class PhabricatorPatch(Patch):
def __init__(self, patch_id: str) -> None:
super().__init__(patch_id)
self.diff_id = int(patch_id)
@cached_property
def raw_diff(self) -> str:
assert phabricator.PHABRICATOR_API is not None
raw_diff = phabricator.PHABRICATOR_API.load_raw_diff(self.diff_id)
return raw_diff
@staticmethod
def _commit_available(commit_hash: str) -> bool:
r = utils.get_session("hgmo").get(
f"https://hg.mozilla.org/mozilla-unified/json-rev/{commit_hash}",
headers={
"User-Agent": utils.get_user_agent(),
},
)
return r.ok
@cached_property
def _diff_metadata(self) -> dict:
assert phabricator.PHABRICATOR_API is not None
diffs = phabricator.PHABRICATOR_API.search_diffs(diff_id=self.diff_id)
assert len(diffs) == 1
diff = diffs[0]
return diff
@cached_property
def base_commit_hash(self) -> str:
diff = self._diff_metadata
try:
base_commit_hash = diff["refs"]["base"]["identifier"]
if self._commit_available(base_commit_hash):
return base_commit_hash
except KeyError:
pass
end_date = datetime.fromtimestamp(diff["dateCreated"])
start_date = datetime.fromtimestamp(diff["dateCreated"] - 86400)
end_date_str = end_date.strftime("%Y-%m-%d %H:%M:%S")
start_date_str = start_date.strftime("%Y-%m-%d %H:%M:%S")
r = utils.get_session("hgmo").get(
f"https://hg.mozilla.org/mozilla-central/json-pushes?startdate={start_date_str}&enddate={end_date_str}&version=2&tipsonly=1",
headers={
"User-Agent": utils.get_user_agent(),
},
)
pushes = r.json()["pushes"]
closest_push = None
for push_id, push in pushes.items():
if diff["dateCreated"] - push["date"] < 0:
continue
if (
closest_push is None
or diff["dateCreated"] - push["date"]
< diff["dateCreated"] - closest_push["date"]
):
closest_push = push
assert closest_push is not None
return closest_push["changesets"][0]
@property
def date_created(self) -> datetime:
return datetime.fromtimestamp(self._diff_metadata["dateCreated"])
class ReviewData(ABC):
NIT_PATTERN = re.compile(r"[^a-zA-Z0-9]nit[\s:,]", re.IGNORECASE)
@abstractmethod
def get_review_request_by_id(self, review_id: int) -> ReviewRequest:
raise NotImplementedError
@abstractmethod
def get_patch_by_id(self, patch_id: str) -> Patch:
raise NotImplementedError
@abstractmethod
def get_all_inline_comments(
self, comment_filter
) -> Iterable[tuple[int, list[InlineComment]]]:
raise NotImplementedError
def load_raw_diff_by_id(self, diff_id) -> str:
"""Load a patch from local cache if it exists.
If the patch is not in the local cache it will be requested from the
provider and cache it locally.
Args:
diff_id: The ID of the patch.
Returns:
The patch.
"""
try:
with open(f"patches/{diff_id}.patch", "r") as f:
raw_diff = f.read()
except FileNotFoundError:
with open(f"patches/{diff_id}.patch", "w") as f:
patch = self.get_patch_by_id(diff_id)
raw_diff = patch.raw_diff
f.write(raw_diff)
return raw_diff
def get_matching_hunk(
self, patched_file: PatchedFile, comment: InlineComment
) -> Hunk:
def source_end(hunk: Hunk) -> int:
return hunk.source_start + hunk.source_length
def target_end(hunk: Hunk) -> int:
return hunk.target_start + hunk.target_length
if comment.on_removed_code is None:
matching_hunks = [
hunk
for hunk in patched_file
if hunk.target_start <= comment.start_line < target_end(hunk)
or hunk.source_start <= comment.start_line < source_end(hunk)
]
# If there is more than one matching hunk, choose the one where the
# line number of the comment corresponds to an added or deleted line. We
# prioritize added lines over deleted lines because comments are more
# likely to be on added lines than deleted lines.
if len(matching_hunks) > 1:
logger.warning(
"Multiple matching hunks found for comment %s in file %s",
comment.id,
comment.filename,
)
for hunk in matching_hunks:
for line in hunk:
if line.is_added and line.target_line_no == comment.start_line:
return hunk
for line in hunk:
if (
line.is_removed
and line.source_line_no == comment.start_line
):
return hunk
if len(matching_hunks) != 0:
return matching_hunks[0]
elif comment.on_removed_code:
for hunk in patched_file:
if hunk.source_start <= comment.start_line < source_end(hunk):
return hunk
else:
for hunk in patched_file:
if hunk.target_start <= comment.start_line < target_end(hunk):
return hunk
def retrieve_comments_with_hunks(self):
def comment_filter(comment: InlineComment):
# We want to keep all generated comments
if comment.is_generated:
return True
comment_content = comment.content
# Ignore very short and very long comments
if not 50 < len(comment_content) < 500:
return False
# Ignore comments with URLs
if "https://" in comment_content or "http://" in comment_content:
return False
# Ignore nit comments
if self.NIT_PATTERN.search(comment_content):
return False
# Ignore comments with code blocks
if "```" in comment_content:
return False
comment_lower = comment_content.lower()
if any(
phrase in comment_lower
for phrase in [
"wdyt?",
"what do you think?",
"you explain",
"understand",
]
):
return False
return True
for diff_id, comments in self.get_all_inline_comments(comment_filter):
try:
patch_set = PatchSet.from_string(self.load_raw_diff_by_id(diff_id))
except UnidiffParseError:
# TODO: use log instead of print
print(f"Failed to parse {diff_id}")
continue
except ConduitError:
logger.warning("Failed to load %d", diff_id)
continue
file_map = {
patched_file.path: patched_file
for patched_file in patch_set
if patched_file.is_modified_file
}
for comment in comments:
patched_file = file_map.get(comment.filename)
if not patched_file:
continue
hunk = self.get_matching_hunk(patched_file, comment)
if not hunk:
continue
yield hunk, comment
class PhabricatorReviewData(ReviewData):
def __init__(self):
super().__init__()
phabricator.set_api_key(
get_secret("PHABRICATOR_URL"), get_secret("PHABRICATOR_TOKEN")
)
def get_review_request_by_id(self, revision_id: int) -> ReviewRequest:
revisions = phabricator.get(rev_ids=[int(revision_id)])
assert len(revisions) == 1
return ReviewRequest(revisions[0]["fields"]["diffID"])
@tenacity.retry(
stop=tenacity.stop_after_attempt(7),
wait=tenacity.wait_exponential(multiplier=2, min=2),
reraise=True,
)
def get_patch_by_id(self, patch_id: str) -> Patch:
return PhabricatorPatch(patch_id)
def get_all_inline_comments(
self, comment_filter
) -> Iterable[tuple[int, list[InlineComment]]]:
db.download(phabricator.REVISIONS_DB)
revision_count = sum(1 for _ in phabricator.get_revisions())
for revision in tqdm(phabricator.get_revisions(), total=revision_count):
diff_comments: dict[int, list[InlineComment]] = defaultdict(list)
for transaction in revision["transactions"]:
if transaction["type"] != "inline":
continue
# Ignore replies
if transaction["fields"]["replyToCommentPHID"] is not None:
continue
if len(transaction["comments"]) != 1:
# Follow up: https://github.com/mozilla/bugbug/issues/4218
logger.warning(
"Unexpected number of comments in transaction %s",
transaction["id"],
)
continue
transaction_comment = transaction["comments"][0]
comment_content = transaction_comment["content"]["raw"]
is_generated = (
# This includes comments generated by Review Helper, but
# excludes any comments that have been edited by the user.
"> This comment was generated automatically and has been approved by"
in comment_content
)
# Ignore bot comments, except the ones by Review Helper
if (
transaction["authorPHID"] == "PHID-USER-cje4weq32o3xyuegalpj"
and not is_generated
):
continue
comment_id = transaction_comment["id"]
date_created = transaction_comment["dateCreated"]
diff_id = transaction["fields"]["diff"]["id"]
filename = transaction["fields"]["path"]
start_line = transaction["fields"]["line"]
end_line = (
transaction["fields"]["line"] + transaction["fields"]["length"] - 1
)
# Unfortunately, we do not have this information for a limitation
# in Phabricator's API.
on_removed_code = None
# store the last modified date and if the comments have been marked as done
date_modified = transaction_comment["dateModified"]
is_done = transaction["fields"]["isDone"]
# TODO: we could create an extended dataclass for this
# instead of adding optional fields.
comment = InlineComment(
filename=filename,
start_line=start_line,
end_line=end_line,
content=comment_content,
on_removed_code=on_removed_code,
id=comment_id,
date_created=date_created,
date_modified=date_modified,
is_done=is_done,
is_generated=is_generated,
)
if not comment_filter(comment):
continue
diff_comments[diff_id].append(comment)
for diff_id, comments in diff_comments.items():
yield diff_id, comments
class SwarmPatch(Patch):
def __init__(self, patch_id: str, auth: dict) -> None:
super().__init__(patch_id)
self.auth = auth
self.rev_id = int(patch_id)
@cached_property
def _revision_metadata(self) -> dict:
import swarm
revisions = swarm.get(self.auth, rev_ids=[self.rev_id])
assert len(revisions) == 1
return revisions[0]
@property
def raw_diff(self) -> str:
revision = self._revision_metadata
return revision["fields"]["diff"]
@cached_property
def base_commit_hash(self) -> str:
raise NotImplementedError
@property
def date_created(self) -> datetime:
revision = self._revision_metadata
return datetime.fromtimestamp(revision["fields"]["created"])
class SwarmReviewData(ReviewData):
def __init__(self):
self.auth = {
"user": get_secret("SWARM_USER"),
"password": get_secret("SWARM_PASS"),
"port": get_secret("SWARM_PORT"),
"instance": get_secret("SWARM_INSTANCE"),
}
def get_review_request_by_id(self, revision_id: int) -> ReviewRequest:
return ReviewRequest(revision_id)
def get_patch_by_id(self, patch_id: str) -> Patch:
return SwarmPatch(patch_id, self.auth)
def get_all_inline_comments(
self, comment_filter
) -> Iterable[tuple[int, list[InlineComment]]]:
# Todo
raise NotImplementedError
review_data_classes = {
"phabricator": PhabricatorReviewData,
"swarm": SwarmReviewData,
}
def find_comment_scope(file: PatchedFile, line_number: int):
hunks_based_on_added = (
hunk
for hunk in file
if hunk.target_start <= line_number <= hunk.target_start + hunk.target_length
)
hunks_based_on_deleted = (
hunk
for hunk in file
if hunk.source_start <= line_number <= hunk.source_start + hunk.source_length
)
try:
hunk = next(chain(hunks_based_on_added, hunks_based_on_deleted))
except StopIteration as e:
raise HunkNotInPatchError("Line number not found in the patch") from e
has_added_lines = any(line.is_added for line in hunk)
has_deleted_lines = any(line.is_removed for line in hunk)
if has_added_lines and has_deleted_lines:
first_line, last_line = find_mixed_lines_range(hunk)
elif has_added_lines:
first_line, last_line = find_added_lines_range(hunk)
else:
first_line, last_line = find_removed_lines_range(hunk)
return {
"line_start": first_line,
"line_end": last_line,
"has_added_lines": has_added_lines,
}
def find_added_lines_range(hunk: Hunk):
added_lines = [line.target_line_no for line in hunk if line.is_added]
return added_lines[0], added_lines[-1]
def find_removed_lines_range(hunk: Hunk):
removed_lines = [line.source_line_no for line in hunk if line.is_removed]
return removed_lines[0], removed_lines[-1]
def find_mixed_lines_range(hunk: Hunk):
def get_first_line(_hunk: Hunk, default: int | None = None):
for i, line in enumerate(_hunk):
if line.is_context:
continue
if line.target_line_no is None:
if i == 0:
# If this is the first line of the hunk, it
# means that we are adding lines is the first
# line in the file.
return default
return _hunk[i - 1].target_line_no
return line.target_line_no
# This should never happen
raise ValueError("Cannot find the line number")
first_line = get_first_line(hunk, 1)
last_line = get_first_line(list(reversed(hunk)))
if last_line is None:
_, last_line = find_added_lines_range(hunk)
return first_line, last_line
def get_hunk_with_associated_lines(hunk):
hunk_with_lines = ""
for line in hunk:
if line.is_added:
hunk_with_lines += f"{line.target_line_no} + {line.value}"
elif line.is_removed:
hunk_with_lines += f"{line.source_line_no} - {line.value}"
elif line.is_context:
hunk_with_lines += f"{line.target_line_no} {line.value}"
return hunk_with_lines
def format_patch_set(patch_set):
output = ""
for patch in patch_set:
for hunk in patch:
output += f"Filename: {patch.target_file}\n"
output += f"{get_hunk_with_associated_lines(hunk)}\n"
return output
def get_associated_file_to_function(function_name, patch):
for patch_by_file in patch:
for one_patch in patch_by_file:
if function_name in str(one_patch.source):
return patch_by_file.path
return None
def get_associated_file_to_line_context(context_line, patch):
for key, value in patch.items():
if context_line in str(value):
return key
return None
def parse_text_for_dict(text):
file_content = {}
current_filename = None
current_lines = []
lines = text.split("\n")
for line in lines:
if line.startswith("Filename:"):
filename = line.split(":", 1)[1].strip()
# Remove the first letter and the '/' character from the filename
filename = filename[2:]
current_filename = filename
current_lines = []
else:
current_lines.append(line)
# If we have content and filename, store it
if current_filename is not None and len(current_lines) > 0:
if file_content.get(current_filename) is not None:
file_content[current_filename] = (
file_content[current_filename] + "\n" + str(line)
)
else:
file_content[current_filename] = "\n".join(current_lines)
return file_content
def len_common_path(f1, f2):
"""Find length of the common path."""
f1_subsystems = f1.split("/")
if f1 == f2:
return len(f1_subsystems)
f2_subsystems = f2.split("/")
max_common_path_length = next(
idx
for idx, (sub1, sub2) in enumerate(zip(f1_subsystems, f2_subsystems))
if sub1 != sub2
)
return max_common_path_length
def solve_conflict_definitions(target_path, functions):
functions_common_path = [
(len_common_path(target_path, fun.file), fun) for fun in functions
]
max_common_path_length = max(
[common_path_length for (common_path_length, _) in functions_common_path]
)
functions = [
fun
for (common_path_length, fun) in functions_common_path
if common_path_length == max_common_path_length
]
if len(functions) == 1:
return functions
else:
return [] # could not solve conflict
def request_for_function_declarations(
function_search, commit_hash, functions_list, patch_set
):
functions_declarations = []
if functions_list is not None:
for function_name in functions_list:
if (
function_name != "Not found"
and function_name != "N/A"
and function_name != "None"
and function_name != ""
and len(function_name) < 50
):
target_path = get_associated_file_to_line_context(
function_name, parse_text_for_dict(format_patch_set(patch_set))
)
if target_path:
definitions = function_search.get_function_by_name(
commit_hash,
path=target_path,
function_name=function_name,
)
if len(definitions) > 1:
definitions = solve_conflict_definitions(
target_path, definitions
)
collect_function_definitions(
functions_declarations, function_name, definitions
)
return functions_declarations
def is_code_line_already_covered(code_line, target_file, function_declarations):
for function_declaration in function_declarations:
if (
function_declaration[1] == target_file
and code_line in function_declaration[2]
):
return True
return False
def collect_function_definitions(function_declarations, target_element, definitions):
for definition in definitions:
function_declarations.append(
[
target_element,
definition.file,
definition.source,
]
)
def gather_line_context(line_context):
r"""Reformat the line context list and remove duplicates.
Args:
line_context: List of lists, where each list is [line, file, function].
Returns:
List of tuples, where each tuple is (gathered_line, file, function). The
'gathered_line' is a str that concatenates the 'line' with a separator
(i.e., `\n`) that required the same function.
"""
file_dir = {}
for line, file, func in line_context:
if file not in file_dir:
file_dir[file] = {}
if func not in file_dir[file]:
file_dir[file][func] = []
file_dir[file][func].append(line)
gathered_context = []
for file, funcs in file_dir.items():
for func, lines in funcs.items():
gathered_requested_lines = "\n".join(lines)
gathered_context.append((gathered_requested_lines, file, func))
return gathered_context
def request_for_context_lines(function_search, commit_hash, context_line_codes, patch):
functions_declarations = []
if context_line_codes is not None:
for context_line in context_line_codes:
try:
line_number = int(re.search(r"\b(\d+)\b", context_line).group(1))
except (AttributeError, ValueError):
print("Unexpected Line Number Format")
continue
try:
content_line = str(context_line.split(str(line_number))[1]).lstrip()[1:]
except (IndexError, TypeError):
print("Unexpected content line")
continue
target_path = get_associated_file_to_line_context(
content_line, parse_text_for_dict(patch)
)
if (
target_path
and content_line
and not is_code_line_already_covered(
content_line, target_path, functions_declarations
)
):
definitions = function_search.get_function_by_line(
commit_hash=commit_hash,
path=target_path,
line=line_number,
)
collect_function_definitions(
functions_declarations, context_line, definitions
)
functions_declarations = gather_line_context(functions_declarations)
return functions_declarations
def get_structured_functions(target, functions_declaration):
function_declaration_text = "\n"
for function in functions_declaration:
try:
current_function_info = ""
current_function_info += target + ": " + function[0] + "\n"
current_function_info += "File: " + function[1] + "\n"
if isinstance(function[2], list):
current_function = ""
for line in function[2]:
current_function += "\n" + line
current_function_info += (
"Function Declaration: " + current_function + "\n\n"
)
else:
current_function_info += (
"Function Declaration: \n" + function[2] + "\n\n"
)
function_declaration_text += current_function_info
except IndexError:
print("Function does not present all required information")
continue
return function_declaration_text
def parse_model_output(output: str) -> list[dict]:
output = output.strip()
if output.startswith("Review:"):
output = output[len("Review:") :].strip()
if output.startswith("```json") and output.endswith("```"):
output = output[7:-3]
comments = json.loads(output)
return comments
def generate_processed_output(output: str, patch: PatchSet) -> Iterable[InlineComment]:
comments = parse_model_output(output)
patched_files_map = {
patched_file.target_file: patched_file for patched_file in patch
}
for comment in comments:
file_path = comment["file"]
if not file_path.startswith("b/") and not file_path.startswith("a/"):
file_path = "b/" + file_path
# FIXME: currently, we do not handle renamed files
patched_file = patched_files_map.get(file_path)
if patched_file is None:
raise FileNotInPatchError(
f"The file `{file_path}` is not part of the patch: {list(patched_files_map)}"
)
line_number = comment["code_line"]
if not isinstance(line_number, int):
raise ModelResultError("Line number must be an integer")
scope = find_comment_scope(patched_file, line_number)
yield InlineComment(
filename=(
patched_file.target_file[2:]
if scope["has_added_lines"]
else patched_file.source_file[2:]
),
start_line=line_number,
end_line=line_number,
hunk_start_line=scope["line_start"],
hunk_end_line=scope["line_end"],
content=comment["comment"],
on_removed_code=not scope["has_added_lines"],
)
class CodeReviewTool(GenerativeModelTool):
version = "0.0.1"
def __init__(
self,
comment_gen_llms,
llm=None,
function_search: Optional[FunctionSearch] = None,
review_comments_db: Optional["ReviewCommentsDB"] = None,
show_patch_example: bool = False,
verbose: bool = True,
suggestions_feedback_db: Optional["SuggestionsFeedbackDB"] = None,
target_software: Optional[str] = None,
) -> None:
super().__init__()
self.target_software = target_software or TARGET_SOFTWARE
self.comment_gen_llms = comment_gen_llms
self.llm = llm if llm is not None else comment_gen_llms[0]
self._tokenizer = get_tokenizer(
comment_gen_llms[0].model_name
if hasattr(comment_gen_llms[0], "model_name")
else ""
)
self.is_experiment_env = os.getenv("EXPERIMENT_ENV", "no").lower() in (
"1",
"yes",
"true",
)
if self.is_experiment_env:
print(
"---------------------- WARNING ---------------------\n"
"You are using the experiment environment.\n"
"This environment is not intended for production use.\n"
"----------------------------------------------------"
)
self.summarization_chain = LLMChain(
prompt=PromptTemplate.from_template(
PROMPT_TEMPLATE_SUMMARIZATION,
partial_variables={
"experience_scope": (
f"the {self.target_software} source code"
if self.target_software
else "a software project"
)
},
),
llm=self.llm,
verbose=verbose,
)
self.filtering_chain = LLMChain(
prompt=PromptTemplate.from_template(
PROMPT_TEMPLATE_FILTERING_ANALYSIS,
partial_variables={
"target_code_consistency": self.target_software or "rest of the"
},
),
llm=self.llm,
verbose=verbose,
)
self.deduplicating_chain = LLMChain(
prompt=PromptTemplate.from_template(PROMPT_TEMPLATE_DEDUPLICATE),
llm=self.llm,
verbose=verbose,
)
self.further_context_chain = LLMChain(
prompt=PromptTemplate.from_template(PROMPT_TEMPLATE_FURTHER_CONTEXT_LINES),
llm=self.llm,
verbose=verbose,
)
self.further_info_chain = LLMChain(
prompt=PromptTemplate.from_template(PROMPT_TEMPLATE_FURTHER_INFO),
llm=self.llm,
verbose=verbose,
)
self.function_search = function_search
self.review_comments_db = review_comments_db
self.show_patch_example = show_patch_example
self.verbose = verbose
self.suggestions_feedback_db = suggestions_feedback_db
def count_tokens(self, text):
return len(self._tokenizer.encode(text))
def _generate_suggestions(self, patch: Patch):
formatted_patch = format_patch_set(patch.patch_set)
if formatted_patch == "":
return None
output_summarization = self.summarization_chain.invoke(
{"patch": formatted_patch},
return_only_outputs=True,
)["text"]
if self.verbose:
GenerativeModelTool._print_answer(output_summarization)
if self.function_search is not None:
line_code_list = self.further_context_chain.run(
patch=formatted_patch, summarization=output_summarization
).split("\n")
if self.verbose:
GenerativeModelTool._print_answer(line_code_list)
requested_context_lines = request_for_context_lines(
self.function_search,
patch.base_commit_hash,
line_code_list,
formatted_patch,
)
function_list = [
function_name.strip()
for function_name in self.further_info_chain.run(
patch=formatted_patch, summarization=output_summarization
).split("\n")
]
if self.verbose:
GenerativeModelTool._print_answer(function_list)
requested_functions = request_for_function_declarations(
self.function_search,
patch.base_commit_hash,
function_list,
patch.patch_set,
)
output = ""
for comment_gen_llm in self.comment_gen_llms:
memory = ConversationBufferMemory()
conversation_chain = ConversationChain(
llm=comment_gen_llm,
memory=memory,
verbose=self.verbose,
)
experience_scope = (
f"the {self.target_software} source code"
if self.target_software
else "a software project"
)
memory.save_context(
{
"input": f"You are an expert reviewer for {experience_scope}, with experience on source code reviews."
},
{
"output": f"Sure, I'm aware of source code practices in {self.target_software or 'the development community'}."
},
)
memory.save_context(
{
"input": 'Please, analyze the code provided and report a summarization about the new changes; for that, focus on the code added represented by lines that start with "+".\n'
+ patch.raw_diff
},
{"output": output_summarization},
)
if self.function_search is not None and len(requested_functions) > 0:
function_declaration_text = get_structured_functions(
"Function Name", requested_functions
)
memory.save_context(
{
"input": "Attached, you can find some function definitions that are used in the current patch and might be useful to you to have more context about the code under analysis. These functions already exist in the codebase before the patch, and can't be modified. "
+ function_declaration_text
},
{
"output": "Okay, I will consider the provided function definitions as additional context to the given patch."
},
)
if self.function_search is not None and len(requested_context_lines) > 0:
context_text = get_structured_functions(
"Requested Context for Line", requested_context_lines
)
memory.save_context(
{
"input": "Attached, you can also have more context of the target code under analysis."
+ context_text
},
{
"output": "Okay, I will also consider the code as additional context to the given patch."
},
)
created_before = patch.date_created if self.is_experiment_env else None
cur_output = conversation_chain.predict(
input=PROMPT_TEMPLATE_REVIEW.format(
patch=formatted_patch,
comment_examples=self._get_comment_examples(patch, created_before),
approved_examples=self._get_generated_examples(
patch, created_before
),
target_code_consistency=self.target_software or "rest of the",
)
)
output += cur_output
if self.verbose:
GenerativeModelTool._print_answer(cur_output)
memory.clear()
if len(self.comment_gen_llms) > 1:
output = self.deduplicating_chain.invoke(
{"review": output},
return_only_outputs=True,
)["text"]
if self.verbose:
GenerativeModelTool._print_answer(output)
return output
@retry(retry=retry_if_exception_type(ModelResultError), stop=stop_after_attempt(3))
def run(self, patch: Patch) -> list[InlineComment] | None:
if self.count_tokens(patch.raw_diff) > 5000:
raise LargeDiffError("The diff is too large")
output = self._generate_suggestions(patch)
unfiltered_suggestions = parse_model_output(output)
if not unfiltered_suggestions:
logger.info("No suggestions were generated")
return []
rejected_examples = (
"\n - ".join(self.get_similar_rejected_comments(unfiltered_suggestions))
if self.suggestions_feedback_db
else DEFAULT_REJECTED_EXAMPLES
)
raw_output = self.filtering_chain.invoke(
{
"comments": output,
"rejected_examples": rejected_examples,
},
return_only_outputs=True,
)["text"]
if self.verbose:
GenerativeModelTool._print_answer(raw_output)
return list(generate_processed_output(raw_output, patch.patch_set))
def _get_generated_examples(self, patch, created_before: datetime | None = None):
"""Get examples of comments that were generated by an LLM.
Since the comments are posted, it means that they were approved by a
reviewer. Thus, we can use them as examples of good comments.
"""
if not self.review_comments_db:
return ""
comment_examples = [
result.payload["comment"]["content"]
for result in self.review_comments_db.find_similar_patch_comments(
patch, limit=5, generated=True, created_before=created_before
)
]
if not comment_examples:
return ""
template = """
**Examples of comments that you suggested on other patches and developers found useful**:
- {comment_examples}
"""
return template.format(comment_examples="\n - ".join(comment_examples))
def _get_comment_examples(self, patch, created_before: datetime | None = None):
comment_examples = []
if self.review_comments_db:
comment_examples = [
result.payload
for result in self.review_comments_db.find_similar_patch_comments(
patch, limit=10, generated=False, created_before=created_before
)
]
if not comment_examples:
comment_examples = STATIC_COMMENT_EXAMPLES
def format_comment(comment):
# TODO: change the schema that we expect the model to return so we
# can remove this function.
return {
"file": comment["filename"],
"code_line": comment["start_line"],
"comment": comment["content"],
}
def generate_formatted_patch_from_raw_hunk(raw_hunk, filename):
patch = TEMPLATE_PATCH_FROM_HUNK.format(
filename=filename, raw_hunk=raw_hunk
)
patch_set = PatchSet.from_string(patch)
return format_patch_set(patch_set)
if not self.show_patch_example:
return json.dumps(
[format_comment(example["comment"]) for example in comment_examples],
indent=2,
)
return "\n\n".join(
TEMPLATE_COMMENT_EXAMPLE.format(
example_number=num + 1,
patch=generate_formatted_patch_from_raw_hunk(
example["raw_hunk"], example["comment"]["filename"]
),
comments=json.dumps(
[format_comment(example["comment"])],
indent=2,
),
)
for num, example in enumerate(comment_examples)
)
def get_similar_rejected_comments(self, suggestions) -> Iterable[str]:
if not self.suggestions_feedback_db:
raise Exception("Suggestions feedback database is not available")
num_examples_per_suggestion = 10 // len(suggestions) or 1
seen_ids: set[int] = set()
for suggestion in suggestions:
similar_rejected_suggestions = (
self.suggestions_feedback_db.find_similar_rejected_suggestions(
suggestion["comment"],
limit=num_examples_per_suggestion,
excluded_ids=seen_ids,
)
)
for rejected_suggestion in similar_rejected_suggestions:
seen_ids.add(rejected_suggestion.id)
yield rejected_suggestion.comment
class ReviewCommentsDB:
NAV_PATTERN = re.compile(r"\{nav, [^}]+\}")
WHITESPACE_PATTERN = re.compile(r"[\n\s]+")
def __init__(self, vector_db: VectorDB) -> None:
self.vector_db = vector_db
self.embeddings = OpenAIEmbeddings(
model="text-embedding-3-large", api_key=get_secret("OPENAI_API_KEY")
)
def clean_comment(self, comment: str):
# We do not want to keep the LLM note in the comment, it is not useful
# when using the comment as examples.
llm_note_index = comment.find("> This comment was generated automatically ")
if llm_note_index != -1:
comment = comment[:llm_note_index]
# TODO: use the nav info instead of removing it
comment = self.NAV_PATTERN.sub("", comment)
comment = self.WHITESPACE_PATTERN.sub(" ", comment)
comment = comment.strip()
return comment
def add_comments_by_hunk(self, items: Iterable[tuple[Hunk, InlineComment]]):
point_ids = set(self.vector_db.get_existing_ids())
logger.info("Will skip %d comments that already exist", len(point_ids))
def vector_points():
for hunk, comment in items:
if comment.id in point_ids:
continue
str_hunk = str(hunk)
vector = self.embeddings.embed_query(str_hunk)
comment_data = asdict(comment)
comment_data["content"] = self.clean_comment(comment.content)
payload = {
"hunk": str_hunk,
"comment": comment_data,
"version": 2,
}
yield VectorPoint(id=comment.id, vector=vector, payload=payload)
self.vector_db.insert(vector_points())
def find_similar_hunk_comments(
self,
hunk: Hunk,
generated: bool | None = None,
created_before: datetime | None = None,
):
return self.vector_db.search(
self.embeddings.embed_query(str(hunk)),
filter=(
QueryFilter(
must_match=(
{"comment.is_generated": generated}
if generated is not None
else None
),
must_range=(
{
"comment.date_created": {
"lt": created_before.timestamp(),
}
}
if created_before is not None
else None
),
)
),
)
def find_similar_patch_comments(
self,
patch: Patch,
limit: int,
generated: bool | None = None,
created_before: datetime | None = None,
):
assert limit > 0, "Limit must be greater than 0"
patch_set = PatchSet.from_string(patch.raw_diff)
# We want to avoid returning the same comment multiple times. Thus, if
# a comment matches multiple hunks, we will only consider it once.
max_score_per_comment: dict = {}
for patched_file in patch_set:
if not patched_file.is_modified_file:
continue
for hunk in patched_file:
for result in self.find_similar_hunk_comments(
hunk, generated, created_before
):
if result is not None and (
result.id not in max_score_per_comment
or result.score > max_score_per_comment[result.id].score
):
max_score_per_comment[result.id] = result
return sorted(max_score_per_comment.values())[-limit:]
class EvaluationAction(enum.Enum):
APPROVE = 1
REJECT = 2
IGNORE = 3
@dataclass
class SuggestionFeedback:
id: int
comment: str
file_path: str
action: Literal["APPROVE", "REJECT", "IGNORE"]
user: str
@staticmethod
def from_payload_score(point: PayloadScore):
return SuggestionFeedback(
id=point.id,
comment=point.payload["comment"],
file_path=point.payload["file_path"],
action=point.payload["action"],
user=point.payload["user"],
)
class SuggestionsFeedbackDB:
def __init__(self, vector_db: VectorDB) -> None:
self.vector_db = vector_db
self.embeddings = OpenAIEmbeddings(
model="text-embedding-3-large", api_key=get_secret("OPENAI_API_KEY")
)
def add_suggestions_feedback(self, suggestions: Iterable[SuggestionFeedback]):
def vector_points():
for suggestion in suggestions:
vector = self.embeddings.embed_query(suggestion.comment)
payload = {
"comment": suggestion.comment,
"file_path": suggestion.file_path,
"action": suggestion.action,
"user": suggestion.user,
}
yield VectorPoint(id=suggestion.id, vector=vector, payload=payload)
self.vector_db.insert(vector_points())
def find_similar_suggestions(self, comment: str):
return (
SuggestionFeedback.from_payload_score(point)
for point in self.vector_db.search(self.embeddings.embed_query(comment))
)
def find_similar_rejected_suggestions(
self, comment: str, limit: int, excluded_ids: Iterable[int] = ()
):
return (
SuggestionFeedback.from_payload_score(point)
for point in self.vector_db.search(
self.embeddings.embed_query(comment),
filter=QueryFilter(
must_match={"action": "REJECT"},
must_not_has_id=list(excluded_ids),
),
limit=limit,
)
)