bot/code_review_bot/workflow.py (482 lines of code) (raw):

# This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import asyncio import time from datetime import datetime, timedelta from itertools import groupby import structlog from libmozdata.phabricator import BuildState, PhabricatorAPI from libmozevent.mercurial import MercurialWorker, Repository from libmozevent.phabricator import PhabricatorActions, PhabricatorBuildState from taskcluster.utils import stringDate from code_review_bot import Level, stats from code_review_bot.analysis import ( RevisionBuild, publish_analysis_lando, publish_analysis_phabricator, ) from code_review_bot.backend import BackendAPI from code_review_bot.config import REPO_AUTOLAND, REPO_MOZILLA_CENTRAL, settings from code_review_bot.mercurial import robust_checkout from code_review_bot.report.debug import DebugReporter from code_review_bot.revisions import Revision from code_review_bot.tasks.base import AnalysisTask, BaseTask, NoticeTask from code_review_bot.tasks.clang_format import ClangFormatTask from code_review_bot.tasks.clang_tidy import ClangTidyTask from code_review_bot.tasks.clang_tidy_external import ExternalTidyTask from code_review_bot.tasks.coverage import ZeroCoverageTask from code_review_bot.tasks.default import DefaultTask from code_review_bot.tasks.docupload import DocUploadTask from code_review_bot.tasks.lint import MozLintTask from code_review_bot.tasks.tgdiff import TaskGraphDiffTask logger = structlog.get_logger(__name__) TASKCLUSTER_NAMESPACE = "project.relman.{channel}.code-review.{name}" TASKCLUSTER_INDEX_TTL = 7 # in days class Workflow: """ Full static analysis workflow - setup remote analysis workflow - find issues from remote tasks - publish issues """ def __init__( self, reporters, index_service, queue_service, phabricator_api, zero_coverage_enabled=True, update_build=True, task_failures_ignored=[], ): self.zero_coverage_enabled = zero_coverage_enabled self.update_build = update_build self.task_failures_ignored = task_failures_ignored logger.info("Will ignore task failures", names=self.task_failures_ignored) # Use share phabricator API client assert isinstance(phabricator_api, PhabricatorAPI) self.phabricator = phabricator_api # Load reporters to use self.reporters = reporters if not self.reporters: logger.warn("No reporters configured, this analysis will not be published") # Always add debug reporter and Diff reporter self.reporters["debug"] = DebugReporter( output_dir=settings.taskcluster.results_dir ) # Use TC services client self.index_service = index_service self.queue_service = queue_service # Setup Backend API client self.backend_api = BackendAPI() # Is local clone already setup ? self.clone_available = False def run(self, revision): """ Find all issues on remote tasks and publish them """ # Index ASAP Taskcluster task for this revision self.index(revision, state="started") # Set the Phabricator build as running self.update_status(revision, state=BuildState.Work) if settings.taskcluster_url: self.publish_link( revision, slug="publication", name="Publication task", url=settings.taskcluster_url, ) # Analyze revision patch to get files/lines data revision.analyze_patch() # Find issues on remote tasks issues, task_failures, notices, reviewers = self.find_issues( revision, settings.try_group_id ) # Analyze issues in case the before/after feature is enabled if revision.before_after_feature: logger.info("Running the before/after feature") # Search a base revision from the decision task decision = self.queue_service.task(settings.try_group_id) base_rev_changeset = ( decision.get("payload", {}).get("env", {}).get("GECKO_BASE_REV") ) if not base_rev_changeset: logger.warning( "Base revision changeset could not be fetched from Phabricator, " "looking for existing issues based on the current date", task=settings.try_group_id, ) # Clone local repo when required # as find_previous_issues will build the hashes self.clone_repository(revision) # Mark know issues to avoid publishing them on this patch self.find_previous_issues(issues, base_rev_changeset) new_issues_count = sum(issue.new_issue for issue in issues) logger.info( f"Found {new_issues_count} new issues (over {len(issues)} total detected issues)", task=settings.try_group_id, ) else: # Clone local repo when required # as publication need the hashes self.clone_repository(revision) if ( all(issue.new_issue is False for issue in issues) and not task_failures and not notices ): logger.info("No issues nor notices, stopping there.") # Publish all issues self.publish(revision, issues, task_failures, notices, reviewers) return issues def ingest_revision(self, revision, group_id): """ Simpler workflow to ingest a revision """ assert revision.head_repository in ( REPO_AUTOLAND, REPO_MOZILLA_CENTRAL, ), "Need a revision from autoland or mozilla-central" logger.info( "Starting revision ingestion", bugzilla=revision.bugzilla_id, title=revision.title, head_repository=revision.head_repository, head_changeset=revision.head_changeset, ) assert ( self.backend_api.enabled ), "Backend storage is disabled, revision ingestion is not possible" supported_tasks = [] def _build_tasks(tasks): for task_status in tasks["tasks"]: try: task_name = task_status["task"]["metadata"]["name"] # Only analyze tasks stating with `source-test-` to avoid checking artifacts every time if not task_name.startswith("source-test-"): logger.debug( f"Task with name '{task_name}' is not supported during the ingestion of a revision" ) continue task = self.build_task(task_status) except Exception as e: logger.warning(f"Could not proceed task {task_name}: {e}") continue if task is None or getattr(task, "parse_issues", None) is None: # Do ignore tasks that cannot be parsed as issues continue supported_tasks.append(task) # Find potential issues in the task group self.queue_service.listTaskGroup(group_id, paginationHandler=_build_tasks) logger.info( "Loaded all supported tasks in the task group", group_id=group_id, nb=len(supported_tasks), ) # Load all the artifacts and potential issues issues = [] for task in supported_tasks: artifacts = task.load_artifacts(self.queue_service) if artifacts is not None: task_issues = task.parse_issues(artifacts, revision) logger.info( f"Found {len(task_issues)} issues", task=task.name, id=task.id, ) issues += task_issues # Store the revision & diff in the backend self.backend_api.publish_revision(revision) # Publish issues when there are some if not issues: logger.info("No issues for that revision") return # Clone local repo when required self.clone_repository(revision) # Publish issues in the backend self.backend_api.publish_issues(issues, revision) def start_analysis(self, revision): """ Apply a patch on a local clone and push to try to trigger a new Code review analysis """ logger.info("Starting revision analysis", revision=revision) # Do not process revisions from black-listed users if revision.is_blacklisted: logger.warning("Blacklisted author, stopping there.") return # Cannot run without mercurial cache configured if not settings.mercurial_cache: raise Exception("Mercurial cache must be configured to start analysis") # Cannot run without ssh key if not settings.ssh_key: raise Exception("SSH Key must be configured to start analysis") # Set the Phabricator build as running self.update_status(revision, state=BuildState.Work) if settings.taskcluster_url: self.publish_link( revision, slug="analysis", name="Analysis task", url=settings.taskcluster_url, ) # Initialize Phabricator build using revision build = RevisionBuild(revision) # Copy internal Phabricator credentials to setup libmozevent phabricator = PhabricatorActions( url=self.phabricator.url, api_key=self.phabricator.api_key, ) # Initialize mercurial repository repository = Repository( config={ "name": revision.base_repository_conf.name, "try_name": revision.base_repository_conf.try_name, "url": revision.base_repository_conf.url, "try_url": revision.base_repository_conf.try_url, # Setup ssh identity "ssh_user": revision.base_repository_conf.ssh_user, "ssh_key": settings.ssh_key, # Force usage of robustcheckout "checkout": "robust", }, cache_root=settings.mercurial_cache, ) worker = MercurialWorker( # We are not using the mercurial workflow # so we can initialize with empty data here queue_name=None, queue_phabricator=None, repositories={}, ) # Try to update the state 5 consecutive time for i in range(5): # Update the internal build state using Phabricator infos phabricator.update_state(build) # Continue with workflow once the build is public if build.state is PhabricatorBuildState.Public: break # Retry later if the build is not yet seen as public logger.warning( "Build is not public, retrying in 30s", build=build, retries_left=build.retries, ) time.sleep(30) # Make sure the build is now public if build.state is not PhabricatorBuildState.Public: raise Exception("Cannot process private builds") # When the build is public, load needed details try: phabricator.load_patches_stack(build) logger.info("Loaded stack of patches", build=str(build)) except Exception as e: logger.warning( "Failed to load build details", build=str(build), error=str(e) ) raise if not build.stack: raise Exception("No stack of patches to apply.") # We'll clone the required repository repository.clone() # Apply the stack of patches using asynchronous method # that runs directly in that process output = asyncio.run(worker.handle_build(repository, build)) # Update final state using worker output if self.update_build: publish_analysis_phabricator(output, self.phabricator) else: logger.info("Skipping Phabricator publication") # Send Build in progress or errors to Lando lando_reporter = self.reporters.get("lando") if lando_reporter is not None: publish_analysis_lando(output, lando_reporter.lando_api) else: logger.info("Skipping Lando publication") def clone_repository(self, revision): """ Clone the repo locally when configured On production this should use a Taskcluster cache """ if not settings.mercurial_cache: logger.debug("Local clone not required") return if self.clone_available: logger.debug("Local clone already setup") return logger.info( "Cloning revision to build issues", repo=revision.base_repository, changeset=revision.head_changeset, dest=settings.mercurial_cache_checkout, ) robust_checkout( repo_upstream_url=revision.base_repository, repo_url=revision.head_repository, revision=revision.head_changeset, checkout_dir=settings.mercurial_cache_checkout, sharebase_dir=settings.mercurial_cache_sharebase, ) self.clone_available = True def publish(self, revision, issues, task_failures, notices, reviewers): """ Publish issues on selected reporters """ # Publish patches on Taskcluster # or write locally for local development for patch in revision.improvement_patches: if settings.taskcluster.local: patch.write() else: patch.publish() # Publish issues on backend to retrieve their comparison state publishable_issues = [i for i in issues if i.is_publishable()] self.backend_api.publish_issues(publishable_issues, revision) # Report issues publication stats nb_issues = len(issues) nb_publishable = len(publishable_issues) nb_publishable_errors = len( [i for i in publishable_issues if i.level == Level.Error] ) self.index( revision, state="analyzed", issues=nb_issues, issues_publishable=nb_publishable, ) stats.add_metric("analysis.issues.publishable", nb_publishable) # Publish reports about these issues with stats.timer("runtime.reports"): for reporter in self.reporters.values(): reporter.publish(issues, revision, task_failures, notices, reviewers) self.index( revision, state="done", issues=nb_issues, issues_publishable=nb_publishable ) # Publish final HarborMaster state self.update_status( revision, BuildState.Fail if nb_publishable_errors > 0 or task_failures else BuildState.Pass, ) def index(self, revision, **kwargs): """ Index current task on Taskcluster index """ assert isinstance(revision, Revision) if settings.taskcluster.local or self.index_service is None: logger.info("Skipping taskcluster indexing", rev=str(revision), **kwargs) return # Build payload payload = revision.as_dict() payload.update(kwargs) # Always add the indexing now = datetime.utcnow() payload["indexed"] = stringDate(now) # Always add the source and try config payload["source"] = "try" payload["try_task_id"] = settings.try_task_id payload["try_group_id"] = settings.try_group_id # Always add the repository we are working on # This is mainly used by the frontend to list & filter diffs payload["repository"] = revision.base_repository # Add restartable flag for monitoring payload["monitoring_restart"] = payload["state"] == "error" and payload.get( "error_code" ) in ("watchdog", "mercurial") # Add a sub namespace with the task id to be able to list # tasks from the parent namespace namespaces = revision.namespaces + [ f"{namespace}.{settings.taskcluster.task_id}" for namespace in revision.namespaces ] # Build complete namespaces list, with monitoring update full_namespaces = [ TASKCLUSTER_NAMESPACE.format(channel=settings.app_channel, name=name) for name in namespaces ] # Index for all required namespaces for namespace in full_namespaces: self.index_service.insertTask( namespace, { "taskId": settings.taskcluster.task_id, "rank": 0, "data": payload, "expires": stringDate(now + timedelta(days=TASKCLUSTER_INDEX_TTL)), }, ) def find_previous_issues(self, issues, base_rev_changeset=None): """ Look for known issues in the backend matching the given list of issues If a base revision ID is provided, compare to issues detected on this revision Otherwise, compare to issues detected on last ingested revision """ assert ( self.backend_api.enabled ), "Backend storage is disabled, comparing issues is not possible" current_date = datetime.now().strftime("%Y-%m-%d") # Group issues by path, so we only list know issues for the affected files issues_groups = groupby( sorted(issues, key=lambda i: i.path), lambda i: i.path, ) logger.info( "Checking for existing issues in the backend", base_revision_changeset=base_rev_changeset, ) for path, group_issues in issues_groups: known_issues = self.backend_api.list_repo_issues( "mozilla-central", date=current_date, revision_changeset=base_rev_changeset, path=path, ) hashes = [issue["hash"] for issue in known_issues] for issue in group_issues: issue.new_issue = bool(issue.hash and issue.hash not in hashes) def find_issues(self, revision, group_id): """ Find all issues on remote Taskcluster task group """ # Load all tasks in task group tasks = self.queue_service.listTaskGroup(group_id) assert "tasks" in tasks tasks = {task["status"]["taskId"]: task for task in tasks["tasks"]} assert len(tasks) > 0 logger.info("Loaded Taskcluster group", id=group_id, tasks=len(tasks)) # Store the revision in the backend (or retrieve an existing one) rev = self.backend_api.publish_revision(revision) assert ( rev is not None ), "Stopping early because revision could not be created nor retrieved from the backend" # Load task description task = tasks.get(settings.try_task_id) assert task is not None, f"Missing task {settings.try_task_id}" dependencies = task["task"]["dependencies"] assert len(dependencies) > 0, "No task dependencies to analyze" # Skip dependencies not in group # But log all skipped tasks def _in_group(dep_id): if dep_id not in tasks: # Used for docker images produced in tree # and other artifacts logger.info("Skip dependency not in group", task_id=dep_id) return False return True dependencies = [dep_id for dep_id in dependencies if _in_group(dep_id)] # Do not run parsers when we only have a gecko decision task # That means no analyzer were triggered by the taskgraph decision task # This can happen if the patch only touches file types for which we have no analyzer defined # See issue https://github.com/mozilla/release-services/issues/2055 if len(dependencies) == 1: task = tasks[dependencies[0]] if task["task"]["metadata"]["name"] == "Gecko Decision Task": logger.warn("Only dependency is a Decision Task, skipping analysis") return [], [], [], [] # Add zero-coverage task if self.zero_coverage_enabled: dependencies.append(ZeroCoverageTask) # Find issues and patches in dependencies issues = [] task_failures = [] notices = [] for dep in dependencies: try: if isinstance(dep, type) and issubclass(dep, AnalysisTask): # Build a class instance from its definition and route task = dep.build_from_route(self.index_service, self.queue_service) else: # Use a task from its id & description task = self.build_task(tasks[dep]) if task is None: continue artifacts = task.load_artifacts(self.queue_service) if artifacts is not None: task_issues, task_patches = [], [] if isinstance(task, AnalysisTask): task_issues = task.parse_issues(artifacts, revision) logger.info( f"Found {len(task_issues)} issues", task=task.name, id=task.id, ) stats.report_task(task, task_issues) issues += task_issues task_patches = task.build_patches(artifacts) for patch in task_patches: revision.add_improvement_patch(task, patch) elif isinstance(task, NoticeTask): notice = task.build_notice(artifacts, revision) if notice: notices.append(notice) # Report a problem when tasks in erroneous state are found # but no issue or patch has been processed by the bot if task.state == "failed" and not task_issues and not task_patches: # Skip task that are listed as ignorable (we try to avoid unnecessary spam) if task.name in self.task_failures_ignored: logger.warning( "Ignoring task failure as configured", task=task.name, id=task.id, ) continue logger.warning( "An erroneous task processed some artifacts and found no issues or patches", task=task.name, id=task.id, ) task_failures.append(task) except Exception as e: logger.warn( "Failure during task analysis", task=settings.taskcluster.task_id, error=e, ) raise reviewers = ( task.extra_reviewers_groups if task and isinstance(task, BaseTask) else [] ) return issues, task_failures, notices, reviewers def build_task(self, task_status): """ Create a specific implementation of AnalysisTask according to the task name """ try: task_id = task_status["status"]["taskId"] except KeyError: raise Exception(f"Cannot read task name {task_id}") try: name = task_status["task"]["metadata"]["name"] except KeyError: raise Exception(f"Cannot read task name {task_id}") # Default format is used first when the correct artifact is available if DefaultTask.matches(task_id): return DefaultTask(task_id, task_status) elif name.startswith("source-test-mozlint-"): return MozLintTask(task_id, task_status) elif name == "source-test-clang-tidy": return ClangTidyTask(task_id, task_status) elif name == "source-test-clang-format": return ClangFormatTask(task_id, task_status) elif name == "source-test-doc-upload": return DocUploadTask(task_id, task_status) elif name == "source-test-clang-external": return ExternalTidyTask(task_id, task_status) elif name == "source-test-taskgraph-diff": return TaskGraphDiffTask(task_id, task_status) elif settings.autoland_group_id is not None and not name.startswith( "source-test-" ): # Log cleanly on autoland unknown tasks logger.info("Skipping unknown task", id=task_id, name=name) else: return None def update_status(self, revision, state): """ Update build status on HarborMaster """ assert isinstance(state, BuildState) if not revision.build_target_phid: logger.info( "No build target found, skipping HarborMaster update", state=state.value ) return if not self.update_build: logger.info( "Update build disabled, skipping HarborMaster update", state=state.value ) return self.phabricator.update_build_target(revision.build_target_phid, state) logger.info("Updated HarborMaster status", state=state, revision=revision) def publish_link(self, revision: Revision, slug: str, name: str, url: str): """ Publish a link as a HarborMaster artifact """ if not revision.build_target_phid: logger.info( "No build target found, skipping HarborMaster link creation", slug=slug, url=url, ) return if not self.update_build: logger.info( "Update build disabled, skipping HarborMaster link creation", slug=slug, url=url, ) return self.phabricator.create_harbormaster_uri( revision.build_target_phid, slug, name, url )