def RunNetperf()

in perfkitbenchmarker/linux_benchmarks/netperf_benchmark.py [0:0]


def RunNetperf(vm, benchmark_name, server_ips, num_streams, client_ips):
  """Spawns netperf on a remote VM, parses results.

  Args:
    vm: The VM that the netperf TCP_RR benchmark will be run upon.
    benchmark_name: The netperf benchmark to run, see the documentation.
    server_ips: A list of ips for a machine that is running netserver.
    num_streams: The number of netperf client threads to run.
    client_ips: A list of ips for a machine that is running netperf.

  Returns:
    A sample.Sample object with the result.
  """
  enable_latency_histograms = FLAGS.netperf_enable_histograms or num_streams > 1
  # Throughput benchmarks don't have latency histograms
  enable_latency_histograms = enable_latency_histograms and (
      benchmark_name not in ['TCP_STREAM', 'UDP_STREAM']
  )
  # Flags:
  # -o specifies keys to include in CSV output.
  # -j keeps additional latency numbers
  # -v sets the verbosity level so that netperf will print out histograms
  # -I specifies the confidence % and width - here 99% confidence that the true
  #    value is within +/- 2.5% of the reported value
  # -i specifies the maximum and minimum number of iterations.
  confidence = (
      f'-I 99,5 -i {FLAGS.netperf_max_iter},3' if FLAGS.netperf_max_iter else ''
  )
  verbosity = '-v2 ' if enable_latency_histograms else ''

  remote_cmd_timeout = (
      FLAGS.netperf_test_length * (FLAGS.netperf_max_iter or 1) + 300
  )

  metadata = {
      'netperf_test_length': FLAGS.netperf_test_length,
      'sending_thread_count': num_streams,
      'max_iter': FLAGS.netperf_max_iter or 1,
  }

  remote_cmd_list = []
  assert server_ips, 'Server VM does not have an IP to use for netperf.'
  if len(client_ips) != len(server_ips):
    logging.warning('Number of client and server IPs do not match.')
  for server_ip_idx, server_ip in enumerate(server_ips):
    client_ip = client_ips[server_ip_idx % len(client_ips)]
    netperf_cmd = (
        f'{netperf.NETPERF_PATH} '
        '-p {command_port} '
        f'-j {verbosity} '
        f'-t {benchmark_name} '
        f'-H {server_ip} -L {client_ip} '
        f'-l {FLAGS.netperf_test_length} {confidence}'
        ' -- '
        '-P ,{data_port} '
        f'-o {OUTPUT_SELECTOR}'
    )

    if benchmark_name.upper() == 'UDP_STREAM':
      send_size = FLAGS.netperf_udp_stream_send_size_in_bytes
      netperf_cmd += f' -R 1 -m {send_size} -M {send_size} '
      metadata['netperf_send_size_in_bytes'] = (
          FLAGS.netperf_udp_stream_send_size_in_bytes
      )

    elif benchmark_name.upper() == 'TCP_STREAM':
      send_size = FLAGS.netperf_tcp_stream_send_size_in_bytes
      netperf_cmd += f' -m {send_size} -M {send_size} '
      metadata['netperf_send_size_in_bytes'] = (
          FLAGS.netperf_tcp_stream_send_size_in_bytes
      )

    if FLAGS.netperf_thinktime != 0:
      netperf_cmd += (
          ' -X '
          f'{FLAGS.netperf_thinktime},'
          f'{FLAGS.netperf_thinktime_array_size},'
          f'{FLAGS.netperf_thinktime_run_length} '
      )

    if FLAGS.netperf_mss and 'TCP' in benchmark_name.upper():
      netperf_cmd += f' -G {FLAGS.netperf_mss}b'
      metadata['netperf_mss_requested'] = FLAGS.netperf_mss

    # Run all of the netperf processes and collect their stdout
    # TODO(dlott): Analyze process start delta of netperf processes on the
    # remote machine.

    remote_cmd = (
        f'./{REMOTE_SCRIPT} --netperf_cmd="{netperf_cmd}" '
        f'--num_streams={num_streams} --port_start={PORT_START+(server_ip_idx*2*num_streams)}'
    )

    if (
        NETPERF_NUMACTL_MEMBIND.value
        or NETPERF_NUMACTL_PHYSCPUBIND.value
        or NETPERF_NUMACTL_CPUNODEBIND.value
    ):
      numactl_prefix = 'numactl '
      if NETPERF_NUMACTL_PHYSCPUBIND.value:
        numactl_prefix += f'--physcpubind {NETPERF_NUMACTL_PHYSCPUBIND.value} '
        metadata['netperf_physcpubind'] = NETPERF_NUMACTL_PHYSCPUBIND.value
      if NETPERF_NUMACTL_CPUNODEBIND.value:
        numactl_prefix += f'--cpunodebind {NETPERF_NUMACTL_CPUNODEBIND.value} '
        metadata['netperf_cpunodebind'] = NETPERF_NUMACTL_CPUNODEBIND.value
      if NETPERF_NUMACTL_MEMBIND.value:
        numactl_prefix += f'--membind {NETPERF_NUMACTL_MEMBIND.value} '
        metadata['netperf_membind'] = NETPERF_NUMACTL_MEMBIND.value
      remote_cmd = f'{numactl_prefix} {remote_cmd}'

    if FLAG_NETPERF_PERF_RECORD.value:
      remote_cmd = f'sudo perf record -g -F 99 -- {remote_cmd}'

    remote_cmd_list.append(remote_cmd)

  # Give the remote script the max possible test length plus 5 minutes to
  # complete
  remote_cmd_timeout = (
      FLAGS.netperf_test_length * (FLAGS.netperf_max_iter or 1) + 300
  )
  start_time = time.time()
  remote_stdout_stderr_threads = background_tasks.RunThreaded(
      lambda cmd: vm.RobustRemoteCommand(cmd, timeout=remote_cmd_timeout),
      remote_cmd_list,
  )
  end_time = time.time()
  start_time_sample = sample.Sample('start_time', start_time, 'sec', metadata)
  end_time_sample = sample.Sample('end_time', end_time, 'sec', metadata)

  # Decode stdouts, stderrs, and return codes from remote command's stdout
  json_outs = [
      json.loads(remote_stdout_stderr[0])
      for remote_stdout_stderr in remote_stdout_stderr_threads
  ]
  stdouts_list = [json_out[0] for json_out in json_outs]

  parsed_output = []
  for stdouts in stdouts_list:
    for stdout in stdouts:
      parsed_output.append(
          ParseNetperfOutput(
              stdout, metadata, benchmark_name, enable_latency_histograms
          )
      )

  samples = [start_time_sample, end_time_sample]
  if len(parsed_output) == 1:
    # Only 1 netperf thread
    throughput_sample, latency_samples, histogram = parsed_output[0]
    latency_histogram = collections.Counter()
    latency_histogram.update(histogram)
    # Calculate all percentiles specified by flag.
    latency_stats = _HistogramStatsCalculator(latency_histogram)
    for stat, value in latency_stats.items():
      # Netperf already outputs p50/p90/p99 in ParseNetperfOutput
      if stat in ['p50', 'p90', 'p99']:
        continue
      latency_samples.append(
          sample.Sample(
              f'{benchmark_name}_Latency_{stat}',
              float(value),
              'us',
              throughput_sample.metadata,
          )
      )
    output_samples = samples + [throughput_sample] + latency_samples
    # Create formatted output for TCP stream throughput metrics
    if benchmark_name.upper() == 'TCP_STREAM':
      output_samples.append(
          sample.Sample(
              throughput_sample.metric + '_1stream',
              throughput_sample.value,
              throughput_sample.unit,
              throughput_sample.metadata,
          )
      )
    return output_samples
  else:
    # Multiple netperf threads
    # Unzip parsed output
    # Note that latency_samples are invalid with multiple threads because stats
    # are computed per-thread by netperf, so we don't use them here.
    throughput_samples, _, latency_histograms = (
        list(t) for t in zip(*parsed_output)
    )
    # They should all have the same units
    throughput_unit = throughput_samples[0].unit
    # Extract the throughput values from the samples
    throughputs = [s.value for s in throughput_samples]
    # Compute some stats on the throughput values
    throughput_stats = sample.PercentileCalculator(throughputs, [50, 90, 99])
    throughput_stats['min'] = min(throughputs)
    throughput_stats['max'] = max(throughputs)
    # Calculate aggregate throughput
    throughput_stats['total'] = throughput_stats['average'] * len(throughputs)
    # Create samples for throughput stats
    for stat, value in throughput_stats.items():
      samples.append(
          sample.Sample(
              f'{benchmark_name}_Throughput_{stat}',
              float(value),
              throughput_unit,
              metadata,
          )
      )
    # Create formatted output, following {benchmark_name}_Throughput_Xstream(s)
    # for TCP stream throughput metrics
    if benchmark_name.upper() == 'TCP_STREAM':
      netperf_mss = None
      for throughput_sample in throughput_samples:
        sample_netperf_mss = throughput_sample.metadata['netperf_mss']
        if netperf_mss is None:
          netperf_mss = sample_netperf_mss
        elif netperf_mss != sample_netperf_mss:
          raise ValueError(
              'Netperf MSS values do not match for multiple netperf threads.'
          )
      metadata['netperf_mss'] = netperf_mss if netperf_mss else 'unknown'
      samples.append(
          sample.Sample(
              f'{benchmark_name}_Throughput_{len(parsed_output)}streams',
              throughput_stats['total'],
              throughput_unit,
              metadata,
          )
      )
    if enable_latency_histograms:
      # Combine all of the latency histogram dictionaries
      latency_histogram = collections.Counter()
      for histogram in latency_histograms:
        latency_histogram.update(histogram)
      # Create a sample for the aggregate latency histogram
      hist_metadata = {'histogram': json.dumps(latency_histogram)}
      hist_metadata.update(metadata)
      samples.append(
          sample.Sample(
              f'{benchmark_name}_Latency_Histogram', 0, 'us', hist_metadata
          )
      )
      # Calculate stats on aggregate latency histogram
      latency_stats = _HistogramStatsCalculator(latency_histogram)
      # Create samples for the latency stats
      for stat, value in latency_stats.items():
        samples.append(
            sample.Sample(
                f'{benchmark_name}_Latency_{stat}', float(value), 'us', metadata
            )
        )
    return samples