in src/buildstream_plugins/sources/bzr.py [0:0]
def _locked(self):
lockdir = os.path.join(self.get_mirror_directory(), "locks")
lockfile = os.path.join(lockdir, utils.url_directory_name(self.original_url) + ".lock")
os.makedirs(lockdir, exist_ok=True)
with open(lockfile, "wb") as lock:
fcntl.flock(lock, fcntl.LOCK_EX)
try:
yield
finally:
fcntl.flock(lock, fcntl.LOCK_UN)