in source/neuropod/backends/python_bridge/_neuropod_native_bootstrap/filelock.py [0:0]
def acquire(self, timeout=None, poll_intervall=0.05):
"""
Acquires the file lock or fails with a :exc:`Timeout` error.
.. code-block:: python
# You can use this method in the context manager (recommended)
with lock.acquire():
pass
# Or use an equivalent try-finally construct:
lock.acquire()
try:
pass
finally:
lock.release()
:arg float timeout:
The maximum time waited for the file lock.
If ``timeout < 0``, there is no timeout and this method will
block until the lock could be acquired.
If ``timeout`` is None, the default :attr:`~timeout` is used.
:arg float poll_intervall:
We check once in *poll_intervall* seconds if we can acquire the
file lock.
:raises Timeout:
if the lock could not be acquired in *timeout* seconds.
.. versionchanged:: 2.0.0
This method returns now a *proxy* object instead of *self*,
so that it can be used in a with statement without side effects.
"""
# Use the default timeout, if no timeout is provided.
if timeout is None:
timeout = self.timeout
# Increment the number right at the beginning.
# We can still undo it, if something fails.
with self._thread_lock:
self._lock_counter += 1
lock_id = id(self)
lock_filename = self._lock_file
start_time = time.time()
try:
while True:
with self._thread_lock:
if not self.is_locked:
logger().debug(
"Attempting to acquire lock %s on %s",
lock_id,
lock_filename,
)
self._acquire()
if self.is_locked:
logger().info("Lock %s acquired on %s", lock_id, lock_filename)
break
elif timeout >= 0 and time.time() - start_time > timeout:
logger().debug(
"Timeout on acquiring lock %s on %s", lock_id, lock_filename
)
raise Timeout(self._lock_file)
else:
logger().debug(
"Lock %s not acquired on %s, waiting %s seconds ...",
lock_id,
lock_filename,
poll_intervall,
)
time.sleep(poll_intervall)
except: # noqa
# Something did go wrong, so decrement the counter.
with self._thread_lock:
self._lock_counter = max(0, self._lock_counter - 1)
raise
return _Acquire_ReturnProxy(lock=self)