def Run()

in perfkitbenchmarker/linux_benchmarks/aerospike_benchmark.py [0:0]


def Run(benchmark_spec: bm_spec.BenchmarkSpec) -> List[sample.Sample]:
  """Runs a read/update load test on Aerospike.

  Args:
    benchmark_spec: The benchmark specification. Contains all data that is
      required to run the benchmark.

  Returns:
    A list of sample.Sample objects.
  """
  clients = benchmark_spec.vm_groups['clients']
  num_client_vms = len(clients)
  servers = benchmark_spec.vm_groups['workers']
  samples = []
  seed_ips = ','.join([str(vm.internal_ip) for vm in servers])
  metadata = {}

  for threads in range(
      AEROSPIKE_MIN_CLIENT_THREADS.value,
      AEROSPIKE_MAX_CLIENT_THREADS.value + 1,
      AEROSPIKE_CLIENT_THREADS_STEP_SIZE.value,
  ):
    stdout_samples = []

    def _Run(namespace, client_idx, process_idx, op, extra_arg):
      extra_arg_str = f'{extra_arg} ' if extra_arg else ''
      run_command = (
          f'asbench '
          f'--threads {threads} --namespace {namespace} '  # pylint: disable=cell-var-from-loop
          f'--workload "{op}" '
          f'{extra_arg_str} '
          f'--object-spec {AEROSPIKE_TEST_WORKLOAD_OBJECT_SPEC.value} '
          f'--keys {AEROSPIKE_NUM_KEYS.value} '
          f'--hosts {seed_ips} --port {3 + process_idx}000 '
          f'--duration {AEROSPIKE_BENCHMARK_DURATION.value} '
          '--latency --percentiles 50,90,99,99.9,99.99 '
          '--output-file '
          f'result.{client_idx}.{process_idx}.{threads} '
      )
      stdout, _ = clients[client_idx].RobustRemoteCommand(run_command)
      stdout_samples.extend(aerospike_client.ParseAsbenchStdout(stdout))  # pylint: disable=cell-var-from-loop

    workload_types = AEROSPIKE_TEST_WORKLOAD_TYPES.value.split(';')

    extra_args = (
        AEROSPIKE_TEST_WORKLOAD_EXTRA_ARGS.value
        if AEROSPIKE_TEST_WORKLOAD_EXTRA_ARGS.value
        else [None] * len(workload_types)
    )
    if len(extra_args) != len(workload_types):
      raise ValueError(
          'aerospike_test_workload_extra_args must be the same length as '
          'aerospike_test_workload_types'
      )
    for op, extra_arg in zip(workload_types, extra_args):
      for namespace in AEROSPIKE_NAMESPACES.value:
        run_params = []
        for client_idx in range(len(clients)):
          for process_idx in range(FLAGS.aerospike_instances):
            run_params.append(
                ((namespace, client_idx, process_idx, op, extra_arg), {})
            )
        background_tasks.RunThreaded(_Run, run_params)
        for server in servers:
          server.RemoteCommand('sudo asadm -e summary')

    if num_client_vms * FLAGS.aerospike_instances == 1:
      detailed_samples = stdout_samples
    else:
      detailed_samples = aerospike_client.AggregateAsbenchSamples(
          stdout_samples
      )

    temp_samples = aerospike_client.CreateTimeSeriesSample(detailed_samples)

    result_files = []
    for client_idx in range(len(clients)):
      for process_idx in range(FLAGS.aerospike_instances):
        filename = f'result.{client_idx}.{process_idx}.{threads}'
        clients[client_idx].PullFile(vm_util.GetTempDir(), filename)
        result_files.append(filename)
    if (
        FLAGS.aerospike_publish_detailed_samples
        or _PUBLISH_PERCENTILE_TIME_SERIES.value
    ):
      detailed_samples.extend(
          aerospike_client.ParseAsbenchHistogram(result_files)
      )
      temp_samples.extend(detailed_samples)
    metadata.update({
        'num_clients_vms': AEROSPIKE_CLIENT_VMS.value,
        'num_aerospike_vms': len(servers),
        'num_aerospike_instances': FLAGS.aerospike_instances,
        'storage_type': FLAGS.aerospike_storage_type,
        'memory_size': int(servers[0].total_memory_kb * 0.8),
        'service_threads': FLAGS.aerospike_service_threads,
        'replication_factor': FLAGS.aerospike_replication_factor,
        'client_threads': threads,
        'read_percent': AEROSPIKE_READ_PERCENT.value,
        'aerospike_edition': FLAGS.aerospike_edition.value,
        'aerospike_enable_strong_consistency': (
            FLAGS.aerospike_enable_strong_consistency
        ),
        'aerospike_test_workload_types': AEROSPIKE_TEST_WORKLOAD_TYPES.value,
        'aerospike_test_workload_extra_args': (
            AEROSPIKE_TEST_WORKLOAD_EXTRA_ARGS.value
        ),
        'aerospike_skip_db_prepopulation': (
            AEROSPIKE_SKIP_DB_PREPOPULATION.value
        ),
        'aerospike_test_workload_object_spec': (
            AEROSPIKE_TEST_WORKLOAD_OBJECT_SPEC.value
        ),
    })
    if FLAGS.aerospike_edition == aerospike_server.AerospikeEdition.ENTERPRISE:
      metadata.update({
          'aerospike_version': FLAGS.aerospike_enterprise_version,
      })
    for s in temp_samples:
      s.metadata.update(metadata)
    samples.extend(temp_samples)

  return samples