perfkitbenchmarker/traces/dstat.py (130 lines of code) (raw):
# Copyright 2015 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Records system performance counters during benchmark runs using pcp dstat.
https://github.com/performancecopilot/pcp
"""
import copy
import logging
import os
import re
from absl import flags
import numpy as np
from perfkitbenchmarker import background_tasks
from perfkitbenchmarker import events
from perfkitbenchmarker import sample
from perfkitbenchmarker import stages
from perfkitbenchmarker import vm_util
from perfkitbenchmarker.linux_packages import dstat
from perfkitbenchmarker.traces import base_collector
flags.DEFINE_boolean(
'dstat',
False,
'Run dstat (http://dag.wiee.rs/home-made/dstat/) '
'on each VM to collect system performance metrics during '
'each benchmark run.',
)
flags.DEFINE_integer(
'dstat_interval',
None,
'dstat sample collection frequency, in seconds. Only '
'applicable when --dstat is specified.',
)
flags.DEFINE_string(
'dstat_output',
None,
'Output directory for dstat output. '
'Only applicable when --dstat is specified. '
'Default: run temporary directory.',
)
flags.DEFINE_boolean(
'dstat_publish', False, 'Whether to publish average dstat statistics.'
)
flags.DEFINE_string(
'dstat_publish_regex',
None,
'Requires setting '
'dstat_publish to true. If specified, any dstat statistic '
'matching this regular expression will be published such '
'that each individual statistic will be in a sample with '
'the time since the epoch in the metadata. Examples. Use '
'".*" to record all samples. Use "net" to record '
'networking statistics.',
)
_DSTAT_FLAGS = flags.DEFINE_string(
'dstat_flags',
'',
'Flags to pass to dstat.',
)
FLAGS = flags.FLAGS
class _DStatCollector(base_collector.BaseCollector):
"""dstat collector.
Installs and runs dstat on a collection of VMs.
"""
def _CollectorName(self):
return 'dstat'
def _InstallCollector(self, vm):
vm.Install('dstat')
def _CollectorRunCommand(self, vm, collector_file):
cmd = (
'pcp dstat --epoch {flags} --noheaders --output {output} '
'{dstat_interval} > /dev/null 2>&1 & echo $!'
).format(
flags=_DSTAT_FLAGS.value,
output=collector_file,
dstat_interval=self.interval or '',
)
return cmd
def Analyze(self, unused_sender, benchmark_spec, samples):
"""Analyze dstat file and record samples."""
def _AnalyzeEvent(role, labels, out, event):
# Find out index of rows belong to event according to timestamp.
cond = (out[:, 0] > event.start_timestamp) & (
out[:, 0] < event.end_timestamp
)
# Skip analyzing event if none of rows falling into time range.
if not cond.any():
return
# Calculate mean of each column.
avg = np.average(out[:, 1:], weights=cond, axis=0)
metadata = copy.deepcopy(event.metadata)
metadata['event'] = event.event
metadata['vm_role'] = role
samples.extend([
sample.Sample(label, avg[idx], '', metadata)
for idx, label in enumerate(labels[1:])
])
dstat_publish_regex = FLAGS.dstat_publish_regex
if dstat_publish_regex:
assert labels[0] == 'epoch__epoch'
for i, label in enumerate(labels[1:]):
metric_idx = i + 1 # Skipped first label for the epoch.
if re.search(dstat_publish_regex, label):
for sample_idx, value in enumerate(out[:, metric_idx]):
individual_sample_metadata = copy.deepcopy(metadata)
individual_sample_metadata['dstat_epoch'] = out[sample_idx, 0]
samples.append(
sample.Sample(label, value, '', individual_sample_metadata)
)
def _Analyze(role, file):
with open(
os.path.join(self.output_directory, os.path.basename(file))
) as f:
fp = iter(f)
labels, out = dstat.ParseCsvFile(fp)
background_tasks.RunThreaded(
_AnalyzeEvent,
[((role, labels, out, e), {}) for e in events.TracingEvent.events],
)
background_tasks.RunThreaded(
_Analyze, [((k, w), {}) for k, w in self._role_mapping.items()]
)
def Register(parsed_flags):
"""Registers the dstat collector if FLAGS.dstat is set."""
if not parsed_flags.dstat:
return
output_directory = (
parsed_flags.dstat_output
if parsed_flags['dstat_output'].present
else vm_util.GetTempDir()
)
logging.debug(
'Registering dstat collector with interval %s, output to %s.',
parsed_flags.dstat_interval,
output_directory,
)
if not os.path.isdir(output_directory):
os.makedirs(output_directory)
collector = _DStatCollector(
interval=parsed_flags.dstat_interval, output_directory=output_directory
)
events.before_phase.connect(collector.Start, stages.RUN, weak=False)
events.after_phase.connect(collector.Stop, stages.RUN, weak=False)
if parsed_flags.dstat_publish:
events.benchmark_samples_created.connect(collector.Analyze, weak=False)