def _RunQueries()

in perfkitbenchmarker/linux_benchmarks/dpb_sparksql_benchmark.py [0:0]


def _RunQueries(benchmark_spec) -> tuple[str, dpb_service.JobResult]:
  """Runs queries. Returns storage path with metrics and JobResult object."""
  cluster = benchmark_spec.dpb_service
  report_dir = '/'.join([cluster.base_dir, f'report-{int(time.time()*1000)}'])
  args = []
  if FLAGS.dpb_sparksql_simultaneous:
    # Assertion true bc of --dpb_sparksql_simultaneous and
    # --dpb_sparksql_streams being mutually exclusive.
    assert len(benchmark_spec.query_streams) == 1
    for query in benchmark_spec.query_streams[0]:
      args += ['--sql-queries', query]
  else:
    for stream in benchmark_spec.query_streams:
      args += ['--sql-queries', ','.join(stream)]
  if _FETCH_RESULTS_FROM_LOGS.value:
    args += ['--log-results', 'True']
  else:
    args += ['--report-dir', report_dir]
  if FLAGS.dpb_sparksql_database:
    args += ['--database', FLAGS.dpb_sparksql_database]
  if FLAGS.dpb_sparksql_create_hive_tables:
    # Note you can even read from Hive without --create_hive_tables if they
    # were precreated.
    args += ['--enable-hive', 'True']
  else:
    table_names = []
    if _BIGQUERY_DATASET.value:
      args += ['--bigquery-dataset', _BIGQUERY_DATASET.value]
      table_names = _BIGQUERY_TABLES.value
    elif benchmark_spec.data_dir:
      args += ['--table-base-dir', benchmark_spec.data_dir]
      table_names = benchmark_spec.table_subdirs or []
    if table_names:
      args += ['--table-names', *table_names]
    if FLAGS.dpb_sparksql_data_format:
      args += ['--table-format', FLAGS.dpb_sparksql_data_format]
    if (
        FLAGS.dpb_sparksql_data_format == 'csv'
        and FLAGS.dpb_sparksql_csv_delimiter
    ):
      args += ['--csv-delim', FLAGS.dpb_sparksql_csv_delimiter]
    if FLAGS.bigquery_record_format:
      args += ['--bigquery-read-data-format', FLAGS.bigquery_record_format]
  if FLAGS.dpb_sparksql_table_cache:
    args += ['--table-cache', FLAGS.dpb_sparksql_table_cache]
  if dpb_sparksql_benchmark_helper.DUMP_SPARK_CONF.value:
    args += ['--dump-spark-conf', os.path.join(cluster.base_dir, 'spark-conf')]
  jars = []
  job_result = cluster.SubmitJob(
      pyspark_file='/'.join([
          cluster.base_dir,
          dpb_sparksql_benchmark_helper.SPARK_SQL_RUNNER_SCRIPT,
      ]),
      job_arguments=args,
      job_jars=jars,
      job_type=dpb_constants.PYSPARK_JOB_TYPE,
  )
  return report_dir, job_result