perfmetrics/scripts/bigquery/experiments_gcsfuse_bq.py (96 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Python module for setting up the dataset and tables in BigQuery.
This python module creates the dataset and the tables that will store experiment
configurations and metrics data in BigQuery. It can also be used to upload data to the tables.
Note:
Make sure BigQuery API is enabled for the project
"""
import uuid
import time
from google.cloud import bigquery
from google.cloud.bigquery.job import QueryJob
from bigquery import constants
class ExperimentsGCSFuseBQ:
"""
Class to create and interact with Bigquery dataset and tables for storing
experiments configurations and their results.
Attributes:
project_id (str): The GCP project in which dataset and tables will be created
dataset_id (str): The name of the dataset in the project that will store the tables
bq_client (Optional[google.cloud.bigquery.client.Client]): The client for interacting with Bigquery.
Default value is bigquery.Client(project=project_id).
"""
def __init__(self, project_id, dataset_id, bq_client=None):
if bq_client is None:
self.client = bigquery.Client(project=project_id)
else:
self.client = bq_client
self.project_id = project_id
self.dataset_id = dataset_id
@property
def dataset_ref(self):
"""Gets the reference of the dataset
Returns:
google.cloud.bigquery.dataset.Dataset: The retrieved dataset object
"""
return self.client.get_dataset(self.dataset_id)
def _get_table_from_table_id(self, table_id):
"""Gets the table from BigQuery from table ID
Args:
table_id (str): String representing the ID or name of the table
Returns:
google.cloud.bigquery.table.Table: The table in BigQuery
"""
table_ref = self.dataset_ref.table(table_id)
table = self.client.get_table(table_ref)
return table
def _execute_query(self, query) -> QueryJob:
"""Executes the query in BigQuery and raises an exception if query
execution could not be completed.
Args:
query (str): Query that will be executed in BigQuery.
Raises:
Exception: If query execution failed.
"""
job = self.client.query(query)
if job.errors:
for error in job.errors:
raise Exception(f"Error message: {error['message']}")
return job
def _check_if_config_valid(self, exp_config_id) -> bool:
"""Checks if exp_config_id exists in the experiment_configuration table.
Args:
exp_config_id (str): An id that uniquely identifies an experiment
Returns:
bool: Returns true if exp_config_id exists, false otherwise.
"""
query_check_if_config_valid = """
SELECT *
FROM `{}.{}.{}`
WHERE configuration_id = '{}'
""".format(self.project_id, self.dataset_id, constants.CONFIGURATION_TABLE_ID, exp_config_id)
job = self._execute_query(query_check_if_config_valid)
row_count = job.result().total_rows
if row_count:
return True
return False
def _delete_rows_incomplete_transaction(self, table_id = None, config_id = None, start_time_build = None):
"""Helper function for _insert_row. If insertion of some nth row fails,
this method deletes (n-1) rows that were inserted before
Args:
table_id (str): ID of table to which results are being uploaded
config_id (str): config_id of the experiment for which results are being uploaded
start_time_build (timestamp): Start epoch time of the build
"""
if config_id:
query_delete_if_row_exists = """
DELETE FROM `{}.{}.{}`
WHERE configuration_id = '{}'
AND start_time_build = '{}'
""".format(self.project_id, self.dataset_id, table_id, config_id, start_time_build)
job = self._execute_query(query_delete_if_row_exists)
def _insert_rows(self, table, rows_to_insert, table_id = None, config_id = None, start_time_build = None):
"""Insert rows in table. If insertion of some nth row fails, delete (n-1) rows
that were inserted before and raise an exception
Args:
table (str): Table in which rows are being inserted
rows_to_insert (str): Rows to insert in the table
table_id (str): ID of table to which results are being uploaded
config_id (str): config_id of the experiment for which results are being uploaded
start_time_build (timestamp): Start epoch time of the build
Raises:
Exception: If some row insertion failed.
"""
try:
result = self.client.insert_rows(table, rows_to_insert)
if result:
raise Exception(f'{result}')
except Exception as e:
self._delete_rows_incomplete_transaction(table_id, config_id, start_time_build)
raise Exception(f'Error inserting data to BigQuery table: {e}')
def setup_dataset_and_tables(self):
f"""
Creates the dataset to store the tables and the experiment configuration table
to store the configuration details and creates the {constants.LS_TABLE_ID},
{constants.FIO_TABLE_ID}, {constants.VM_TABLE_ID} tables to store the metrics
"""
# Create dataset if not exists
dataset = bigquery.Dataset(f"{self.project_id}.{self.dataset_id}")
self.client.create_dataset(dataset, exists_ok=True)
# Wait for the dataset to be created and ready to be referenced
time.sleep(120)
# Query for creating experiment_configuration table if it does not exist
query_create_table_experiment_configuration = """
CREATE TABLE IF NOT EXISTS {}.{}.{}(
configuration_id STRING,
configuration_name STRING,
gcsfuse_flags STRING,
config_file_flags_as_json STRING,
branch STRING,
end_date TIMESTAMP,
PRIMARY KEY (configuration_id) NOT ENFORCED
) OPTIONS (description = 'Table for storing Job Configurations and respective VM instance name on which the job was run');
""".format(self.project_id, self.dataset_id, constants.CONFIGURATION_TABLE_ID)
# Query for creating fio_metrics table
query_create_table_fio_metrics = """
CREATE TABLE IF NOT EXISTS {}.{}.{}(
configuration_id STRING,
start_time_build INT64,
operation string,
num_threads INT64,
file_size_kb INT64,
start_time INT64,
end_time INT64,
iops FLOAT64,
bandwidth_bytes_per_sec INT64,
io_bytes INT64,
min_latency FLOAT64,
max_latency FLOAT64,
mean_latency FLOAT64,
percentile_latency_20 FLOAT64,
percentile_latency_50 FLOAT64,
percentile_latency_90 FLOAT64,
percentile_latency_95 FLOAT64,
FOREIGN KEY(configuration_id) REFERENCES {}.{} (configuration_id) NOT ENFORCED
) OPTIONS (description = 'Table for storing FIO metrics extracted from experiments.');
""".format(self.project_id, self.dataset_id, constants.FIO_TABLE_ID, self.dataset_id, constants.CONFIGURATION_TABLE_ID)
# Query for creating vm_metrics table
query_create_table_vm_metrics = """
CREATE TABLE IF NOT EXISTS {}.{}.{}(
configuration_id STRING,
start_time_build INT64,
end_time INT64,
cpu_utilization_peak_percentage FLOAT64,
cpu_utilization_mean_percentage FLOAT64,
received_bytes_peak_per_sec FLOAT64,
received_bytes_mean_per_sec FLOAT64,
read_bytes_count INT64,
ops_error_count INT64,
ops_mean_latency_sec FLOAT64,
FOREIGN KEY(configuration_id) REFERENCES {}.{} (configuration_id) NOT ENFORCED
) OPTIONS (description = 'Table for storing VM metrics extracted from experiments.');
""".format(self.project_id, self.dataset_id, constants.VM_TABLE_ID, self.dataset_id, constants.CONFIGURATION_TABLE_ID)
# Query for creating ls_metrics table
query_create_table_ls_metrics = """
CREATE TABLE IF NOT EXISTS {}.{}.{}(
configuration_id STRING,
start_time_build INT64,
mount_type STRING,
test_description string,
command STRING,
num_files INT64,
num_folders INT64,
num_samples INT64,
mean_latency_msec FLOAT64,
median_latency_msec FLOAT64,
standard_dev_latency_msec FLOAT64,
percentile_latency_0 FLOAT64,
percentile_latency_20 FLOAT64,
percentile_latency_50 FLOAT64,
percentile_latency_90 FLOAT64,
percentile_latency_95 FLOAT64,
percentile_latency_98 FLOAT64,
percentile_latency_99 FLOAT64,
percentile_latency_995 FLOAT64,
percentile_latency_999 FLOAT64,
percentile_latency_100 FLOAT64,
FOREIGN KEY(configuration_id) REFERENCES {}.{} (configuration_id) NOT ENFORCED
) OPTIONS (description = 'Table for storing GCSFUSE metrics extracted from list experiments.');
""".format(self.project_id, self.dataset_id, constants.LS_TABLE_ID, self.dataset_id, constants.CONFIGURATION_TABLE_ID)
self._execute_query(query_create_table_experiment_configuration)
self._execute_query(query_create_table_fio_metrics)
self._execute_query(query_create_table_vm_metrics)
self._execute_query(query_create_table_ls_metrics)
def get_experiment_configuration_id(self, gcsfuse_flags, config_file_flags_as_json, branch, end_date, config_name) -> str:
"""Gets the configuration ID of the experiment from experiment details
If experiment configuration exists: Check if end date needs update and
then return the configuration ID
Else: Insert new experiment configuration and return the configuration ID
Args:
gcsfuse_flags (str): Set of flags the gcsfuse flags used for experiment.
config_file_flags_as_json (str): Config file flag value that gcsfuse --config-file flag used for experiment.
branch (str): GCSFuse repo branch used for building GCSFuse.
end_date (timestamp): Date till when experiments of this configuration are run.
Format: 'YYYY-MM-DD HH:MM:SS'
config_name (str): Name of the experiment configuration.
Returns:
str: Configuration ID of the experiment
"""
# Check if the experiment configuration is already present in table
query_check_config_name_exists = """
SELECT configuration_id, gcsfuse_flags, config_file_flags_as_json, branch, end_date
FROM `{}.{}.{}`
WHERE configuration_name = '{}'
""".format(self.project_id, self.dataset_id, constants.CONFIGURATION_TABLE_ID, config_name)
job = self._execute_query(query_check_config_name_exists)
result_count = job.result().total_rows
# If more than 1 result -> duplicate experiment configuration present -> throw error
if result_count > 1:
raise Exception("Duplicate experiment configurations exist. Data corrupted")
# If result empty, then experiment configuration not present -> insert new experiment configuration -> return configuration ID
elif result_count == 0:
table = self._get_table_from_table_id(constants.CONFIGURATION_TABLE_ID)
uuid_str = str(uuid.uuid4())
rows_to_insert = [(uuid_str, config_name, gcsfuse_flags, config_file_flags_as_json, branch, end_date)]
self._insert_rows(table, rows_to_insert)
return uuid_str
# If exactly one result -> check if config valid -> If valid and end date not same, update end date -> return configuration ID
else:
row = list(job)[0]
# If the configuration name exists, but GCSFuse flags and branch don't match then raise an exception
if (row.get('gcsfuse_flags') != gcsfuse_flags) or (row.get('branch') != branch):
raise Exception("Configuration name already exists. GCSFuse flags and branch don't match")
config_id = row.get('configuration_id')
# If the configuration name exists and GCSFuse flags and branch match, but end date does not match, then update end date
if row.get('end_date') is not end_date:
query_update_end_date = """
UPDATE `{}.{}.{}`
SET end_date = '{}'
WHERE configuration_id = '{}'
""".format(self.project_id, self.dataset_id, constants.CONFIGURATION_TABLE_ID, end_date, config_id)
self._execute_query(query_update_end_date)
return config_id
def upload_metrics_to_table(self, table_id, config_id, start_time_build, metrics_data):
"""Uploads metrics_data to the table corresponding to 'table_name'.
Args:
table_id (str): ID of table to which results are being uploaded
config_id (str): config_id of the experiment for which results are being uploaded
start_time_build (int): Start epoch time of the build
metrics_data (list): A 2D list containing the experiment results
For example: metrics data for fio jobs will look like:
[['read', 40, 256, 1687928088, 1687928159, 27032.61141, 443600529,
26647527424, 0.000126831, 0.323205657, 0.09454765585, 0.08650752,
0.0917504, 0.106430464, 0.113770496],
['write', 40, 256, 1687928278, 1687928364, 87.361631, 1979988,
149176320, 0.032581924, 45.73434076, 20.26386098, 13.75731712,
17.11276032, 17.11276032, 17.11276032]]
"""
# Check if the configuration ID of the experiment is valid
config_valid = self._check_if_config_valid(config_id)
if not config_valid:
raise Exception("Invalid configuration ID")
table = self._get_table_from_table_id(table_id)
rows_to_insert = []
for row in metrics_data:
rows_to_insert = rows_to_insert + [(config_id, start_time_build) + tuple(row)]
self._insert_rows(table, rows_to_insert, table_id, config_id, start_time_build)