http_service/bugbug_http/readthrough_cache.py (49 lines of code) (raw):
# -*- coding: utf-8 -*-
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
import datetime
import logging
import threading
import time
from datetime import timedelta
from typing import Callable, Generic, TypeVar
LOGGER = logging.getLogger()
# A simple TTL cache to use with models. Because we expect the number of models
# in the service to not be very large, simplicity of implementation is
# preferred to algorithmic efficiency of operations.
#
# Called an 'Idle' TTL cache because TTL of items is reset after every get
Key = TypeVar("Key")
Value = TypeVar("Value")
class ReadthroughTTLCache(Generic[Key, Value]):
def __init__(self, ttl: timedelta, load_item_function: Callable[[Key], Value]):
self.ttl = ttl
self.load_item_function = load_item_function
self.items_last_accessed: dict[Key, datetime.datetime] = {}
self.items_storage: dict[Key, Value] = {}
def __contains__(self, key):
return key in self.items_storage
def get(self, key, force_store=False):
store_item = force_store
if key in self.items_storage:
item = self.items_storage[key]
else:
item = self.load_item_function(key)
# Cache the item only if it was last accessed within the past TTL seconds
# Note that all entries in items_last_accessed are purged if item was not
# accessed in the last TTL seconds.
if key in self.items_last_accessed:
store_item = True
self.items_last_accessed[key] = datetime.datetime.now()
if store_item:
LOGGER.info(
f"Storing item with the following key in readthroughcache: {key}"
)
self.items_storage[key] = item
return item
def purge_expired_entries(self):
purge_entries_before = datetime.datetime.now() - self.ttl
for key, time_last_touched in list(self.items_last_accessed.items()):
if time_last_touched < purge_entries_before:
LOGGER.info(
f"Evicting item with the following key from readthroughcache: {key}"
)
del self.items_last_accessed[key]
del self.items_storage[key]
def start_ttl_thread(self):
def purge_expired_entries_with_wait():
while True:
time.sleep(self.ttl.total_seconds())
self.purge_expired_entries()
thread = threading.Thread(target=purge_expired_entries_with_wait)
thread.setDaemon(True)
thread.start()