def get_current_test_failures()

in treeherder/push_health/tests.py [0:0]


def get_current_test_failures(push, option_map, jobs, investigated_tests=None):
    # Using .distinct(<fields>) here would help by removing duplicate FailureLines
    # for the same job (with different sub-tests), but it's only supported by
    # postgres.  Just using .distinct() has no effect.
    new_failure_lines = FailureLine.objects.filter(
        action__in=["test_result", "log", "crash"],
        job_log__job__push=push,
        job_log__job__result="testfailed",
        job_log__job__tier__lte=2,
    ).select_related(
        "job_log__job__job_type",
        "job_log__job__job_group",
        "job_log__job__machine_platform",
        "job_log__job__taskcluster_metadata",
    )
    # using a dict here to avoid duplicates due to multiple failure_lines for
    # each job.
    tests = {}
    all_failed_jobs = {}
    for failure_line in new_failure_lines:
        test_name = clean_test(failure_line.test, failure_line.signature, failure_line.message)
        if not test_name:
            continue
        job = failure_line.job_log.job
        config = clean_config(option_map[job.option_collection_hash])
        platform = clean_platform(job.machine_platform.platform)
        job_name = job.job_type.name
        job_symbol = job.job_type.symbol
        job_group = job.job_group.name
        job_group_symbol = job.job_group.symbol
        job.job_key = f"{config}{platform}{job_name}{job_group}"
        all_failed_jobs[job.id] = job
        # The 't' ensures the key starts with a character, as required for a query selector
        test_key = re.sub(r"\W+", "", f"t{test_name}{config}{platform}{job_name}{job_group}")
        is_classified_intermittent = any(
            job["failure_classification_id"] == 4 for job in jobs[job_name]
        )

        is_investigated = False
        investigated_test_id = None
        for investigated_test in investigated_tests:
            if (
                investigated_test.test == test_name
                and job.job_type.id == investigated_test.job_type.id
            ):
                is_investigated = True
                investigated_test_id = investigated_test.id
                break

        if test_key not in tests:
            line = {
                "testName": test_name,
                "action": failure_line.action.split("_")[0],
                "jobName": job_name,
                "jobSymbol": job_symbol,
                "jobGroup": job_group,
                "jobGroupSymbol": job_group_symbol,
                "platform": platform,
                "config": config,
                "key": test_key,
                "jobKey": job.job_key,
                "suggestedClassification": "New Failure",
                "confidence": 0,
                "tier": job.tier,
                "totalFailures": 0,
                "totalJobs": 0,
                "failedInParent": False,
                "isClassifiedIntermittent": is_classified_intermittent,
                "isInvestigated": is_investigated,
                "investigatedTestId": investigated_test_id,
            }
            tests[test_key] = line
        count_jobs = len(
            list(filter(lambda x: x["result"] in ["success", "testfailed"], jobs[job_name]))
        )
        tests[test_key]["totalFailures"] += 1
        tests[test_key]["totalJobs"] = count_jobs

    # Each line of the sorted list that is returned here represents one test file per platform/
    # config.  Each line will have at least one failing job, but may have several
    # passing/failing jobs associated with it.
    return sorted(tests.values(), key=lambda k: k["testName"])