pathology/transformation_pipeline/ingestion_lib/redis_client.py (152 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """Wrapper for cloud memorystore redis operations.""" from __future__ import annotations import contextlib import os import threading import time from typing import Dict, Mapping, Optional import redis from pathology.shared_libs.logging_lib import cloud_logging_client from pathology.transformation_pipeline import ingest_flags from pathology.transformation_pipeline.ingestion_lib import ingest_const class RedisServerIPUndefinedError(Exception): """Redis server IP is undefined or empty.""" class CouldNotAcquireNonBlockingLockError(Exception): """Exception raised when nonblocking lock cannot be acquired.""" def __init__(self, lock_name: str, log: Optional[Mapping[str, str]] = None): super().__init__( f'Could not acquire lock: {lock_name}.', ingest_const.ErrorMsgs.COULD_NOT_ACQUIRE_LOCK, ) self._lock_name = lock_name self._log = log @property def log(self) -> Mapping[str, str]: return self._log if self._log is not None else {} @property def lock_name(self) -> str: return self._lock_name class _AutoLockUnlocker(contextlib.AbstractContextManager): """Automatically unlocks redis lock when lock exits a context block.""" def __init__(self, name: str, log: Optional[Mapping[str, str]] = None): super().__init__() self._lock_name = name self._start_time = time.time() self._unlock_log = dict(log) if log is not None else {} def __exit__(self, exc_type, exc_value, traceback): super().__exit__(exc_type, exc_value, traceback) r_client = RedisClient.get_singleton() if not r_client.has_redis_client(): return r_client.release_lock(self._lock_name, ignore_redis_exception=True) self._unlock_log[ingest_const.LogKeywords.LOCK_HELD_SEC] = ( time.time() - self._start_time ) cloud_logging_client.info( f'Released transformation lock: {self._lock_name}', self._unlock_log ) def _extend_rlock(redis_lock: redis.lock.Lock, ttl: int) -> None: try: redis_lock.extend(ttl, replace_ttl=True) except redis.exceptions.LockError as exp: cloud_logging_client.error('Error occured extending redis lock TTL.', exp) class RedisClient: """Wrapper for cloud memorystore redis operations.""" _instance_creation_lock = threading.Lock() _instance: Optional[RedisClient] = None def __init__(self): if RedisClient._instance is not None: raise ValueError('Singleton already initialized.') self._lock = threading.Lock() self._redis_lock_dict: Dict[str, redis.lock.Lock] = {} self._redis_ip = ingest_flags.REDIS_SERVER_IP_FLG.value if self._redis_ip is None: self._client_instance = None else: self._redis_ip = self._redis_ip.strip() if not self._redis_ip: raise RedisServerIPUndefinedError() self._redis_port = ingest_flags.REDIS_SERVER_PORT_FLG.value self._client_instance = redis.Redis( host=self._redis_ip, port=self._redis_port, db=ingest_flags.REDIS_DB_FLG.value, username=ingest_flags.REDIS_USERNAME_FLG.value, password=ingest_flags.REDIS_AUTH_PASSWORD_FLG.value, ) RedisClient._instance = self @classmethod def init_fork_module_state(cls) -> None: cls._instance_creation_lock = threading.Lock() cls._instance = None def has_redis_client(self) -> bool: return self._client_instance is not None @classmethod def get_singleton(cls) -> RedisClient: """Returns RedisClient instance.""" if cls._instance is None: with cls._instance_creation_lock: if cls._instance is None: RedisClient() return cls._instance @property def redis_ip(self) -> str: return self._redis_ip @property def redis_port(self) -> int: return self._redis_port @property def client(self) -> redis.Redis: return self._client_instance def extend_lock_timeouts(self, amount: int): """Interface for thread to call to extend TTL on Redis keys. Args: amount: time in seconds to extend timeout. """ with self._lock: if not self.has_redis_client() or not self._redis_lock_dict: return for redis_lock in self._redis_lock_dict.values(): _extend_rlock(redis_lock, amount) def ping(self) -> bool: """Pings the redis server. Returns: True if success """ return self._client_instance.ping() def acquire_non_blocking_lock( self, name: str, value: str, expiry_seconds: int, context_block: contextlib.ExitStack, unlock_log: Optional[Mapping[str, str]] = None, ): """Sets the value at key with an expiry time only if a key with name does not already exist or the current key value pair already exists. Args: name: Name of key value: Value to be stored for key expiry_seconds: Expiry time in seconds context_block: Context block to auto-unlock lock. unlock_log: Optional log to report when lock is released, only applies to newly acquired locks. Raises: CouldNotAcquireNonBlockingLockError: Lock cannot be acquired. """ with self._lock: rlock = self._redis_lock_dict.get(name) if rlock is not None: new_lock = False else: # Disable thread local storage, lock will be extended in background # thread. Within process thread safty handled by this class. new_lock = True rlock = redis.lock.Lock( self._client_instance, name, timeout=expiry_seconds, thread_local=False, ) acquired = rlock.acquire(blocking=False, token=value) if not acquired: raise CouldNotAcquireNonBlockingLockError(name, unlock_log) self._redis_lock_dict[name] = rlock if new_lock: context_block.enter_context(_AutoLockUnlocker(name, unlock_log)) else: _extend_rlock(rlock, expiry_seconds) def is_lock_owned(self, name: str) -> bool: with self._lock: rlock = self._redis_lock_dict.get(name) if rlock is None: return False return rlock.owned() def release_lock(self, name: str, ignore_redis_exception: bool) -> None: """Releases lock associated with key.""" with self._lock: rlock = self._redis_lock_dict.get(name) if rlock is not None: try: rlock.release() except ( redis.exceptions.LockError, redis.exceptions.ConnectionError, ) as _: if not ignore_redis_exception: raise del self._redis_lock_dict[name] def redis_client() -> RedisClient: return RedisClient.get_singleton() os.register_at_fork(after_in_child=RedisClient.init_fork_module_state)