in perfkitbenchmarker/linux_benchmarks/dpb_sparksql_benchmark.py [0:0]
def _RunQueries(benchmark_spec) -> tuple[str, dpb_service.JobResult]:
"""Runs queries. Returns storage path with metrics and JobResult object."""
cluster = benchmark_spec.dpb_service
report_dir = '/'.join([cluster.base_dir, f'report-{int(time.time()*1000)}'])
args = []
if FLAGS.dpb_sparksql_simultaneous:
# Assertion true bc of --dpb_sparksql_simultaneous and
# --dpb_sparksql_streams being mutually exclusive.
assert len(benchmark_spec.query_streams) == 1
for query in benchmark_spec.query_streams[0]:
args += ['--sql-queries', query]
else:
for stream in benchmark_spec.query_streams:
args += ['--sql-queries', ','.join(stream)]
if _FETCH_RESULTS_FROM_LOGS.value:
args += ['--log-results', 'True']
else:
args += ['--report-dir', report_dir]
if FLAGS.dpb_sparksql_database:
args += ['--database', FLAGS.dpb_sparksql_database]
if FLAGS.dpb_sparksql_create_hive_tables:
# Note you can even read from Hive without --create_hive_tables if they
# were precreated.
args += ['--enable-hive', 'True']
else:
table_names = []
if _BIGQUERY_DATASET.value:
args += ['--bigquery-dataset', _BIGQUERY_DATASET.value]
table_names = _BIGQUERY_TABLES.value
elif benchmark_spec.data_dir:
args += ['--table-base-dir', benchmark_spec.data_dir]
table_names = benchmark_spec.table_subdirs or []
if table_names:
args += ['--table-names', *table_names]
if FLAGS.dpb_sparksql_data_format:
args += ['--table-format', FLAGS.dpb_sparksql_data_format]
if (
FLAGS.dpb_sparksql_data_format == 'csv'
and FLAGS.dpb_sparksql_csv_delimiter
):
args += ['--csv-delim', FLAGS.dpb_sparksql_csv_delimiter]
if FLAGS.bigquery_record_format:
args += ['--bigquery-read-data-format', FLAGS.bigquery_record_format]
if FLAGS.dpb_sparksql_table_cache:
args += ['--table-cache', FLAGS.dpb_sparksql_table_cache]
if dpb_sparksql_benchmark_helper.DUMP_SPARK_CONF.value:
args += ['--dump-spark-conf', os.path.join(cluster.base_dir, 'spark-conf')]
jars = []
job_result = cluster.SubmitJob(
pyspark_file='/'.join([
cluster.base_dir,
dpb_sparksql_benchmark_helper.SPARK_SQL_RUNNER_SCRIPT,
]),
job_arguments=args,
job_jars=jars,
job_type=dpb_constants.PYSPARK_JOB_TYPE,
)
return report_dir, job_result