perfkitbenchmarker/linux_benchmarks/mesh_network_benchmark.py (109 lines of code) (raw):
# Copyright 2014 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Runs mesh network benchmarks.
Runs TCP_RR, TCP_STREAM benchmarks from netperf and compute total throughput
and average latency inside mesh network.
"""
import logging
import re
import threading
from absl import flags
from perfkitbenchmarker import background_tasks
from perfkitbenchmarker import configs
from perfkitbenchmarker import errors
from perfkitbenchmarker import sample
from perfkitbenchmarker.linux_packages import netperf
flags.DEFINE_integer(
'num_connections', 1, 'Number of connections between each pair of vms.'
)
flags.DEFINE_integer('num_iterations', 1, 'Number of iterations for each run.')
FLAGS = flags.FLAGS
BENCHMARK_NAME = 'mesh_network'
BENCHMARK_CONFIG = """
mesh_network:
description: >
Measures VM to VM cross section bandwidth in
a mesh network. Specify the number of VMs in the network
with --num_vms.
vm_groups:
default:
vm_spec: *default_dual_core
"""
NETPERF_BENCHMARKSS = ['TCP_RR', 'TCP_STREAM']
VALUE_INDEX = 1
RESULT_LOCK = threading.Lock()
def GetConfig(user_config):
config = configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME)
config['vm_groups']['default']['vm_count'] = FLAGS.num_vms
if FLAGS.num_vms < 2: # Needs at least 2 vms to run the benchmark.
config['vm_groups']['default']['vm_count'] = 2
return config
def PrepareVM(vm):
"""Prepare netperf on a single VM.
Args:
vm: The VM that needs to install netperf package.
"""
vm.RemoteCommand('./netserver')
def Prepare(benchmark_spec):
"""Install vms with necessary softwares.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the banchmark.
"""
vms = benchmark_spec.vms
logging.info('Preparing netperf on %s', vms[0])
vms[0].Install('netperf')
for vm in vms:
vms[0].MoveFile(vm, netperf.NETPERF_PATH)
vms[0].MoveFile(vm, netperf.NETSERVER_PATH)
background_tasks.RunThreaded(PrepareVM, vms, len(vms))
def RunNetperf(vm, benchmark_name, servers, result):
"""Spawns netperf on a remote VM, parses results.
Args:
vm: The VM running netperf.
benchmark_name: The netperf benchmark to run.
servers: VMs running netserver.
result: The result variable shared by all threads.
"""
cmd = ''
if FLAGS.duration_in_seconds:
cmd_duration_suffix = '-l %s' % FLAGS.duration_in_seconds
else:
cmd_duration_suffix = ''
for server in servers:
if vm != server:
cmd += (
'./netperf -t '
'{benchmark_name} -H {server_ip} -i {iterations} '
'{cmd_suffix} & '
).format(
benchmark_name=benchmark_name,
server_ip=server.internal_ip,
iterations=FLAGS.num_iterations,
cmd_suffix=cmd_duration_suffix,
)
netperf_cmd = ''
for _ in range(FLAGS.num_connections):
netperf_cmd += cmd
netperf_cmd += 'wait'
output, _ = vm.RemoteCommand(netperf_cmd)
logging.info(output)
match = re.findall(r'(\d+\.\d+)\s+\n', output)
value = 0
expected_num_match = (len(servers) - 1) * FLAGS.num_connections
if len(match) != expected_num_match:
raise errors.Benchmarks.RunError(
'Netserver not reachable. Expecting %s results, got %s.'
% (expected_num_match, len(match))
)
for res in match:
if benchmark_name == 'TCP_RR':
value += 1.0 / float(res) * 1000.0
else:
value += float(res)
with RESULT_LOCK:
result[VALUE_INDEX] += value
def Run(benchmark_spec):
"""Run netperf on target vms.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
Returns:
Total throughput, average latency in the form of tuple. The tuple contains
the sample metric (string), value (float), unit (string).
"""
vms = benchmark_spec.vms
num_vms = len(vms)
results = []
for netperf_benchmark in NETPERF_BENCHMARKSS:
args = []
metadata = {
'number_machines': num_vms,
'number_connections': FLAGS.num_connections,
}
if netperf_benchmark == 'TCP_STREAM':
metric = 'TCP_STREAM_Total_Throughput'
unit = 'Mbits/sec'
value = 0.0
else:
metric = 'TCP_RR_Average_Latency'
unit = 'ms'
value = 0.0
result = [metric, value, unit, metadata]
args = [((source, netperf_benchmark, vms, result), {}) for source in vms]
background_tasks.RunThreaded(RunNetperf, args, num_vms)
result = sample.Sample(*result)
if netperf_benchmark == 'TCP_RR':
denom = (num_vms - 1) * num_vms * FLAGS.num_connections
result = result._replace(value=result.value / denom)
results.append(result)
logging.info(results)
return results
def Cleanup(benchmark_spec):
"""Cleanup netperf on the target vm (by uninstalling).
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
"""
vms = benchmark_spec.vms
for vm in vms:
logging.info('uninstalling netperf on %s', vm)
vm.RemoteCommand('pkill -9 netserver')
vm.RemoteCommand('rm netserver')
vm.RemoteCommand('rm netperf')