pathology/dicom_proxy/redis_cache.py (404 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """Interface to communicate with local in memory Redis cache.""" import dataclasses import os import subprocess import tempfile import threading import time from typing import Any, Mapping, Optional, Union import google.cloud.storage import psutil import redis from pathology.dicom_proxy import cache_enabled_type from pathology.dicom_proxy import dicom_proxy_flags from pathology.shared_libs.logging_lib import cloud_logging_client _REDIS_NONE_VALUE = b'NONE' _VALUE_REDIS_CACHE_SIZE_LIMIT_LIMIT = 512000000 _SET_REDIS_CACHE_MAXMEMORY_CONFIG = 'Set Redis cache maxmemory config' _REDIS_CACHE_MAXMEMORY_GREATER_THAN_MEM_AVAIL = ( 'REDIS_CACHE_MAXMEMORY set to value > available memory.' ) _ERROR_SETTING_REDIS_CACHE_MAXMEMORY = 'Error setting Redis cache maxmemory.' _SET_MAXMEMORY_NON_LOCAL_REDIS_ERROR = ( 'Tile-server cache configured to use non-local redis instance. Configure ' 'non-local redis instance max memory settings directly on the REDIS server.' ) _REDIS_USERNAME_AND_PASSWORD_MUST_BE_NONE_RUNNING_FROM_LOCALHOST = ( 'Username and password must be None when running in from localhost.' ) _REDIS_CACHE_CANNOT_BE_TLS_PROXY_WITH_LOCALHOST_IP = ( 'Redis cache cannot be configured to use TLS encryption with localhost.' ) _INVALID_REDIS_LOCALHOST_PORT = ( 'Redis cache running on Localhost must be configured to run on port 6379.' ) _LOCALHOST_IP = '127.0.0.1' _STUNNEL_PORT = 6378 class RedisConfigError(Exception): """Redis configuration error.""" @dataclasses.dataclass class RedisResult: """Result from RedisCache.get holding value returned for key.""" value: Optional[bytes] def _redis_tls_certificate_authority_gcs_uri() -> str: val = dicom_proxy_flags.REDIS_TLS_CERTIFICATE_AUTHORITY_GCS_URI_FLG.value return val.strip() if val is not None else '' def _redis_host_ip_config() -> str: host_ip = dicom_proxy_flags.REDIS_CACHE_HOST_IP_FLG.value host_ip = host_ip.strip() if host_ip is not None else '' if host_ip.lower() in (_LOCALHOST_IP, 'localhost'): return _LOCALHOST_IP return host_ip def _redis_host_ip() -> str: """Returns Redis Host IP.""" host_ip = _redis_host_ip_config() if _redis_tls_certificate_authority_gcs_uri() and host_ip: return _LOCALHOST_IP return host_ip def _redis_host_port() -> int: if _redis_tls_certificate_authority_gcs_uri(): return _STUNNEL_PORT return dicom_proxy_flags.REDIS_CACHE_HOST_PORT_FLG.value def _log_and_raise_critical_error( msg: str, *args: Union[Mapping[str, Any], Exception, None] ) -> None: """Logs and raises critical error. Args: msg: Error message. *args: Additional arguments to log. Raises: RedisConfigError: Proxy server Redis configuration error. """ cloud_logging_client.critical(msg, *args) raise RedisConfigError(msg) # Path to file which records time instance last perfomed redis cache flush. _redis_db_last_flushed_filepath = os.path.join( tempfile.gettempdir(), 'redis_db_last_flushed.txt' ) _redis_host_instance_lock = threading.Lock() _redis_host_instance: Optional[redis.Redis] = None def _init_fork_module_state() -> None: global _redis_host_instance_lock global _redis_host_instance _redis_host_instance_lock = threading.Lock() _redis_host_instance = None def _min_redis_cache_flush_interval() -> int: """Returns minium time between instances Redis cache flush operations.""" return max(dicom_proxy_flags.REDIS_MIN_CACHE_FLUSH_INTERVAL_FLG.value, 0) def _get_redis_db_last_flushed() -> float: """Returns time Redis was last flushed by the instance.""" try: with open(_redis_db_last_flushed_filepath, 'rt') as infile: return float(infile.read()) except OSError as os_exp: cloud_logging_client.warning( f'A OSError occured reading: {_redis_db_last_flushed_filepath}.', os_exp, ) # If read error occures return current time. At most this will delay # cache reset. Server will continue to run. return time.time() def _write_redis_db_last_flushed(update: float) -> None: """Writes the time the instance last flushed the Redis DB.""" try: with open(_redis_db_last_flushed_filepath, 'wt') as outfile: outfile.write(str(update)) except OSError as os_exp: cloud_logging_client.warning( f'A OSError occured writing: {_redis_db_last_flushed_filepath}.', os_exp, ) # If write error occures ingore. At most this will cause result in # additional cache reset. Server will continue to run. class RedisCache: """Interface to communicate with local in memory Redis cache.""" def __init__( self, cache_enabled: cache_enabled_type.CachingEnabled = cache_enabled_type.CachingEnabled( True ), ): if not self.server_defined or ( dicom_proxy_flags.REDIS_CACHE_MAXMEMORY_FLG.value == 0 and self.is_localhost ): cloud_logging_client.warning( 'Redis cache disabled.', { 'server defined': self.server_defined, 'max_memory': dicom_proxy_flags.REDIS_CACHE_MAXMEMORY_FLG.value, 'localhost': self.is_localhost, }, ) cache_enabled = cache_enabled_type.CachingEnabled(False) self._cache_enabled = cache_enabled if not self.is_enabled: return disable_redis_host_cache = ( dicom_proxy_flags.DISABLE_REDIS_INSTANCE_PROCESS_CACHE_IN_DEBUG_FLG.value ) global _redis_host_instance if _redis_host_instance is not None and not disable_redis_host_cache: self._redis = _redis_host_instance return with _redis_host_instance_lock: if _redis_host_instance is not None and not disable_redis_host_cache: self._redis = _redis_host_instance return self._redis = redis.Redis( host=_redis_host_ip(), port=_redis_host_port(), db=dicom_proxy_flags.REDIS_CACHE_DB_FLG.value, username=dicom_proxy_flags.REDIS_USERNAME_FLG.value, password=dicom_proxy_flags.REDIS_AUTH_PASSWORD_FLG.value, ) _redis_host_instance = self._redis return def config_set(self, name: str, value: str) -> bool: """Sets redis configuration. Args: name: String name of config to set. value: Configuration value. Returns: True if configuration set. """ if not self.is_enabled or not self.is_localhost: return False try: old_value = self._redis.config_get(name) result = self._redis.config_set(name, value) new_value = self._redis.config_get(name) # in unit tests config set was not throw connection errors when no # redis store was a avaliable log change to verify that config # parameter changes. cloud_logging_client.info( 'Redis config changed.', { 'config': name, 'old_value': old_value, 'value_set': value, 'new_value': new_value, }, ) return result except redis.exceptions.ConnectionError: self._cache_enabled = cache_enabled_type.CachingEnabled(False) return False @property def server_defined(self) -> bool: return bool(_redis_host_ip_config()) @property def is_localhost(self) -> bool: return bool(_redis_host_ip_config() == _LOCALHOST_IP) @property def is_enabled(self) -> bool: """Returns True if Redis cache is enabled.""" return self._cache_enabled def ping(self) -> bool: if not self.is_enabled: return False try: self._redis.ping() return True except redis.exceptions.ConnectionError as exp: cloud_logging_client.warning('Error connecting to Redis Cache.', exp) return False def get(self, cache_key: str) -> Optional[RedisResult]: """Returns value stored for key in redis cache. Args: cache_key: Key to look up in Redis store. Returns: Value associated with key or None if value is not found. If RedisResult.value == None then None was stored for key's value. """ if not self.is_enabled: return None try: result = self._redis.get(cache_key) if result is None: return result if result == _REDIS_NONE_VALUE: return RedisResult(None) return RedisResult(result) except redis.exceptions.ConnectionError as exp: cloud_logging_client.warning('Error getting value from Redis Cache.', exp) return None def set( self, cache_key: str, value: Optional[Union[bytes, str]], allow_overwrite: bool = True, ttl_sec: Optional[int] = None, ) -> bool: """Sets key value in redis cache. Args: cache_key: Key to look up in Redis store. value: Key's value. allow_overwrite: Allow set to overwrite pre-existing cache value ttl_sec: TTL (sec) for key: value pair to expire Returns: True if key:value set in redis cache. """ if not self.is_enabled: return False nx = not allow_overwrite # Redis param: set the key if it does not exist. ex = ttl_sec # Redis Parameter TTL sec try: if value is None: if self._redis.set(cache_key, _REDIS_NONE_VALUE, nx=nx, ex=ex) is None: return False return True if isinstance(value, str): set_value = value.encode('utf-8') else: set_value = value if len(set_value) > _VALUE_REDIS_CACHE_SIZE_LIMIT_LIMIT: return False if self._redis.set(cache_key, set_value, nx=nx, ex=ex) is None: return False return True except redis.exceptions.ConnectionError as exp: cloud_logging_client.warning( f'Error setting value in Redis Cache. {exp}', exp ) return False except redis.exceptions.ResponseError as exp: if 'command not allowed under OOM prevention' not in str(exp): cloud_logging_client.warning('Unexpected Redis cache error.', exp) return False # Redis cache should be configured to avoid OOM conditions. # https://cloud.google.com/memorystore/docs/redis/memory-management-best-practices # If Redis cache returns OOM then attempt to recover by erasing # current cached data. Proxy server runs on multiple threads, processes, # and GKE instances. As a result a race condition exists here where # multiple OOM events could simultationusly try to empty the cache. Fully # protecting against this is not necessary. Current execution will # eventually result in a usable cache. Two mechanisms # have been added to reduce the likelyhood of the race condition. # 1) To protect against the race condition within a server instance # each proxy instance writes and reads a temporay file which tracks the # time the instance last performed a redis.flushall operation. The min # time delta between cache flush operations is configurable using the # REDIS_MIN_CACHE_FLUSH_INTERVAL enviromental variable. # 2) To protect against cross instance execution Redis are checked # lazyfree_pending_objects if the Redis database reports pending objects. # the Redis instance ins not flushed. cloud_logging_client.warning( 'Redis cache OOM; See:' ' https://cloud.google.com/memorystore/docs/redis/memory-management-best-practices', exp, ) redis_last_flushed = _get_redis_db_last_flushed() if time.time() - redis_last_flushed < _min_redis_cache_flush_interval(): return False try: try: lazyfree_pending_objects = self._redis.info('memory').get( 'lazyfree_pending_objects', 0 ) if lazyfree_pending_objects > 0: cloud_logging_client.info( f'Redis cache has N={lazyfree_pending_objects} ' 'lazyfree_pending_objects. Redis not flushed.' ) return False except AttributeError as attribute_exp: cloud_logging_client.warning( 'Redis info returned unexpected result.', attribute_exp ) _write_redis_db_last_flushed(time.time()) self._redis.flushall(asynchronous=True) cloud_logging_client.info('Flushed Redis cache.') except ( redis.exceptions.ConnectionError, redis.exceptions.ResponseError, ) as redis_exp: _write_redis_db_last_flushed(redis_last_flushed) cloud_logging_client.warning( 'Unexpected error flushing Redis cache.', redis_exp ) return False def delete(self, cache_key: str) -> int: try: return self._redis.delete(cache_key) except redis.exceptions.ConnectionError: return 0 def _init_config_defrag(cache: RedisCache) -> None: if not cache.is_enabled or not cache.is_localhost: return try: if cache.config_set('activedefrag', 'yes'): return raised_exception = None except redis.exceptions.ResponseError as exp: raised_exception = exp cloud_logging_client.warning( 'Could not enable activedefrag on localhost redis.', raised_exception ) def _init_config_maxmemory(cache: RedisCache) -> None: """Called at program initialization to init redis LRU maxmemory setting. If no redis server -> Do nothing If remote redis server log warning that config should be performed on remote server. Redis maxmemory read from REDIS_CACHE_MAXMEMORY env. If not defined set to 1/2 of available memory in Gigabytes. If local validate that memory setting is valid >= 1 and within container memory limits. Then attempt to set the memory config Args: cache: Redis cache to init. Returns: None Raises: RedisConfigError: Proxy server Redis configuration error. """ if not cache.is_enabled or not cache.is_localhost: if dicom_proxy_flags.REDIS_CACHE_MAXMEMORY_FLG.value > 0: _log_and_raise_critical_error( _SET_MAXMEMORY_NON_LOCAL_REDIS_ERROR, { 'REDIS_CACHE_MAXMEMORY': ( dicom_proxy_flags.REDIS_CACHE_MAXMEMORY_FLG.value ) }, ) return gigabyte = pow(1024, 3) vm_available = psutil.virtual_memory().available if dicom_proxy_flags.REDIS_CACHE_MAXMEMORY_FLG.value < 0: memory_config = int(vm_available / (2 * gigabyte)) else: memory_config = dicom_proxy_flags.REDIS_CACHE_MAXMEMORY_FLG.value free_memory = int(vm_available / gigabyte) if memory_config > free_memory: _log_and_raise_critical_error( _REDIS_CACHE_MAXMEMORY_GREATER_THAN_MEM_AVAIL, { 'redis set_config maxmemory': f'{memory_config}GB', 'total_available_memory': f'{free_memory}GB', }, ) memory_config_str = f'{memory_config}GB' if not cache.config_set('maxmemory', memory_config_str): _log_and_raise_critical_error( _ERROR_SETTING_REDIS_CACHE_MAXMEMORY, { 'redis set_config maxmemory': memory_config_str, 'total_available_memory': f'{free_memory}GB', }, ) cloud_logging_client.info( _SET_REDIS_CACHE_MAXMEMORY_CONFIG, {'redis set_config maxmemory': memory_config_str}, ) def _validate_flags(cache: RedisCache) -> None: """Validates proxy server redis configuration flags. Args: cache: Redis cache to validate with flags. Raises: RedisConfigError: Proxy server Redis configuration error. """ if cache.is_localhost: username = dicom_proxy_flags.REDIS_USERNAME_FLG.value password = dicom_proxy_flags.REDIS_AUTH_PASSWORD_FLG.value if username is not None or password is not None: _log_and_raise_critical_error( _REDIS_USERNAME_AND_PASSWORD_MUST_BE_NONE_RUNNING_FROM_LOCALHOST, {'username': username, 'password': password}, ) if _redis_tls_certificate_authority_gcs_uri(): _log_and_raise_critical_error( _REDIS_CACHE_CANNOT_BE_TLS_PROXY_WITH_LOCALHOST_IP, { 'redis_host_ip': dicom_proxy_flags.REDIS_CACHE_HOST_IP_FLG.value, 'redis_tls_certificate': ( _redis_tls_certificate_authority_gcs_uri() ), }, ) if _redis_host_port() != 6379: _log_and_raise_critical_error( _INVALID_REDIS_LOCALHOST_PORT, {'redis_port': _redis_host_port()} ) def _start_stunnel_proxy() -> None: """Init stunnnel local proxy if external Redis configured with TLS CA.""" temp_dir = tempfile.TemporaryDirectory().name os.mkdir(temp_dir) config_file = os.path.join(temp_dir, 'stunnel.config') ca_file = os.path.join(temp_dir, 'server_ca.pem') stunnel_pid = os.path.join(temp_dir, 'stunnel.pid') try: client = google.cloud.storage.Client() blob = google.cloud.storage.Blob.from_string( _redis_tls_certificate_authority_gcs_uri(), client ) with open(ca_file, 'wb') as outfile: outfile.write(blob.download_as_bytes()) except Exception as exp: cloud_logging_client.critical( 'Error downloading Redis TLS certifcate.', exp ) raise try: server_ip = dicom_proxy_flags.REDIS_CACHE_HOST_IP_FLG.value server_ip = server_ip.strip() if server_ip is not None else '' server_port = dicom_proxy_flags.REDIS_CACHE_HOST_PORT_FLG.value with open(config_file, 'wt') as outfile: lines = [ f'CAfile={ca_file}', 'client=yes', f'pid={stunnel_pid}', 'verifyChain=yes', 'sslVersion=TLSv1.2', '[redis]', f'accept={_LOCALHOST_IP}:{_STUNNEL_PORT}', f'connect={server_ip}:{server_port}', ] outfile.write('\n'.join(lines)) except Exception as exp: cloud_logging_client.critical('Error saving Redis CA.', exp) raise try: cloud_logging_client.info('Starting stunnel.') subprocess.Popen(['/usr/bin/stunnel', config_file]) cloud_logging_client.info('stunnel started.') except Exception as exp: cloud_logging_client.critical('Error starting stunnel.', exp) raise def setup() -> None: """Initializes and Validates Redis cache; call once at startup. Returns: None Raises: RedisConfigError: Proxy server Redis configuration error. """ _write_redis_db_last_flushed(time.time() - _min_redis_cache_flush_interval()) cache = RedisCache() _validate_flags(cache) if cache.is_localhost: subprocess.Popen(['/usr/bin/redis-server', '/redis.conf']) time.sleep(30) # Give time for redis to start before configuring. _init_config_defrag(cache) _init_config_maxmemory(cache) if not cache.is_enabled: cloud_logging_client.warning('Redis cache disabled.') return if cache.is_localhost: cache_status = {'running': 'locally'} elif _redis_tls_certificate_authority_gcs_uri(): _start_stunnel_proxy() cache_status = {'running': 'remote_instance_tls'} else: cache_status = { 'running': 'remote_instance', 'redis_host_ip': dicom_proxy_flags.REDIS_CACHE_HOST_IP_FLG.value, } cloud_logging_client.info('Redis cache setup.', cache_status) os.register_at_fork(after_in_child=_init_fork_module_state)