in perfkitbenchmarker/linux_benchmarks/netperf_benchmark.py [0:0]
def RunNetperf(vm, benchmark_name, server_ips, num_streams, client_ips):
"""Spawns netperf on a remote VM, parses results.
Args:
vm: The VM that the netperf TCP_RR benchmark will be run upon.
benchmark_name: The netperf benchmark to run, see the documentation.
server_ips: A list of ips for a machine that is running netserver.
num_streams: The number of netperf client threads to run.
client_ips: A list of ips for a machine that is running netperf.
Returns:
A sample.Sample object with the result.
"""
enable_latency_histograms = FLAGS.netperf_enable_histograms or num_streams > 1
# Throughput benchmarks don't have latency histograms
enable_latency_histograms = enable_latency_histograms and (
benchmark_name not in ['TCP_STREAM', 'UDP_STREAM']
)
# Flags:
# -o specifies keys to include in CSV output.
# -j keeps additional latency numbers
# -v sets the verbosity level so that netperf will print out histograms
# -I specifies the confidence % and width - here 99% confidence that the true
# value is within +/- 2.5% of the reported value
# -i specifies the maximum and minimum number of iterations.
confidence = (
f'-I 99,5 -i {FLAGS.netperf_max_iter},3' if FLAGS.netperf_max_iter else ''
)
verbosity = '-v2 ' if enable_latency_histograms else ''
remote_cmd_timeout = (
FLAGS.netperf_test_length * (FLAGS.netperf_max_iter or 1) + 300
)
metadata = {
'netperf_test_length': FLAGS.netperf_test_length,
'sending_thread_count': num_streams,
'max_iter': FLAGS.netperf_max_iter or 1,
}
remote_cmd_list = []
assert server_ips, 'Server VM does not have an IP to use for netperf.'
if len(client_ips) != len(server_ips):
logging.warning('Number of client and server IPs do not match.')
for server_ip_idx, server_ip in enumerate(server_ips):
client_ip = client_ips[server_ip_idx % len(client_ips)]
netperf_cmd = (
f'{netperf.NETPERF_PATH} '
'-p {command_port} '
f'-j {verbosity} '
f'-t {benchmark_name} '
f'-H {server_ip} -L {client_ip} '
f'-l {FLAGS.netperf_test_length} {confidence}'
' -- '
'-P ,{data_port} '
f'-o {OUTPUT_SELECTOR}'
)
if benchmark_name.upper() == 'UDP_STREAM':
send_size = FLAGS.netperf_udp_stream_send_size_in_bytes
netperf_cmd += f' -R 1 -m {send_size} -M {send_size} '
metadata['netperf_send_size_in_bytes'] = (
FLAGS.netperf_udp_stream_send_size_in_bytes
)
elif benchmark_name.upper() == 'TCP_STREAM':
send_size = FLAGS.netperf_tcp_stream_send_size_in_bytes
netperf_cmd += f' -m {send_size} -M {send_size} '
metadata['netperf_send_size_in_bytes'] = (
FLAGS.netperf_tcp_stream_send_size_in_bytes
)
if FLAGS.netperf_thinktime != 0:
netperf_cmd += (
' -X '
f'{FLAGS.netperf_thinktime},'
f'{FLAGS.netperf_thinktime_array_size},'
f'{FLAGS.netperf_thinktime_run_length} '
)
if FLAGS.netperf_mss and 'TCP' in benchmark_name.upper():
netperf_cmd += f' -G {FLAGS.netperf_mss}b'
metadata['netperf_mss_requested'] = FLAGS.netperf_mss
# Run all of the netperf processes and collect their stdout
# TODO(dlott): Analyze process start delta of netperf processes on the
# remote machine.
remote_cmd = (
f'./{REMOTE_SCRIPT} --netperf_cmd="{netperf_cmd}" '
f'--num_streams={num_streams} --port_start={PORT_START+(server_ip_idx*2*num_streams)}'
)
if (
NETPERF_NUMACTL_MEMBIND.value
or NETPERF_NUMACTL_PHYSCPUBIND.value
or NETPERF_NUMACTL_CPUNODEBIND.value
):
numactl_prefix = 'numactl '
if NETPERF_NUMACTL_PHYSCPUBIND.value:
numactl_prefix += f'--physcpubind {NETPERF_NUMACTL_PHYSCPUBIND.value} '
metadata['netperf_physcpubind'] = NETPERF_NUMACTL_PHYSCPUBIND.value
if NETPERF_NUMACTL_CPUNODEBIND.value:
numactl_prefix += f'--cpunodebind {NETPERF_NUMACTL_CPUNODEBIND.value} '
metadata['netperf_cpunodebind'] = NETPERF_NUMACTL_CPUNODEBIND.value
if NETPERF_NUMACTL_MEMBIND.value:
numactl_prefix += f'--membind {NETPERF_NUMACTL_MEMBIND.value} '
metadata['netperf_membind'] = NETPERF_NUMACTL_MEMBIND.value
remote_cmd = f'{numactl_prefix} {remote_cmd}'
if FLAG_NETPERF_PERF_RECORD.value:
remote_cmd = f'sudo perf record -g -F 99 -- {remote_cmd}'
remote_cmd_list.append(remote_cmd)
# Give the remote script the max possible test length plus 5 minutes to
# complete
remote_cmd_timeout = (
FLAGS.netperf_test_length * (FLAGS.netperf_max_iter or 1) + 300
)
start_time = time.time()
remote_stdout_stderr_threads = background_tasks.RunThreaded(
lambda cmd: vm.RobustRemoteCommand(cmd, timeout=remote_cmd_timeout),
remote_cmd_list,
)
end_time = time.time()
start_time_sample = sample.Sample('start_time', start_time, 'sec', metadata)
end_time_sample = sample.Sample('end_time', end_time, 'sec', metadata)
# Decode stdouts, stderrs, and return codes from remote command's stdout
json_outs = [
json.loads(remote_stdout_stderr[0])
for remote_stdout_stderr in remote_stdout_stderr_threads
]
stdouts_list = [json_out[0] for json_out in json_outs]
parsed_output = []
for stdouts in stdouts_list:
for stdout in stdouts:
parsed_output.append(
ParseNetperfOutput(
stdout, metadata, benchmark_name, enable_latency_histograms
)
)
samples = [start_time_sample, end_time_sample]
if len(parsed_output) == 1:
# Only 1 netperf thread
throughput_sample, latency_samples, histogram = parsed_output[0]
latency_histogram = collections.Counter()
latency_histogram.update(histogram)
# Calculate all percentiles specified by flag.
latency_stats = _HistogramStatsCalculator(latency_histogram)
for stat, value in latency_stats.items():
# Netperf already outputs p50/p90/p99 in ParseNetperfOutput
if stat in ['p50', 'p90', 'p99']:
continue
latency_samples.append(
sample.Sample(
f'{benchmark_name}_Latency_{stat}',
float(value),
'us',
throughput_sample.metadata,
)
)
output_samples = samples + [throughput_sample] + latency_samples
# Create formatted output for TCP stream throughput metrics
if benchmark_name.upper() == 'TCP_STREAM':
output_samples.append(
sample.Sample(
throughput_sample.metric + '_1stream',
throughput_sample.value,
throughput_sample.unit,
throughput_sample.metadata,
)
)
return output_samples
else:
# Multiple netperf threads
# Unzip parsed output
# Note that latency_samples are invalid with multiple threads because stats
# are computed per-thread by netperf, so we don't use them here.
throughput_samples, _, latency_histograms = (
list(t) for t in zip(*parsed_output)
)
# They should all have the same units
throughput_unit = throughput_samples[0].unit
# Extract the throughput values from the samples
throughputs = [s.value for s in throughput_samples]
# Compute some stats on the throughput values
throughput_stats = sample.PercentileCalculator(throughputs, [50, 90, 99])
throughput_stats['min'] = min(throughputs)
throughput_stats['max'] = max(throughputs)
# Calculate aggregate throughput
throughput_stats['total'] = throughput_stats['average'] * len(throughputs)
# Create samples for throughput stats
for stat, value in throughput_stats.items():
samples.append(
sample.Sample(
f'{benchmark_name}_Throughput_{stat}',
float(value),
throughput_unit,
metadata,
)
)
# Create formatted output, following {benchmark_name}_Throughput_Xstream(s)
# for TCP stream throughput metrics
if benchmark_name.upper() == 'TCP_STREAM':
netperf_mss = None
for throughput_sample in throughput_samples:
sample_netperf_mss = throughput_sample.metadata['netperf_mss']
if netperf_mss is None:
netperf_mss = sample_netperf_mss
elif netperf_mss != sample_netperf_mss:
raise ValueError(
'Netperf MSS values do not match for multiple netperf threads.'
)
metadata['netperf_mss'] = netperf_mss if netperf_mss else 'unknown'
samples.append(
sample.Sample(
f'{benchmark_name}_Throughput_{len(parsed_output)}streams',
throughput_stats['total'],
throughput_unit,
metadata,
)
)
if enable_latency_histograms:
# Combine all of the latency histogram dictionaries
latency_histogram = collections.Counter()
for histogram in latency_histograms:
latency_histogram.update(histogram)
# Create a sample for the aggregate latency histogram
hist_metadata = {'histogram': json.dumps(latency_histogram)}
hist_metadata.update(metadata)
samples.append(
sample.Sample(
f'{benchmark_name}_Latency_Histogram', 0, 'us', hist_metadata
)
)
# Calculate stats on aggregate latency histogram
latency_stats = _HistogramStatsCalculator(latency_histogram)
# Create samples for the latency stats
for stat, value in latency_stats.items():
samples.append(
sample.Sample(
f'{benchmark_name}_Latency_{stat}', float(value), 'us', metadata
)
)
return samples