infrastructure/load-balancing-blitz/app/warehouse.py (128 lines of code) (raw):

#!/usr/bin/env python # Copyright 2024 Google LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Warehouse VM - Flask App Author: SampathM@google.com Description: This is a simple flask back-end application to receive message sent by user VM. Web Server Features: 1. Receives all REST API calls (get/post) at `/message` 2. At `/health` provides health-check status 3. At `/load` provides os load status 4. At `/crash` provide time before crashing 5. At `/crash/now`, crashes(i.e.) pauses the processing work """ import datetime import logging import time from lib import config from lib.app_cache import WarehouseCache from lib.base_app import app, APP_URLS, create_index_page from lib.celery_task import delayed_generate_controlled_load from lib.job_reporter import JobReporter # Settings HOSTNAME = config.HOSTNAME FILE = __file__ APP_URLS += """ /process /crash /crash/status /crash/now /score /reset """.strip().splitlines() # crash settings CRASH_TIME_DURATION = 5.0 # seconds MESSAGE_PROCESS_DURATION = 0.02 # seconds CRASH_TIME = -1 * CRASH_TIME_DURATION WH_APP_CACHE = WarehouseCache() WH_APP_CACHE.reset_score_counter() WH_APP_CACHE.set_crash_start_time() JOB_REPORTER = JobReporter() # WH_APP_CACHE.reset() @app.route("/") def warehouse_index_page(): return create_index_page(HOSTNAME, FILE, APP_URLS) def crash_checker(function): def inner_function(*args, **kwargs): if CRASH_TIME < time.perf_counter(): return function(*args, **kwargs) else: # return f"System Crashed, Wait {CRASH_TIME - time.perf_counter()}" # https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/429 # 429: Too many requests # return abort(429) return { "status": "not ok!", "message": "request blocker is enabled! please wait", } return inner_function @app.route("/process") @crash_checker def send_message() -> dict: # WH_APP_CACHE.incr_score_counter() # time.sleep(MESSAGE_PROCESS_DURATION) # # send run the job request to background queue response = delayed_generate_controlled_load.delay( duration_ms=800, cpu_percent=2, memory_percent=2, start_time_lag_ms=0, end_time_lag_ms=0, ) # report a job queue_size = JOB_REPORTER.get_job_queue_size() if queue_size > config.GAME_WH_JOB_LIMIT: return { "status": "not ok", "hostname": HOSTNAME, "description": f"Queue size limit reached {queue_size}, limit is {config.GAME_WH_JOB_LIMIT}", } # register a new task JOB_REPORTER.register_new_task() # logging.debug(f"delayed job response {response}") return { "status": "ok", "total-score": WH_APP_CACHE.get_score_counter(), "hostname": HOSTNAME, } @app.route("/score") def process_score() -> dict: return {"score": WH_APP_CACHE.get_score_counter(), "hostname": HOSTNAME} @app.route("/reset") def reset_warehouse_stats() -> dict: logging.warning("Reset JobReporter") response1 = JobReporter().reset() logging.warning("Reset Warehouse App Cache") response2 = WH_APP_CACHE.reset() response = {**response1, **response2} response["hostname"] = HOSTNAME response["status"] = "ok" return response @app.route("/crash/now") def crash_scene_simulator() -> dict: global CRASH_TIME global CRASH_TIME_DURATION if CRASH_TIME < time.perf_counter(): CRASH_TIME = time.perf_counter() + CRASH_TIME_DURATION logging.warning(f"updates CRASH_TIME to {CRASH_TIME}") message = { "start-crash-simulation": "ok", "time-now": datetime.datetime.now(), "crash-recovery-duration(sec)": CRASH_TIME_DURATION, } else: message = { "start-crash-simulation": "not ok!", "crash-recover-start-time(local)": CRASH_TIME, "crash-recover-duration(secs)": CRASH_TIME_DURATION, "crash-recover-wait-time(secs)": CRASH_TIME - time.perf_counter(), "time-now": datetime.datetime.now(), } return message @app.route("/crash") @app.route("/crash/status") def crash_scene_test() -> dict: global CRASH_TIME if CRASH_TIME < time.perf_counter(): message = {"status": "ok!"} else: message = { "status": "ok!", "recovery-wait(seconds)": CRASH_TIME - time.perf_counter(), } # return make_response(jsonify(message), 200) return message def is_port_free(port) -> bool: """Checks if a port is available for use. Args: port (int): The port number to check. Returns: bool: True if the port is free, False otherwise. Note: Written by Gemini """ import socket with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: s.bind(("localhost", int(port))) return True except OSError: return False def get_free_port() -> int: """To find an unused port to dev port to run""" logging.info(f"Check free ports") all_port = "8000 8080 8081 8082 8083 8084 8085".split() for app_port in all_port: if is_port_free(app_port): logging.warning(f"using port {app_port}") print(f"using port {app_port}") return app_port else: logging.info(f"Failing with port: {app_port}") raise Exception("Failed to find a valid port!") if __name__ == "__main__": logging.basicConfig( format="%(asctime)s - %(message)s", filemode="w", level=logging.DEBUG, ) app.run(debug=True, port=get_free_port())