perfkitbenchmarker/linux_benchmarks/iperf_benchmark.py (455 lines of code) (raw):
# Copyright 2014 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Runs plain Iperf.
Docs:
http://iperf.fr/
Runs Iperf to collect network throughput.
"""
import logging
import re
import time
from absl import flags
from perfkitbenchmarker import configs
from perfkitbenchmarker import flag_util
from perfkitbenchmarker import sample
from perfkitbenchmarker import vm_util
flag_util.DEFINE_integerlist(
'iperf_sending_thread_count',
flag_util.IntegerList([1]),
'server for sending traffic. Iperfwill run once for each value in the list',
module_name=__name__,
)
flags.DEFINE_integer(
'iperf_runtime_in_seconds',
60,
'Number of seconds to run iperf.',
lower_bound=1,
)
flags.DEFINE_integer(
'iperf_timeout',
None,
(
'Number of seconds to wait in '
'addition to iperf runtime before '
'killing iperf client command.'
),
lower_bound=1,
)
flags.DEFINE_float(
'iperf_udp_per_stream_bandwidth',
None,
(
'In Mbits. Iperf will attempt to send at this bandwidth for UDP tests. '
'If using multiple streams, each stream will '
'attempt to send at this bandwidth'
),
)
flags.DEFINE_float(
'iperf_tcp_per_stream_bandwidth',
None,
(
'In Mbits. Iperf will attempt to send at this bandwidth for TCP tests. '
'If using multiple streams, each stream will '
'attempt to send at this bandwidth'
),
)
flags.DEFINE_float(
'iperf_interval',
None,
(
'The number of seconds between periodic bandwidth reports. '
'Currently only for TCP tests'
),
)
flags.DEFINE_integer(
'iperf_sleep_time', 5, 'number of seconds to sleep after each iperf test'
)
TCP = 'TCP'
UDP = 'UDP'
IPERF_BENCHMARKS = [TCP, UDP]
flags.DEFINE_list('iperf_benchmarks', [TCP], 'Run TCP, UDP or both')
flags.register_validator(
'iperf_benchmarks',
lambda benchmarks: benchmarks
and set(benchmarks).issubset(IPERF_BENCHMARKS),
)
flags.DEFINE_string(
'iperf_buffer_length',
None,
(
'set read/write buffer size (TCP) or length (UDP) to n[kmKM]Bytes.'
'1kB= 10^3, 1mB= 10^6, 1KB=2^10, 1MB=2^20'
),
)
FLAGS = flags.FLAGS
BENCHMARK_NAME = 'iperf'
BENCHMARK_CONFIG = """
iperf:
description: Run iperf
vm_groups:
vm_1:
vm_spec: *default_dual_core
vm_2:
vm_spec: *default_dual_core
"""
IPERF_PORT = 20000
IPERF_UDP_PORT = 25000
IPERF_RETRIES = 5
def GetConfig(user_config):
return configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME)
def Prepare(benchmark_spec):
"""Install iperf and start the server on all machines.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
"""
vms = benchmark_spec.vms
if len(vms) != 2:
raise ValueError(
f'iperf benchmark requires exactly two machines, found {len(vms)}'
)
for vm in vms:
vm.Install('iperf')
# TODO(user): maybe indent this block one
if vm_util.ShouldRunOnExternalIpAddress():
if TCP in FLAGS.iperf_benchmarks:
vm.AllowPort(IPERF_PORT)
if UDP in FLAGS.iperf_benchmarks:
vm.AllowPort(IPERF_UDP_PORT)
if TCP in FLAGS.iperf_benchmarks:
stdout, _ = vm.RemoteCommand(
f'nohup iperf --server --port {IPERF_PORT} &> /dev/null & echo $!'
)
# TODO(user): store this in a better place
vm.iperf_tcp_server_pid = stdout.strip()
# Check that the server is actually running
vm.RemoteCommand(f'ps -p {vm.iperf_tcp_server_pid}')
if UDP in FLAGS.iperf_benchmarks:
stdout, _ = vm.RemoteCommand(
f'nohup iperf --server --bind {vm.internal_ip} --udp '
f'--port {IPERF_UDP_PORT} &> /dev/null & echo $!'
)
# TODO(user): store this in a better place
vm.iperf_udp_server_pid = stdout.strip()
# Check that the server is actually running
vm.RemoteCommand(f'ps -p {vm.iperf_udp_server_pid}')
@vm_util.Retry(max_retries=IPERF_RETRIES)
def _RunIperf(
sending_vm,
receiving_vm,
receiving_ip_address,
thread_count,
ip_type,
protocol,
interval_size=None,
):
"""Run iperf using sending 'vm' to connect to 'ip_address'.
Args:
sending_vm: The VM sending traffic.
receiving_vm: The VM receiving traffic.
receiving_ip_address: The IP address of the iperf server (ie the receiver).
thread_count: The number of threads the server will use.
ip_type: The IP type of 'ip_address' (e.g. 'internal', 'external')
protocol: The protocol for Iperf to use. Either 'TCP' or 'UDP'
interval_size: Time interval at which to output stats.
Returns:
A Sample.
"""
metadata = {
# The meta data defining the environment
'receiving_machine_type': receiving_vm.machine_type,
'receiving_zone': receiving_vm.zone,
'sending_machine_type': sending_vm.machine_type,
'sending_thread_count': thread_count,
'sending_zone': sending_vm.zone,
'runtime_in_seconds': FLAGS.iperf_runtime_in_seconds,
'ip_type': ip_type,
}
if protocol == TCP:
iperf_cmd = (
f'iperf --enhancedreports --client {receiving_ip_address} --port '
f'{IPERF_PORT} --format m --time {FLAGS.iperf_runtime_in_seconds} '
f'--parallel {thread_count}'
)
if FLAGS.iperf_interval:
iperf_cmd += f' --interval {FLAGS.iperf_interval}'
if FLAGS.iperf_tcp_per_stream_bandwidth:
iperf_cmd += f' --bandwidth {FLAGS.iperf_tcp_per_stream_bandwidth}M'
if FLAGS.iperf_buffer_length:
iperf_cmd += f' --len {FLAGS.iperf_buffer_length}'
# the additional time on top of the iperf runtime is to account for the
# time it takes for the iperf process to start and exit
timeout_buffer = FLAGS.iperf_timeout or 30 + thread_count
stdout, _ = sending_vm.RemoteCommand(
iperf_cmd, timeout=FLAGS.iperf_runtime_in_seconds + timeout_buffer
)
window_size_match = re.search(
r'TCP window size: (?P<size>\d+\.?\d+) (?P<units>\S+)', stdout
)
window_size = float(window_size_match.group('size'))
buffer_size = float(
re.search(
r'Write buffer size: (?P<buffer_size>\d+\.?\d+) \S+', stdout
).group('buffer_size')
)
if interval_size:
interval_throughput_list = []
interval_start_time_list = []
rtt_avg_list = []
cwnd_avg_list = []
netpwr_sum_list = []
retry_sum_list = []
cwnd_scale = None
total_stats = {}
if thread_count == 1:
r = re.compile(
r'\[\s*(?P<thread_num>\d+)\]\s+(?P<interval>\d+\.\d+-\d+\.\d+)\s+sec\s+'
r'(?P<transfer>\d+\.?\d*)\s\w+\s+(?P<throughput>\d+\.?\d*)\sMbits\/sec\s+'
r'(?P<write>\d+)\/(?P<err>\d+)\s+(?P<retry>\d+)\s+(?P<cwnd>-?\d+)(?P<cwnd_scale>\w*)\/'
r'(?P<rtt>\d+)\s+(?P<rtt_unit>\w+)\s+(?P<netpwr>\d+\.?\d*)'
)
interval_stats = [m.groupdict() for m in r.finditer(stdout)]
for count in range(0, len(interval_stats) - 1):
i = interval_stats[count]
interval_time = i['interval'].split('-')
interval_start = float(interval_time[0])
interval_start_time_list.append(interval_start)
interval_throughput_list.append(float(i['throughput']))
rtt_avg_list.append(float(i['rtt']))
cwnd_avg_list.append(int(i['cwnd']))
cwnd_scale = i['cwnd_scale']
netpwr_sum_list.append(float(i['netpwr']))
retry_sum_list.append(int(i['retry']))
total_stats = interval_stats[len(interval_stats) - 1]
elif thread_count > 1:
# Parse aggregates of multiple sending threads for each report interval
r = re.compile(
r'\[SUM\]\s+(?P<interval>\d+\.\d+-\d+\.\d+)\s\w+\s+(?P<transfer>\d+\.?\d*)'
r'\s\w+\s+(?P<throughput>\d+\.?\d*)\sMbits\/sec\s+(?P<write>\d+)'
r'\/(?P<err>\d+)\s+(?P<retry>\d+)'
)
interval_sums = [m.groupdict() for m in r.finditer(stdout)]
# Parse output for each individual thread for each report interval
r = re.compile(
r'\[\s*(?P<thread_num>\d+)\]\s+(?P<interval>\d+\.\d+-\d+\.\d+)\s+sec\s+'
r'(?P<transfer>\d+\.?\d*)\s\w+\s+(?P<throughput>\d+\.?\d*)\sMbits\/sec\s+(?P<write>\d+)\/'
r'(?P<err>\d+)\s+(?P<retry>\d+)\s+(?P<cwnd>-?\d+)(?P<cwnd_scale>\w*)\/(?P<rtt>\d+)\s+'
r'(?P<rtt_unit>\w+)\s+(?P<netpwr>\d+\.?\d*)'
)
interval_threads = [m.groupdict() for m in r.finditer(stdout)]
# sum and average across threads for each interval report
for interval in interval_sums[:-1]:
interval_time = interval['interval'].split('-')
interval_start = float(interval_time[0])
interval_start_time_list.append(interval_start)
interval_throughput_list.append(float(interval['throughput']))
thread_results_for_interval = list(
filter(
lambda x: x['interval'] == interval['interval'], # pylint: disable=cell-var-from-loop
interval_threads,
)
)
if len(thread_results_for_interval) != thread_count:
logging.warning(
"iperf thread results don't match sending_thread argument"
)
rtt_sum = 0.0
cwnd_sum = 0.0
netpwr_sum = 0.0
retry_sum = 0
for thread_result in thread_results_for_interval:
rtt_sum += float(thread_result['rtt'])
cwnd_sum += float(thread_result['cwnd'])
netpwr_sum += float(thread_result['netpwr'])
retry_sum += int(thread_result['retry'])
cwnd_scale = thread_result['cwnd_scale']
rtt_average = rtt_sum / len(thread_results_for_interval)
cwnd_average = cwnd_sum / len(thread_results_for_interval)
rtt_avg_list.append(rtt_average)
cwnd_avg_list.append(cwnd_average)
netpwr_sum_list.append(netpwr_sum)
retry_sum_list.append(retry_sum)
total_stats = interval_sums[len(interval_sums) - 1]
total_stats['rtt'] = sum(rtt_avg_list) / len(rtt_avg_list)
total_stats['cwnd'] = sum(cwnd_avg_list) / len(cwnd_avg_list)
total_stats['netpwr'] = sum(netpwr_sum_list) / len(netpwr_sum_list)
total_stats['rtt_unit'] = thread_results_for_interval[0]['rtt_unit']
total_throughput = total_stats['throughput']
tcp_metadata = {
'buffer_size': buffer_size,
'tcp_window_size': window_size,
'write_packet_count': int(total_stats['write']),
'err_packet_count': int(total_stats['err']),
'retry_packet_count': int(total_stats['retry']),
'congestion_window': float(total_stats['cwnd']),
'congestion_window_scale': cwnd_scale,
'interval_length_seconds': float(interval_size),
'rtt': float(total_stats['rtt']),
'rtt_unit': total_stats['rtt_unit'],
'netpwr': float(total_stats['netpwr']),
'transfer_mbytes': int(total_stats['transfer']),
'interval_throughput_list': interval_throughput_list,
'interval_start_time_list': interval_start_time_list,
'interval_rtt_list': rtt_avg_list,
'interval_congestion_window_list': cwnd_avg_list,
'interval_retry_list': retry_sum_list,
'interval_netpwr_list': netpwr_sum_list,
}
metadata_tmp = metadata.copy()
metadata_tmp.update(tcp_metadata)
return sample.Sample(
'Throughput', total_stats['throughput'], 'Mbits/sec', metadata_tmp
)
# if interval_size == None
else:
multi_thread = re.search(
(
r'\[SUM\]\s+\d+\.\d+-\d+\.\d+\s\w+\s+(?P<transfer>\d+)\s\w+\s+(?P<throughput>\d+)'
r'\s\w+/\w+\s+(?P<write>\d+)/(?P<err>\d+)\s+(?P<retry>\d+)\s*'
),
stdout,
)
# Iperf output is formatted differently when running with multiple threads
# vs a single thread
if multi_thread:
# Write, error, retry
write = int(multi_thread.group('write'))
err = int(multi_thread.group('err'))
retry = int(multi_thread.group('retry'))
# if single thread
else:
# Write, error, retry
match = re.search(
r'\d+ Mbits/sec\s+(?P<write>\d+)/(?P<err>\d+)\s+(?P<retry>\d+)',
stdout,
)
write = int(match.group('write'))
err = int(match.group('err'))
retry = int(match.group('retry'))
r = re.compile(
r'\d+ Mbits\/sec\s+'
r' \d+\/\d+\s+\d+\s+(?P<cwnd>-*\d+)(?P<cwnd_unit>\w+)\/(?P<rtt>\d+)'
r'\s+(?P<rtt_unit>\w+)\s+(?P<netpwr>\d+\.\d+)'
)
match = [m.groupdict() for m in r.finditer(stdout)]
cwnd = sum(float(i['cwnd']) for i in match) / len(match)
rtt = round(sum(float(i['rtt']) for i in match) / len(match), 2)
netpwr = round(sum(float(i['netpwr']) for i in match) / len(match), 2)
rtt_unit = match[0]['rtt_unit']
thread_values = re.findall(r'\[SUM].*\s+(\d+\.?\d*).Mbits/sec', stdout)
if not thread_values:
# If there is no sum you have try and figure out an estimate
# which happens when threads start at different times. The code
# below will tend to overestimate a bit.
thread_values = re.findall(
r'\[.*\d+\].*\s+(\d+\.?\d*).Mbits/sec', stdout
)
if len(thread_values) != thread_count:
raise ValueError(
f'Only {len(thread_values)} out of {thread_count}'
' iperf threads reported a throughput value.'
)
total_throughput = sum(float(value) for value in thread_values)
tcp_metadata = {
'buffer_size': buffer_size,
'tcp_window_size': window_size,
'write_packet_count': write,
'err_packet_count': err,
'retry_packet_count': retry,
'congestion_window': cwnd,
'rtt': rtt,
'rtt_unit': rtt_unit,
'netpwr': netpwr,
}
metadata.update(tcp_metadata)
return sample.Sample(
'Throughput', total_throughput, 'Mbits/sec', metadata
)
elif protocol == UDP:
iperf_cmd = (
f'iperf --enhancedreports --udp --client {receiving_ip_address} --port'
f' {IPERF_UDP_PORT} --format m --time {FLAGS.iperf_runtime_in_seconds}'
f' --parallel {thread_count} '
)
if FLAGS.iperf_udp_per_stream_bandwidth:
iperf_cmd += f' --bandwidth {FLAGS.iperf_udp_per_stream_bandwidth}M'
if FLAGS.iperf_buffer_length:
iperf_cmd += f' --len {FLAGS.iperf_buffer_length}'
# the additional time on top of the iperf runtime is to account for the
# time it takes for the iperf process to start and exit
timeout_buffer = FLAGS.iperf_timeout or 30 + thread_count
stdout, _ = sending_vm.RemoteCommand(
iperf_cmd, timeout=FLAGS.iperf_runtime_in_seconds + timeout_buffer
)
match = re.search(
r'UDP buffer size: (?P<buffer_size>\d+\.?\d+)\s+(?P<buffer_unit>\w+)',
stdout,
)
buffer_size = float(match.group('buffer_size'))
datagram_size = int(
re.findall(r'(?P<datagram_size>\d+)\sbyte\sdatagrams', stdout)[0]
)
ipg_target = float(re.findall(r'IPG\starget:\s(\d+.?\d+)', stdout)[0])
ipg_target_unit = str(
re.findall(r'IPG\starget:\s\d+.?\d+\s(\S+)\s', stdout)[0]
)
multi_thread = re.search(
(
r'\[SUM\]\s\d+\.?\d+-\d+\.?\d+\ssec\s+\d+\.?\d+\s+MBytes\s+\d+\.?\d+'
r'\s+Mbits/sec\s+(?P<write>\d+)/(?P<err>\d+)\s+(?P<pps>\d+)\s+pps'
),
stdout,
)
if multi_thread:
# Write, Err, PPS
write = int(multi_thread.group('write'))
err = int(multi_thread.group('err'))
pps = int(multi_thread.group('pps'))
else:
# Write, Err, PPS
match = re.search(
r'\d+\s+Mbits/sec\s+(?P<write>\d+)/(?P<err>\d+)\s+(?P<pps>\d+)\s+pps',
stdout,
)
write = int(match.group('write'))
err = int(match.group('err'))
pps = int(match.group('pps'))
# Jitter
jitter_array = re.findall(
r'Mbits/sec\s+(?P<jitter>\d+\.?\d+)\s+[a-zA-Z]+', stdout
)
jitter_avg = sum(float(x) for x in jitter_array) / len(jitter_array)
jitter_unit = str(
re.search(
r'Mbits/sec\s+\d+\.?\d+\s+(?P<jitter_unit>[a-zA-Z]+)', stdout
).group('jitter_unit')
)
# total and lost datagrams
match = re.findall(
r'(?P<lost_datagrams>\d+)/\s*(?P<total_datagrams>\d+)\s+\(', stdout
)
lost_datagrams_sum = sum(int(i[0]) for i in match)
total_datagrams_sum = sum(int(i[1]) for i in match)
# out of order datagrams
out_of_order_array = re.findall(
r'(\d+)\s+datagrams\sreceived\sout-of-order', stdout
)
out_of_order_sum = sum(int(x) for x in out_of_order_array)
thread_values = re.findall(r'\[SUM].*\s+(\d+\.?\d*).Mbits/sec', stdout)
if not thread_values:
# If there is no sum you have try and figure out an estimate
# which happens when threads start at different times. The code
# below will tend to overestimate a bit.
thread_values = re.findall(
r'\[.*\d+\].*\s+(\d+\.?\d*).Mbits/sec\s+\d+/\d+', stdout
)
if len(thread_values) != thread_count:
raise ValueError(
f'Only {len(thread_values)} out of {thread_count} iperf threads'
' reported a throughput value.'
)
total_throughput = sum(float(value) for value in thread_values)
udp_metadata = {
'buffer_size': buffer_size,
'datagram_size_bytes': datagram_size,
'write_packet_count': write,
'err_packet_count': err,
'pps': pps,
'ipg_target': ipg_target,
'ipg_target_unit': ipg_target_unit,
'jitter': jitter_avg,
'jitter_unit': jitter_unit,
'lost_datagrams': lost_datagrams_sum,
'total_datagrams': total_datagrams_sum,
'out_of_order_datagrams': out_of_order_sum,
'udp_per_stream_bandwidth_mbit': FLAGS.iperf_udp_per_stream_bandwidth,
}
metadata.update(udp_metadata)
return sample.Sample(
'UDP Throughput', total_throughput, 'Mbits/sec', metadata
)
def Run(benchmark_spec):
"""Run iperf on the target vm.
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
Returns:
A list of sample.Sample objects.
"""
vms = benchmark_spec.vms
results = []
logging.info('Iperf Results:')
for protocol in FLAGS.iperf_benchmarks:
for thread_count in FLAGS.iperf_sending_thread_count:
if thread_count < 1:
continue
# Send traffic in both directions
for sending_vm, receiving_vm in vms, reversed(vms):
# Send using external IP addresses
if vm_util.ShouldRunOnExternalIpAddress():
results.append(
_RunIperf(
sending_vm,
receiving_vm,
receiving_vm.ip_address,
thread_count,
vm_util.IpAddressMetadata.EXTERNAL,
protocol,
interval_size=FLAGS.iperf_interval,
)
)
time.sleep(FLAGS.iperf_sleep_time)
# Send using internal IP addresses
if vm_util.ShouldRunOnInternalIpAddress(sending_vm, receiving_vm):
results.append(
_RunIperf(
sending_vm,
receiving_vm,
receiving_vm.internal_ip,
thread_count,
vm_util.IpAddressMetadata.INTERNAL,
protocol,
interval_size=FLAGS.iperf_interval,
)
)
time.sleep(FLAGS.iperf_sleep_time)
return results
def Cleanup(benchmark_spec):
"""Cleanup iperf on the target vm (by uninstalling).
Args:
benchmark_spec: The benchmark specification. Contains all data that is
required to run the benchmark.
"""
vms = benchmark_spec.vms
for vm in vms:
if TCP in FLAGS.iperf_benchmarks:
vm.RemoteCommand(
f'kill -9 {vm.iperf_tcp_server_pid}', ignore_failure=True
)
if UDP in FLAGS.iperf_benchmarks:
vm.RemoteCommand(
f'kill -9 {vm.iperf_udp_server_pid}', ignore_failure=True
)