perfkitbenchmarker/linux_benchmarks/copy_throughput_benchmark.py (192 lines of code) (raw):
# Copyright 2014 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Runs copy benchmarks.
cp and dd between two attached disks on same vm.
scp copy across different vms using external networks.
"""
import functools
import logging
import posixpath
from absl import flags
from perfkitbenchmarker import background_tasks
from perfkitbenchmarker import configs
from perfkitbenchmarker import data
from perfkitbenchmarker import errors
from perfkitbenchmarker import linux_virtual_machine
from perfkitbenchmarker import sample
from perfkitbenchmarker import vm_util
from perfkitbenchmarker.configs import benchmark_config_spec
from perfkitbenchmarker.configs import option_decoders
class CopyThroughputBenchmarkSpec(benchmark_config_spec.BenchmarkConfigSpec):
def __init__(self, component_full_name, **kwargs):
self.data_size_in_mb = None
super().__init__(
component_full_name, **kwargs
)
@classmethod
def _GetOptionDecoderConstructions(cls):
"""Gets decoder classes and constructor args for each configurable option.
Returns:
dict. Maps option name string to a (ConfigOptionDecoder class, dict) pair.
The pair specifies a decoder class and its __init__() keyword
arguments to construct in order to decode the named option.
"""
result = super()._GetOptionDecoderConstructions()
result.update({
'data_size_in_mb': (option_decoders.FloatDecoder, {'default': None}),
})
return result
flags.DEFINE_enum(
'copy_benchmark_mode',
'cp',
['cp', 'dd', 'scp'],
'Runs either cp, dd or scp tests.',
)
flags.DEFINE_integer(
'copy_benchmark_single_file_mb',
None,
'If set, a '
'single file of the specified number of MB is used '
'instead of the normal cloud-storage-workload.sh basket '
'of files. Not supported when copy_benchmark_mode is dd',
)
FLAGS = flags.FLAGS
BENCHMARK_NAME = 'copy_throughput'
BENCHMARK_CONFIG = """
copy_throughput:
description: Get cp and scp performance between vms.
vm_groups:
default:
vm_spec: *default_dual_core
disk_spec: *default_500_gb
disk_count: 2
vm_count: 1
"""
BENCHMARK_CONFIG_SPEC_CLASS = CopyThroughputBenchmarkSpec
# Preferred SCP ciphers, in order of preference:
CIPHERS = ['aes128-cbc', 'aes128-ctr']
DATA_FILE = 'cloud-storage-workload.sh'
# size of default data
DEFAULT_DATA_SIZE_IN_MB = 256.1
# Unit for all benchmarks
UNIT = 'MB/sec'
def GetConfig(user_config):
"""Decide number of vms needed and return information for copy benchmark."""
if FLAGS.copy_benchmark_mode == 'dd' and FLAGS.copy_benchmark_single_file_mb:
raise errors.Setup.InvalidFlagConfigurationError(
'Flag copy_benchmark_single_file_mb is not supported when flag '
'copy_benchmark_mode is dd.'
)
config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME)
if FLAGS.copy_benchmark_mode == 'scp':
config['vm_groups']['default']['vm_count'] = 2
config['vm_groups']['default']['disk_count'] = 1
if FLAGS.copy_benchmark_single_file_mb:
config['data_size_in_mb'] = FLAGS.copy_benchmark_single_file_mb
return config
def CheckPrerequisites(benchmark_config):
"""Verifies that the required resources are present.
Args:
benchmark_config: Unused
Raises:
perfkitbenchmarker.data.ResourceNotFound: On missing resource.
"""
del benchmark_config # unused
data.ResourcePath(DATA_FILE)
def PrepareDataFile(vm, data_size_in_mb):
"""Generate data file on vm to destination directory.
Args:
vm: The VM which needs the data file.
data_size_in_mb: The size of the data file in MB.
"""
file_path = data.ResourcePath(DATA_FILE)
vm.PushFile(file_path, '%s/' % vm.GetScratchDir(0))
if data_size_in_mb:
vm.RemoteCommand(
'cd %s/; bash cloud-storage-workload.sh single_file %s'
% (vm.GetScratchDir(0), data_size_in_mb)
)
else:
vm.RemoteCommand(
'cd %s/; bash cloud-storage-workload.sh' % vm.GetScratchDir(0)
)
def PreparePrivateKey(vm):
vm.AuthenticateVm()
def Prepare(benchmark_spec):
"""Prepare vms with additional scratch disks and create vms for scp.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
"""
vms = benchmark_spec.vms
background_tasks.RunThreaded(PreparePrivateKey, vms)
args = [
((vm, benchmark_spec.config.data_size_in_mb), {})
for vm in benchmark_spec.vms
]
background_tasks.RunThreaded(PrepareDataFile, args)
def RunCp(vms, data_size_in_mb, metadata):
"""Runs cp benchmarks and parses results.
Args:
vms: The VMs running cp benchmarks.
data_size_in_mb: The size of the data file in MB.
metadata: The base metadata to attach to the sample.
Returns:
A list of sample.Sample objects.
"""
cmd = (
'rm -rf %s/*; sudo sync; sudo sysctl vm.drop_caches=3; '
'time cp %s/data/* %s/; '
% (
vms[0].GetScratchDir(1),
vms[0].GetScratchDir(0),
vms[0].GetScratchDir(1),
)
)
_, res = vms[0].RemoteCommand(cmd)
logging.info(res)
time_used = vm_util.ParseTimeCommandResult(res)
return [
sample.Sample(
'cp throughput', data_size_in_mb / time_used, UNIT, metadata=metadata
)
]
def RunDd(vms, data_size_in_mb, metadata):
"""Run dd benchmark and parses results.
Args:
vms: The VMs running dd benchmarks.
data_size_in_mb: The size of the data file in MB.
metadata: The metadata to attach to the sample.
Returns:
A list of samples. Each sample is a 4-tuple of (benchmark_name, value, unit,
metadata), as accepted by PerfKitBenchmarkerPublisher.AddSamples.
"""
vm = vms[0]
cmd = (
'rm -rf %s/*; sudo sync; sudo sysctl vm.drop_caches=3; '
'time for i in {0..99}; do dd if=%s/data/file-$i.dat '
'of=%s/file-$i.dat bs=262144; done'
% (vm.GetScratchDir(1), vm.GetScratchDir(0), vm.GetScratchDir(1))
)
_, res = vm.RemoteCommand(cmd)
logging.info(res)
time_used = vm_util.ParseTimeCommandResult(res)
return [
sample.Sample(
'dd throughput', data_size_in_mb / time_used, UNIT, metadata=metadata
)
]
def AvailableCiphers(vm):
"""Returns the set of ciphers accepted by the vm's SSH server."""
ciphers, _ = vm.RemoteCommand('sshd -T | grep ^ciphers ')
return set(ciphers.split()[1].split(','))
def ChooseSshCipher(vms):
"""Returns the most-preferred cipher that's available to all vms."""
available = functools.reduce(
lambda a, b: a & b, [AvailableCiphers(vm) for vm in vms]
)
for cipher in CIPHERS:
if cipher in available:
return cipher
raise Exception(
'None of the preferred ciphers (%s) are available (%s).'
% (CIPHERS, available)
)
def RunScp(vms, data_size_in_mb, metadata):
"""Run scp benchmark.
Args:
vms: The vms running scp commands.
data_size_in_mb: The size of the data file in MB.
metadata: The metadata to attach to the sample.
Returns:
A list of samples. Each sample is a 4-tuple of (benchmark_name, value, unit,
metadata), as accepted by PerfKitBenchmarkerPublisher.AddSamples.
"""
cipher = ChooseSshCipher(vms)
result = RunScpSingleDirection(
vms[0], vms[1], cipher, data_size_in_mb, metadata
)
result += RunScpSingleDirection(
vms[1], vms[0], cipher, data_size_in_mb, metadata
)
return result
MODE_FUNCTION_DICTIONARY = {'cp': RunCp, 'dd': RunDd, 'scp': RunScp}
def RunScpSingleDirection(
sending_vm, receiving_vm, cipher, data_size_in_mb, base_metadata
):
"""Run scp from sending_vm to receiving_vm and parse results.
If 'receiving_vm' is accessible via internal IP from 'sending_vm', throughput
over internal IP addresses will be tested in addition to external IP
addresses.
Args:
sending_vm: The originating VM for the scp command.
receiving_vm: The destination VM for the scp command.
cipher: Name of the SSH cipher to use.
data_size_in_mb: The size of the data file in MB.
base_metadata: The base metadata to attach to the sample.
Returns:
A list of sample.Sample objects.
"""
results = []
metadata = base_metadata.copy()
for vm_specifier, vm in ('receiving', receiving_vm), ('sending', sending_vm):
for k, v in vm.GetResourceMetadata().items():
metadata['{}_{}'.format(vm_specifier, k)] = v
cmd_template = (
'sudo sync; sudo sysctl vm.drop_caches=3; '
'time /usr/bin/scp -o StrictHostKeyChecking=no -i %s -c %s '
'%s %s@%%s:%%s/;'
) % (
linux_virtual_machine.REMOTE_KEY_PATH,
cipher,
'%s/data/*' % sending_vm.GetScratchDir(0),
receiving_vm.user_name,
)
def RunForIpAddress(ip_address, ip_type):
"""Run SCP benchmark against a destination IP address."""
target_dir = posixpath.join(receiving_vm.GetScratchDir(0), ip_type)
cmd = cmd_template % (ip_address, target_dir)
receiving_vm.RemoteCommand('mkdir %s' % target_dir)
meta = metadata.copy()
meta['ip_type'] = ip_type
_, res = sending_vm.RemoteCommand(cmd)
time_used = vm_util.ParseTimeCommandResult(res)
result = data_size_in_mb / time_used
receiving_vm.RemoteCommand('rm -rf %s' % target_dir)
return sample.Sample('scp throughput', result, UNIT, meta)
if vm_util.ShouldRunOnExternalIpAddress():
results.append(RunForIpAddress(receiving_vm.ip_address, 'external'))
if vm_util.ShouldRunOnInternalIpAddress(sending_vm, receiving_vm):
results.append(RunForIpAddress(receiving_vm.internal_ip, 'internal'))
return results
def Run(benchmark_spec):
"""Run cp/scp on target vms.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
Returns:
A list of throughput samples. Each sample contains
the sample metric (string), value (float), unit (string), and metadata
(dict).
"""
vms = benchmark_spec.vms
data_size_for_calculation = DEFAULT_DATA_SIZE_IN_MB
if benchmark_spec.config.data_size_in_mb:
data_size_for_calculation = benchmark_spec.config.data_size_in_mb
metadata = {
'copy_benchmark_single_file_mb': benchmark_spec.config.data_size_in_mb
}
results = MODE_FUNCTION_DICTIONARY[FLAGS.copy_benchmark_mode](
vms, data_size_for_calculation, metadata
)
return results
def Cleanup(benchmark_spec): # pylint: disable=unused-argument
"""Cleanup function.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
"""
pass