perfkitbenchmarker/linux_benchmarks/dpb_sparksql_benchmark.py (436 lines of code) (raw):
# Copyright 2022 PerfKitBenchmarker Authors. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Executes a series of queries using Apache Spark SQL and records latencies.
Queries:
This benchmark uses TPC-DS and TPC-H queries from
https://github.com/databricks/spark-sql-perf, because spark SQL doesn't support
all the queries that using dialect netezza.
Data:
The Data (TPCDS or TPCH) needs be generated first by user and loaded into object
storage.
TPCDS and TPCH tools.
TPCDS: https://github.com/databricks/tpcds-kit
TPCH: https://github.com/databricks/tpch-dbgen
Spark SQL can either read from Hive tables where the data is stored in a series
of files on a Hadoop Compatible File System (HCFS) or it can read from temporary
views registered from Spark SQL data sources:
https://spark.apache.org/docs/latest/sql-data-sources.html.
Spark SQL queries are run using custom pyspark runner.
This benchmark can create and replace external Hive tables using
`--dpb_sparksql_create_hive_tables=true` Alternatively you could pre-provision
a Hive metastore with data before running the benchmark.
If you do not want to use a Hive Metastore, the custom pyspark runner can
register data as temporary views during job submission. This supports the
entire Spark datasource API and is the default.
One data soruce of note is Google BigQuery using
https://github.com/GoogleCloudPlatform/spark-bigquery-connector.
"""
from collections.abc import MutableMapping
import json
import logging
import os
import re
import time
from typing import Any, List
from absl import flags
from perfkitbenchmarker import configs
from perfkitbenchmarker import dpb_constants
from perfkitbenchmarker import dpb_service
from perfkitbenchmarker import dpb_sparksql_benchmark_helper
from perfkitbenchmarker import errors
from perfkitbenchmarker import object_storage_service
from perfkitbenchmarker import sample
from perfkitbenchmarker import temp_dir
from perfkitbenchmarker import vm_util
BENCHMARK_NAME = 'dpb_sparksql_benchmark'
BENCHMARK_CONFIG = """
dpb_sparksql_benchmark:
description: Run Spark SQL on dataproc and emr
dpb_service:
service_type: dataproc
worker_group:
vm_spec:
GCP:
machine_type: n1-standard-4
num_local_ssds: 0
AWS:
machine_type: m5.xlarge
disk_spec:
GCP:
disk_size: 1000
disk_type: pd-standard
# Only used by unmanaged
mount_point: /scratch
AWS:
disk_size: 1000
disk_type: gp2
# Only used by unmanaged
mount_point: /scratch
Azure:
disk_size: 1000
disk_type: Standard_LRS
# Only used by unmanaged
mount_point: /scratch
worker_count: 2
"""
_BIGQUERY_DATASET = flags.DEFINE_string(
'dpb_sparksql_bigquery_dataset',
None,
'BigQuery dataset with the tables to load as Temporary Spark SQL views'
' instead of reading from external Hive tables.',
)
_BIGQUERY_TABLES = flags.DEFINE_list(
'dpb_sparksql_bigquery_tables',
None,
'BigQuery table names (unqualified) to load as Temporary Spark SQL views'
' instead of reading from external Hive tables.',
)
flags.DEFINE_string(
'bigquery_record_format',
None,
'The record format to use when connecting to BigQuery storage. See: '
'https://github.com/GoogleCloudDataproc/spark-bigquery-connector#properties',
)
_FETCH_RESULTS_FROM_LOGS = flags.DEFINE_bool(
'dpb_sparksql_fetch_results_from_logs',
False,
'Make the query runner script to log query timings to stdout/stderr '
' instead of writing them to some object storage location. Reduces runner '
' latency (and hence its total wall time), but it is not supported by all '
' DPB services.',
)
FLAGS = flags.FLAGS
LOG_RESULTS_PATTERN = (
r'----@spark_sql_runner:results_start@----'
r'(.*)'
r'----@spark_sql_runner:results_end@----'
)
POLL_LOGS_INTERVAL = 60
POLL_LOGS_TIMEOUT = 6 * 60
RESULTS_FROM_LOGS_SUPPORTED_DPB_SERVICES = (
dpb_constants.DATAPROC_SERVERLESS,
dpb_constants.EMR_SERVERLESS,
dpb_constants.GLUE,
)
class QueryResultsNotReadyError(Exception):
"""Used to signal a job is still running."""
def GetConfig(user_config):
return configs.LoadConfig(BENCHMARK_CONFIG, user_config, BENCHMARK_NAME)
def CheckPrerequisites(benchmark_config):
"""Verifies that the required resources are present.
Args:
benchmark_config: Config needed to run the Spark SQL.
Raises:
Config.InvalidValue: On encountering invalid configuration.
"""
if not FLAGS.dpb_sparksql_data and FLAGS.dpb_sparksql_create_hive_tables:
raise errors.Config.InvalidValue(
'You must pass dpb_sparksql_data with dpb_sparksql_create_hive_tables'
)
if FLAGS.dpb_sparksql_database and FLAGS.dpb_sparksql_create_hive_tables:
raise errors.Config.InvalidValue(
'You cannot create hive tables in a custom database.'
)
if bool(_BIGQUERY_DATASET.value) != bool(_BIGQUERY_TABLES.value):
raise errors.Config.InvalidValue(
'--dpb_sparksql_bigquery_dataset and '
'--dpb_sparksql_bigquery_tables must be passed together.'
)
if not (
FLAGS.dpb_sparksql_data
or _BIGQUERY_TABLES.value
or FLAGS.dpb_sparksql_database
):
# In the case of a static dpb_service, data could pre-exist
logging.warning(
'You did not specify --dpb_sparksql_data,'
' --dpb_sparksql_bigquery_tables, or dpb_sparksql_database. You will'
' probably not have data to query!'
)
if (
sum([
bool(FLAGS.dpb_sparksql_data),
bool(_BIGQUERY_TABLES.value),
bool(FLAGS.dpb_sparksql_database),
])
> 1
):
logging.warning(
'You should only pass one of them: --dpb_sparksql_data,'
' --dpb_sparksql_bigquery_tables, or --dpb_sparksql_database.'
)
if bool(FLAGS.dpb_sparksql_order) == bool(FLAGS.dpb_sparksql_streams):
raise errors.Config.InvalidValue(
'You must specify the queries to run with either --dpb_sparksql_order '
'or --dpb_sparksql_streams (but not both).'
)
if FLAGS.dpb_sparksql_simultaneous and FLAGS.dpb_sparksql_streams:
raise errors.Config.InvalidValue(
'--dpb_sparksql_simultaneous is not compatible with '
'--dpb_sparksql_streams.'
)
if (
_FETCH_RESULTS_FROM_LOGS.value
and benchmark_config.dpb_service.service_type
not in RESULTS_FROM_LOGS_SUPPORTED_DPB_SERVICES
):
raise errors.Config.InvalidValue(
f'Current dpb service {benchmark_config.dpb_service.service_type!r} is'
' not supported for --dpb_sparksql_fetch_results_from_logs. Supported'
f' dpb services are: {RESULTS_FROM_LOGS_SUPPORTED_DPB_SERVICES!r}'
)
def Prepare(benchmark_spec):
"""Installs and sets up dataset on the Spark clusters.
Copies scripts and all the queries to cloud.
Creates external Hive tables for data (unless BigQuery is being used).
Args:
benchmark_spec: The benchmark specification
"""
cluster = benchmark_spec.dpb_service
storage_service = cluster.storage_service
start_time = time.time()
dpb_sparksql_benchmark_helper.Prepare(benchmark_spec)
end_time = time.time()
benchmark_spec.queries_and_script_upload_time = end_time - start_time
# Copy to HDFS
if FLAGS.dpb_sparksql_data and FLAGS.dpb_sparksql_copy_to_hdfs:
job_arguments = []
copy_dirs = {
'source': benchmark_spec.data_dir,
'destination': 'hdfs:/tmp/spark_sql/',
}
for flag, data_dir in copy_dirs.items():
staged_file = os.path.join(cluster.base_dir, flag + '-metadata.json')
extra = {}
if flag == 'destination' and FLAGS.dpb_sparksql_data_compression:
extra['compression'] = FLAGS.dpb_sparksql_data_compression
metadata = _GetDistCpMetadata(
data_dir, benchmark_spec.table_subdirs, extra_metadata=extra
)
dpb_sparksql_benchmark_helper.StageMetadata(
metadata, storage_service, staged_file
)
job_arguments += ['--{}-metadata'.format(flag), staged_file]
try:
result = cluster.SubmitJob(
pyspark_file='/'.join([
cluster.base_dir,
dpb_sparksql_benchmark_helper.SPARK_SQL_DISTCP_SCRIPT,
]),
job_type=dpb_constants.PYSPARK_JOB_TYPE,
job_arguments=job_arguments,
)
logging.info(result)
# Tell the benchmark to read from HDFS instead.
benchmark_spec.data_dir = copy_dirs['destination']
except dpb_service.JobSubmissionError as e:
raise errors.Benchmarks.PrepareException(
'Copying tables into HDFS failed'
) from e
# Create external Hive tables
benchmark_spec.hive_tables_creation_time = None
if FLAGS.dpb_sparksql_create_hive_tables:
start_time = time.time()
try:
result = cluster.SubmitJob(
pyspark_file='/'.join([
cluster.base_dir,
dpb_sparksql_benchmark_helper.SPARK_TABLE_SCRIPT,
]),
job_type=dpb_constants.PYSPARK_JOB_TYPE,
job_arguments=[
benchmark_spec.data_dir,
','.join(benchmark_spec.table_subdirs),
],
)
logging.info(result)
except dpb_service.JobSubmissionError as e:
raise errors.Benchmarks.PrepareException(
'Creating tables from {}/* failed'.format(benchmark_spec.data_dir)
) from e
end_time = time.time()
benchmark_spec.hive_tables_creation_time = end_time - start_time
def Run(benchmark_spec):
"""Runs a sequence of Spark SQL Query.
Args:
benchmark_spec: Spec needed to run the Spark SQL.
Returns:
A list of samples, comprised of the detailed run times of individual query.
Raises:
Benchmarks.RunError if no query succeeds.
"""
cluster = benchmark_spec.dpb_service
storage_service = cluster.storage_service
metadata = _GetSampleMetadata(benchmark_spec)
# Run PySpark Spark SQL Runner
report_dir, job_result = _RunQueries(benchmark_spec)
results = _GetQuerySamples(storage_service, report_dir, job_result, metadata)
results += _GetGlobalSamples(results, cluster, job_result, metadata)
results += _GetPrepareSamples(benchmark_spec, metadata)
return results
def _GetSampleMetadata(benchmark_spec):
"""Gets metadata dict to be attached to exported benchmark samples/metrics."""
metadata = benchmark_spec.dpb_service.GetResourceMetadata()
metadata['benchmark'] = dpb_sparksql_benchmark_helper.BENCHMARK_NAMES[
FLAGS.dpb_sparksql_query
]
if FLAGS.bigquery_record_format:
# This takes higher priority since for BQ dpb_sparksql_data_format actually
# holds a fully qualified Java class/package name.
metadata['data_format'] = FLAGS.bigquery_record_format
elif FLAGS.dpb_sparksql_data_format:
metadata['data_format'] = FLAGS.dpb_sparksql_data_format
if FLAGS.dpb_sparksql_data_compression:
metadata['data_compression'] = FLAGS.dpb_sparksql_data_compression
if FLAGS.dpb_sparksql_simultaneous:
metadata['run_type'] = 'SIMULTANEOUS'
elif FLAGS.dpb_sparksql_streams:
metadata['run_type'] = 'THROUGHPUT'
else:
metadata['run_type'] = 'POWER'
return metadata
def _RunQueries(benchmark_spec) -> tuple[str, dpb_service.JobResult]:
"""Runs queries. Returns storage path with metrics and JobResult object."""
cluster = benchmark_spec.dpb_service
report_dir = '/'.join([cluster.base_dir, f'report-{int(time.time()*1000)}'])
args = []
if FLAGS.dpb_sparksql_simultaneous:
# Assertion true bc of --dpb_sparksql_simultaneous and
# --dpb_sparksql_streams being mutually exclusive.
assert len(benchmark_spec.query_streams) == 1
for query in benchmark_spec.query_streams[0]:
args += ['--sql-queries', query]
else:
for stream in benchmark_spec.query_streams:
args += ['--sql-queries', ','.join(stream)]
if _FETCH_RESULTS_FROM_LOGS.value:
args += ['--log-results', 'True']
else:
args += ['--report-dir', report_dir]
if FLAGS.dpb_sparksql_database:
args += ['--database', FLAGS.dpb_sparksql_database]
if FLAGS.dpb_sparksql_create_hive_tables:
# Note you can even read from Hive without --create_hive_tables if they
# were precreated.
args += ['--enable-hive', 'True']
else:
table_names = []
if _BIGQUERY_DATASET.value:
args += ['--bigquery-dataset', _BIGQUERY_DATASET.value]
table_names = _BIGQUERY_TABLES.value
elif benchmark_spec.data_dir:
args += ['--table-base-dir', benchmark_spec.data_dir]
table_names = benchmark_spec.table_subdirs or []
if table_names:
args += ['--table-names', *table_names]
if FLAGS.dpb_sparksql_data_format:
args += ['--table-format', FLAGS.dpb_sparksql_data_format]
if (
FLAGS.dpb_sparksql_data_format == 'csv'
and FLAGS.dpb_sparksql_csv_delimiter
):
args += ['--csv-delim', FLAGS.dpb_sparksql_csv_delimiter]
if FLAGS.bigquery_record_format:
args += ['--bigquery-read-data-format', FLAGS.bigquery_record_format]
if FLAGS.dpb_sparksql_table_cache:
args += ['--table-cache', FLAGS.dpb_sparksql_table_cache]
if dpb_sparksql_benchmark_helper.DUMP_SPARK_CONF.value:
args += ['--dump-spark-conf', os.path.join(cluster.base_dir, 'spark-conf')]
jars = []
job_result = cluster.SubmitJob(
pyspark_file='/'.join([
cluster.base_dir,
dpb_sparksql_benchmark_helper.SPARK_SQL_RUNNER_SCRIPT,
]),
job_arguments=args,
job_jars=jars,
job_type=dpb_constants.PYSPARK_JOB_TYPE,
)
return report_dir, job_result
def _GetQuerySamples(
storage_service: object_storage_service.ObjectStorageService,
report_dir: str,
job_result: dpb_service.JobResult,
base_metadata: MutableMapping[str, str],
) -> list[sample.Sample]:
"""Get Sample objects from job's logs."""
if _FETCH_RESULTS_FROM_LOGS.value:
query_results = _FetchResultsFromLogs(job_result)
else:
query_results = _FetchResultsFromStorage(storage_service, report_dir)
samples = []
for result in query_results:
logging.info('Timing: %s', result)
query_id = result['query_id']
assert query_id
metadata_copy = dict(base_metadata)
metadata_copy['query'] = query_id
if FLAGS.dpb_sparksql_streams:
metadata_copy['stream'] = result['stream']
samples.append(
sample.Sample(
'sparksql_run_time',
result['duration'],
'seconds',
metadata_copy,
)
)
return samples
def _GetGlobalSamples(
query_samples: list[sample.Sample],
cluster: dpb_service.BaseDpbService,
job_result: dpb_service.JobResult,
metadata: MutableMapping[str, str],
) -> list[sample.Sample]:
"""Gets samples that summarize the whole benchmark run."""
run_times = {}
passing_queries = {}
for s in query_samples:
query_id = s.metadata['query']
passing_queries.setdefault(s.metadata.get('stream', 0), set()).add(query_id)
run_times[query_id] = s.value
samples = []
if FLAGS.dpb_sparksql_streams:
for i, stream in enumerate(dpb_sparksql_benchmark_helper.GetStreams()):
metadata[f'failing_queries_stream_{i}'] = ','.join(
sorted(set(stream) - passing_queries.get(i, set()))
)
else:
all_passing_queries = set()
for stream_passing_queries in passing_queries.values():
all_passing_queries.update(stream_passing_queries)
metadata['failing_queries'] = ','.join(
sorted(set(FLAGS.dpb_sparksql_order) - all_passing_queries)
)
cumulative_run_time = sum(run_times.values())
cluster.cluster_duration = cumulative_run_time + (
cluster.GetClusterCreateTime() or 0
)
cluster.metadata.update(metadata)
# TODO(user): Compute aggregated time for each query across streams and
# iterations.
# Wall time of the DPB service job submitted as reported by the DPB service.
# Should include sparksql_cumulative_run_time. Doesn't include
# dpb_sparksql_job_pending.
samples.append(
sample.Sample(
'sparksql_total_wall_time', job_result.wall_time, 'seconds', metadata
)
)
# Geomean of all the passing queries' run time.
samples.append(
sample.Sample(
'sparksql_geomean_run_time',
sample.GeoMean(run_times.values()),
'seconds',
metadata,
)
)
# Sum of all the passing queries' run time.
samples.append(
sample.Sample(
'sparksql_cumulative_run_time',
cumulative_run_time,
'seconds',
metadata,
)
)
# Time the DPB service job (AKA Spark application) was queued before running,
# as reported by the DPB service.
samples.append(
sample.Sample(
'dpb_sparksql_job_pending',
job_result.pending_time,
'seconds',
metadata,
)
)
if FLAGS.dpb_export_job_stats:
# Run cost of the job last DPB service job (non-empty for Serverless DPB
# services implementations only).
costs = cluster.CalculateLastJobCosts()
samples += costs.GetSamples(
prefix='sparksql_',
renames={'total_cost': 'run_cost'},
metadata=metadata,
)
return samples
def _GetPrepareSamples(
benchmark_spec, metadata: MutableMapping[str, str]
) -> list[sample.Sample]:
"""Gets samples for Prepare stage statistics."""
samples = []
if benchmark_spec.queries_and_script_upload_time is not None:
samples.append(
sample.Sample(
'queries_and_script_upload_time',
benchmark_spec.queries_and_script_upload_time,
'seconds',
metadata,
)
)
if benchmark_spec.hive_tables_creation_time is not None:
samples.append(
sample.Sample(
'hive_tables_creation_time',
benchmark_spec.hive_tables_creation_time,
'seconds',
metadata,
)
)
return samples
def _FetchResultsFromStorage(
storage_service: object_storage_service.ObjectStorageService,
report_dir: str,
) -> list[dict[str, Any]]:
"""Get Sample objects from metrics storage path."""
# Spark can only write data to directories not files. So do a recursive copy
# of that directory and then search it for the collection of JSON files with
# the results.
temp_run_dir = temp_dir.GetRunDirPath()
storage_service.Copy(report_dir, temp_run_dir, recursive=True)
report_files = []
for dir_name, _, files in os.walk(
os.path.join(temp_run_dir, os.path.basename(report_dir))
):
for filename in files:
if filename.endswith('.json'):
report_file = os.path.join(dir_name, filename)
report_files.append(report_file)
logging.info("Found report file '%s'.", report_file)
if not report_files:
raise errors.Benchmarks.RunError('Job report not found.')
results = []
for report_file in report_files:
with open(report_file) as file:
for line in file:
results.append(json.loads(line))
return results
@vm_util.Retry(
timeout=POLL_LOGS_TIMEOUT,
poll_interval=POLL_LOGS_INTERVAL,
fuzz=0,
retryable_exceptions=(QueryResultsNotReadyError,),
)
def _FetchResultsFromLogs(job_result: dpb_service.JobResult):
"""Get samples from job results logs."""
job_result.FetchOutput()
logs = '\n'.join([job_result.stdout or '', job_result.stderr])
query_results = _ParseResultsFromLogs(logs)
if query_results is None:
raise QueryResultsNotReadyError
return query_results
def _ParseResultsFromLogs(logs: str) -> list[dict[str, Any]] | None:
json_str_match = re.search(LOG_RESULTS_PATTERN, logs, re.DOTALL)
if not json_str_match:
return None
try:
results = json.loads(json_str_match.group(1))
except ValueError as e:
raise errors.Benchmarks.RunError(
'Corrupted results from logs cannot be deserialized.'
) from e
return results
def _GetDistCpMetadata(base_dir: str, subdirs: List[str], extra_metadata=None):
"""Compute list of table metadata for spark_sql_distcp metadata flags."""
metadata = []
if not extra_metadata:
extra_metadata = {}
for subdir in subdirs or []:
metadata += [(
FLAGS.dpb_sparksql_data_format or 'parquet',
{'path': '/'.join([base_dir, subdir]), **extra_metadata},
)]
return metadata
def Cleanup(benchmark_spec):
"""Cleans up the Benchmark."""
del benchmark_spec # unused