gui/backend/gui_plugin/core/Cache.py (77 lines of code) (raw):
# Copyright (c) 2025, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms, as
# designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See
# the GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
import threading
import time
import gui_plugin as gui
class AutoTTLConnectionCache:
def __init__(self, ttl_seconds=3600, max_size=1000, cleanup_interval=1):
self.clean_func = None
self.ttl = ttl_seconds
self.max_size = max_size
self.cleanup_interval = cleanup_interval
self._cache = {} # key: (value, timestamp)
self._lock = threading.Lock()
self._stop_event = threading.Event()
self._cleaner_thread = threading.Thread(
target=self._cleaner, daemon=True)
self._cleaner_thread.start()
def _cleaner(self):
while not self._stop_event.is_set():
self._evict_expired()
time.sleep(self.cleanup_interval)
def _evict_expired(self):
now = time.time()
with self._lock:
keys_to_delete = [
key for key, (_, ts) in self._cache.items() if now - ts > self.ttl]
for key in keys_to_delete:
profile_id, connection_id = self._cache[key]
gui.db_connections.remove_db_connection( # type: ignore
profile_id, connection_id)
del self._cache[key]
def __setitem__(self, key, value):
with self._lock:
if len(self._cache) >= self.max_size:
oldest_key = min(self._cache.items(),
key=lambda item: item[1][1])[0]
del self._cache[oldest_key]
self._cache[key] = (value, time.time())
def __getitem__(self, key):
with self._lock:
item = self._cache.get(key)
if not item:
raise KeyError(key)
value, ts = item
if time.time() - ts > self.ttl:
del self._cache[key]
raise KeyError(key)
return value
def __delitem__(self, key):
with self._lock:
if key in self._cache:
del self._cache[key]
else:
raise KeyError(key)
def __contains__(self, key):
with self._lock:
item = self._cache.get(key)
if not item:
return False
_, ts = item
if time.time() - ts > self.ttl:
del self._cache[key]
return False
return True
def __len__(self):
with self._lock:
now = time.time()
return sum(1 for _, ts in self._cache.values() if now - ts <= self.ttl)
def set_clean_func(self, func):
"""
Set a function to be called when the cache is cleared.
The function should take no arguments and return nothing.
"""
self.clean_func = func
def clear(self):
"""
Clear the cache and call the clean function if set.
"""
with self._lock:
self._cache.clear()
def stop(self):
"""
Stop the cleaner thread and call the clean function if set.
"""
self._stop_event.set()
self._cleaner_thread.join()
def clean(self):
if self.clean_func is not None:
self.clean_func()