treeherder/etl/bugzilla.py (287 lines of code) (raw):

import logging from datetime import datetime, timedelta import dateutil.parser import requests from django.conf import settings from django.db.models import Count, Max from treeherder.model.models import BugJobMap, Bugscache from treeherder.utils.github import fetch_json from treeherder.utils.http import make_request logger = logging.getLogger(__name__) def reopen_request(url, method, headers, json): make_request(url, method=method, headers=headers, json=json) def reopen_intermittent_bugs(minimum_failures_to_reopen=1): # Don't reopen bugs from non-production deployments. if settings.BUGFILER_API_KEY is None: return incomplete_bugs = set( Bugscache.objects.filter(resolution="INCOMPLETE", bugzilla_id__isnull=False).values_list( "bugzilla_id", flat=True ) ) # Intermittent bugs get closed after 3 weeks of inactivity if other conditions don't apply: # https://github.com/mozilla/relman-auto-nag/blob/c7439e247677333c1cd8c435234b3ef3adc49680/auto_nag/scripts/close_intermittents.py#L17 recent_days = 7 recently_used_bugs = set( BugJobMap.objects.filter(created__gt=(datetime.now() - timedelta(recent_days))) .filter(bug__bugzilla_id__isnull=False) .values("bug__bugzilla_id") .annotate(num_failures=Count("bug__bugzilla_id")) .filter(num_failures__gte=minimum_failures_to_reopen) .values_list("bug__bugzilla_id", flat=True) ) bugs_to_reopen = incomplete_bugs & recently_used_bugs for bugzilla_id in bugs_to_reopen: bug_data = ( BugJobMap.objects.filter(bug__bugzilla_id=bugzilla_id) .select_related("job__repository") .order_by("-created") .values("job_id", "job__repository__name")[0] ) job_id = bug_data.get("job_id") repository = bug_data.get("job__repository__name") log_url = f"https://treeherder.mozilla.org/logviewer?job_id={job_id}&repo={repository}" comment = {"body": "New failure instance: " + log_url} url = settings.BUGFILER_API_URL + "/rest/bug/" + str(bugzilla_id) headers = { "x-bugzilla-api-key": settings.BUGFILER_API_KEY, "Accept": "application/json", } data = { "status": "REOPENED", "comment": comment, "comment_tags": "treeherder", } try: reopen_request(url, method="PUT", headers=headers, json=data) # NOTE: this will only toggle 1 bug_job_map entry, not all (if there are retriggers) BugJobMap.objects.filter(job_id=job_id, bug__bugzilla_id=bugzilla_id).update( bug_open=True ) except requests.exceptions.HTTPError as e: try: message = e.response.json()["message"] except (ValueError, KeyError): message = e.response.text logger.error(f"Reopening bug {str(bugzilla_id)} failed: {message}") def fetch_intermittent_bugs(additional_params, limit, duplicate_chain_length): url = settings.BZ_API_URL + "/rest/bug" params = { "include_fields": ",".join( [ "id", "summary", "status", "resolution", "dupe_of", "duplicates", "cf_crash_signature", "keywords", "last_change_time", "whiteboard", ] ), "limit": limit, } params.update(additional_params) response = fetch_json(url, params=params) return response.get("bugs", []) class BzApiBugProcess: minimum_failures_to_reopen = 3 def run(self): year_ago = datetime.utcnow() - timedelta(days=365) last_change_time_max = ( Bugscache.objects.filter(bugzilla_id__isnull=False).aggregate(Max("modified"))[ "modified__max" ] or None ) if last_change_time_max: last_change_time_max -= timedelta(minutes=10) else: last_change_time_max = year_ago max_summary_length = Bugscache._meta.get_field("summary").max_length max_whiteboard_length = Bugscache._meta.get_field("whiteboard").max_length last_change_time_string = last_change_time_max.strftime("%Y-%m-%dT%H:%M:%SZ") bugs_to_duplicates = {} duplicates_to_bugs = {} insert_errors_observed = False duplicates_to_check = set() # The bugs are ingested in different phases: # 1. Intermittent bugs with activity in the bug in the last year # (Bugzilla seed). Iteration 0. # 2. Bugs used for classification (classification seed). They will be # part of the previous phase once a report about the classification # has been posted in the bug (schedule weekly or daily). # Processed as part of iteration 1. # 3. For bugs which have been resolved as duplicates, the bugs as whose # duplicates they have been set will be fetched. The open bugs will # be used to store the classifications. Iterations 1-5. # 4. Duplicates of the bugs from previous phases get fetched. Duplicate # bugs included in those eventually end up here due to inactivity but # are still needed for matching failure lines against bug summaries. # Iterations 6-10. duplicate_chain_length = -1 # make flake8 happy bugs_to_process = [] while duplicate_chain_length < 10: duplicate_chain_length += 1 if duplicate_chain_length > 0: bugs_to_process = list( bugs_to_process - set( Bugscache.objects.filter( processed_update=True, bugzilla_id__isnull=False ).values_list("bugzilla_id", flat=True) ) ) if len(bugs_to_process) == 0: break bug_list = [] bugs_count_limit = 500 bugs_offset = 0 # Keep querying Bugzilla until there are no more results. while True: if duplicate_chain_length == 0: additional_params = { "keywords": "intermittent-failure", "last_change_time": last_change_time_string, "offset": bugs_offset, } else: additional_params = { "id": ",".join( list( map( str, bugs_to_process[bugs_offset : bugs_offset + bugs_count_limit], ) ) ), } bug_results_chunk = fetch_intermittent_bugs( additional_params, bugs_count_limit, duplicate_chain_length ) bug_list += bug_results_chunk bugs_offset += bugs_count_limit if duplicate_chain_length == 0 and len(bug_results_chunk) < bugs_count_limit: break if duplicate_chain_length > 0 and bugs_offset >= len(bugs_to_process): break bugs_to_process_next = set() if bug_list: if duplicate_chain_length == 0: Bugscache.objects.exclude(summary="(no bug data fetched)").exclude( bugzilla_id__in=BugJobMap.objects.distinct("bug__bugzilla_id").values_list( "bug__bugzilla_id", flat=True ) ).filter(modified__lt=year_ago, bugzilla_id__isnull=False).delete() Bugscache.objects.filter(bugzilla_id__isnull=False).update( processed_update=False ) for bug in bug_list: # we currently don't support timezones in treeherder, so # just ignore it when importing/updating the bug to avoid # a ValueError try: dupe_of = bug.get("dupe_of", None) if ( dupe_of is not None and not Bugscache.objects.filter(bugzilla_id=dupe_of).exists() ): Bugscache.objects.update_or_create( bugzilla_id=dupe_of, defaults={ "modified": datetime(1971, 1, 1), "summary": "(no bug data fetched)", "processed_update": False, }, ) Bugscache.objects.update_or_create( bugzilla_id=bug["id"], defaults={ "status": bug.get("status", ""), "resolution": bug.get("resolution", ""), "summary": bug.get("summary", "")[:max_summary_length], "dupe_of": dupe_of, "crash_signature": bug.get("cf_crash_signature", ""), "keywords": ",".join(bug["keywords"]), "modified": dateutil.parser.parse( bug["last_change_time"], ignoretz=True ), "whiteboard": bug.get("whiteboard", "")[:max_whiteboard_length], "processed_update": True, }, ) except Exception as e: logger.error("error inserting bug '%s' into db: %s", bug, e) insert_errors_observed = True continue if dupe_of is not None: openish = ( duplicates_to_bugs[dupe_of] if dupe_of in duplicates_to_bugs else dupe_of ) duplicates_to_bugs[bug["id"]] = openish if openish not in bugs_to_duplicates: bugs_to_process_next.add(openish) bugs_to_duplicates[openish] = set() bugs_to_duplicates[openish].add(bug["id"]) if bug["id"] in bugs_to_duplicates: for duplicate_id in bugs_to_duplicates[bug["id"]]: duplicates_to_bugs[duplicate_id] = openish bugs_to_duplicates[openish] |= bugs_to_duplicates[bug["id"]] duplicates = bug.get("duplicates") if len(duplicates) > 0: duplicates_to_check |= set(duplicates) if duplicate_chain_length == 0: # Phase 2: Bugs used for classification should be kept. # Can return invalid bug numbers (e.g. too large because of # typo) but they don't cause issues. # distinct('bug_id') is not supported by Django + MySQL 5.7 bugs_to_process_next |= set( BugJobMap.objects.filter(bug__bugzilla_id__isnull=False).values_list( "bug__bugzilla_id", flat=True ) ) bugs_to_process = bugs_to_process_next - set( Bugscache.objects.filter(processed_update=True).values_list( "bugzilla_id", flat=True ) ) if duplicate_chain_length == 5 and len(bugs_to_process): logger.warn( "Found a chain of duplicate bugs longer than 6 bugs, stopped following chain. Bugscache's 'dupe_of' column contains duplicates instead of non-duplicate bugs. Unprocessed bugs: " + (" ".join(list(map(str, bugs_to_process)))) ) if 0 <= duplicate_chain_length < 6 and len(bugs_to_process) == 0: # phase 3: looking for open bugs based on duplicates duplicate_chain_length = 5 if duplicate_chain_length >= 5: # phase 4: fetching duplicates bugs_to_process_next = duplicates_to_check duplicates_to_check = set() bugs_to_process = bugs_to_process_next - set( Bugscache.objects.filter( processed_update=True, bugzilla_id__isnull=False ).values_list("bugzilla_id", flat=True) ) if len(bugs_to_process) == 0: break elif duplicate_chain_length == 10 and len(bugs_to_process): logger.warn( "Found a chain of duplicate bugs longer than 6 bugs, stopped following chain. Not all duplicates have been loaded. Unprocessed bugs: " + (" ".join(list(map(str, bugs_to_process)))) ) # Duplicate bugs don't see any activity. Use the modification date of # the bug against which they have been set as duplicate to prevent them # from getting dropped - they are still needed to match the failure line # against the bug summary. for bug_duplicate, bug_openish in duplicates_to_bugs.items(): bug_openish_object = Bugscache.objects.filter(bugzilla_id=bug_openish) if len(bug_openish_object) == 0: # Script does not have access to open bug but to duplicate continue Bugscache.objects.filter(bugzilla_id=bug_duplicate).update( dupe_of=bug_openish, modified=bug_openish_object[0].modified ) # Switch classifications from duplicate bugs to open ones. duplicates_db = set( Bugscache.objects.filter(dupe_of__isnull=False, bugzilla_id__isnull=False).values_list( "bugzilla_id", flat=True ) ) bugs_used = set( BugJobMap.objects.filter(bug__bugzilla_id__isnull=False).values_list( "bug__bugzilla_id", flat=True ) ) duplicates_used = duplicates_db & bugs_used for bugzilla_id in duplicates_used: dupe_of = Bugscache.objects.get(bugzilla_id=bugzilla_id).dupe_of # Jobs both already classified with new duplicate and its open bug. jobs_openish = list( BugJobMap.objects.filter(bug__bugzilla_id=dupe_of).values_list("job_id", flat=True) ) # Delete annotations with duplicate bug for jobs which have also # been classified with the open bug of a duplicate bug. BugJobMap.objects.filter(bug__bugzilla_id=bugzilla_id, job_id__in=jobs_openish).delete() BugJobMap.objects.filter(bug__bugzilla_id=bugzilla_id).update( bug_id=Bugscache.objects.get(bugzilla_id=dupe_of) ) # Delete open bugs and related duplicates if modification date (of open # bug) is too old. Bugscache.objects.exclude( bugzilla_id__in=BugJobMap.objects.distinct("bug__bugzilla_id").values_list( "bug__bugzilla_id", flat=True ) ).filter(modified__lt=year_ago, bugzilla_id__isnull=False).delete() if insert_errors_observed: logger.error( "error inserting some bugs, bugscache is incomplete, bugs updated during run will be ingested again during the next run" ) # Move modification date of bugs inserted/updated during this # run back to attempt to ingest bug data which failed during # this insert/update in the next run. Bugscache.objects.filter( modified__gt=last_change_time_max, bugzilla_id__isnull=False ).update(modified=last_change_time_max) reopen_intermittent_bugs(self.minimum_failures_to_reopen)