def Run()

in perfkitbenchmarker/linux_benchmarks/dpb_wordcount_benchmark.py [0:0]


def Run(benchmark_spec):
  # Configuring input location and output for the word count job
  if FLAGS.dpb_wordcount_input is None:
    input_location = gcp_dpb_dataflow.DATAFLOW_WC_INPUT
  else:
    input_location = '{}://{}'.format(
        FLAGS.dpb_wordcount_fs, FLAGS.dpb_wordcount_input
    )

  # Get handle to the dpb service
  dpb_service_instance = benchmark_spec.dpb_service

  # Switch the parameters for submit job function of specific dpb service
  job_arguments = []
  classname, job_type = _GetJobArguments(dpb_service_instance.SERVICE_TYPE)
  if FLAGS.dpb_job_classname:
    classname = FLAGS.dpb_job_classname
  if FLAGS.dpb_job_type:
    job_type = FLAGS.dpb_job_type
  if (
      dpb_service_instance.SERVICE_TYPE
      in [
          dpb_constants.DATAFLOW,
          dpb_constants.DATAPROC_FLINK,
          dpb_constants.KUBERNETES_FLINK_CLUSTER,
      ]
      or FLAGS.dpb_wordcount_force_beam_style_job_args
  ):
    jarfile = benchmark_spec.dpb_wordcount_jarfile
    job_arguments.append('--inputFile={}'.format(input_location))
    # Add the output argument for Dataflow
    if dpb_service_instance.SERVICE_TYPE == dpb_constants.DATAFLOW:
      if not FLAGS.dpb_wordcount_out_base:
        base_out = dpb_service_instance.GetStagingLocation()
      else:
        base_out = f'gs://{FLAGS.dpb_wordcount_out_base}'
      job_arguments.append(f'--output={os.path.join(base_out, "output")}')
  else:
    # Use user-provided jar file if present; otherwise use the default example
    if not benchmark_spec.dpb_wordcount_jarfile:
      jarfile = dpb_service_instance.GetExecutionJar('spark', 'examples')
    else:
      jarfile = benchmark_spec.dpb_wordcount_jarfile

    job_arguments = [input_location]
  job_arguments.extend(_GetJobAdditionalArguments(dpb_service_instance))

  # TODO (saksena): Finalize more stats to gather
  results = []

  start_time = datetime.datetime.now()
  job_result = dpb_service_instance.SubmitJob(
      jarfile=jarfile,
      classname=classname,
      job_arguments=job_arguments,
      job_type=job_type,
  )
  end_time = datetime.datetime.now()

  # Update metadata after job run to get job id
  metadata = copy.copy(dpb_service_instance.GetResourceMetadata())
  metadata.update({
      'input_location': input_location,
      'dpb_wordcount_additional_args': ','.join(
          FLAGS.dpb_wordcount_additional_args
      ),
      'dpb_wordcount_force_beam_style_job_args': (
          FLAGS.dpb_wordcount_force_beam_style_job_args
      ),
      'dpb_job_type': job_type,
  })
  dpb_service_instance.metadata.update(metadata)

  run_time = (end_time - start_time).total_seconds()
  results.append(sample.Sample('run_time', run_time, 'seconds', metadata))
  if job_result is not None:
    results.append(
        sample.Sample(
            'reported_run_time', job_result.run_time, 'seconds', metadata
        )
    )
    if job_result.pending_time is not None:
      results.append(
          sample.Sample(
              'reported_pending_time',
              job_result.pending_time,
              'seconds',
              metadata,
          )
      )

  # TODO(odiego): Refactor to avoid explicit service type checks.
  if FLAGS.dpb_export_job_stats:
    if dpb_service_instance.SERVICE_TYPE == dpb_constants.DATAFLOW:
      avg_cpu_util = dpb_service_instance.GetAvgCpuUtilization(
          start_time, end_time
      )
      results.append(sample.Sample('avg_cpu_util', avg_cpu_util, '%', metadata))
      stats = dpb_service_instance.job_stats
      for name, value in stats.items():
        results.append(sample.Sample(name, value, 'number', metadata))

    costs = dpb_service_instance.CalculateLastJobCosts()
    results += costs.GetSamples(metadata=metadata)
  else:
    logging.info(
        '--dpb_export_job_stats flag is False (which is the new default). Not '
        'exporting job stats.'
    )

  return results