def _locked()

in src/buildstream_plugins/sources/bzr.py [0:0]


    def _locked(self):
        lockdir = os.path.join(self.get_mirror_directory(), "locks")
        lockfile = os.path.join(lockdir, utils.url_directory_name(self.original_url) + ".lock")
        os.makedirs(lockdir, exist_ok=True)
        with open(lockfile, "wb") as lock:
            fcntl.flock(lock, fcntl.LOCK_EX)
            try:
                yield
            finally:
                fcntl.flock(lock, fcntl.LOCK_UN)