infrastructure/beat-the-load-balancer/backend/player.py (200 lines of code) (raw):

#!/usr/bin/env python # Copyright 2024 Google LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import random import time import requests from flask import render_template, redirect from lib import config from lib.app_cache import PlayerCache from lib.base_app import app, APP_URLS, create_index_page from lib.base_cache import RedisSelectionManager from lib.helper import JobFileManager from metrics.metrics import send_game_record_message # Settings for Index Page random.seed(42) APP_URLS += """ /game/start /game/stop /vm/all/reset /vm/all/stats /vm/active """.strip().splitlines() OLD_LOG = """ /process /rr/process /load/start /vm/all /reset /process/options /game /vm/all /vm/all/score /vm/all/load /vm/all/stats /vm/active /vm/score /vm/active/cache /vm/active/cache/show /vm/active/cache/reset /process /process/options /process/auto /rr/process """ # Warehouse VM to redirect the requests WH_VM_MGR = RedisSelectionManager(config.WH_VM_NAMES) _DEFAULT_INSTANCE = config.WH_VM_NAMES[0] WH_VM_MGR.set_selected_vm(_DEFAULT_INSTANCE) PLAYER_APP_CACHE = PlayerCache() LOADER_FLAG_FILE = "/tmp/flaskapp_loader_flag_file" RESET_FLAG_FILE = "/tmp/flaskapp_game_reset_flag_file" # Additional Settings for Index Page for _VM in config.WH_VM_NAMES: APP_URLS.append(f"/vm/select/{_VM}") @app.route("/") def new_index_page(): return create_index_page(config.HOSTNAME, __file__, APP_URLS) @app.route("/game") def game(): return render_template("game.html", vm_wh_ips=config.VM_IPS) @app.route("/vm/all", methods=["GET"]) def get_vms_details(): """For all registered VM details""" return PLAYER_APP_CACHE.get_vm_all_ips() @app.route("/vm/all/score", methods=["GET"]) def get_vms_scores(): """For all VM provide score details""" return PLAYER_APP_CACHE.get_vm_all_scores() @app.route("/vm/all/load", methods=["GET"]) def get_vms_health(): """For all VM provide system health details""" return PLAYER_APP_CACHE.get_all_vm_health_stats() @app.route("/vm/all/stats", methods=["GET"]) def get_vms_stats(): """For selected VM provide all stats(score, health & other)""" return PLAYER_APP_CACHE.get_all_vm_stats() @app.route("/vm/active", methods=["GET"]) def show_active_vm(): return { "active-vm": WH_VM_MGR.get_selected_vm(), "active-vm-update-count": WH_VM_MGR.get_vm_selections_count(), } @app.route("/reset", methods=["GET"]) def reset_player_game() -> dict: response = dict({"hostname": config.HOSTNAME}) job_mgr = JobFileManager(RESET_FLAG_FILE, expiry_time=time.time()) if job_mgr.is_job_running(): response["status"] = "not ok" response["details"] = ( f"Flag file {RESET_FLAG_FILE} says a job seem to be running" ) else: response["status"] = "ok" response2 = PLAYER_APP_CACHE.reset() response.update(response2) return response @app.route("/vm/select/<vm_name>", methods=["GET"]) def update_vm_select(vm_name) -> dict: global WH_VM_MGR if vm_name in config.WH_VM_NAMES: if vm_name != WH_VM_MGR.get_selected_vm(): logging.warning( f" ===> vm-selection is updated! ({WH_VM_MGR.get_selected_vm()}" f" --> {vm_name})" ) WH_VM_MGR.set_selected_vm(vm_name) WH_VM_MGR.incr_vm_selection_count() # return f"Done! Selected VM:{vm_name}!" return {"active-vm": vm_name, "status": "updated"} else: return {"active-vm": vm_name, "status": "no changes!"} else: return { "active-vm": vm_name, "status": "no changes due to invalid input", "valid-inputs": config.WH_VM_NAMES, } @app.route("/process") def automatic_routing(): ip_address = config.VM_IPS[WH_VM_MGR.get_selected_vm()] url = f"http://{ip_address}:8000/process" return redirect(url, code=302) @app.route("/process/options") def automatic_routing_options() -> dict: url_config = { vm_name: f"http://{config.VM_IPS[vm_name]}:8000/process" for vm_name in config.WH_VM_NAMES } logging.debug(f"==> /process options {url_config}") return url_config # @app.route("/process/auto") @app.route("/rr/process") def automatic_round_robin_routing(): selected_vm = random.choice(config.WH_VM_NAMES) selected_vm_address = config.VM_IPS[selected_vm] url = f"http://{selected_vm_address}:8000/process" logging.info( f"/rr/process - selected vm : {selected_vm} ==> redirect request to {url}" ) return redirect(url, code=302) @app.route("/vm/reset/<string:vm_name>") def reset_vm_stats(vm_name=None) -> dict: ip_address = config.VM_IPS[vm_name] url = f"http://{ip_address}:8000/reset" response = send_internal_http_request_to(url) return response @app.route("/vm/all/reset") @app.route("/game/reset") def reset_all_vms_stats() -> dict: response = dict() logging.warning("player: requested reset for Player App Cache") response[config.HOSTNAME] = PLAYER_APP_CACHE.reset() # reset worker VMS for vm_name in config.WH_VM_NAMES + config.LB_WH_VM_NAME: logging.warning(f"player: requested reset for Worker({vm_name}) App Cache") ip_address = config.VM_IPS[vm_name] # purge celery queue reset_url = f"http://{ip_address}:8000/purge_celery_queue" response[vm_name + "-celery-reset"] = send_internal_http_request_to(reset_url) # reset flaskapp cache url = f"http://{ip_address}:8000/reset" response[vm_name] = send_internal_http_request_to(url) logging.warning(f"reset results! {response}") return {"status": "ok"} def send_internal_http_request_to(url) -> dict: logging.warning(f"Sending outbound request to {url}") response = dict() response["url"] = url response["request"] = "GET" logging.debug(f"==> sending load request to {url}") try: response["response"] = requests.get(url).json() response["status"] = "ok" except Exception as err: response["status"] = "not ok" response["error"] = str(err) logging.warning("send_request_to_loader: Found error") logging.warning(err) return response def send_request_to_loader(player_name) -> dict: logging.warning("Sending request to loader!!!") job_mgr = JobFileManager(LOADER_FLAG_FILE, expiry_time=time.time()) response = dict({"player_name": player_name, "status": "not ok"}) if job_mgr.is_job_running(): logging.debug("Locust is recently Triggered. Try again later.") response["status"] = "not ok" response["details"] = f"A job seem to be running. Found - {LOADER_FLAG_FILE}" else: try: job_mgr.create_timestamp_file() # ip_address = config.VM_IPS["vm-loader"] url = f"http://{ip_address}:8000/load/start" response["loader1"] = send_internal_http_request_to(url) # ip_address = config.VM_IPS["vm-loader2"] url = f"http://{ip_address}:8000/load/start" response["loader2"] = send_internal_http_request_to(url) except Exception as err: logging.warning("send_request_to_loader: Found error") logging.warning(err) return response def get_player_name(player_name) -> str: if player_name == "player": # Get the current UTC/GMT time current_time = time.gmtime() # Format the time as "DD_HH_MM" formatted_time = time.strftime("%m%d_%H%M%S", current_time) player_name = f"{player_name}_{formatted_time}" logging.warning(f"Using player name as `{player_name}`") return player_name @app.route("/game/start") @app.route("/game/start/<string:player_name>") def game_start(player_name="player") -> dict: # generate unique player id player_name = get_player_name(player_name) # reset the game stats response1 = reset_all_vms_stats() # start the game PLAYER_APP_CACHE.start_the_game(player_name) # start the loader response2 = send_request_to_loader(player_name) # send the start message to Pubsub send_game_record_message("start", uniqueid=player_name) return {**response1, **response2} def send_local_http_call(url): try: return requests.get(url) except Exception: logging.exception("failed to API call request") return "job failed" @app.route("/game/stop") def stop_the_player_game() -> dict: send_game_record_message("stop", uniqueid=PLAYER_APP_CACHE.get_player_name()) try: PLAYER_APP_CACHE.stop_the_game() except Exception: logging.warning("Failed to reset Celery jobs") return { "status": "ok", "message": "Game Timer is reset. Call `/vm/all/reset` before the next game!", } if __name__ == "__main__": logging.basicConfig( format="%(asctime)s - %(message)s", filemode="w", level=logging.DEBUG, ) app.run(debug=True, port=config.VM_APP_PORTS["vm-main"])