in perfkitbenchmarker/linux_benchmarks/dpb_wordcount_benchmark.py [0:0]
def Run(benchmark_spec):
# Configuring input location and output for the word count job
if FLAGS.dpb_wordcount_input is None:
input_location = gcp_dpb_dataflow.DATAFLOW_WC_INPUT
else:
input_location = '{}://{}'.format(
FLAGS.dpb_wordcount_fs, FLAGS.dpb_wordcount_input
)
# Get handle to the dpb service
dpb_service_instance = benchmark_spec.dpb_service
# Switch the parameters for submit job function of specific dpb service
job_arguments = []
classname, job_type = _GetJobArguments(dpb_service_instance.SERVICE_TYPE)
if FLAGS.dpb_job_classname:
classname = FLAGS.dpb_job_classname
if FLAGS.dpb_job_type:
job_type = FLAGS.dpb_job_type
if (
dpb_service_instance.SERVICE_TYPE
in [
dpb_constants.DATAFLOW,
dpb_constants.DATAPROC_FLINK,
dpb_constants.KUBERNETES_FLINK_CLUSTER,
]
or FLAGS.dpb_wordcount_force_beam_style_job_args
):
jarfile = benchmark_spec.dpb_wordcount_jarfile
job_arguments.append('--inputFile={}'.format(input_location))
# Add the output argument for Dataflow
if dpb_service_instance.SERVICE_TYPE == dpb_constants.DATAFLOW:
if not FLAGS.dpb_wordcount_out_base:
base_out = dpb_service_instance.GetStagingLocation()
else:
base_out = f'gs://{FLAGS.dpb_wordcount_out_base}'
job_arguments.append(f'--output={os.path.join(base_out, "output")}')
else:
# Use user-provided jar file if present; otherwise use the default example
if not benchmark_spec.dpb_wordcount_jarfile:
jarfile = dpb_service_instance.GetExecutionJar('spark', 'examples')
else:
jarfile = benchmark_spec.dpb_wordcount_jarfile
job_arguments = [input_location]
job_arguments.extend(_GetJobAdditionalArguments(dpb_service_instance))
# TODO (saksena): Finalize more stats to gather
results = []
start_time = datetime.datetime.now()
job_result = dpb_service_instance.SubmitJob(
jarfile=jarfile,
classname=classname,
job_arguments=job_arguments,
job_type=job_type,
)
end_time = datetime.datetime.now()
# Update metadata after job run to get job id
metadata = copy.copy(dpb_service_instance.GetResourceMetadata())
metadata.update({
'input_location': input_location,
'dpb_wordcount_additional_args': ','.join(
FLAGS.dpb_wordcount_additional_args
),
'dpb_wordcount_force_beam_style_job_args': (
FLAGS.dpb_wordcount_force_beam_style_job_args
),
'dpb_job_type': job_type,
})
dpb_service_instance.metadata.update(metadata)
run_time = (end_time - start_time).total_seconds()
results.append(sample.Sample('run_time', run_time, 'seconds', metadata))
if job_result is not None:
results.append(
sample.Sample(
'reported_run_time', job_result.run_time, 'seconds', metadata
)
)
if job_result.pending_time is not None:
results.append(
sample.Sample(
'reported_pending_time',
job_result.pending_time,
'seconds',
metadata,
)
)
# TODO(odiego): Refactor to avoid explicit service type checks.
if FLAGS.dpb_export_job_stats:
if dpb_service_instance.SERVICE_TYPE == dpb_constants.DATAFLOW:
avg_cpu_util = dpb_service_instance.GetAvgCpuUtilization(
start_time, end_time
)
results.append(sample.Sample('avg_cpu_util', avg_cpu_util, '%', metadata))
stats = dpb_service_instance.job_stats
for name, value in stats.items():
results.append(sample.Sample(name, value, 'number', metadata))
costs = dpb_service_instance.CalculateLastJobCosts()
results += costs.GetSamples(metadata=metadata)
else:
logging.info(
'--dpb_export_job_stats flag is False (which is the new default). Not '
'exporting job stats.'
)
return results