infrastructure/beat-the-load-balancer/backend/lib/base_cache.py (73 lines of code) (raw):
#!/usr/bin/env python
# Copyright 2024 Google LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
import redis
from lib import config
from lib import vm_selection_manager
# Global config
redis_client = redis.StrictRedis(config.REDIS_IP_ADDRESS, port="6379")
class RedisHelper:
def __init__(self):
logging.info(
f"redis({self.__class__.__name__}): creating class -> ({self.__class__.__name__})"
)
self.redis_client = redis_client
def incr(self, key_name, value):
"""
Increments a key by a value.
"""
self.redis_client.incr(key_name, value)
def set(self, key_name, key_value):
self.redis_client.set(key_name, key_value)
logging.info(f"redis({self.__class__.__name__}): set {key_name} : {key_value}")
def get(self, key_name):
key_value = self.redis_client.get(key_name)
return key_value.decode("utf")
def mget(self, key_names):
key_values = self.redis_client.mget(key_names)
for i in range(len(key_values)):
if key_values[i]:
key_values[i] = key_values[i].decode("utf")
return key_values
class RedisSelectionManager(RedisHelper, vm_selection_manager.LocalSelectionManager):
"""
Handles VM selection and statistics.
Uses Redis as backend memory store.
"""
def __init__(self, vm_options=None):
# inherit parent features
RedisHelper.__init__(self)
if not vm_options:
vm_options = config.WH_VM_NAMES
vm_selection_manager.LocalSelectionManager.__init__(self, vm_options)
self.selected_vm = config.WH_VM_NAMES[0]
self.selected_vm_key = "game_selected_instance"
self.selected_vm_counter = 0
self.selected_vm_counter_key = "game_selected_instance_counter"
# vm expected
self.vm_options = vm_options
# Temp cache variables to stop VM
self.selected_vm = None
self.last_updated_time = 0
# cache time limit in seconds
self._cache_time_limit = 1
logging.info(f"Started {__class__}")
# cache time limit is 100 ms or 0.1 seconds
self._cache_time_limit = 0.1
def reset(self):
self.selected_vm = config.WH_VM_NAMES[0]
self.set(self.selected_vm_key, self.selected_vm)
self.selected_vm_counter = 0
self.set(self.selected_vm_counter_key, self.selected_vm_counter)
response = dict(
{
self.selected_vm_key: self.selected_vm,
self.selected_vm_counter_key: self.selected_vm_counter,
}
)
logging.warning("---")
logging.warning(response)
return response
def incr_vm_selection_count(self) -> None:
"""
Increments the instance selection counter.
"""
self.incr(self.selected_vm_counter_key, 1)
logging.info(
f"redis({self.__class__.__name__}): (Reset) Selection counter is {self.get(self.selected_vm_key)}"
)
def get_vm_selections_count(self):
"""
Returns the instance selection counter.
"""
value = self.get(self.selected_vm_counter_key)
# logging.info(f"redis({self.__class__.__name__}): Selection Counter: {value}")
return value
def _read_selected_vm_from_storage(self, vm_name=None):
"""
Reads the selected VM from storage.
"""
# write to redis
vm_name = self.get(self.selected_vm_key)
# vm_name = vm_name.decode("utf-8")
# update config
self.selected_vm = vm_name
self.last_updated_time = time.time()
def _write_selected_vm_to_storage(self, vm_name):
"""
Writes the selected VM to storage.
"""
# write to redis
self.set(self.selected_vm_key, vm_name)
logging.info(
f"redis({self.__class__.__name__}): Update VM Selection to {vm_name}"
)