def evaluation()

in bugbug/models/testselect.py [0:0]


    def evaluation(self) -> None:
        # Get a test set of pushes on which to test the model.
        pushes, train_push_len = self.get_pushes(False)

        # To evaluate the model with reductions enabled, we need to regenerate the failing together DB, using
        # only failure data from the training pushes (otherwise, we'd leak training information into the test
        # set).
        logger.info("Generate failing together DB (restricted to training pushes)")
        push_data_iter, push_data_count, _ = test_scheduling.get_push_data(
            "label" if self.granularity == "label" else "config_group"
        )
        test_scheduling.generate_failing_together_probabilities(
            "label" if self.granularity == "label" else "config_group",
            push_data_iter(),
            push_data_count,
            pushes[train_push_len - 1]["revs"][0],
        )

        test_pushes_list = pushes[train_push_len:]

        all_tasks = reduce(
            lambda x, y: x | y,
            (
                set(push["failures"]) | set(push["passes"])
                for push in test_pushes_list[-28:]
            ),
        )

        all_revs = set(sum((push["revs"] for push in test_pushes_list), []))

        test_pushes_failures = sum(
            1 for push in test_pushes_list if len(push["failures"]) > 0
        )

        test_pushes = {push["revs"][0]: push for push in test_pushes_list}

        if self.granularity == "group":
            for (
                revisions,
                fix_revision,
                push_runnables,
                possible_regressions,
                likely_regressions,
            ) in tqdm(push_data_iter(), total=push_data_count):
                if revisions[0] not in test_pushes:
                    continue

                test_pushes[revisions[0]]["config_group_failures"] = (
                    possible_regressions + likely_regressions
                )

            missing_config_group_failures = sum(
                1
                for push in test_pushes.values()
                if "config_group_failures" not in push
            )
            logger.info(
                "%d pushes without config_group failures", missing_config_group_failures
            )

        logger.info(
            "Testing on %d (%d with failures) out of %d. %d schedulable tasks.",
            len(test_pushes),
            test_pushes_failures,
            len(pushes),
            len(all_tasks),
        )

        del pushes

        commit_map = get_commit_map(all_revs)

        past_failures_data = test_scheduling.PastFailures(self.granularity, True)
        last_push_num = past_failures_data.push_num
        past_failures_data.close()

        # Select tests for all the pushes in the test set.
        for i, push in enumerate(tqdm(test_pushes.values())):
            commits = tuple(
                commit_map.pop(revision)
                for revision in push["revs"]
                if revision in commit_map
            )
            if len(commits) == 0:
                push["all_possibly_selected"] = {}
                continue

            push_num = last_push_num - (len(test_pushes) - (i + 1))

            # Note: we subtract 100 to the push number to make sure we don't use
            # past failure data for the push itself.
            # The number 100 comes from the fact that in the past failure data
            # generation we store past failures in batches of 100 pushes.
            push["all_possibly_selected"] = self.select_tests(
                commits, 0.5, push_num - 100
            )

        def do_eval(
            executor: concurrent.futures.ProcessPoolExecutor,
            confidence_threshold: float,
            reduction: float | None,
            cap: int | None,
            minimum: int | None,
        ) -> None:
            futures: dict[concurrent.futures.Future, dict[str, Any]] = {}
            for push in test_pushes.values():
                futures[
                    executor.submit(
                        eval_apply_transforms,
                        self.granularity,
                        push,
                        confidence_threshold,
                        reduction,
                        cap,
                        minimum,
                    )
                ] = push

            for future in concurrent.futures.as_completed(futures):
                exc = future.exception()
                if exc is not None:
                    logger.error(
                        "Exception %s while running %s", exc, futures[future]["revs"][0]
                    )
                    for f in futures:
                        f.cancel()

                push = futures[future]
                selected, group_configs = future.result()

                if reduction is not None and self.granularity == "group":
                    push["number_configs"] = len(
                        set(
                            sum(
                                group_configs.values(),
                                [],
                            )
                        )
                    )
                    selected_config_groups = set(
                        (config, group)
                        for group, configs in group_configs.items()
                        for config in configs
                    )
                    if "config_group_failures" in push:
                        caught_config_groups = selected_config_groups & set(
                            push["config_group_failures"]
                        )
                        push["caught_one_config_group"] = (
                            len(caught_config_groups) > 0
                            if len(push["config_group_failures"]) != 0
                            else None
                        )
                        push["caught_percentage_config_group"] = (
                            len(caught_config_groups)
                            / len(push["config_group_failures"])
                            if len(push["config_group_failures"]) != 0
                            else None
                        )

                caught = selected & set(push["failures"])

                push["number_scheduled"] = len(selected)
                push["caught_one"] = (
                    len(caught) > 0 if len(push["failures"]) != 0 else None
                )
                push["some_didnt_run"] = (
                    not selected.issubset(set(push["passes"]) | set(push["failures"])),
                )
                push["caught_percentage"] = (
                    len(caught) / len(push["failures"])
                    if len(push["failures"]) != 0
                    else None
                )

            min_scheduled = min(
                result["number_scheduled"] for result in test_pushes.values()
            )
            max_scheduled = max(
                result["number_scheduled"] for result in test_pushes.values()
            )
            average_scheduled = statistics.mean(
                result["number_scheduled"] for result in test_pushes.values()
            )
            num_failing_pushes = sum(
                1 for result in test_pushes.values() if result["caught_one"] is not None
            )
            num_caught_one = sum(
                1 for result in test_pushes.values() if result["caught_one"]
            )
            num_caught_one_or_some_didnt_run = sum(
                1
                for result in test_pushes.values()
                if result["caught_one"]
                or (result["caught_one"] is not None and result["some_didnt_run"])
            )
            percentage_caught_one = 100 * num_caught_one / num_failing_pushes
            percentage_caught_one_or_some_didnt_run = (
                100 * num_caught_one_or_some_didnt_run / num_failing_pushes
            )
            average_caught_percentage = 100 * statistics.mean(
                result["caught_percentage"]
                for result in test_pushes.values()
                if result["caught_percentage"] is not None
            )

            reduction_str = (
                f"enabled at {reduction * 100}%"
                if reduction is not None
                else "disabled"
            )

            message = f"For confidence threshold {confidence_threshold}, with reduction {reduction_str}, cap at {cap}, and minimum at {minimum}: scheduled {average_scheduled} tasks on average (min {min_scheduled}, max {max_scheduled}). In {percentage_caught_one}% of pushes we caught at least one failure ({percentage_caught_one_or_some_didnt_run}% ignoring misses when some of our selected tasks didn't run). On average, we caught {average_caught_percentage}% of all seen failures."

            if reduction is not None and self.granularity == "group":
                average_configs = statistics.mean(
                    result["number_configs"] for result in test_pushes.values()
                )
                median_configs = statistics.median(
                    result["number_configs"] for result in test_pushes.values()
                )
                message += f" On average, we selected {average_configs} configs (a median of {median_configs} configs)."

                num_failing_pushes_with_config_group = sum(
                    1
                    for result in test_pushes.values()
                    if "caught_one_config_group" in result
                    and result["caught_one_config_group"] is not None
                )
                num_caught_one_config_group = sum(
                    1
                    for result in test_pushes.values()
                    if "caught_one_config_group" in result
                    and result["caught_one_config_group"]
                )
                percentage_caught_one_config_group = (
                    100
                    * num_caught_one_config_group
                    / num_failing_pushes_with_config_group
                )
                average_caught_percentage_config_group = 100 * statistics.mean(
                    result["caught_percentage_config_group"]
                    for result in test_pushes.values()
                    if "caught_percentage_config_group" in result
                    and result["caught_percentage_config_group"] is not None
                )

                message += f" In {percentage_caught_one_config_group}% of pushes we caught at least one config/group failure. On average, we caught {average_caught_percentage_config_group}% of all seen config/group failures."

            logger.info(message)

        with concurrent.futures.ProcessPoolExecutor(
            max_workers=utils.get_physical_cpu_count(),
            # Fixing https://github.com/mozilla/bugbug/issues/3131
            mp_context=mp.get_context("fork"),
        ) as executor:
            scenarios = [
                (None, None, None),
                (10, None, None),
                (None, 300, None),
                (None, None, 0.9),
                (None, None, 1.0),
            ]
            for minimum, cap, reduction in scenarios:
                # Pre-generate equivalence sets, so when we run the config selection in multiple processes
                # we don't risk concurrent writes to the equivalence sets file.
                if reduction is not None and self.granularity == "group":
                    _get_equivalence_sets(reduction)

                for confidence_threshold in [0.5, 0.7, 0.8, 0.85, 0.9, 0.95]:
                    do_eval(executor, confidence_threshold, reduction, cap, minimum)