def _ensure_mirror()

in src/buildstream_plugins/sources/bzr.py [0:0]


    def _ensure_mirror(self, skip_ref_check=False):
        mirror_dir = self._get_mirror_dir()
        bzr_metadata_dir = os.path.join(mirror_dir, ".bzr")
        if not os.path.exists(bzr_metadata_dir):
            self.call(
                [self.host_bzr, "init-repo", "--no-trees", mirror_dir],
                fail="Failed to initialize bzr repository",
            )

        branch_dir = os.path.join(mirror_dir, self.tracking)
        branch_url = self.url + "/" + self.tracking
        if not os.path.exists(branch_dir):
            # `bzr branch` the branch if it doesn't exist
            # to get the upstream code
            self.call(
                [self.host_bzr, "branch", branch_url, branch_dir],
                fail="Failed to branch from {} to {}".format(branch_url, branch_dir),
            )

        else:
            # `bzr pull` the branch if it does exist
            # to get any changes to the upstream code
            self.call(
                [
                    self.host_bzr,
                    "pull",
                    "--directory={}".format(branch_dir),
                    branch_url,
                ],
                fail="Failed to pull new changes for {}".format(branch_dir),
            )

        if not skip_ref_check and not self._check_ref():
            raise SourceError(
                "Failed to ensure ref '{}' was mirrored".format(self.ref),
                reason="ref-not-mirrored",
            )