def run_push_test_selection_data()

in mozci/data/sources/taskcluster/__init__.py [0:0]


    def run_push_test_selection_data(self, branch, rev):
        task_name = f"gecko.v2.{branch}.revision.{rev}.taskgraph.decision"
        try:
            decision_task_id = taskcluster.find_task_id(task_name)
        except requests.exceptions.HTTPError as e:
            # If the decision task was not indexed, it means it was broken. So we can
            # assume we didn't run any task for this push.
            if e.response.status_code == 404:
                logger.warning(f"Decision task broken in {rev} on {branch}")

            raise ContractNotFilled(
                self.name,
                "push_test_selection_data",
                f"could not retrieve decision task '{task_name}'",
            )

        try:
            results = taskcluster.get_artifact(
                decision_task_id, "public/bugbug-push-schedules.json"
            )
        except requests.exceptions.HTTPError:
            raise ContractNotFilled(
                self.name,
                "push_test_selection_data",
                "could not retrieve schedules from cache",
            )

        return results