infrastructure/scripts/aws/submit_benchmark.py (135 lines of code) (raw):

#!/usr/bin/env python3 # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import hashlib import json import os import re import psycopg2 import creds import csv import glob import datetime parser = argparse.ArgumentParser() parser.add_argument('--benchmark_dir', '-b', help='Directory containing the benchmark to be submitted') parser.add_argument('--identifier', '-i', help='Unique identifier for this benchmark result') parser.add_argument('--instance_id', '-I', help='instance id to use if not present in metadata') args = parser.parse_args() build_identifier = "" benchmark_dir = args.benchmark_dir.rstrip('/') print("***************************") print(f"processing benchmark data set in {benchmark_dir}.") if args.identifier is not None: build_identifier = args.identifier metadata_file = f"{benchmark_dir}/metadata.json" with open(metadata_file, "r") as read_file: metadata_string = read_file.read() data = json.loads(metadata_string) # what we need to create a benchmark_build entry ci_sha = "" benchmark_sha = "" build_version = "" instance_id = "" benchmarks_raw_results_uri = "" notes = "" build_sha = "" if data["testMetadata"] is not None: testmetadata = data["testMetadata"] if 'instance_id' in testmetadata and testmetadata["instance_id"] is not None: instance_id = testmetadata["instance_id"] if (instance_id is None or instance_id == "") and args.instance_id is not None: instance_id = args.instance_id if 'source_version' in testmetadata and testmetadata["source_version"] is not None: build_version = testmetadata["source_version"] if 'source_revision' in testmetadata and testmetadata["source_revision"] is not None: build_sha = testmetadata["source_revision"] if 'benchmark_sha' in testmetadata and testmetadata["benchmark_sha"] is not None: benchmark_sha = testmetadata["benchmark_sha"] if (build_identifier is None or build_identifier == "") and \ 'build_identifier' in testmetadata and \ testmetadata["build_identifier"] is not None: build_identifier = testmetadata["build_identifier"] if build_identifier == "": m = hashlib.sha1() m.update(metadata_string.encode('utf-8')) m.update(f"{os.path.getmtime(metadata_file):.9f}".encode('utf-8')) build_identifier = m.hexdigest() print(f"The build identifier for this benchmark dataset is {build_identifier} ") if instance_id == "": possible_benchmark_archive_dir = benchmark_dir + "/../.." possible_instance_id = os.path.basename(os.path.abspath(possible_benchmark_archive_dir)) if re.search(r'Benchmark-\d+-\d+',possible_instance_id) is not None: instance_id = possible_instance_id print(f"The instance id for this benchmark dataset is {instance_id}") # Set up a connection to the postgres server. conn_string = "host=" + creds.PGHOST + \ " port=5432" + \ " dbname=" + creds.PGDATABASE + \ " user=" + creds.PGUSER + \ " password=" + creds.PGPASSWORD conn = psycopg2.connect(conn_string) print("Connected to database!") # Create a cursor object cursor = conn.cursor() # figure out if we've already submitted the data identifier_command = f"select build_id from public.benchmark_build where build_identifier = %s" cursor.execute(identifier_command, (build_identifier,)) rows = cursor.fetchall() if len(rows) > 0: print("* This build data has already been submitted to the database.") exit(1) table_columns = [ "ci_sha", "benchmark_sha", "build_version", "instance_id", "benchmarks_raw_results_uri", "notes", "build_sha", "build_identifier" ] table_values = [] for junk in table_columns: table_values.append("%s") sql_command = f"INSERT INTO public.benchmark_build({','.join(table_columns)}) " \ f"values ({','.join(table_values)}) returning build_id" cursor.execute(sql_command, (ci_sha, benchmark_sha, build_version, instance_id, benchmarks_raw_results_uri, notes, build_sha, build_identifier)) build_id = cursor.fetchone()[0] conn.commit() if data["testNames"] is not None: testnames = data["testNames"] for testname in testnames: testdir = f"{benchmark_dir}/{testname}" clientdirs = glob.glob(f"{testdir}/client-*") for clientdir in clientdirs: latencyfilename = f"{clientdir}/latency_csv.hgrm" sql_command = f"INSERT INTO " \ f"public.latency_result(build_id, benchmark_test, value, percentile, " \ f"total_count, one_by_one_minus_percentile) values({build_id}, '{testname}'," \ f" %s, %s, %s, %s)" print(f"Submitting latency data for {testname}") with open(latencyfilename) as f: reader = csv.DictReader(filter(lambda row: row[0] != '#', f)) data = [r for r in reader] for datum in data: if datum['1/(1-Percentile)'] != 'Infinity': cursor.execute(sql_command, (datum['Value'], datum['Percentile'], datum['TotalCount'], datum['1/(1-Percentile)'])) # conn.commit() yardstickdirs = glob.glob(f"{clientdir}/*-yardstick-output") yardstickdir = yardstickdirs[0] if (yardstickdirs is not None) else None if yardstickdir is not None: throughputfilename = f"{yardstickdir}/ThroughputLatencyProbe.csv" sql_command = f"INSERT INTO public.throughput_result(build_id, benchmark_test, " \ f"timestamp, ops_per_sec) values({build_id}, '{testname}', %s, %s)" print(f"Submitting throughput data for {testname}") with open(throughputfilename) as f: reader = csv.DictReader(filter(lambda row: row[0] != '#' and row[0] != '-' and row[0] != '@' and row[0] != '*', f), fieldnames=('time', 'operations', 'latency')) data = [r for r in reader] for datum in data: cursor.execute(sql_command, (datetime.datetime.fromtimestamp(int(datum['time'])), int(float(datum['operations'])))) conn.commit() cursor.close() conn.close()