perfmetrics/scripts/ls_metrics/listing_benchmark.py (341 lines of code) (raw):

# Copyright 2023 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Python script for benchmarking listing operation. This python script benchmarks and compares the latency of listing operation in persistent disk vs GCS bucket. It creates the necessary directory structure, containing files and folders, needed to test the listing operation. Furthermore, it can optionally upload the results of the test to a Google Sheet or Bigquery. It takes input a JSON config file which contains the info regrading directory structure and also through which multiple tests of different configurations can be performed in a single run. Typical usage example: $ python3 listing_benchmark.py [-h] [--keep_files] [--upload_gs] [--upload_bq] [--num_samples NUM_SAMPLES] [--config_id CONFIG_ID] [--start_time_build START_TIME_BUILD] [--message MESSAGE] --gcsfuse_flags GCSFUSE_FLAGS --command COMMAND config_file Flag -h: Typical help interface of the script. Flag --keep_files: Do not delete the generated directory structure from the persistent disk after running the tests. Flag --upload_gs: Uploads the results of the test to the Google Sheet. Flag --upload_bq: Uploads the results of the test to the BigQuery. Flag --num_samples: Runs each test for NUM_SAMPLES times. Flag --message: Takes input a message string, which describes/titles the test. Flag --config_id: Configuration id of the experiment in BigQuery tables. Flag --start_time_build: Time at which KOKORO triggered the build scripts Flag --num_samples: Runs each test for NUM_SAMPLES times. Flag --message: Takes input a message string, which describes/titles the test. Flag --gcsfuse_flags (required): GCSFUSE flags with which the list tests bucket will be mounted. Flag --command (required): Takes as input a string, which is the command to run the tests on. config_file (required): Path to the JSON config file which contains the details of the tests. Note: This python script is dependent on generate_files.py. Note: This script currently skips folder with 1000000 files to facilitate periodic kokoro tests without timeout .To run that test case, run script with --run_1m_test flag. """ import argparse import json import logging import os import statistics as stat import subprocess import sys import time import directory_pb2 as directory_proto sys.path.insert(0, '..') import generate_files from google.protobuf.json_format import ParseDict from gsheet import gsheet import numpy as np from bigquery import experiments_gcsfuse_bq from bigquery import constants from utils.mount_unmount_util import mount_gcs_bucket,unmount_gcs_bucket from utils.checks_util import check_dependencies logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s', handlers=[logging.StreamHandler(sys.stdout)], ) log = logging.getLogger() RUN_1M_TEST=False WORKSHEET_NAME_GCS = 'ls_metrics_gcsfuse' WORKSHEET_NAME_PD = 'ls_metrics_persistent_disk' MOUNT_TYPE_GCS = 'gcs_bucket' MOUNT_TYPE_PD = 'persistent_disk' def _count_number_of_files_and_folders(directory, files, folders): """Count the number of files and folders in the given directory recursively. Args: directory: protobuf of the directory. files: Number of files. folders: Number of folders. Returns: Number of files and folders encountered till now. """ files += directory.num_files folders += directory.num_folders for folder in directory.folders: child_files, child_folders = _count_number_of_files_and_folders(folder, 0, 0) files += child_files folders += child_folders return files, folders def _get_values_to_export(folders, metrics, command) -> list: """This function takes in extracted metrics data, filters it, rearranges it, and returns the modified data to export to Google Sheet and BigQuery. Args: folders: List containing protobufs of testing folders. metrics: A dictionary containing all the result metrics for each testing folder. command: Command to run the tests on. Returns: list: List of results to upload to GSheet or BigQuery """ list_metrics_data = [] for testing_folder in folders: if not RUN_1M_TEST and testing_folder.name == "1KB_1000000files_0subdir": # Excluding test case with 1m files from HNS in daily periodic tests. continue num_files, num_folders = _count_number_of_files_and_folders( testing_folder, 0, 0) row = [ metrics[testing_folder.name]['Test Desc.'], command, num_files, num_folders, metrics[testing_folder.name]['Number of samples'], metrics[testing_folder.name]['Mean'], metrics[testing_folder.name]['Median'], metrics[testing_folder.name]['Standard Dev'], metrics[testing_folder.name]['Quantiles']['0 %ile'], metrics[testing_folder.name]['Quantiles']['20 %ile'], metrics[testing_folder.name]['Quantiles']['50 %ile'], metrics[testing_folder.name]['Quantiles']['90 %ile'], metrics[testing_folder.name]['Quantiles']['95 %ile'], metrics[testing_folder.name]['Quantiles']['98 %ile'], metrics[testing_folder.name]['Quantiles']['99 %ile'], metrics[testing_folder.name]['Quantiles']['99.5 %ile'], metrics[testing_folder.name]['Quantiles']['99.9 %ile'], metrics[testing_folder.name]['Quantiles']['100 %ile'] ] list_metrics_data.append(row) return list_metrics_data def _parse_results(folders, results_list, message, num_samples) -> dict: """Outputs the results on the console. This function takes in dictionary containing the list of results (for all samples) for each testing folder, for both the gcs bucket and persistent disk. Then it generates various metrics out of these lists and outputs them into the console. The metrics present in the output are (in msec): Mean, Median, Standard Dev, 0th %ile, 20th %ile, 50th %ile, 90th %ile, 95th %ile, 98th %ile, 99th %ile, 99.5th %ile, 99.9th %ile, 100th %ile. Args: folders: List containing protobufs of testing folders. results_list: Dictionary containing the list of results (for all samples) for each testing folder. message: String which describes/titles the test. num_samples: Number of samples to collect for each test. Returns: A dictionary containing the various metrics in a JSON format. """ metrics = dict() for testing_folder in folders: if not RUN_1M_TEST and testing_folder.name == "1KB_1000000files_0subdir": # Excluding test case with 1m files from HNS in daily periodic tests. continue metrics[testing_folder.name] = dict() metrics[testing_folder.name]['Test Desc.'] = message metrics[testing_folder.name]['Number of samples'] = num_samples # Sorting based on time. results_list[testing_folder.name] = sorted( results_list[testing_folder.name]) metrics[testing_folder.name]['Mean'] = round( stat.mean(results_list[testing_folder.name]), 3) metrics[testing_folder.name]['Median'] = round( stat.median(results_list[testing_folder.name]), 3) metrics[testing_folder.name]['Standard Dev'] = round( stat.stdev(results_list[testing_folder.name]), 3) metrics[testing_folder.name]['Quantiles'] = dict() sample_set = [0, 20, 50, 90, 95, 98, 99, 99.5, 99.9, 100] for percentile in sample_set: metrics[testing_folder.name]['Quantiles'][ '{} %ile'.format(percentile)] = round( np.percentile(results_list[testing_folder.name], percentile), 3) print(metrics) return metrics def _record_time_of_operation(command, path, num_samples) -> list: """Runs the command on the given path for given num_samples times. Args: command: Command to run. path: Path at which to run the command. num_samples: Number of times to run the command. Returns: A list containing the latencies of operations in milisecond. """ result_list = [] for _ in range(num_samples): start_time_sec = time.time() subprocess.call('{} {}'.format(command, path), shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) end_time_sec = time.time() result_list.append((end_time_sec - start_time_sec) * 1000) return result_list def _perform_testing( folders, gcs_bucket, persistent_disk, num_samples, command): """This function tests the listing operation on the testing folders. Going through all the testing folders one by one for both GCS bucket and persistent disk, we calculate the latency (in msec) of listing operation and store the results in a list of that particular testing folder. Reading are taken multiple times as specified by num_samples argument. Args: folders: List of protobufs containing the testing folders. gcs_bucket: Name of the directory to which GCS bucket is mounted to. persistent_disk: Name of the directory in persistent disk containing all the testing folders. num_samples: Number of times to run each test. command: Command to run the test on. Returns: gcs_bucket_results: A dictionary containing the list of results (all samples) for each testing folder. persistent_disk_results: A dictionary containing the list of results (all samples) for each testing folder. """ gcs_bucket_results = {} persistent_disk_results = {} for testing_folder in folders: if not RUN_1M_TEST and testing_folder.name == "1KB_1000000files_0subdir": # Excluding test case with 1m files from HNS in daily periodic tests. continue log.info('Testing started for testing folder: %s\n', testing_folder.name) local_dir_path = './{}/{}/'.format(persistent_disk, testing_folder.name) gcs_bucket_path = './{}/{}/'.format(gcs_bucket, testing_folder.name) persistent_disk_results[testing_folder.name] = _record_time_of_operation( command, local_dir_path, num_samples) gcs_bucket_results[testing_folder.name] = _record_time_of_operation( command, gcs_bucket_path, num_samples) log.info('Testing completed. Generating output.\n') return gcs_bucket_results, persistent_disk_results def _create_directory_structure( gcs_bucket_url, persistent_disk_url, directory_structure, create_files_in_gcs) -> int: """Creates new directory structure using generate_files.py as a library. This function creates new directory structure in persistent disk. If create_files_in_gcs is True, then it also creates the same structure in GCS bucket. For more info regarding how the generation of files is happening, please read the generate_files.py. Args: gcs_bucket_url: Path of the directory in GCS bucket in which to create the files. persistent_disk_url: Path in the persistent disk in which to create the files in. directory_structure: Protobuf of the current directory. create_files_in_gcs: Bool value which is True if we have to create files in GCS bucket (similar directory structure not present). Otherwise, it is False, means that we will not create files in GCS bucket from scratch. Returns: 1 if an error is encountered while uploading files to the GCS bucket. 0 if no error is encountered. """ # Create a directory in the persistent disk. subprocess.call('mkdir {}'.format(persistent_disk_url), shell=True) if directory_structure.num_files != 0: file_size = int(directory_structure.file_size[:-2]) file_size_unit = directory_structure.file_size[-2:] exit_code = generate_files.generate_files_and_upload_to_gcs_bucket( gcs_bucket_url, directory_structure.num_files, file_size_unit, file_size, directory_structure.file_name_prefix, persistent_disk_url, create_files_in_gcs) if exit_code != 0: return exit_code result = 0 for folder in directory_structure.folders: if not RUN_1M_TEST and folder.name == "1KB_1000000files_0subdir": # Excluding test case with 1m files from HNS in daily periodic tests. continue result += _create_directory_structure(gcs_bucket_url + folder.name + '/', persistent_disk_url + folder.name + '/', folder, create_files_in_gcs) return int(result > 0) def _list_directory(path) -> list: """Returns the list containing path of all the contents present in the current directory. Args: path: Path of the directory. Returns: A list containing path of all contents present in the input path. """ contents = subprocess.check_output( 'gsutil -m ls {}'.format(path), shell=True) contents_url = contents.decode('utf-8').split('\n')[:-1] return contents_url def _compare_directory_structure(url, directory_structure) -> bool: """Compares the directory structure present in the GCS bucket with the structure present in the JSON config file. Args: url: Path of the directory to compare in the GCS bucket. directory_structure: Protobuf of the current directory. Returns: True if GCS bucket contents matches the directory structure. """ contents_url = _list_directory(url) # gsutil in some cases return the contents_url list with the current # directory in the first index. We dont want the current directory so # we remove it manually. if contents_url and contents_url[0] == url: contents_url = contents_url[1:] files = [] folders = [] for content in contents_url: if content[-1] == '/': folders.append(content) else: files.append(content) if len(folders) != directory_structure.num_folders: return False if len(files) != directory_structure.num_files: return False result = True for folder in directory_structure.folders: if not RUN_1M_TEST and folder.name == "1KB_1000000files_0subdir": # Excluding test case with 1m files from HNS in daily periodic tests. continue new_url = url + folder.name + '/' if new_url not in folders: return False result = result and _compare_directory_structure(new_url, folder) return result def _parse_arguments(argv): """Parses the arguments provided to the script via command line. Args: argv: List of arguments received by the script. Returns: A class containing the parsed arguments. """ argv = sys.argv parser = argparse.ArgumentParser() parser.add_argument( 'config_file', help='Provide path of the config file.', action='store' ) parser.add_argument( '--keep_files', help='Does not delete the directory structure in persistent disk.', action='store_true', default=False, required=False, ) parser.add_argument( '--upload_gs', help='Upload the results to the Google Sheet.', action='store_true', default=False, required=False, ) parser.add_argument( '--upload_bq', help='Upload the results to the BigQuery.', action='store_true', default=False, required=False, ) parser.add_argument( '--message', help='Puts a message/title describing the test.', action='store', nargs=1, default=['Performance Listing Benchmark'], required=False, ) parser.add_argument( '--config_id', help='Configuration ID of the experiment.', action='store', nargs=1, required=False, ) parser.add_argument( '--start_time_build', help='Start time of the build.', action='store', nargs=1, required=False, ) parser.add_argument( '--num_samples', help='Number of samples to collect of each test.', action='store', nargs=1, default=[10], required=False, ) parser.add_argument( '--command', help='Command to run the tests on.', action='store', nargs=1, default=['ls -R'], required=False, ) parser.add_argument( '--gcsfuse_flags', help='Gcsfuse flags for mounting the list tests bucket. Example set of flags - "--implicit-dirs --max-conns-per-host 100 --debug_fuse --debug_gcs --log-file gcsfuse-list-logs.txt --log-format \"text\" --stackdriver-export-interval=30s"', action='store', nargs=1, required=True, ) parser.add_argument( '--spreadsheet_id', help='Provide id of spreadsheet', action='store', required=False, ) parser.add_argument( '--run_1m_test', help='Perform listing benchmark on 1m files directory? [True/False]', action='store_true', default=False, required=False, ) # Ignoring the first parameter, as it is the path of this python # script itself. return parser.parse_args(argv[1:]) def _export_to_gsheet(worksheet, ls_data, spreadsheet_id=""): """Writes list results to Google Spreadsheets Args: worksheet (str): Google sheet name to which results will be uploaded ls_data (list): List results to be uploaded """ # Changing directory to comply with "cred.json" path in "gsheet.py". os.chdir('..') if spreadsheet_id != "": gsheet.write_to_google_sheet(worksheet, ls_data, spreadsheet_id) else: gsheet.write_to_google_sheet(worksheet, ls_data) os.chdir('./ls_metrics') # Changing the directory back to current directory. return def _export_to_bigquery(test_type, config_id, start_time_build, ls_data): """Writes list results to BigQuery Args: test_type (str): Table name to which results will be uploaded config_id (str): Configuration ID of the experiment start_time_build (str): Start time of the build ls_data (list): List results to be uploaded """ bigquery_obj = experiments_gcsfuse_bq.ExperimentsGCSFuseBQ( constants.PROJECT_ID, constants.DATASET_ID) ls_data_upload = [[test_type] + row for row in ls_data] bigquery_obj.upload_metrics_to_table(constants.LS_TABLE_ID, config_id, start_time_build, ls_data_upload) return if __name__ == '__main__': argv = sys.argv if len(argv) < 4: raise TypeError('Incorrect number of arguments.\n' 'Usage: ' 'python3 listing_benchmark.py [--keep_files] [--upload_gs] [--num_samples NUM_SAMPLES] [--message MESSAGE] --gcsfuse_flags GCSFUSE_FLAGS --command COMMAND config_file --spreadsheet_id SPREADSHEETID') args = _parse_arguments(argv) check_dependencies(['gsutil', 'gcsfuse'],log) with open(os.path.abspath(args.config_file)) as file: config_json = json.load(file) directory_structure = ParseDict(config_json, directory_proto.Directory()) log.info('Started checking the directory structure in the bucket.\n') directory_structure_present = _compare_directory_structure( 'gs://{}/'.format(directory_structure.name), directory_structure) # Removing the already present folder in persistent disk so as to create the # files from scratch. persistent_disk = 'persistent_disk' if os.path.exists('./{}'.format(persistent_disk)): subprocess.call('rm -rf {}'.format(persistent_disk), shell=True) # If similar directory structure not found in the GCS bucket then delete all # the files in the bucket and make it from scratch. if not directory_structure_present: log.info( """Similar directory structure not found in the GCS bucket. Creating a new one.\n""") log.info('Deleting previously present directories in the GCS bucket.\n') subprocess.call( 'gsutil -m rm -r gs://{}/*'.format(directory_structure.name), shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) # Creating a temp directory which will be needed by the generate_files # method to create files in batches. temp_dir = generate_files.TEMPORARY_DIRECTORY if os.path.exists(os.path.dirname(temp_dir)): subprocess.call('rm -rf {}'.format(os.path.dirname(temp_dir)), shell=True) subprocess.call('mkdir -p {}'.format(temp_dir), shell=True) exit_code = _create_directory_structure( 'gs://{}/'.format(directory_structure.name), './{}/'.format(persistent_disk), directory_structure, not directory_structure_present) # Deleting the temp folder after the creation of files is done. subprocess.call('rm -rf {}'.format(os.path.dirname(temp_dir)), shell=True) if exit_code != 0: log.error('Cannot create files in the GCS bucket. Error encountered.\n') subprocess.call('bash', shell=True) log.info('Directory Structure Created.\n') gcs_bucket = mount_gcs_bucket(directory_structure.name, args.gcsfuse_flags[0],log) RUN_1M_TEST=args.run_1m_test gcs_bucket_results, persistent_disk_results = _perform_testing( directory_structure.folders, gcs_bucket, persistent_disk, int(args.num_samples[0]), args.command[0]) gcs_parsed_metrics = _parse_results( directory_structure.folders, gcs_bucket_results, args.message[0], int(args.num_samples[0])) pd_parsed_metrics = _parse_results( directory_structure.folders, persistent_disk_results, args.message[0], int(args.num_samples[0])) upload_values_gcs = _get_values_to_export(directory_structure.folders, gcs_parsed_metrics, args.command[0]) upload_values_pd = _get_values_to_export(directory_structure.folders, pd_parsed_metrics, args.command[0]) if args.upload_gs: log.info('Uploading files to the Google Sheet.\n') _export_to_gsheet(WORKSHEET_NAME_GCS, upload_values_gcs, args.spreadsheet_id) _export_to_gsheet(WORKSHEET_NAME_PD, upload_values_pd, args.spreadsheet_id) if args.upload_bq: if not args.config_id or not args.start_time_build: raise Exception( "Pass required arguments experiments configuration ID and start time of build for uploading to BigQuery") log.info('Uploading results to the BigQuery.\n') _export_to_bigquery(MOUNT_TYPE_GCS, args.config_id[0], args.start_time_build[0], upload_values_gcs) _export_to_bigquery(MOUNT_TYPE_PD, args.config_id[0], args.start_time_build[0], upload_values_pd) if not args.keep_files: log.info('Deleting files from persistent disk.\n') subprocess.call('rm -rf {}'.format(persistent_disk), shell=True) unmount_gcs_bucket(gcs_bucket,log)