infrastructure/load-balancing-blitz/app/sys_health_server.py (71 lines of code) (raw):

#!/usr/bin/env python # Copyright 2024 Google LLC. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import os import time from multiprocessing import Process, Queue import psutil from lib.system_health import SystemLoad logging.info(f"CronJob: Running {__file__} script!") flag_file = "/tmp/flaskapp_hc_flag_file" def get_system_data(): logging.debug("CronJob: running get_system_data()") cpu_percent = psutil.cpu_percent(interval=None) # Instantaneous CPU usage cpu_load_avg = psutil.getloadavg()[0] # 5 seconds load average memory_usage = psutil.virtual_memory().percent return cpu_percent, cpu_load_avg, memory_usage def update_system_data(data_queue, lag=None, time_interval=5): time.sleep(lag) logging.debug(f"CronJob: lag time {lag}") start_time = time.time() sys = SystemLoad() if lag: time.sleep(lag) while os.path.isfile(flag_file): logging.debug("CronJob: running update_system_data()") time_diff = time.time() - start_time # collect sys load cpu_instant_load, cpu_load, mem_load = sys.get_os_system_health(time_interval) # report the sys load sys.update_system_health_to_redis(cpu_load, mem_load) sys.update_system_health_to_pubsub() data_queue.put((time_diff, cpu_instant_load, cpu_load, mem_load)) time.sleep(5) def main(): if os.path.isfile(flag_file): logging.warning("CronJob: Found flag file! skipping executions") return with open(flag_file, "w") as fp: fp.write("Delete this file to stop processing!") logging.debug("CronJob: running main script") # single thread # data_updater = Process(target=update_system_data, args=(data_queue, 1)) # data_updater.start() # multi thread round_trip_time_sec = 5 time_interval = 5 updates_per_sec = 10 process_count = round_trip_time_sec * updates_per_sec # ==> 50 process_lag = (round_trip_time_sec * 1.0) / process_count # ==> 0.100 seconds # starting the threads data_queue = Queue() running_jobs = [] for i in range(process_count): _data_updater = Process( target=update_system_data, args=(data_queue, i * process_lag, time_interval) ) _data_updater.start() running_jobs.append(_data_updater) while os.path.isfile(flag_file): if not data_queue.empty(): time_diff, cpu_percent, cpu_load_avg, memory_usage = data_queue.get() print( "Time", time_diff, "CPU Instant:", cpu_percent, "CPU Load Avg (5s):", cpu_load_avg, "Memory:", memory_usage, ) time.sleep(process_lag) if __name__ == "__main__": logging.basicConfig( filename=f"/tmp/flaskapp_hc.log", format="%(asctime)s - %(message)s", filemode="w", level=logging.DEBUG, ) main()