sync/lock.py (172 lines of code) (raw):
from __future__ import annotations
import abc
import inspect
import os
import filelock
from . import log
from .env import Environment
from typing import Any, MutableMapping, TYPE_CHECKING
from git.repo.base import Repo
if TYPE_CHECKING:
from sync.base import ProcessName
env = Environment()
logger = log.get_logger(__name__)
"""
Locking system for the wpt sync.
There are two principal kinds of lock defined here; the RepoLock that's
used to protect against concurrent writes to the underlying storage, and
the SyncLock, which is used to ensure that multiples processes don't try
to update the same sync at the same time.
Since the sync is run from multiple processes without shared memory,
and each process is single threaded, the locks must be based on files.
A RepoLock is pretty straightforward; only one process is allowed to
acquire the RepoLock at a time. Like all locks it's intended to be used
as a context manager, and there's a repo_lock decoration for functions
and methods that must acquire the lock for their entire body.
For the SyncLock, things are more complex. Processes are put into the
following groups:
* All upstream syncs
* All landing syncs
* Downstream syncs for each pr
Within each group operations that update data must be sequenced i.e.
there is a lock that must be obtained for each group when writing.
That lock is represented by the SyncLock object and it has an
associated "subtype" which is the kind of sync (upstream, downstream,
or landing) and object id which is the PR id for downstream syncs
or None otherwise.
In order to allow for read-only access to sync objects and to ensure
that we don't have data races, functions and methods that cause mutation
to the underlying data are annotated with the @mut() decorator. When
applied to a class method this does the following
* Ensures that the instance has a property called _lock
* Ensures that the _lock property is a acquired SyncLock with the
right attributes for mutation of this class.
In order to set the properties, classes with mutable state must provide
an as_mut() method that takes the current lock as an argument. This returns
a context manager object that ensures mutation methods are accessible for
the defined context. This is clearer with an example:
# Ensure we have the lock for upstream syns
with SyncLock("upstream", None) as lock:
sync = UpstreamSync.for_bug(git_gecko, git_wpt, bug)
# Reading the sync is possible here
print sync.status
# But writing will fail unless we are in a mut block
with sync.as_mut(lock):
sync.update_wpt_commits()
The @mut decorator can also be used on functions to check that arguments
passed into those functions are available for mutation; in this case the
decorator takes the names of the arguments to check e.g.
@mut("sync")
def some_update_fn(git_gecko, git_wpt, sync):
# Sync must be mutable here or we would fail
sync.update_wpt_commits()
Constructing new objects is also a form of mutation, but because an object
doesn't exist yet a slightly different approach is required. The
@constructor decorator marks a classmethod that creates new instances and
can only be called with an appropriate lock. But in this case the lock is
passed as the first argument to the function, and the lock checking is done
by providing a function argument to the decorator that takes the arguments
to the constructor and returns a (subtype, obj_id) pair used to check for
lock validity e.g.
@classmethod
@constructor(lambda kwargs: (kwargs["process_name"].subtype,
kwargs["process_name"].obj_id))
def create(cls, lock, git_gecko, git_wpt, process_name):
pass
All objects that can cause mutation of the underlying sync data must
implement this locking system. In order to do so, the object must
provide the following methods and properties:
property _lock - None or a SyncLock representing the currenly held lock
method lock_key - Returns the (subtype, obj_id) pair for the current
instance, used to ensure the lock is valid
method as_mut - Used to make the object mutable. Returns a MutGuard
wrapping the current object.
"""
class LockError(Exception):
pass
class Lock(metaclass=abc.ABCMeta):
locks: MutableMapping[str, Lock] = {}
def __init__(self, *args: Any) -> None:
self.path = self.lock_path(*args)
self.lock = filelock.FileLock(self.path)
def __enter__(self) -> Lock:
if self.path in self.locks:
# If this is already locked by the current process
# then locking again is a no-op
return self.locks[self.path]
self.locks[self.path] = self
self.lock.acquire()
return self
def __exit__(self, *args: Any, **kwargs: Any) -> None:
if self.locks[self.path] != self:
return
del self.locks[self.path]
self.lock.release()
@staticmethod
@abc.abstractmethod
def lock_path(*args: Any) -> str:
"""Return a path to the file representing the current lock"""
pass
class RepoLock(Lock):
def __init__(self, repo: Repo) -> None:
super().__init__(repo)
@staticmethod
def lock_path(*args: Any) -> str:
# This is annoying but otherwise mypy complains
repo, = args
return os.path.join(
env.config["root"],
env.config["paths"]["locks"],
"{}.lock".format(repo.working_dir.replace(os.path.sep, "_")))
class ProcessLock(Lock):
obj_types: tuple[str, ...] | None = None
lock_type: str | None = None
lock_per_type: set[str] = set()
lock_per_obj: set[str] = set()
locks: MutableMapping[str, Lock] = {}
def __init__(self, sync_type: str, obj_id: str | None) -> None:
assert sync_type in self.lock_per_obj | self.lock_per_type
if sync_type in self.lock_per_type:
if obj_id is not None:
raise ValueError("%s must be locked over all objects" % sync_type)
elif obj_id is None:
raise ValueError("%s must be locked over each object" % sync_type)
self.sync_type = sync_type
self.obj_id = obj_id
super().__init__(self.lock_type, sync_type, obj_id)
@classmethod
def for_process(cls, process_name: ProcessName) -> ProcessLock:
"""Get the SyncLock for the provided ProcessName."""
# This is sort of an antipattern because it requires the class to know about consumers.
# But it also enforces some invariants to ensure that things have the right kind of
# lock
assert cls.obj_types is not None and process_name.obj_type in cls.obj_types
sync_type = process_name.subtype
obj_id = process_name.obj_id if sync_type in cls.lock_per_obj else None
return cls(sync_type, obj_id)
def check(self, sync_type: str, obj_id: str | None) -> None:
"""Check that the current lock is valid for the provided sync_type and obj_id"""
if sync_type in self.lock_per_type:
obj_id = None
if not (sync_type == self.sync_type and
obj_id == self.obj_id and
self.lock.is_locked):
raise ValueError("""Got wrong lock, expected sync_type:%s obj_id:%s locked:True,
got: sync_type:%s obj_id:%s locked:%s""" %
(sync_type,
obj_id,
self.sync_type,
self.obj_id,
self.lock.is_locked))
@staticmethod
def lock_path(*args: Any) -> str:
obj_type, sync_type, obj_id = args
if obj_id is None:
filename = f"{obj_type}_{sync_type}.lock"
else:
filename = f"{obj_type}_{sync_type}_{obj_id}.lock"
return os.path.join(
env.config["root"],
env.config["paths"]["locks"],
filename)
class SyncLock(ProcessLock):
obj_types = ("sync", "try")
lock_type = "sync"
lock_per_type: set[str] = {"landing", "upstream"}
lock_per_obj: set[str] = {"downstream"}
locks: MutableMapping[str, Lock] = {}
class ProcLock(ProcessLock):
obj_types = ("proc",)
lock_type = "proc"
lock_per_type: set[str] = {"bugzilla"}
lock_per_obj: set[str] = set()
locks: MutableMapping[str, Lock] = {}
class MutGuard:
def __init__(self,
lock: SyncLock,
instance: Any,
props: list[Any] | None = None,
) -> None:
"""Context Manager wrapping an object that is to be accessed for mutation.
Mutability is re-entrant in the sense that if we already have a certain object
in a mutable state, an attempt to make the same object mutable using the same
lock will succeed.
"""
self.instance = instance
self.lock = lock
self.props = props or []
self.owned_guards: list[Any] = []
lock.check(*instance.lock_key)
self.took_lock: bool | None = None
def __enter__(self) -> Any:
logger.debug("Making object mutable %r" % self.instance)
if self.instance._lock is not None:
if self.instance._lock is not self.lock:
raise ValueError("Tried to re-lock %s with a different lock" % self.instance)
self.took_lock = False
return
self.took_lock = True
self.instance._lock = self.lock
for prop in self.props:
if prop._lock is None:
self.owned_guards.append(prop.as_mut(self.lock))
self.owned_guards[-1].__enter__()
return self.instance
def __exit__(self, *args: Any, **kwargs: Any) -> None:
try:
if not self.took_lock:
return
while self.owned_guards:
guard = self.owned_guards.pop()
guard.__exit__(*args, **kwargs)
if hasattr(self.instance, "exit_mut"):
self.instance.exit_mut()
self.instance._lock = None
finally:
self.took_lock = None
class mut:
def __init__(self, *args):
"""Mark a function as requiring given arguments are mutable.
When entering the function the decorator checks that the specified
arguments are locked for mutation with an appropriate lock.
When no arguments are specified, self is used as a default,
appropriate for marking instance methods that require the
object to be locked for mutation."""
if not args:
args = ("self",)
self.args = args
def __call__(self, f):
def inner(*args: Any, **kwargs: Any) -> Any:
arg_values = inspect.getcallargs(f, *args, **kwargs)
for arg in self.args:
arg_value = arg_values[arg]
if arg_value._lock is None:
raise ValueError("Tried to use %r as mutable without locking" % arg_value)
arg_value._lock.check(*arg_value.lock_key)
return f(*args, **kwargs)
inner.__name__ = f.__name__
inner.__doc__ = f.__doc__
return inner
class constructor:
def __init__(self, arg_func):
"""Mark a classmethod as a constructor for an object which uses the
mutation system.
The decorator takes a single function as an argument. This function
is run against the arguments provided to the decorator and returns a
tuple of (subtype, obj_id) representing the kind of lock that needs
to be held to construct the object."""
self.arg_func = arg_func
def __call__(self, f):
def inner(cls: Any, lock: SyncLock, *args: Any, **kwargs: Any) -> Any:
if lock is None:
raise ValueError("Tried to access constructor %s without locking")
arg_values = inspect.getcallargs(f, cls, lock, *args, **kwargs)
sync_type, obj_id = self.arg_func(arg_values)
lock.check(sync_type, obj_id)
return f(cls, lock, *args, **kwargs)
inner.__name__ = f.__name__
inner.__doc__ = f.__doc__
return inner