benchmarks/benchmark/tools/locust-load-inference/locust-runner/app/main.py (106 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from fastapi import FastAPI, BackgroundTasks import requests import time from google.cloud import monitoring_v3 from google.cloud import storage from datetime import datetime import os import yaml from .data_model import Metric, MetricType, LocustRun app = FastAPI() @app.get("/run") async def root(background_tasks: BackgroundTasks, duration=os.environ["DURATION"], users=os.environ["USERS"], rate=os.environ["RATE"], namespace=os.environ["NAMESPACE"]): run: LocustRun = LocustRun(duration=duration, users=users, rate=rate, namespace=namespace) background_tasks.add_task(call_locust, run) return {"message": f"""Swarming started"""} def call_locust(run: LocustRun): locust_service = f"locust-master.{run.namespace}.svc.cluster.local" run.start_time = time.time() query_response = requests.post(f"""http://{locust_service}:8089/swarm""", { "user_count": run.users, "spawn_rate": run.rate}) time.sleep(int(run.duration)) get_response = requests.get(f"""http://{locust_service}:8089/stop""") run.end_time = time.time() stats = requests.get( f"""http://{locust_service}:8089/stats/requests/csv""") # read metric list with open('metrics.yaml', "r") as f: metric_map = yaml.safe_load(f) metric_list: [Metric] = [] metrics_dict = metric_map['metrics'] for metric_name in metrics_dict: current = metrics_dict[metric_name] metric_list.append(Metric( name=metric_name, filter=current['filter'], aggregate=current['aggregation'], type=MetricType.GAUGE)) for metric in metric_list: # TODO: run this in parallel in coroutine metric.results = grab_metrics( run.start_time, run.end_time, metric.filter, metric.type) save_to_gss(run, stats.text, metric_list) def grab_metrics(start_time: float, end_time: float, filter: str, type: MetricType): nanos = int((start_time - int(start_time)) * 10**9) client = monitoring_v3.MetricServiceClient() project_id = os.environ["PROJECT_ID"] project_name = f"projects/{project_id}" interval = monitoring_v3.TimeInterval( { "end_time": {"seconds": int(end_time), "nanos": nanos}, "start_time": {"seconds": int(start_time), "nanos": nanos}, } ) try: results = client.list_time_series( request={ "name": project_name, "filter": filter, "interval": interval, "view": monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, # TODO: add agregation as specified in yaml } ) return results except: print("No metrics found") results = [] return results def save_to_gss(run: LocustRun, stats, metrics_list: [Metric]): storage_client = storage.Client() bucket_name = os.environ["BUCKET"] bucket = storage_client.bucket(bucket_name) now = datetime.now() timestamp = now.strftime('stats_and_metrics%Y-%m-%d%H:%M:%S') blob = bucket.blob(timestamp) start_time_formatted = time.strftime( "%Y-%m-%d %H:%M:%S", time.gmtime(run.start_time)) end_time_formatted = time.strftime( "%Y-%m-%d %H:%M:%S", time.gmtime(run.end_time)) with blob.open("w") as f: f.write("*********** AI on GKE Benchmarking Tool ******************\n") f.write("**********************************************************\n") f.write(f"Test duration: {run.duration} [s] \n") f.write(f"Test start: {start_time_formatted} \n") f.write(f"Test end: {end_time_formatted} \n") f.write(f"Users: {run.users} \n") f.write(f"Rate: {run.rate} \n") f.write("\n**********************************************************\n\n") f.write("Locust statistics\n") f.write(stats) f.write("\n**********************************************************\n\n") f.write("\n\nMetrics\n") for m in metrics_list: f.write(m.name) f.write("\n") for result in m.results: label = result.resource.labels # metadata = result.metadata.system_labels.fields metricdata = result.metric.labels for l in label: f.write(f"""{l}: {label[l]}\n""") for l in metricdata: f.write(f"""{l}: {metricdata[l]}\n""") points = result.points f.write(f"""Number of points: {len(points)}\n""") for point in points: p_point = ( point.value.double_value) if point.value.double_value is not None else 0 f.write(f""" {p_point},""") f.write("\n ") # if __name__ == "__main__": # app.run(main)