aws_advanced_python_wrapper/utils/cache_map.py (63 lines of code) (raw):

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). # You may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import annotations import threading import time from typing import Dict, Generic, Optional, TypeVar K = TypeVar('K') V = TypeVar('V') class CacheMap(Generic[K, V]): def __init__(self): self._cache: Dict[K, CacheItem[V]] = {} self._cleanup_interval_ns: int = 600_000_000_000 # 10 minutes self._cleanup_time_ns: int = time.perf_counter_ns() + self._cleanup_interval_ns self._lock = threading.RLock() def __len__(self): return len(self._cache) def get(self, key: K) -> Optional[V]: with self._lock: value = self._cache.get(key) if not value: return None if value.is_expired(): self._cache.pop(key) return None return value.item def get_with_default(self, key: K, value_if_absent: V, item_expiration_ns: int) -> V: with self._lock: old_value = self._cache.get(key) if not old_value or old_value.is_expired(): new_value = CacheItem(value_if_absent, time.perf_counter_ns() + item_expiration_ns) else: new_value = old_value if new_value is not None: self._cache[key] = new_value return new_value.item if key in self._cache: self._cache.pop(key) return None def put(self, key: K, item: V, item_expiration_ns: int): self._cache[key] = CacheItem(item, time.perf_counter_ns() + item_expiration_ns) self._cleanup() def remove(self, key: K): self._cache.pop(key, None) self._cleanup() def clear(self): self._cache.clear() def get_dict(self) -> Dict[K, V]: with self._lock: return {key: self._cache[key].item for key in self._cache.keys()} def _cleanup(self): if self._cleanup_time_ns > time.perf_counter_ns(): return self._cleanup_time_ns = time.perf_counter_ns() + self._cleanup_interval_ns with self._lock: removal_keys = [key for key, cache_item in self._cache.items() if not cache_item or cache_item.is_expired()] for key in removal_keys: self._cache.pop(key) class CacheItem(Generic[V]): def __init__(self, item: V, expiration_time: int): self.item = item self._expiration_time = expiration_time def __str__(self): return f"CacheItem [item={str(self.item)}, expiration_time={self._expiration_time}]" def is_expired(self) -> bool: return time.perf_counter_ns() > self._expiration_time