def run()

in treeherder/etl/bugzilla.py [0:0]


    def run(self):
        year_ago = datetime.utcnow() - timedelta(days=365)
        last_change_time_max = (
            Bugscache.objects.filter(bugzilla_id__isnull=False).aggregate(Max("modified"))[
                "modified__max"
            ]
            or None
        )
        if last_change_time_max:
            last_change_time_max -= timedelta(minutes=10)
        else:
            last_change_time_max = year_ago

        max_summary_length = Bugscache._meta.get_field("summary").max_length
        max_whiteboard_length = Bugscache._meta.get_field("whiteboard").max_length

        last_change_time_string = last_change_time_max.strftime("%Y-%m-%dT%H:%M:%SZ")

        bugs_to_duplicates = {}
        duplicates_to_bugs = {}
        insert_errors_observed = False
        duplicates_to_check = set()

        # The bugs are ingested in different phases:
        # 1. Intermittent bugs with activity in the bug in the last year
        #    (Bugzilla seed). Iteration 0.
        # 2. Bugs used for classification (classification seed). They will be
        #    part of the previous phase once a report about the classification
        #    has been posted in the bug (schedule weekly or daily).
        #    Processed as part of iteration 1.
        # 3. For bugs which have been resolved as duplicates, the bugs as whose
        #    duplicates they have been set will be fetched. The open bugs will
        #    be used to store the classifications. Iterations 1-5.
        # 4. Duplicates of the bugs from previous phases get fetched. Duplicate
        #    bugs included in those eventually end up here due to inactivity but
        #    are still needed for matching failure lines against bug summaries.
        #    Iterations 6-10.

        duplicate_chain_length = -1
        # make flake8 happy
        bugs_to_process = []
        while duplicate_chain_length < 10:
            duplicate_chain_length += 1
            if duplicate_chain_length > 0:
                bugs_to_process = list(
                    bugs_to_process
                    - set(
                        Bugscache.objects.filter(
                            processed_update=True, bugzilla_id__isnull=False
                        ).values_list("bugzilla_id", flat=True)
                    )
                )
                if len(bugs_to_process) == 0:
                    break

            bug_list = []
            bugs_count_limit = 500
            bugs_offset = 0

            # Keep querying Bugzilla until there are no more results.
            while True:
                if duplicate_chain_length == 0:
                    additional_params = {
                        "keywords": "intermittent-failure",
                        "last_change_time": last_change_time_string,
                        "offset": bugs_offset,
                    }
                else:
                    additional_params = {
                        "id": ",".join(
                            list(
                                map(
                                    str,
                                    bugs_to_process[bugs_offset : bugs_offset + bugs_count_limit],
                                )
                            )
                        ),
                    }
                bug_results_chunk = fetch_intermittent_bugs(
                    additional_params, bugs_count_limit, duplicate_chain_length
                )
                bug_list += bug_results_chunk
                bugs_offset += bugs_count_limit
                if duplicate_chain_length == 0 and len(bug_results_chunk) < bugs_count_limit:
                    break
                if duplicate_chain_length > 0 and bugs_offset >= len(bugs_to_process):
                    break

            bugs_to_process_next = set()

            if bug_list:
                if duplicate_chain_length == 0:
                    Bugscache.objects.exclude(summary="(no bug data fetched)").exclude(
                        bugzilla_id__in=BugJobMap.objects.distinct("bug__bugzilla_id").values_list(
                            "bug__bugzilla_id", flat=True
                        )
                    ).filter(modified__lt=year_ago, bugzilla_id__isnull=False).delete()
                    Bugscache.objects.filter(bugzilla_id__isnull=False).update(
                        processed_update=False
                    )

                for bug in bug_list:
                    # we currently don't support timezones in treeherder, so
                    # just ignore it when importing/updating the bug to avoid
                    # a ValueError
                    try:
                        dupe_of = bug.get("dupe_of", None)
                        if (
                            dupe_of is not None
                            and not Bugscache.objects.filter(bugzilla_id=dupe_of).exists()
                        ):
                            Bugscache.objects.update_or_create(
                                bugzilla_id=dupe_of,
                                defaults={
                                    "modified": datetime(1971, 1, 1),
                                    "summary": "(no bug data fetched)",
                                    "processed_update": False,
                                },
                            )
                        Bugscache.objects.update_or_create(
                            bugzilla_id=bug["id"],
                            defaults={
                                "status": bug.get("status", ""),
                                "resolution": bug.get("resolution", ""),
                                "summary": bug.get("summary", "")[:max_summary_length],
                                "dupe_of": dupe_of,
                                "crash_signature": bug.get("cf_crash_signature", ""),
                                "keywords": ",".join(bug["keywords"]),
                                "modified": dateutil.parser.parse(
                                    bug["last_change_time"], ignoretz=True
                                ),
                                "whiteboard": bug.get("whiteboard", "")[:max_whiteboard_length],
                                "processed_update": True,
                            },
                        )
                    except Exception as e:
                        logger.error("error inserting bug '%s' into db: %s", bug, e)
                        insert_errors_observed = True
                        continue
                    if dupe_of is not None:
                        openish = (
                            duplicates_to_bugs[dupe_of]
                            if dupe_of in duplicates_to_bugs
                            else dupe_of
                        )
                        duplicates_to_bugs[bug["id"]] = openish
                        if openish not in bugs_to_duplicates:
                            bugs_to_process_next.add(openish)
                            bugs_to_duplicates[openish] = set()
                        bugs_to_duplicates[openish].add(bug["id"])
                        if bug["id"] in bugs_to_duplicates:
                            for duplicate_id in bugs_to_duplicates[bug["id"]]:
                                duplicates_to_bugs[duplicate_id] = openish
                            bugs_to_duplicates[openish] |= bugs_to_duplicates[bug["id"]]
                    duplicates = bug.get("duplicates")
                    if len(duplicates) > 0:
                        duplicates_to_check |= set(duplicates)

            if duplicate_chain_length == 0:
                # Phase 2: Bugs used for classification should be kept.
                # Can return invalid bug numbers (e.g. too large because of
                # typo) but they don't cause issues.
                # distinct('bug_id') is not supported by Django + MySQL 5.7
                bugs_to_process_next |= set(
                    BugJobMap.objects.filter(bug__bugzilla_id__isnull=False).values_list(
                        "bug__bugzilla_id", flat=True
                    )
                )
            bugs_to_process = bugs_to_process_next - set(
                Bugscache.objects.filter(processed_update=True).values_list(
                    "bugzilla_id", flat=True
                )
            )
            if duplicate_chain_length == 5 and len(bugs_to_process):
                logger.warn(
                    "Found a chain of duplicate bugs longer than 6 bugs, stopped following chain. Bugscache's 'dupe_of' column contains duplicates instead of non-duplicate bugs. Unprocessed bugs: "
                    + (" ".join(list(map(str, bugs_to_process))))
                )
            if 0 <= duplicate_chain_length < 6 and len(bugs_to_process) == 0:
                # phase 3: looking for open bugs based on duplicates
                duplicate_chain_length = 5
            if duplicate_chain_length >= 5:
                # phase 4: fetching duplicates
                bugs_to_process_next = duplicates_to_check
                duplicates_to_check = set()
                bugs_to_process = bugs_to_process_next - set(
                    Bugscache.objects.filter(
                        processed_update=True, bugzilla_id__isnull=False
                    ).values_list("bugzilla_id", flat=True)
                )
                if len(bugs_to_process) == 0:
                    break
                elif duplicate_chain_length == 10 and len(bugs_to_process):
                    logger.warn(
                        "Found a chain of duplicate bugs longer than 6 bugs, stopped following chain. Not all duplicates have been loaded. Unprocessed bugs: "
                        + (" ".join(list(map(str, bugs_to_process))))
                    )

        # Duplicate bugs don't see any activity. Use the modification date of
        # the bug against which they have been set as duplicate to prevent them
        # from getting dropped - they are still needed to match the failure line
        # against the bug summary.
        for bug_duplicate, bug_openish in duplicates_to_bugs.items():
            bug_openish_object = Bugscache.objects.filter(bugzilla_id=bug_openish)
            if len(bug_openish_object) == 0:
                # Script does not have access to open bug but to duplicate
                continue
            Bugscache.objects.filter(bugzilla_id=bug_duplicate).update(
                dupe_of=bug_openish, modified=bug_openish_object[0].modified
            )

        # Switch classifications from duplicate bugs to open ones.
        duplicates_db = set(
            Bugscache.objects.filter(dupe_of__isnull=False, bugzilla_id__isnull=False).values_list(
                "bugzilla_id", flat=True
            )
        )
        bugs_used = set(
            BugJobMap.objects.filter(bug__bugzilla_id__isnull=False).values_list(
                "bug__bugzilla_id", flat=True
            )
        )
        duplicates_used = duplicates_db & bugs_used
        for bugzilla_id in duplicates_used:
            dupe_of = Bugscache.objects.get(bugzilla_id=bugzilla_id).dupe_of
            # Jobs both already classified with new duplicate and its open bug.
            jobs_openish = list(
                BugJobMap.objects.filter(bug__bugzilla_id=dupe_of).values_list("job_id", flat=True)
            )
            # Delete annotations with duplicate bug for jobs which have also
            # been classified with the open bug of a duplicate bug.
            BugJobMap.objects.filter(bug__bugzilla_id=bugzilla_id, job_id__in=jobs_openish).delete()
            BugJobMap.objects.filter(bug__bugzilla_id=bugzilla_id).update(
                bug_id=Bugscache.objects.get(bugzilla_id=dupe_of)
            )

        # Delete open bugs and related duplicates if modification date (of open
        # bug) is too old.
        Bugscache.objects.exclude(
            bugzilla_id__in=BugJobMap.objects.distinct("bug__bugzilla_id").values_list(
                "bug__bugzilla_id", flat=True
            )
        ).filter(modified__lt=year_ago, bugzilla_id__isnull=False).delete()

        if insert_errors_observed:
            logger.error(
                "error inserting some bugs, bugscache is incomplete, bugs updated during run will be ingested again during the next run"
            )
            # Move modification date of bugs inserted/updated during this
            # run back to attempt to ingest bug data which failed during
            # this insert/update in the next run.
            Bugscache.objects.filter(
                modified__gt=last_change_time_max, bugzilla_id__isnull=False
            ).update(modified=last_change_time_max)

        reopen_intermittent_bugs(self.minimum_failures_to_reopen)