def _push()

in sync/trypush.py [0:0]


    def _push(self) -> tuple[int, str]:
        self.worktree.git.reset("--hard")

        working_dir = self.worktree.working_dir
        assert working_dir is not None

        mach = Mach(working_dir)
        # Gross hack to create a objdir until we figure out why this is failing
        # from here but not from the shell
        try:
            if not os.path.exists(os.path.join(working_dir,
                                               "obj-x86_64-pc-linux-gnu")):
                mach.python("-c", "")
        except OSError:
            pass

        query_args = []
        for query in self.queries:
            query_args.extend(["-q", query])
        logger.info("Pushing to try with fuzzy query: %s" % " ".join(query_args))

        can_push_routes = b"--route " in mach.try_("fuzzy", "--help")

        args = ["fuzzy"] + query_args
        if self.rebuild:
            args.append("--rebuild")
            args.append(str(self.rebuild))
        if self.full:
            args.append("--full")
        if self.disable_target_task_filter:
            args.append("--disable-target-task-filter")
        if can_push_routes:
            args.append("--route=notify.pulse.wptsync.try-task.on-any")
        if self.artifact:
            args.append("--artifact")
        else:
            args.append("--no-artifact")

        # --push-to-vcs is required to push directly to hgmo
        args.append("--push-to-vcs")

        if self.tests_by_type is not None:
            paths = []
            all_paths = set()
            for values in self.tests_by_type.values():
                for item in values:
                    if (item not in all_paths and
                        os.path.exists(os.path.join(working_dir,
                                                    item))):
                        paths.append(item)
                    all_paths.add(item)
            max_tests = env.config["gecko"]["try"].get("max-tests")
            if max_tests and len(paths) > max_tests:
                logger.warning("Capping number of affected tests at %d" % max_tests)
                paths = paths[:max_tests]
            args.extend(paths)

        try:
            output = mach.try_(*args, stderr=subprocess.STDOUT)
            return 0, output.decode("utf8", "replace")
        except subprocess.CalledProcessError as e:
            return e.returncode, e.output