http_service/bugbug_http/app.py (465 lines of code) (raw):

# -*- coding: utf-8 -*- # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this file, # You can obtain one at http://mozilla.org/MPL/2.0/. import gzip import logging import os import uuid from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone from typing import Any, Callable, Sequence from urllib.parse import urlparse import orjson import zstandard from apispec import APISpec from apispec.ext.marshmallow import MarshmallowPlugin from apispec_webframeworks.flask import FlaskPlugin from cerberus import Validator from flask import Flask, Response, jsonify, render_template, request from flask_cors import cross_origin from libmozdata.bugzilla import Bugzilla from marshmallow import Schema, fields from redis import Redis from rq import Queue from rq.exceptions import NoSuchJobError from rq.job import Job from sentry_sdk.integrations.flask import FlaskIntegration from bugbug import bugzilla, get_bugbug_version, utils from bugbug_http.models import ( MODELS_NAMES, classify_broken_site_report, classify_bug, classify_issue, get_config_specific_groups, schedule_tests, ) from bugbug_http.sentry import setup_sentry if os.environ.get("SENTRY_DSN"): setup_sentry(dsn=os.environ.get("SENTRY_DSN"), integrations=[FlaskIntegration()]) utils.setup_libmozdata() API_TOKEN = "X-Api-Key" API_DESCRIPTION = """ This is the documentation for the BubBug http service, the platform for Bugzilla Machine Learning projects. # Introduction This service can be used to classify a given bug using a pre-trained model. You can classify a single bug or a batch of bugs. The classification happens in the background so you need to call back the service for getting the results. """ spec = APISpec( title="Bugbug", version=get_bugbug_version(), openapi_version="3.0.2", info=dict(description=API_DESCRIPTION), plugins=[FlaskPlugin(), MarshmallowPlugin()], security=[{"api_key": []}], ) application = Flask(__name__) url = urlparse(os.environ.get("REDIS_URL", "redis://localhost/0")) assert url.hostname is not None redis_conn = Redis( host=url.hostname, port=url.port if url.port is not None else 6379, password=url.password, ssl=True if url.scheme == "rediss" else False, ssl_cert_reqs=None, ) # Kill jobs which don't finish within 12 minutes. JOB_TIMEOUT = 12 * 60 # Kill Bugzilla jobs which don't finish within 5 minutes. BUGZILLA_JOB_TIMEOUT = 5 * 60 # Remove jobs from the queue if they haven't started within 7 minutes. QUEUE_TIMEOUT = 7 * 60 # Store the information that a job failed for 3 minutes. FAILURE_TTL = 3 * 60 q = Queue( connection=redis_conn, default_timeout=JOB_TIMEOUT ) # no args implies the default queue VALIDATOR = Validator() BUGZILLA_TOKEN = os.environ.get("BUGBUG_BUGZILLA_TOKEN") GITHUB_TOKEN = os.environ.get("BUGBUG_GITHUB_TOKEN") dctx = zstandard.ZstdDecompressor() logging.basicConfig(level=logging.DEBUG) LOGGER = logging.getLogger() class BugPrediction(Schema): prob = fields.List(fields.Float()) index = fields.Integer() suggestion = fields.Str() extra_data = fields.Dict() class NotAvailableYet(Schema): ready = fields.Boolean(enum=[False]) class ModelName(Schema): model_name = fields.Str(enum=MODELS_NAMES, example="component") class UnauthorizedError(Schema): message = fields.Str(default="Error, missing X-API-KEY") class BranchName(Schema): branch = fields.Str(example="autoland") class Schedules(Schema): tasks = fields.List(fields.Str) groups = fields.List(fields.Str) spec.components.schema(BugPrediction.__name__, schema=BugPrediction) spec.components.schema(NotAvailableYet.__name__, schema=NotAvailableYet) spec.components.schema(ModelName.__name__, schema=ModelName) spec.components.schema(UnauthorizedError.__name__, schema=UnauthorizedError) spec.components.schema(BranchName.__name__, schema=BranchName) spec.components.schema(Schedules.__name__, schema=Schedules) api_key_scheme = {"type": "apiKey", "in": "header", "name": "X-API-Key"} spec.components.security_scheme("api_key", api_key_scheme) @dataclass(init=False, frozen=True) class JobInfo: func: Callable[..., str] args: Sequence[Any] = field(default_factory=list) def __init__(self, func, *args): # Custom __init__ is needed to support *args, and object.__setattr__ is # needed for 'frozen=True'. object.__setattr__(self, "func", func) object.__setattr__(self, "args", args) def __str__(self): return f"{self.func.__name__}:{'_'.join(map(str, self.args))}" @property def mapping_key(self): """The mapping key for this job. Returns: (str) A key to be used to for job ids. """ return f"bugbug:job_id:{self}" @property def result_key(self): """The result key for this job. Returns: (str) A key to be used to for job results. """ return f"bugbug:job_result:{self}" @property def change_time_key(self): """The change time key for this job. Returns: (str) A key to be used to for job change time. """ return f"bugbug:change_time:{self}" def get_job_id() -> str: return uuid.uuid4().hex def init_job(job: JobInfo, job_id: str | None = None) -> str: job_id = job_id or get_job_id() redis_conn.mset({job.mapping_key: job_id}) return job_id def schedule_job( job: JobInfo, job_id: str | None = None, timeout: int | None = None ) -> None: job_id = init_job(job, job_id) q.enqueue( job.func, *job.args, job_id=job_id, job_timeout=timeout, ttl=QUEUE_TIMEOUT, failure_ttl=FAILURE_TTL, ) def prepare_queue_job( job: JobInfo, job_id: str | None = None, timeout: int | None = None ) -> Queue: job_id = init_job(job, job_id) return Queue.prepare_data( job.func, args=job.args, job_id=job_id, timeout=timeout, ttl=QUEUE_TIMEOUT, failure_ttl=FAILURE_TTL, ) def create_bug_classification_jobs( model_name: str, bug_ids: Sequence[int] ) -> tuple[JobInfo, str, int]: """Create job_id and redis connection""" job_id = get_job_id() # Set the mapping before queuing to avoid some race conditions job_id_mapping = {} for bug_id in bug_ids: key = JobInfo(classify_bug, model_name, bug_id).mapping_key job_id_mapping[key] = job_id redis_conn.mset(job_id_mapping) return ( JobInfo(classify_bug, model_name, bug_ids, BUGZILLA_TOKEN), job_id, BUGZILLA_JOB_TIMEOUT, ) def create_broken_site_report_classification_jobs( model_name: str, reports: list[dict] ) -> tuple[JobInfo, str, int]: """Create job_id and redis connection""" job_id = get_job_id() # Set the mapping before queuing to avoid some race conditions job_id_mapping = {} for report in reports: key = JobInfo( classify_broken_site_report, model_name, report["uuid"] ).mapping_key job_id_mapping[key] = job_id redis_conn.mset(job_id_mapping) return ( JobInfo(classify_broken_site_report, model_name, reports), job_id, JOB_TIMEOUT, ) def schedule_issue_classification( model_name: str, owner: str, repo: str, issue_nums: Sequence[int] ) -> None: """Schedule the classification of a issue_id list""" job_id = get_job_id() # Set the mapping before queuing to avoid some race conditions job_id_mapping = {} for issue_num in issue_nums: key = JobInfo(classify_issue, model_name, owner, repo, issue_num).mapping_key job_id_mapping[key] = job_id redis_conn.mset(job_id_mapping) schedule_job( JobInfo(classify_issue, model_name, owner, repo, issue_nums), job_id=job_id, timeout=BUGZILLA_JOB_TIMEOUT, ) def is_pending(job): # Check if there is a job job_id = redis_conn.get(job.mapping_key) if not job_id: LOGGER.debug(f"No job ID mapping {job_id}, False") return False try: job = Job.fetch(job_id.decode("ascii"), connection=redis_conn) except NoSuchJobError: LOGGER.debug(f"No job in DB for {job_id}, False") # The job might have expired from redis return False job_status = job.get_status() if job_status == "started": LOGGER.debug(f"Job {job_id} is running, True") return True # Enforce job timeout as RQ doesn't seems to do it https://github.com/rq/rq/issues/758 timeout_datetime = job.enqueued_at + timedelta(seconds=job.timeout) utcnow = datetime.now(timezone.utc) if timeout_datetime < utcnow: # Remove the timeouted job so it will be requeued job.cancel() job.cleanup() LOGGER.debug(f"Job timeout {job_id}, False") return False if job_status == "queued": LOGGER.debug(f"Job {job_id} is queued, True") return True LOGGER.debug(f"Job {job_id} has status {job_status}, False") return False def get_bugs_last_change_time(bug_ids): bugzilla.set_token(BUGZILLA_TOKEN) old_CHUNK_SIZE = Bugzilla.BUGZILLA_CHUNK_SIZE try: Bugzilla.BUGZILLA_CHUNK_SIZE = 700 bugs = {} def bughandler(bug): bugs[bug["id"]] = bug["last_change_time"] Bugzilla( bugids=bug_ids, bughandler=bughandler, include_fields=["id", "last_change_time"], ).get_data().wait() finally: Bugzilla.BUGZILLA_CHUNK_SIZE = old_CHUNK_SIZE return bugs def get_github_issues_update_time( owner: str, repo: str, issue_nums: Sequence[int] ) -> dict: header = {"Authorization": "token {}".format(GITHUB_TOKEN)} repo_url = f"https://api.github.com/repos/{owner}/{repo}/issues/" issues = {} for issue_num in issue_nums: issue_url = repo_url + str(issue_num) response = utils.get_session("github").get(issue_url, headers=header) response.raise_for_status() raw_issue = response.json() issues[raw_issue["number"]] = raw_issue["updated_at"] return issues def is_prediction_invalidated(job, change_time): # First get the saved change time saved_change_time = redis_conn.get(job.change_time_key) # If we have no last changed time, the bug was not classified yet or the bug was classified by an old worker if not saved_change_time: # We can have a result without a cache time if redis_conn.exists(job.result_key): return True return False return saved_change_time.decode("utf-8") != change_time def clean_prediction_cache(job): # If the bug was modified since last time we classified it, clear the cache to avoid stale answer LOGGER.debug(f"Cleaning results for {job}") redis_conn.delete(job.result_key) redis_conn.delete(job.change_time_key) def get_result(job: JobInfo) -> Any | None: LOGGER.debug(f"Checking for existing results at {job.result_key}") result = redis_conn.get(job.result_key) if result: LOGGER.debug(f"Found {result!r}") try: result = dctx.decompress(result) except zstandard.ZstdError: # Some job results were stored before compression was enabled. # We can remove the exception handling after enough time has passed # since 47114f4f47db6b73214cf946377be8da945d34b5. pass assert result is not None # mypy thinks it could be None return orjson.loads(result) return None def compress_response(data: dict, status_code: int): """Compress data using gzip compressor and frame response :param data: data :type data: dict :param status_code: response status code :type status_code: int :return: response with gzip compressed data :rtype: Response """ gzip_buffer = gzip.compress(orjson.dumps(data), compresslevel=9) response = Response(status=status_code) response.set_data(gzip_buffer) response.headers["Content-Encoding"] = "gzip" response.headers["Content-Length"] = len(gzip_buffer) response.headers["Content-Type"] = "application/json" return response @application.route("/<model_name>/predict/<int:bug_id>") @cross_origin() def model_prediction(model_name, bug_id): """ --- get: description: Classify a single bug using given model, answer either 200 if the bug is processed or 202 if the bug is being processed summary: Classify a single bug parameters: - name: model_name in: path schema: ModelName - name: bug_id in: path schema: type: integer example: 123456 responses: 200: description: A single bug prediction content: application/json: schema: BugPrediction 202: description: A temporary answer for the bug being processed content: application/json: schema: NotAvailableYet 401: description: API key is missing content: application/json: schema: UnauthorizedError """ headers = request.headers auth = headers.get(API_TOKEN) if not auth: return jsonify(UnauthorizedError().dump({})), 401 else: LOGGER.info("Request with API TOKEN %r", auth) if model_name not in MODELS_NAMES: return jsonify({"error": f"Model {model_name} doesn't exist"}), 404 # Get the latest change from Bugzilla for the bug bug = get_bugs_last_change_time([bug_id]) # Change time could be None if it's a security bug job = JobInfo(classify_bug, model_name, bug_id) bug_change_time = bug.get(bug_id) if bug_change_time and is_prediction_invalidated(job, bug[bug_id]): clean_prediction_cache(job) status_code = 200 data = get_result(job) if not data: if not is_pending(job): job_info, job_id, timeout = create_bug_classification_jobs( model_name, [bug_id] ) schedule_job(job_info, job_id=job_id, timeout=timeout) status_code = 202 data = {"ready": False} return compress_response(data, status_code) @application.route( "/<model_name>/predict/github/<string:owner>/<string:repo>/<int:issue_num>" ) @cross_origin() def model_prediction_github(model_name, owner, repo, issue_num): """ --- get: description: Classify a single issue using given model, answer either 200 if the issue is processed or 202 if the issue is being processed summary: Classify a single issue parameters: - name: model_name in: path schema: ModelName - name: owner in: path schema: type: str example: webcompat - name: repo in: path schema: type: str example: web-bugs - name: issue_number in: path schema: type: integer example: 123456 responses: 200: description: A single issue prediction content: application/json: schema: BugPrediction 202: description: A temporary answer for the issue being processed content: application/json: schema: NotAvailableYet 401: description: API key is missing content: application/json: schema: UnauthorizedError """ headers = request.headers auth = headers.get(API_TOKEN) if not auth: return jsonify(UnauthorizedError().dump({})), 401 else: LOGGER.info("Request with API TOKEN %r", auth) if model_name not in MODELS_NAMES: return jsonify({"error": f"Model {model_name} doesn't exist"}), 404 # Get the latest change date from github for the issue update_time = get_github_issues_update_time(owner, repo, [issue_num]) job = JobInfo(classify_issue, model_name, owner, repo, issue_num) issue_change_time = update_time.get(issue_num) if issue_change_time and is_prediction_invalidated(job, update_time[issue_num]): clean_prediction_cache(job) status_code = 200 data = get_result(job) if not data: if not is_pending(job): schedule_issue_classification(model_name, owner, repo, [issue_num]) status_code = 202 data = {"ready": False} return compress_response(data, status_code) @application.route("/<model_name>/predict/batch", methods=["POST"]) @cross_origin() def batch_prediction(model_name): """ --- post: description: > Post a batch of bug ids to classify, answer either 200 if all bugs are processed or 202 if at least one bug is not processed. <br/><br/> Starts by sending a batch of bugs ids like this:<br/> ``` {"bugs": [123, 456]} ```<br/><br> You will likely get a 202 answer that indicates that no result is available yet for any of the bug id you provided with the following body:<br/> ``` {"bugs": {"123": {ready: False}, "456": {ready: False}}} ```<br/><br/> Call back the same endpoint with the same bug ids a bit later, and you will get the results.<br/><br/> You might get the following output if some bugs are not available: <br/> ``` {"bugs": {"123": {"available": False}}} ```<br/><br/> And you will get the following output once the bugs are available: <br/> ``` {"bugs": {"456": {"extra_data": {}, "index": 0, "prob": [0], "suggestion": ""}}} ```<br/><br/> Please be aware that each bug could be in a different state, so the following output, where a bug is returned and another one is still being processed, is valid: <br/> ``` {"bugs": {"123": {"available": False}, "456": {"extra_data": {}, "index": 0, "prob": [0], "suggestion": ""}}} ``` summary: Classify a batch of bugs parameters: - name: model_name in: path schema: ModelName requestBody: description: The list of bugs to classify content: application/json: schema: type: object properties: bugs: type: array items: type: integer examples: cat: summary: An example of payload value: bugs: [123456, 789012] responses: 200: description: A list of results content: application/json: schema: type: object additionalProperties: true example: bugs: 123456: extra_data: {} index: 0 prob: [0] suggestion: string 789012: extra_data: {} index: 0 prob: [0] suggestion: string 202: description: A temporary answer for bugs being processed content: application/json: schema: type: object items: type: object properties: ready: type: boolean enum: [False] example: bugs: 123456: extra_data: {} index: 0 prob: [0] suggestion: string 789012: {ready: False} 401: description: API key is missing content: application/json: schema: UnauthorizedError """ headers = request.headers auth = headers.get(API_TOKEN) if not auth: return jsonify(UnauthorizedError().dump({})), 401 else: LOGGER.info("Request with API TOKEN %r", auth) if model_name not in MODELS_NAMES: return jsonify({"error": f"Model {model_name} doesn't exist"}), 404 # TODO Check is JSON is valid and validate against a request schema batch_body = orjson.loads(request.data) # Validate schema = { "bugs": { "type": "list", "minlength": 1, "schema": {"type": "integer"}, } } validator = Validator() if not validator.validate(batch_body, schema): return jsonify({"errors": validator.errors}), 400 bugs = batch_body["bugs"] status_code = 200 data = {} missing_bugs = [] bug_change_dates = get_bugs_last_change_time(bugs) for bug_id in bugs: job = JobInfo(classify_bug, model_name, bug_id) change_time = bug_change_dates.get(int(bug_id)) # Change time could be None if it's a security bug if change_time and is_prediction_invalidated(job, change_time): clean_prediction_cache(job) data[str(bug_id)] = get_result(job) if not data[str(bug_id)]: if not is_pending(job): missing_bugs.append(bug_id) status_code = 202 data[str(bug_id)] = {"ready": False} queueJobList: Queue = [] for i in range(0, len(missing_bugs), 100): bug_ids = missing_bugs[i : (i + 100)] job_info, job_id, timeout = create_bug_classification_jobs(model_name, bug_ids) queueJobList.append(prepare_queue_job(job_info, job_id=job_id, timeout=timeout)) q.enqueue_many(queueJobList) return compress_response({"bugs": data}, status_code) @application.route("/<model_name>/predict/broken_site_report/batch", methods=["POST"]) @cross_origin() def batch_prediction_broken_site_report(model_name): """ --- post: description: > Post a batch of reports to classify, answer either 200 if all are are processed or 202 if at least one report is not processed. <br/><br/> Starts by sending a batch of reports like this:<br/> ``` {"reports": [{"uuid": "954dbc23-91e6-4d6f-a10a-405f46663e31", "title: "https://example.com", "body": "Loads blank page."}]} ```<br/><br> You will likely get a 202 answer that indicates that no result is available yet for any of the reports id you provided with the following body:<br/> ``` {"reports": {"<uuid 1>": {ready: False}, "<uuid 2>": {ready: False}}} ```<br/><br/> Call back the same endpoint with the same uuids a bit it later, and you will get the results.<br/><br/> You might get the following output if some bugs are not available: <br/> ``` {"reports": {"<uuid 1>": {"available": False}}} ```<br/><br/> And you will get the following output once the bugs are available: <br/> ``` {"reports": {"<uuid 1>": {"extra_data": {}, "index": 0, "prob": [0], "suggestion": ""}}} ```<br/><br/> Please be aware that each report could be in a different state, so the following output, where a report is returned and another one is still being processed, is valid: <br/> ``` {"reports": {"<uuid 1>": {"available": False}, "<uuid 2>": {"extra_data": {}, "index": 0, "prob": [0], "suggestion": ""}}} ``` summary: Classify a batch of reports parameters: - name: model_name in: path schema: ModelName requestBody: description: The list of reports to classify content: application/json: schema: type: object properties: reports: type: array items: type: object properties: uuid: type: string title: type: string body: type: string examples: cat: summary: An example of payload value: reports: - uuid: "954dbc23-91e6-4d6f-a10a-405f46663e31" title: "https://example.com" body: "Loads blank page." responses: 200: description: A list of results content: application/json: schema: type: object additionalProperties: true example: reports: <uuid 1>: extra_data: {} index: 0 prob: [0] suggestion: string <uuid 2>: extra_data: {} index: 0 prob: [0] suggestion: string 202: description: A temporary answer for reports being processed content: application/json: schema: type: object items: type: object properties: ready: type: boolean enum: [False] example: reports: <uuid 1>: extra_data: {} index: 0 prob: [0] suggestion: string <uuid 2>: {ready: False} 401: description: API key is missing content: application/json: schema: UnauthorizedError """ headers = request.headers auth = headers.get(API_TOKEN) if not auth: return jsonify(UnauthorizedError().dump({})), 401 else: LOGGER.info("Request with API TOKEN %r", auth) if model_name not in MODELS_NAMES: return jsonify({"error": f"Model {model_name} doesn't exist"}), 404 batch_body = orjson.loads(request.data) schema = { "reports": { "type": "list", "minlength": 1, "schema": { "type": "dict", "schema": { "uuid": {"type": "string", "required": True}, "title": {"type": "string", "required": True}, "body": {"type": "string", "required": True}, }, }, } } validator = Validator() if not validator.validate(batch_body, schema): return jsonify({"errors": validator.errors}), 400 reports = batch_body["reports"] status_code = 200 data = {} missing_reports = [] for report in reports: report_uuid = report["uuid"] job = JobInfo(classify_broken_site_report, model_name, report_uuid) data[report_uuid] = get_result(job) if not data[report_uuid]: if not is_pending(job): missing_reports.append(report) status_code = 202 data[report_uuid] = {"ready": False} queueJobList: Queue = [] for i in range(0, len(missing_reports), 100): reports = missing_reports[i : (i + 100)] job_info, job_id, timeout = create_broken_site_report_classification_jobs( model_name, reports ) queueJobList.append(prepare_queue_job(job_info, job_id=job_id, timeout=timeout)) q.enqueue_many(queueJobList) return compress_response({"reports": data}, status_code) @application.route("/push/<path:branch>/<rev>/schedules") @cross_origin() def push_schedules(branch, rev): """ --- get: description: Determine which tests and tasks a push should schedule. summary: Get which tests and tasks to schedule. parameters: - name: branch in: path schema: BranchName - name: rev in: path schema: type: str example: 76383a875678 responses: 200: description: A dict of tests and tasks to schedule. content: application/json: schema: Schedules 202: description: Request is still being processed. content: application/json: schema: NotAvailableYet 401: description: API key is missing content: application/json: schema: UnauthorizedError """ headers = request.headers auth = headers.get(API_TOKEN) if not auth: return jsonify(UnauthorizedError().dump({})), 401 else: LOGGER.info("Request with API TOKEN %r", auth) # Support the string 'autoland' for convenience. if branch == "autoland": branch = "integration/autoland" job = JobInfo(schedule_tests, branch, rev) data = get_result(job) if data: return compress_response(data, 200) if not is_pending(job): schedule_job(job) return jsonify({"ready": False}), 202 @application.route("/config_specific_groups/<path:config>") @cross_origin() def config_specific_groups(config: str) -> tuple[Response, int]: """ --- get: description: Determine which groups could possibly exclusively fail on the given configuration. summary: Get config-specific groups. parameters: - name: config in: path schema: type: str example: test-windows7-32/opt-*-e10s responses: 200: description: A list of groups that could specifically fail on the given configuration. content: application/json: schema: type: array items: type: string 202: description: Request is still being processed. content: application/json: schema: NotAvailableYet 401: description: API key is missing content: application/json: schema: UnauthorizedError """ headers = request.headers auth = headers.get(API_TOKEN) if not auth: return jsonify(UnauthorizedError().dump({})), 401 else: LOGGER.info("Request with API TOKEN %r", auth) job = JobInfo(get_config_specific_groups, config) data = get_result(job) if data is not None: return compress_response(data, 200) if not is_pending(job): schedule_job(job) return jsonify({"ready": False}), 202 @application.route("/swagger") @cross_origin() def swagger(): for name, rule in application.view_functions.items(): # Ignore static endpoint as it isn't documented with OpenAPI if name == "static": continue spec.path(view=rule) return jsonify(spec.to_dict()) @application.route("/doc") def doc(): return render_template("doc.html")