perfmetrics/scripts/vm_metrics/vm_metrics.py (247 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Extracts required Google Cloud VM metrics using Monitoring V3 API call, parses
the API response into a list and writes to google sheet.
Takes VM instance name, interval start time, interval end time, alignment
period, fio test type and google worksheet name as command line inputs.
The supported fio test types are: read, write
Metrics extracted:
1.Peak Cpu Utilization(%)
2.Mean Cpu Utilization(%)
3.Peak Network Bandwidth(Bytes/s)
4.Mean Network Bandwidth(Bytes/s)
5.Read Bytes Count(Bytes)
6.Opencensus Error Count
7.Opencensus Mean Latency(s)
Usage:
>>python3 vm_metrics.py {instance} {start time in epoch sec} {end time in epoch sec} {period in sec} {test_type} {worksheet_name}
"""
import dataclasses
from dataclasses import dataclass, field
import os
import sys
import google.api_core
from google.api_core.exceptions import GoogleAPICallError
import google.cloud
from google.cloud import monitoring_v3
from typing import List
import subprocess
sys.path.insert(0, '..')
from gsheet import gsheet
PROJECT_NAME = 'projects/gcs-fuse-test-ml'
CPU_UTI_METRIC_TYPE = 'compute.googleapis.com/instance/cpu/utilization'
RECEIVED_BYTES_COUNT_METRIC_TYPE = 'compute.googleapis.com/instance/network/received_bytes_count'
OPS_LATENCY_METRIC_TYPE = 'custom.googleapis.com/gcsfuse/fs/ops_latency'
READ_BYTES_COUNT_METRIC_TYPE = 'custom.googleapis.com/gcsfuse/gcs/read_bytes_count'
OPS_ERROR_COUNT_METRIC_TYPE = 'custom.googleapis.com/gcsfuse/fs/ops_error_count'
MEMORY_UTIL_METRIC_TYPE='agent.googleapis.com/processes/rss_usage'
SENT_BYTES_COUNT_METRIC_TYPE = 'compute.googleapis.com/instance/network/sent_bytes_count'
LOAD_AVG_OS_THREADS_MEAN_METRIC_TYPE='agent.googleapis.com/cpu/load_1m'
@dataclasses.dataclass
class MetricPoint:
value: float
start_time_sec: int
end_time_sec: int
'''Refer this doc to find appropriate values for the attributes of the Metric class:
https://cloud.google.com/monitoring/custom-metrics/reading-metrics .'''
@dataclasses.dataclass
class Metric:
metric_type: str
factor: float
aligner: str
extra_filter: str = ''
reducer: str = 'REDUCE_NONE'
group_fields: List[str] = field(default_factory=list)
metric_point_list: List[MetricPoint] = field(default_factory=list)
CPU_UTI_PEAK = Metric(
metric_type=CPU_UTI_METRIC_TYPE, factor=1 / 100, aligner='ALIGN_MAX')
CPU_UTI_MEAN = Metric(
metric_type=CPU_UTI_METRIC_TYPE, factor=1 / 100, aligner='ALIGN_MEAN')
REC_BYTES_PEAK = Metric(
metric_type=RECEIVED_BYTES_COUNT_METRIC_TYPE,
factor=60,
aligner='ALIGN_MAX')
REC_BYTES_MEAN = Metric(
metric_type=RECEIVED_BYTES_COUNT_METRIC_TYPE,
factor=60,
aligner='ALIGN_MEAN')
SENT_BYTES_PEAK = Metric(
metric_type=SENT_BYTES_COUNT_METRIC_TYPE,
factor=60,
aligner='ALIGN_MAX')
SENT_BYTES_MEAN = Metric(
metric_type=SENT_BYTES_COUNT_METRIC_TYPE,
factor=60,
aligner='ALIGN_MEAN')
READ_BYTES_COUNT = Metric(
metric_type=READ_BYTES_COUNT_METRIC_TYPE, factor=1, aligner='ALIGN_DELTA')
OPS_ERROR_COUNT_FILTER = 'metric.labels.fs_op != "GetXattr"'
OPS_ERROR_COUNT = Metric(
metric_type=OPS_ERROR_COUNT_METRIC_TYPE,
factor=1,
aligner='ALIGN_DELTA',
extra_filter=OPS_ERROR_COUNT_FILTER,
reducer='REDUCE_SUM',
group_fields=['metric.labels'])
MEMORY_USAGE_PEAK=Metric(metric_type=MEMORY_UTIL_METRIC_TYPE, factor=1 / 100, aligner='ALIGN_MAX')
MEMORY_USAGE_MEAN=Metric(metric_type=MEMORY_UTIL_METRIC_TYPE, factor=1 / 100, aligner='ALIGN_MEAN')
LOAD_AVG_OS_THREADS_MEAN = Metric(
metric_type=LOAD_AVG_OS_THREADS_MEAN_METRIC_TYPE, factor=1, aligner='ALIGN_MEAN')
METRICS_LIST = [
CPU_UTI_PEAK, CPU_UTI_MEAN, REC_BYTES_PEAK, REC_BYTES_MEAN,
READ_BYTES_COUNT, OPS_ERROR_COUNT
]
RENAME_METRICS_LIST = [
CPU_UTI_PEAK, CPU_UTI_MEAN, REC_BYTES_PEAK, REC_BYTES_MEAN,SENT_BYTES_PEAK,SENT_BYTES_MEAN,
OPS_ERROR_COUNT, MEMORY_USAGE_PEAK,MEMORY_USAGE_MEAN,LOAD_AVG_OS_THREADS_MEAN
]
class NoValuesError(Exception):
"""API response values are missing."""
def _parse_metric_value_by_type(value, value_type) -> float:
"""Parses the value from a value object in API response.
Args:
value (object): The value object from API response
value_type (int) : Integer representing the value type of the object, refer
https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TypedValue.
"""
if value_type == 1:
return value.bool_value
elif value_type == 2:
return value.int64_value
elif value_type == 3:
return value.double_value
elif value_type == 4:
return value.string_value
elif value_type == 5:
return value.distribution_value.mean
else:
raise Exception('Unhandled Value type')
def _get_instance_id():
"""
For fetching the metrics with metric type agent*, we need instance id .Hence ,
this function returns the instance ID of the VM on which the benchmark is run.
"""
command = ["curl", "http://metadata.google.internal/computeMetadata/v1/instance/id",
"-H", "Metadata-Flavor:Google"]
try:
instance_id = subprocess.check_output(command, text=True).strip()
return instance_id
except subprocess.CalledProcessError as e:
print(f"Error fetching instance ID: {e}")
return None
def _get_gcsfuse_pid():
"""
For fetching memory related metrics specific to gcsfuse process, we required
the pid returned by this function.
"""
command= "ps -aux | grep -i \'gcsfuse\' | head -1"
result=subprocess.check_output(command,shell=True).split()
pid=result[1].decode('utf-8')
return pid
def _get_metric_filter(type, metric_type, instance, extra_filter):
"""Getting the metrics filter string from metric type, instance name and extra filter.
Args:
metric_type (str): The type of metric, Metric.metric_type
instance (str): VM instance name
extra_filter(str): Metric.extra_filter
"""
if (type == 'compute'):
metric_filter = (
'metric.type = "{metric_type}" AND metric.label.instance_name '
'={instance_name}').format(
metric_type=metric_type, instance_name=instance)
elif (type == 'custom'):
metric_filter = (
'metric.type = "{metric_type}" AND metadata.system_labels.name = '
'"{instance_name}"').format(
metric_type=metric_type, instance_name=instance)
elif (type == 'agent'):
# Fetch the instance ID here
instance_id = _get_instance_id()
metric_filter = (
'metric.type = "{metric_type}" AND resource.labels.instance_id '
'={instance_id}').format(
metric_type=metric_type, instance_id=instance_id)
if (extra_filter == ''):
return metric_filter
return '{} AND {}'.format(metric_filter, extra_filter)
def _create_metric_points_from_response(metrics_response, factor):
"""Parses the given metrics API response and returns a list of MetricPoint.
Args:
metrics_response (object): The metrics API response
factor (float) : Converting the API response values into appropriate unit
Returns:
list[MetricPoint]
"""
metric_point_list = []
for metric in metrics_response:
for point in metric.points:
value = _parse_metric_value_by_type(point.value, metric.value_type)
metric_point = MetricPoint(value / factor,
point.interval.start_time.timestamp(),
point.interval.end_time.timestamp())
metric_point_list.append(metric_point)
metric_point_list.reverse()
return metric_point_list
class VmMetrics:
def _validate_start_end_times(self, start_time_sec, end_time_sec):
"""Checks whether the start time is less than end time.
Args:
start_time_sec (int) : Epoch seconds
end_time_sec (int) : Epoch seconds
Raises:
ValueError : When start time is after end time.
"""
if start_time_sec < end_time_sec:
return True
else:
raise ValueError('Start time should be before end time')
def _get_api_response(self, start_time_sec, end_time_sec, instance, period, metric):
"""Fetches the API response for the requested metrics.
Args:
start_time_sec (int): Epoch seconds
end_time_sec (int): Epoch seconds
instance (str): VM instance name
period (float): Period over which the values are aligned
metric: Metric object
Returns:
metrics API response (object)
Raises:
GoogleAPICallError
"""
client = monitoring_v3.MetricServiceClient()
interval = monitoring_v3.TimeInterval(
end_time={'seconds': int(end_time_sec)},
start_time={'seconds': int(start_time_sec)})
aggregation = monitoring_v3.Aggregation(
alignment_period={'seconds': period},
per_series_aligner=getattr(monitoring_v3.Aggregation.Aligner, metric.aligner),
cross_series_reducer=getattr(monitoring_v3.Aggregation.Reducer,metric.reducer),
group_by_fields=metric.group_fields
)
# Checking whether the metric is custom or compute by getting the first 6 or 7 elements of metric type:
if (metric.metric_type[0:7] == 'compute'):
metric_filter = _get_metric_filter('compute', metric.metric_type,
instance, metric.extra_filter)
elif (metric.metric_type[0:6] == 'custom'):
metric_filter = _get_metric_filter('custom', metric.metric_type, instance,
metric.extra_filter)
elif (metric.metric_type[0:5] == 'agent'):
if metric.metric_type == MEMORY_UTIL_METRIC_TYPE:
gcsfuse_pid=_get_gcsfuse_pid()
metric.extra_filter= 'metric.labels.pid = {}'.format(gcsfuse_pid)
metric_filter = _get_metric_filter('agent', metric.metric_type, instance,
metric.extra_filter)
else:
raise Exception('Unhandled metric type')
try:
metrics_response = client.list_time_series({
'name': PROJECT_NAME,
'filter': metric_filter,
'interval': interval,
'view': monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
'aggregation': aggregation,
})
except:
raise GoogleAPICallError(('The request for API response of {} failed.'
).format(metric.metric_type))
return metrics_response
def _get_metrics(self, start_time_sec, end_time_sec, instance, period,
metric):
"""Returns the MetricPoint list for requested metric type.
Args:
start_time_sec (int): Epoch seconds
end_time_sec (int): Epoch seconds
instance (str): VM instance name
period (float): Period over which the values are aligned
metric: Metric Object
Returns:
list[MetricPoint]
"""
metrics_response = self._get_api_response(start_time_sec, end_time_sec,
instance, period, metric)
metrics_data = _create_metric_points_from_response(metrics_response,
metric.factor)
# In case OPS_ERROR_COUNT data is empty, we return a list of zeroes:
if(metric == OPS_ERROR_COUNT and metrics_data == []):
return [MetricPoint(0, 0, 0) for i in range(int((end_time_sec-start_time_sec)/period)+1)]
# Metrics data for metrics other that OPS_ERROR_COUNT_DATA should not be empty:
if (metrics_data == []):
raise NoValuesError('No values were retrieved from the call for ' +
metric.metric_type)
return metrics_data
def _add_new_metric_using_test_type(self, test_type):
"""Creates a copy of METRICS_LIST and appends new Metric objects to it for read
and write tests. Returns the LISTING_TESTS_METRICS_LIST for list type.
Args:
test_type(str): The type of test for which metrics are taken
Returns:
list[Metric]
"""
if test_type == "rename":
return list(RENAME_METRICS_LIST)
# Getting the fs_op type from test_type:
if test_type == 'read' or test_type == 'randread':
fs_op = 'ReadFile'
elif test_type == 'write' or test_type == 'randwrite':
fs_op = 'WriteFile'
updated_metrics_list = list(METRICS_LIST)
# Creating a new metric that requires test_type and adding it to the updated_metrics_list:
ops_latency_filter = 'metric.labels.fs_op = "{}"'.format(fs_op)
ops_latency_mean = Metric(
metric_type=OPS_LATENCY_METRIC_TYPE,
extra_filter=ops_latency_filter,
factor=1000,
aligner='ALIGN_DELTA')
updated_metrics_list.append(ops_latency_mean)
return updated_metrics_list
def fetch_metrics(self, start_time_sec, end_time_sec, instance, period, test_type):
"""Fetches the metrics data for all types and returns a list of lists to be written in google sheet.
Args:
start_time_sec (int): Epoch seconds
end_time_sec (int): Epoch seconds
instance (str): VM instance
period (float): Period over which the values are taken
test_type(str): The type of load test for which metrics are taken
Returns:
list[[period end time, interval end time,CPU_UTI_PEAK, CPU_UTI_MEAN,
REC_BYTES_PEAK, REC_BYTES_MEAN,SENT_BYTES_PEAK,SENT_BYTES_MEAN,OPS_ERROR_COUNT,
MEMORY_USAGE_PEAK,MEMORY_USAGE_MEAN,LOAD_AVG_OS_THREADS_MEAN]] in case of rename
list[[period end time, interval end time, CPU_UTI_PEAK, CPU_UTI_MEAN,
REC_BYTES_PEAK, REC_BYTES_MEAN, READ_BYTES_COUNT, OPS_ERROR_COUNT,
OPS_MEAN_LATENCY]] otherwise
"""
self._validate_start_end_times(start_time_sec, end_time_sec)
# Getting updated metrics list:
updated_metrics_list = self._add_new_metric_using_test_type(test_type)
# Extracting MetricPoint list for every metric in the updated_metrics_list:
for metric in updated_metrics_list:
metric.metric_point_list = self._get_metrics(start_time_sec, end_time_sec,
instance, period, metric)
# Creating a list of lists to write into google sheet:
num_points = len(updated_metrics_list[0].metric_point_list)
metrics_data = []
for i in range(num_points):
row = [updated_metrics_list[0].metric_point_list[i].start_time_sec]
row.append(end_time_sec)
for metric in updated_metrics_list:
row.append(metric.metric_point_list[i].value)
# Skipping the first column as it duplicates the second column.
metrics_data.append(row[1:])
return metrics_data
def fetch_metrics_and_write_to_google_sheet(self, start_time_sec,
end_time_sec, instance, period,
test_type, worksheet_name):
"""Fetches the metrics data for all types and writes to a google sheet.
Args:
start_time_sec (int): Epoch seconds
end_time_sec (int): Epoch seconds
instance (str): VM instance
period (float): Period over which the values are taken
test_type(str): The type of load test for which metrics are taken
worksheet_name(str): The name of the google worksheet you want to write to
Returns:
None
"""
self._validate_start_end_times(start_time_sec, end_time_sec)
# Getting metrics data:
metrics_data = self.fetch_metrics(start_time_sec, end_time_sec, instance,
period, test_type)
# Writing data into google sheet
gsheet.write_to_google_sheet(worksheet_name, metrics_data)
def main() -> None:
if len(sys.argv) != 7:
raise Exception('Invalid arguments.')
instance = sys.argv[1]
start_time_sec = int(sys.argv[2])
end_time_sec = int(sys.argv[3])
period = int(sys.argv[4])
test_type = sys.argv[5]
worksheet_name = sys.argv[6]
vm_metrics = VmMetrics()
vm_metrics.fetch_metrics_and_write_to_google_sheet(start_time_sec,
end_time_sec, instance,
period, test_type,
worksheet_name)
if __name__ == '__main__':
main()