infrastructure/beat-the-load-balancer/backend/lib/app_cache.py (202 lines of code) (raw):

#!/usr/bin/env python # Copyright 2024 Google LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import datetime import logging import time import lib.job_reporter from lib import base_cache from lib import config class GameCacheVariables(base_cache.RedisHelper): """ Class for all Game related cached related variable's (redis) keys names and their default values. """ def __init__(self): super().__init__() self.wh_vms = config.WH_VM_NAMES self.lb_wh_vms = config.LB_WH_VM_NAME self.all_wh_vms = self.wh_vms + self.lb_wh_vms # Warehouse variables - keys & default values self.score_counter_key_default = 0 self.score_counter_key = f"{config.HOSTNAME}_score".lower() self.crash_start_time = str(datetime.datetime.now()) self.crash_start_time_key = f"{config.HOSTNAME}_crash_start_time".lower() self.crash_start_time_key_counter = 0 self.crash_start_time_key_counter_key = ( f"{config.HOSTNAME}_crashes_count".lower() ) # Player self.player_name = "-no-name-" self._default_player_name = "-no-name-" self.player_name_key = "game_player_name" self.game_start_time_key = "game_start_time" self.game_start_time_key_default_value = "0.0" # str(time.time() - 60) self.player_task_score = config.GAME_WORKER_JOB_SCORE self.player_task_score_key = "game_task_score" self.selection_mgr = base_cache.RedisSelectionManager() self.player_selected_vm_key = self.selection_mgr.selected_vm_key self.player_selected_vm_counter_key = self.selection_mgr.selected_vm_counter_key # Warehouse & Player stats self.all_vms_crash_counts = self.all_vms_ip_keys = [ f"{_hostname}_crashes_count".lower() for _hostname in self.all_wh_vms ] self.all_vms_ip_keys = [ f"{_hostname}_IP".lower() for _hostname in self.all_wh_vms ] self.all_vms_ip_internal_keys = [ f"{_hostname}_internal_ip".lower() for _hostname in self.all_wh_vms ] # SystemHealth self.all_vms_cpu_keys = [ f"{_hostname}_load_cpu_500ms".lower() for _hostname in self.all_wh_vms ] self.all_vms_mem_keys = [ f"{_hostname}_load_mem_500ms".lower() for _hostname in self.all_wh_vms ] self.all_vms_score_keys = [ f"{_hostname}_score".lower() for _hostname in self.all_wh_vms ] self.all_celery_request_task_keys = [ f"{_hostname}-tasks-registered".lower() for _hostname in self.all_wh_vms ] self.all_celery_completed_task_keys = [ f"{_hostname}-tasks-completed".lower() for _hostname in self.all_wh_vms ] def is_game_in_progress(self) -> bool: game_start_time = float(self.get(self.game_start_time_key)) time_diff = time.time() - game_start_time return time_diff <= config.GAME_COMPLETION_TIME_LIMIT class PlayerCache(GameCacheVariables): def reset(self): """Reset player game cache""" # safety check # if self.is_game_in_progress(): # return self.reset_during_game_in_progress() # reset player name self.player_name = self._default_player_name self.set(self.player_name_key, self.player_name) # rest game start time self.set(self.game_start_time_key, self.game_start_time_key_default_value) # reset player score self.player_task_score = 10 self.set(self.player_task_score_key, self.player_task_score) response1 = dict( { self.player_task_score_key: self.player_task_score, self.player_name_key: self.player_name, } ) # response2 = base_cache.RedisSelectionManager(config.WH_VM_NAMES).reset() response2 = self.selection_mgr.reset() response = {**response1, **response2} logging.warning("---") logging.warning(response) return response def reset_player_name(self): self.player_name = self._default_player_name self.set(self.player_name_key, self.player_name) def start_the_game(self, player_name): self.player_name = player_name self.set(self.player_name_key, player_name) # update game time flag self.set(self.game_start_time_key, str(time.time())) def stop_the_game(self): self.set(self.game_start_time_key, self.game_start_time_key_default_value) def get_player_name(self): return self.get(self.player_name_key) # def set_task_score(self, score=10): # self.player_task_score = score # self.set(self.player_task_score_key, self.player_task_score) def _prepare_dict_(self, keys, redis_value_keys): values = self.mget(redis_value_keys) return dict(zip(keys, values)) def get_vm_all_ips(self): keys = self.all_wh_vms values_keys = self.all_vms_ip_internal_keys return self._prepare_dict_(keys, values_keys) def get_vm_all_scores(self): keys = self.all_wh_vms values_keys = self.all_vms_score_keys return self._prepare_dict_(keys, values_keys) def get_all_vm_health_stats(self): keys = self.all_wh_vms cpu_values = self.mget(self.all_vms_cpu_keys) mem_values = self.mget(self.all_vms_mem_keys) tasks_reported = self.mget(self.all_celery_request_task_keys) tasks_completed = self.mget(self.all_celery_completed_task_keys) response = list() for x, y, z, p, q in zip( keys, cpu_values, mem_values, tasks_reported, tasks_completed ): response.append( dict( { "HOSTNAME": x, "CPU": y, "MEMORY": z, "TASK_REGISTERED": p, "TASK_COMPLETED": q, "QUEUE": int(p) - int(q), } ) ) return response def get_all_vm_stats(self): game_stats = dict( { "ALL_VMS": self.all_wh_vms, "ALL_SCORES": self.mget( self.all_vms_score_keys ), # self.get_vm_all_scores(), "ALL_CPU": self.mget(self.all_vms_cpu_keys), "ALL_MEM": self.mget(self.all_vms_mem_keys), "ALL_TASKS_REGISTERED": self.mget(self.all_celery_request_task_keys), "ALL_TASKS_COMPLETED": self.mget(self.all_celery_completed_task_keys), "ALL_VMS_CRASHES_COUNT": self.mget(self.all_vms_crash_counts), } ) # inner function def get_list_diff(items1, items2): return [str(int(key1) - int(key2)) for key1, key2 in zip(items1, items2)] game_talley = dict( { "QUEUE": get_list_diff( game_stats["ALL_TASKS_REGISTERED"], game_stats["ALL_TASKS_COMPLETED"], ), "GAME_VM_SELECTION_UPDATES": self.selection_mgr.get_vm_selections_count(), "GAME_VM_SELECTION": self.selection_mgr.get_selected_vm(), "GAME_PLAYER_NAME": self.get_player_name(), "GAME_START_TIME": self.get(self.game_start_time_key), "GAME_IS_IN_PROGRESS": self.is_game_in_progress(), } ) # Add total scores _scores = [int(x) for x in game_stats["ALL_SCORES"]] game_stats["GAME_SCORE_PLAYER"] = sum(_scores[:4]) game_stats["GAME_SCORE_GCLB"] = sum(_scores[-4:]) game_stats["GAME_CURRENT_TIME"] = round( time.time() - float(game_talley["GAME_START_TIME"]), 4 ) # combining the stats & tally response = {**game_stats, **game_talley} return response class WarehouseCache(GameCacheVariables): def __init__(self): super().__init__() self.job_reporter = lib.job_reporter.JobReporter() def celery_queue_status(self): return self.job_reporter.job_queue_status() def reset(self) -> dict: # safety check # if self.is_game_in_progress(): # return self.reset_during_game_in_progress() # score update self.score_counter = 0 self.set(self.score_counter_key, self.score_counter) self.crash_start_time = str(datetime.datetime.now()) self.set(self.crash_start_time_key, self.crash_start_time) self.crash_start_time_key_counter = 0 self.set( self.crash_start_time_key_counter_key, self.crash_start_time_key_counter ) response_dict = dict( { self.score_counter_key: self.score_counter, self.crash_start_time_key: self.crash_start_time, self.crash_start_time_key_counter_key: self.crash_start_time_key_counter, } ) # # response_dict1 = self.reset_warehouse_data() response_dict2 = self.job_reporter.reset() response = { f"{config.HOSTNAME}-WH-RESET": response_dict, f"{config.HOSTNAME}-CELERY-RESET": response_dict2, } logging.warning("---") logging.warning(response) return response def set_crash_start_time(self): self.set(self.crash_start_time_key, str(datetime.datetime.now())) def reset_crash_start_time(self): self.set_crash_start_time() def get_crash_start_time(self): self.get(self.crash_start_time_key) def incr_crash_counter(self): self.incr(self.crash_start_time_key_counter_key, 1) # def set_score_counter(self, value=0): # self.set(self.score_counter_key, value) def reset_score_counter(self): self.set(self.score_counter_key, self.score_counter_key_default) def get_score_counter(self) -> int: return self.get(self.score_counter_key) def incr_score_counter(self, value): self.incr(self.score_counter_key, value)