perfkitbenchmarker/edw_benchmark_results_aggregator.py (572 lines of code) (raw):
"""Aggregates the performance results from a edw benchmark.
An edw benchmark, runs multiple iterations of a suite of queries.
Independent raw query performance is aggregated during the benchmark, and used
for generating:
a. Raw query performance samples
b. Aggregated query performance samples
c. Raw wall time for each stream in each iteration
d. Raw wall time for each iteration
e. Aggregated (average) iteration wall time
f. Raw geo mean performance for each iteration
g. Aggregated geo mean performance using the aggregated query performances
"""
import abc
import copy
import enum
import functools
import json
import logging
from typing import Any, Iterable
from absl import flags
import numpy as np
from perfkitbenchmarker import sample
flags.DEFINE_bool(
'edw_generate_aggregated_metrics',
True,
'Whether the benchmark generates aggregated_metrics such as '
'geomean. Query performance metrics are still generated.',
)
FLAGS = flags.FLAGS
class EdwPerformanceAggregationError(Exception):
"""Error encountered during aggregation of performance results."""
def geometric_mean(iterable: list[float]) -> float:
"""Function to compute the geo mean for a list of numeric values.
Args:
iterable: A list of Float performance values
Returns:
A float value equal to the geometric mean of the input performance values.
Raises:
EdwPerformanceAggregationError: If an invalid performance value was included
for aggregation.
"""
if not iterable or any(perf <= 0.0 for perf in iterable):
raise EdwPerformanceAggregationError('Invalid values cannot be aggregated.')
a = np.array(iterable)
return a.prod() ** (1.0 / len(a))
class EdwQueryExecutionStatus(enum.Enum):
"""Enum class for potential status of query execution.
Potential values:
FAILED: Indicates that the query execution failed.
SUCCESSFUL: Indicates that the query execution succeeded.
"""
FAILED = 'query_execution_failed'
SUCCESSFUL = 'query_execution_successful'
class EdwQueryPerformance:
"""Class that represents the performance of an executed edw query.
Attributes:
name: A string name of the query that was executed
performance: A Float variable set to the query's completion time in secs.
-1.0 is used as a sentinel value implying the query failed. For a
successful query the value is expected to be positive.
execution_status: An EdwQueryExecutionStatus enum indicating success/failure
metadata: A dictionary of query execution attributes (job_id, etc.)
"""
def __init__(
self, query_name: str, performance: float, metadata: dict[str, Any]
):
# TODO(user): add query start and query end as attributes.
self.name = query_name
self.performance = performance
self.execution_status = (
EdwQueryExecutionStatus.FAILED
if performance == -1.0
else EdwQueryExecutionStatus.SUCCESSFUL
)
self.metadata = metadata
@classmethod
def from_json(cls, serialized_performance: str):
"""Process the serialized query performance from client jar.
Expected Performance format:
{"query_wall_time_in_secs":1.998,"query_end":1601695222108,"query":"1",
"query_start":1601695220110,
"details":{"job_id":"b66b5a8e-633f-4ee4-8632-4e3d0856172f"}}
Args:
serialized_performance: Stringified json performance.
Returns:
An instance of EdwQueryPerformance
"""
results = json.loads(serialized_performance)
if 'details' in results:
metadata = results['details']
else:
metadata = {}
if results['query_wall_time_in_secs'] == -1:
logging.warning('Query %s failed.', results['query'])
return cls(
query_name=results['query'],
performance=results['query_wall_time_in_secs'],
metadata=metadata,
)
def get_performance_sample(self, metadata: dict[str, Any]) -> sample.Sample:
"""Method to generate a sample for the query performance.
Args:
metadata: A dictionary of execution attributes to be merged with the query
execution attributes, for eg. tpc suite, scale of dataset, etc.
Returns:
A sample for the edw query performance.
"""
query_metadata = copy.copy(metadata)
query_metadata['query'] = self.name
query_metadata['execution_status'] = self.execution_status
query_metadata.update(self.metadata)
return sample.Sample(
'edw_raw_query_time', self.performance, 'seconds', query_metadata
)
def get_performance_value(self) -> float:
"""Method to get the query's completion time in secs.
Returns:
A float value set to the query's completion time in secs.
"""
return self.performance
def get_performance_metadata(self) -> dict[str, Any]:
"""Method to get the query's execution attributes (job_id, etc.).
Returns:
A dictionary set to query's execution attributes (job_id, etc.)
"""
return self.metadata
def is_successful(self) -> bool:
"""Validates if the query was successful."""
return self.execution_status == EdwQueryExecutionStatus.SUCCESSFUL
class EdwBaseIterationPerformance(abc.ABC):
"""Class that represents the performance of an iteration of edw queries."""
id: str
@abc.abstractmethod
def has_query_performance(self, query_name: str) -> bool:
"""Returns whether the query was run at least once in the iteration."""
@abc.abstractmethod
def is_successful(self, expected_queries: list[str]) -> bool:
"""Check if all the expected queries ran and all succeeded."""
@abc.abstractmethod
def is_query_successful(self, query_name: str) -> bool:
"""Returns whether the query was successful in the iteration."""
@abc.abstractmethod
def get_query_performance(self, query_name: str) -> float:
"""Gets a query's execution performance generated during iteration execution."""
@abc.abstractmethod
def get_query_metadata(self, query_name: str) -> dict[str, Any]:
"""Gets the metadata of a query as executed in the current iteration."""
@abc.abstractmethod
def get_all_query_performance_samples(
self, metadata: dict[str, Any]
) -> list[sample.Sample]:
"""Gets a list of samples for all queries in the iteration."""
@abc.abstractmethod
def get_queries_geomean_performance_sample(
self, expected_queries: list[str], metadata: dict[str, Any]
) -> sample.Sample:
"""Gets a sample for geomean of all queries in the iteration."""
@abc.abstractmethod
def get_queries_geomean(self) -> float:
"""Gets the geometric mean of all queries in the iteration."""
@abc.abstractmethod
def get_wall_time(self) -> float:
"""Gets the total wall time, in seconds, for the iteration."""
@abc.abstractmethod
def get_wall_time_performance_sample(
self, metadata: dict[str, Any]
) -> sample.Sample:
"""Gets a sample for wall time performance of the iteration."""
class EdwPowerIterationPerformance(EdwBaseIterationPerformance):
"""Class that represents the performance of a power iteration of edw queries.
Attributes:
id: A unique string id for the iteration.
start_time: The start time of the iteration in milliseconds since epoch.
end_time: The end time of the iteration in milliseconds since epoch.
wall_time: The wall time in seconds as a double value.
performance: A dictionary of query name to its execution performance which
is a EdwQueryPerformance instance.
successful_count: An integer count of the successful queries in the
iteration.
total_count: An integer count of the total number of queries in the
iteration.
"""
def __init__(self, iteration_id: str, total_queries: int):
self.id = iteration_id
self.performance: dict[str, EdwQueryPerformance] = {}
self.total_count = total_queries
self.successful_count: int = 0
self.start_time = 0
self.end_time = -1
self.wall_time: float = 0.0
def add_query_performance(
self, query_name: str, performance: float, metadata: dict[str, Any]
):
"""Creates and populates a query performance from the input results.
Updates the iteration's performance map with the query performance.
The method also increaments the success and failure query counts for the
iteration.
Also updates the wall time of the iteration based on the query time.
Args:
query_name: A string name of the query that was executed
performance: A Float variable set to the query's completion time in secs.
-1.0 is used as a sentinel value implying the query failed. For a
successful query the value is expected to be positive.
metadata: Extra metadata to add to each performance.
Raises:
EdwPerformanceAggregationError: If the query has already been added.
"""
query_metadata = copy.copy(metadata)
self._log_and_strip_query_results(query_metadata)
query_performance = EdwQueryPerformance(
query_name=query_name, performance=performance, metadata=query_metadata
)
if query_performance.name in self.performance:
raise EdwPerformanceAggregationError(
'Attempting to aggregate a duplicate query: %s.'
% query_performance.name
)
self.performance[query_performance.name] = query_performance
if query_performance.is_successful():
self.successful_count += 1
self.wall_time = self.wall_time + performance
def _log_and_strip_query_results(
self, metadata: dict[str, str | dict[Any, Any]]
) -> None:
"""Logs first 100 characters of query output, then removes from metadata.
Args:
metadata: A metadata dict with query detail fields resulting of a query
run via an EDW driver. If a key named 'output' exists in the dict, then
the first 100 characters of the associated value are printed, and the
key is deleted from the dict.
"""
# Although normally only one of either 'output' or 'query_results' should be
# set, we want to err on the safe-side and remove both keys if they ever
# existed together and then log whichever.
output = metadata.pop('output', None)
query_results = metadata.pop('query_results', None)
stripped = output or query_results
if stripped:
logging.info('query results (first 100 chars): %s', str(stripped)[:100])
def has_query_performance(self, query_name: str) -> bool:
"""Returns whether the query was run at least once in the iteration.
Args:
query_name: A String name of the query to check.
Returns:
A boolean value indicating if the query was executed in the iteration.
"""
return query_name in self.performance
def is_query_successful(self, query_name: str) -> bool:
"""Returns whether the query was successful in the iteration.
Args:
query_name: A String name of the query to check.
Returns:
A boolean value indicating if the query was successful in the iteration.
"""
return self.performance[query_name].is_successful()
def get_query_performance(self, query_name: str) -> float:
"""Gets a query's execution performance generated during iteration execution.
Args:
query_name: A String name of the query to retrieve details for
Returns:
A float value set to the query's completion time in secs.
"""
return self.performance[query_name].get_performance_value()
def get_query_metadata(self, query_name: str) -> dict[str, Any]:
"""Gets the metadata of a query as executed in the current iteration.
Args:
query_name: Name of the query whose performance is requested.
Returns:
A dictionary set to the query's metadata.
Raises:
EdwPerformanceAggregationError: If the query failed.
"""
if not self.is_query_successful(query_name):
raise EdwPerformanceAggregationError(
'Cannot aggregate invalid / failed query' + query_name
)
return self.performance[query_name].metadata
def get_all_queries_in_iteration(self) -> Iterable[str]:
"""Gets a list of names of all queries in the iteration.
Returns:
A list of all queries in the iteration.
"""
return self.performance.keys()
def get_all_query_performance_samples(
self, metadata: dict[str, Any]
) -> list[sample.Sample]:
"""Gets a list of samples for all queries in the iteration.
Args:
metadata: A dictionary of execution attributes to be merged with the query
execution attributes, for eg. tpc suite, scale of dataset, etc.
Returns:
A list of samples of each query's performance
"""
return [
query_performance.get_performance_sample(metadata)
for query_performance in self.performance.values()
]
def add_start_time(self, start_time: int):
"""Sets the start time of the iteration.
Args:
start_time: The UNIX timestamp, in milliseconds, at which the iteration
was started
"""
self.start_time = start_time
def add_end_time(self, end_time: int):
"""Sets the end time of the iteration.
Args:
end_time: The UNIX timestamp, in milliseconds, at which the iteration
completed
"""
self.end_time = end_time
def get_wall_time(self) -> float:
"""Gets the total wall time, in seconds, for the iteration.
The wall time is the sum of the wall time of all individual queries.
Returns:
The wall time in seconds.
"""
return self.wall_time
def get_wall_time_performance_sample(
self, metadata: dict[str, Any]
) -> sample.Sample:
"""Gets a sample for wall time performance of the iteration.
Args:
metadata: A dictionary of execution attributes to be merged with the query
execution attributes, for eg. tpc suite, scale of dataset, etc.
Returns:
A sample of iteration wall time performance
"""
wall_time_metadata = copy.copy(metadata)
wall_time_metadata['iteration_start_time'] = self.start_time
wall_time_metadata['iteration_end_time'] = self.end_time
return sample.Sample(
'edw_iteration_wall_time', self.wall_time, 'seconds', wall_time_metadata
)
def is_successful(self, expected_queries: list[str]) -> bool:
"""Check if all the expected queries ran and all succeeded."""
all_queries_ran = set(self.get_all_queries_in_iteration()) == set(
expected_queries
)
all_queries_were_successful = self.total_count == self.successful_count
return all_queries_ran and all_queries_were_successful
def get_queries_geomean(self) -> float:
"""Gets the geometric mean of all queries in the iteration.
Returns:
The (float) geometric mean of all the queries ran in the iteration.
Raises:
EdwPerformanceAggregationError: If the iteration contains unsuccessful
query executions.
"""
return geometric_mean([
query_performance.performance
for query_performance in self.performance.values()
])
def get_queries_geomean_performance_sample(
self, expected_queries: list[str], metadata: dict[str, Any]
) -> sample.Sample:
"""Gets a sample for geomean of all queries in the iteration.
Args:
expected_queries: A list of query names expected to have been executed in
an iteration.
metadata: A dictionary of execution attributes to be merged with the query
execution attributes, for eg. tpc suite, scale of dataset, etc.
Returns:
A sample of iteration geomean performance.
Raises:
EdwPerformanceAggregationError: If the iteration contains unsuccessful
query executions.
"""
if not self.is_successful(expected_queries):
raise EdwPerformanceAggregationError('Failed executions in iteration.')
raw_geo_mean = self.get_queries_geomean()
geo_mean_metadata = copy.copy(metadata)
return sample.Sample(
'edw_iteration_geomean_time', raw_geo_mean, 'seconds', geo_mean_metadata
)
class EdwSimultaneousIterationPerformance(EdwBaseIterationPerformance):
"""Class that represents the performance of a simultaneous iteration.
Attributes:
id: A unique string id for the iteration.
start_time: The start time of the iteration in milliseconds since epoch.
end_time: The end time of the iteration in milliseconds since epoch.
wall_time: The wall time in seconds as a double value.
performance: A dictionary of query name to its execution performance which
is an EdwQueryPerformance instance.
all_queries_succeeded: Whether all queries in the iteration were successful.
"""
def __init__(
self,
iteration_id: str,
iteration_start_time: int,
iteration_end_time: int,
iteration_wall_time: float,
iteration_performance: dict[str, EdwQueryPerformance],
all_queries_succeeded: bool,
):
self.id = iteration_id
self.start_time = iteration_start_time
self.end_time = iteration_end_time
self.wall_time = iteration_wall_time
self.performance = iteration_performance
self.all_queries_succeeded = all_queries_succeeded
@classmethod
def from_json(cls, iteration_id: str, serialized_performance: str):
"""Process the serialized simultaneous iteration performance from client jar.
Expected Performance format:
{"simultaneous_end":1601145943197,"simultaneous_start":1601145940113,
"all_queries_performance_array":[{"query_wall_time_in_secs":2.079,
"query_end":1601145942208,"job_id":"914682d9-4f64-4323-bad2-554267cbbd8d",
"query":"1","query_start":1601145940129},{"query_wall_time_in_secs":2.572,
"query_end":1601145943192,"job_id":"efbf93a1-614c-4645-a268-e3801ae994f1",
"query":"2","query_start":1601145940620}],
"simultaneous_wall_time_in_secs":3.084}
Args:
iteration_id: String identifier of the simultaneous iteration.
serialized_performance: Stringified json performance.
Returns:
An instance of EdwSimultaneousIterationPerformance
"""
results = json.loads(serialized_performance)
query_performance_map = {}
all_queries_succeeded = 'failure_reason' not in results
if all_queries_succeeded:
for query_perf_json in results['all_queries_performance_array']:
query_perf = EdwQueryPerformance.from_json(
serialized_performance=(json.dumps(query_perf_json))
)
query_performance_map[query_perf.name] = query_perf
else:
logging.warning('Failure reported. Reason: %s', results['failure_reason'])
return cls(
iteration_id=iteration_id,
iteration_start_time=(
results['simultaneous_start'] if all_queries_succeeded else -1
),
iteration_end_time=(
results['simultaneous_end'] if all_queries_succeeded else -1
),
iteration_wall_time=results['simultaneous_wall_time_in_secs'],
iteration_performance=query_performance_map,
all_queries_succeeded=all_queries_succeeded,
)
def get_wall_time(self) -> float:
"""Gets the total wall time, in seconds, for the iteration.
The wall time is the time from the start of the first query to the end time
of the last query to finish.
Returns:
The wall time in seconds.
"""
return self.wall_time
def get_wall_time_performance_sample(
self, metadata: dict[str, Any]
) -> sample.Sample:
"""Gets a sample for wall time performance of the iteration.
Args:
metadata: A dictionary of execution attributes to be merged with the query
execution attributes, for eg. tpc suite, scale of dataset, etc.
Returns:
A sample of iteration wall time performance
"""
wall_time = self.wall_time
wall_time_metadata = copy.copy(metadata)
wall_time_metadata['iteration_start_time'] = self.start_time
wall_time_metadata['iteration_end_time'] = self.end_time
return sample.Sample(
'edw_iteration_wall_time', wall_time, 'seconds', wall_time_metadata
)
def get_all_query_performance_samples(
self, metadata: dict[str, Any]
) -> list[sample.Sample]:
"""Gets a list of samples for all queries in the iteration.
Args:
metadata: A dictionary of execution attributes to be merged with the query
execution attributes, for eg. tpc suite, scale of dataset, etc.
Returns:
A list of samples of each query's performance
"""
return [
query_performance.get_performance_sample(metadata)
for query_performance in self.performance.values()
]
def is_successful(self, expected_queries: list[str]) -> bool:
"""Check if all the expected queries ran and all succeeded."""
all_queries_ran = self.performance.keys() == set(expected_queries)
return all_queries_ran and self.all_queries_succeeded
def has_query_performance(self, query_name: str) -> bool:
"""Returns whether the query was run at least once in the iteration.
Args:
query_name: A String name of the query to check.
Returns:
A boolean value indicating if the query was executed in the iteration.
"""
return query_name in self.performance
def is_query_successful(self, query_name: str) -> bool:
"""Returns whether the query was successful in the iteration.
Args:
query_name: A String name of the query to check.
Returns:
A boolean value indicating if the query was successful in the iteration.
"""
if self.has_query_performance(query_name):
return self.performance[query_name].is_successful()
return False
def get_query_performance(self, query_name: str) -> float:
"""Gets a query's execution performance in the current iteration.
Args:
query_name: A String name of the query to retrieve details for
Returns:
A float value set to the query's completion time in secs.
"""
return self.performance[query_name].get_performance_value()
def get_query_metadata(self, query_name: str) -> dict[str, Any]:
"""Gets the metadata of a query in the current iteration.
Args:
query_name: Name of the query whose aggregated performance is requested
Returns:
A dictionary set to the query's aggregated metadata, accumulated from the
raw query run in the current iteration.
Raises:
EdwPerformanceAggregationError: If the query failed in the iteration.
"""
if not self.is_query_successful(query_name):
raise EdwPerformanceAggregationError(
'Cannot aggregate invalid / failed query' + query_name
)
return self.performance[query_name].metadata
def get_queries_geomean(self) -> float:
"""Gets the geometric mean of all queries in the iteration.
Returns:
The (float) geometric mean of all the queries ran in the iteration.
Raises:
EdwPerformanceAggregationError: If the iteration contains unsuccessful
query executions.
"""
return geometric_mean([
query_performance.performance
for query_performance in self.performance.values()
])
def get_queries_geomean_performance_sample(
self, expected_queries: list[str], metadata: dict[str, Any]
) -> sample.Sample:
"""Gets a sample for geomean of all queries in the iteration.
Args:
expected_queries: A list of query names expected to have been executed in
an iteration.
metadata: A dictionary of execution attributes to be merged with the query
execution attributes, for eg. tpc suite, scale of dataset, etc.
Returns:
A sample of iteration geomean performance.
Raises:
EdwPerformanceAggregationError: If the iteration contains unsuccessful
query executions.
"""
if not self.is_successful(expected_queries):
raise EdwPerformanceAggregationError('Failed executions in iteration.')
raw_geo_mean = self.get_queries_geomean()
geo_mean_metadata = copy.copy(metadata)
return sample.Sample(
'edw_iteration_geomean_time', raw_geo_mean, 'seconds', geo_mean_metadata
)
class EdwThroughputIterationPerformance(EdwBaseIterationPerformance):
"""Class that represents the performance of an iteration of edw queries.
Attributes:
id: A unique string id for the iteration.
start_time: The start time of the iteration execution.
end_time: The end time of the iteration execution.
wall_time: The wall time of the stream execution.
performance: A dict of stream_id to stream performances, each of which is a
dictionary mapping query names to their execution performances, which are
EdwQueryPerformance instances.
"""
def __init__(
self,
iteration_id: str,
iteration_start_time: int,
iteration_end_time: int,
iteration_wall_time: float,
iteration_performance: dict[str, dict[str, EdwQueryPerformance]],
):
self.id = iteration_id
self.start_time = iteration_start_time
self.end_time = iteration_end_time
self.wall_time = iteration_wall_time
self.performance = iteration_performance
@classmethod
def from_json(cls, iteration_id: str, serialized_performance: str):
"""Process the serialized throughput iteration performance from client jar.
Expected Performance format:
{"throughput_start":1601666911596,"throughput_end":1601666916139,
"throughput_wall_time_in_secs":4.543,
"all_streams_performance_array":[
{"stream_start":1601666911597,"stream_end":1601666916139,
"stream_wall_time_in_secs":4.542,
"stream_performance_array":[
{"query_wall_time_in_secs":2.238,"query_end":1601666913849,
"query":"1","query_start":1601666911611,
"details":{"job_id":"438170b0-b0cb-4185-b733-94dd05b46b05"}},
{"query_wall_time_in_secs":2.285,"query_end":1601666916139,
"query":"2","query_start":1601666913854,
"details":{"job_id":"371902c7-5964-46f6-9f90-1dd00137d0c8"}}
]},
{"stream_start":1601666911597,"stream_end":1601666916018,
"stream_wall_time_in_secs":4.421,
"stream_performance_array":[
{"query_wall_time_in_secs":2.552,"query_end":1601666914163,
"query":"2","query_start":1601666911611,
"details":{"job_id":"5dcba418-d1a2-4a73-be70-acc20c1f03e6"}},
{"query_wall_time_in_secs":1.855,"query_end":1601666916018,
"query":"1","query_start":1601666914163,
"details":{"job_id":"568c4526-ae26-4e9d-842c-03459c3a216d"}}
]}
]}
Args:
iteration_id: String identifier of the throughput iteration.
serialized_performance: Stringified json performance.
Returns:
An instance of EdwThroughputIterationPerformance
"""
results = json.loads(serialized_performance)
stream_performances = {}
all_queries_succeeded = 'failure_reason' not in results
if all_queries_succeeded:
for stream_id, stream_perf_json in enumerate(
results['all_streams_performance_array']
):
stream_id = str(stream_id)
stream_performance_map = {}
for query_perf_json in stream_perf_json['stream_performance_array']:
query_perf = EdwQueryPerformance.from_json(
serialized_performance=(json.dumps(query_perf_json))
)
stream_performance_map[query_perf.name] = query_perf
stream_performances.update({stream_id: stream_performance_map})
else:
logging.warning('Failure reported. Reason: %s', results['failure_reason'])
return cls(
iteration_id=iteration_id,
iteration_start_time=(
results['throughput_start'] if all_queries_succeeded else -1
),
iteration_end_time=(
results['throughput_end'] if all_queries_succeeded else -1
),
iteration_wall_time=results['throughput_wall_time_in_secs'],
iteration_performance=stream_performances,
)
def has_query_performance(self, query_name: str) -> bool:
"""Returns whether the query was run at least once in the iteration.
Args:
query_name: A String name of the query to check.
Returns:
A boolean value indicating if the query was executed in the iteration.
"""
for stream in self.performance.values():
if query_name in stream:
return True
return False
def is_query_successful(self, query_name: str) -> bool:
"""Returns whether the query was successful in the iteration.
Args:
query_name: A String name of the query to check.
Returns:
A boolean value indicating if the query was successful in the iteration.
"""
for stream in self.performance.values():
if query_name in stream:
if not stream[query_name].is_successful():
return False
return True
def get_query_performance(self, query_name: str) -> float:
"""Gets a query's execution performance aggregated across all streams in the current iteration.
Args:
query_name: A String name of the query to retrieve details for
Returns:
A float value set to the query's average completion time in secs.
"""
all_performances = []
for stream in self.performance.values():
if query_name in stream:
all_performances.append(stream[query_name].get_performance_value())
if not all_performances:
return -1.0
return sum(all_performances) / len(all_performances)
def get_query_metadata(self, query_name: str) -> dict[str, Any]:
"""Gets the metadata of a query aggregated across all streams in the current iteration.
Args:
query_name: Name of the query whose aggregated performance is requested
Returns:
A dictionary set to the query's aggregated metadata, accumulated from the
raw query runs in all streams of the current iteration.
Raises:
EdwPerformanceAggregationError: If the query failed in one or more streams
"""
result = {}
for stream_id, stream_performance in self.performance.items():
if query_name in stream_performance:
q_performance = stream_performance[query_name]
result[stream_id + '_runtime'] = q_performance.get_performance_value()
result.update({
stream_id + '_' + k: v
for (k, v) in q_performance.get_performance_metadata().items()
})
return result
def get_all_query_performance_samples(
self, metadata: dict[str, Any]
) -> list[sample.Sample]:
"""Gets a list of samples for all queries in all streams of the iteration.
Args:
metadata: A dictionary of execution attributes to be merged with the query
execution attributes, for eg. tpc suite, scale of dataset, etc.
Returns:
A list of samples of each query's performance
"""
all_query_performances = []
for stream_id, stream_performance in self.performance.items():
stream_metadata = copy.copy(metadata)
stream_metadata['stream'] = stream_id
all_query_performances.extend([
query_perf.get_performance_sample(stream_metadata)
for query_perf in stream_performance.values()
])
return all_query_performances
def all_streams_ran_all_expected_queries(
self, expected_queries: list[str]
) -> bool:
"""Checks that the same set of expected queries ran in all streams."""
for stream in self.performance.values():
if set(stream.keys()) != set(expected_queries):
return False
return True
def no_duplicate_queries(self) -> bool:
"""Checks that no streams contain any duplicate queries."""
for stream in self.performance.values():
if len(stream.keys()) != len(set(stream.keys())):
return False
return True
def all_queries_succeeded(self) -> bool:
"""Checks if every query in every stream was successful."""
for stream_performance in self.performance.values():
for query_perf in stream_performance.values():
if query_perf.performance == -1:
return False
return True
def is_successful(self, expected_queries: list[str]) -> bool:
"""Check if the throughput run was successful.
A successful run meets the following conditions:
- There were more than 0 streams.
- Each stream ran the same set of expected queries (regardless of order)
- Each stream ran each query only once
- Every query in every stream succeeded
Args:
expected_queries: A list of query names expected to have been executed in
an iteration.
Returns:
True if all success conditions were met, false otherwise.
"""
non_zero_streams = len(self.performance) >= 1
all_streams_ran_all_queries = self.all_streams_ran_all_expected_queries(
expected_queries
)
no_duplicate_queries = self.no_duplicate_queries()
all_queries_succeeded = self.all_queries_succeeded()
return (
non_zero_streams
and all_streams_ran_all_queries
and no_duplicate_queries
and all_queries_succeeded
)
def get_queries_geomean(self) -> float:
"""Gets the geometric mean of all queries in all streams of the iteration.
Returns:
The (float) geometric mean of all the individual queries ran in all
streams of the iteration.
Raises:
EdwPerformanceAggregationError: If the suite contains unsuccessful query
executions.
"""
query_performances = []
for stream in self.performance.values():
for query in stream.values():
query_performances.append(query.get_performance_value())
return geometric_mean(query_performances)
def get_queries_geomean_performance_sample(
self, expected_queries: list[str], metadata: dict[str, Any]
) -> sample.Sample:
"""Gets a sample for geomean of all queries in all streams of the iteration.
Args:
expected_queries: A list of query names expected to have been executed in
an iteration.
metadata: A dictionary of execution attributes to be merged with the query
execution attributes, for eg. tpc suite, scale of dataset, etc.
Returns:
A sample of iteration geomean performance.
Raises:
EdwPerformanceAggregationError: If the iteration contains unsuccessful
query executions.
"""
if not self.is_successful(expected_queries):
raise EdwPerformanceAggregationError('Failed executions in iteration.')
raw_geo_mean = self.get_queries_geomean()
geo_mean_metadata = copy.copy(metadata)
return sample.Sample(
'edw_iteration_geomean_time', raw_geo_mean, 'seconds', geo_mean_metadata
)
def get_wall_time(self) -> float:
"""Gets the total wall time, in seconds, for the iteration.
The wall time is the time from the start of the first stream to the end time
of the last stream to finish.
Returns:
The wall time in seconds.
"""
return self.wall_time
def get_wall_time_performance_sample(
self, metadata: dict[str, Any]
) -> sample.Sample:
"""Gets a sample for total wall time performance of the iteration.
Args:
metadata: A dictionary of execution attributes to be merged with the query
execution attributes, for eg. tpc suite, scale of dataset, etc.
Returns:
A sample of iteration wall time performance
"""
wall_time_metadata = copy.copy(metadata)
wall_time_metadata['iteration_start_time'] = self.start_time
wall_time_metadata['iteration_end_time'] = self.end_time
return sample.Sample(
'edw_iteration_wall_time', self.wall_time, 'seconds', wall_time_metadata
)
class EdwBenchmarkPerformance:
"""Class that represents the performance of an edw benchmark.
Attributes:
total_iterations: An integer variable set to total of number of iterations.
expected_queries: A list of query names that are executed in an iteration of
the benchmark
iteration_performances: A dictionary of iteration id (String value) to its
execution performance (an instance of EdwBaseIterationPerformance)
"""
def __init__(self, total_iterations: int, expected_queries: Iterable[str]):
self.total_iterations = total_iterations
self.expected_queries = list(expected_queries)
self.iteration_performances: dict[str, EdwBaseIterationPerformance] = {}
def add_iteration_performance(self, performance: EdwBaseIterationPerformance):
"""Add an iteration's performance to the benchmark results.
Args:
performance: An instance of EdwBaseIterationPerformance encapsulating the
iteration performance details.
Raises:
EdwPerformanceAggregationError: If the iteration has already been added.
"""
iteration_id = performance.id
if iteration_id in self.iteration_performances:
raise EdwPerformanceAggregationError(
'Attempting to aggregate a duplicate iteration: %s.' % iteration_id
)
self.iteration_performances[iteration_id] = performance
def is_successful(self) -> bool:
"""Check a benchmark's success, only if all the iterations succeed."""
return functools.reduce(
(lambda x, y: x and y),
[
iteration_performance.is_successful(self.expected_queries)
for iteration_performance in self.iteration_performances.values()
],
)
def aggregated_query_status(self, query_name: str) -> bool:
"""Gets the status of query aggregated across all iterations.
A query is considered successful only if
a. Query was executed in every iteration
b. Query was successful in every iteration
Args:
query_name: Name of the query whose aggregated success is requested
Returns:
A boolean value indicating if the query was successful in the benchmark.
"""
for performance in self.iteration_performances.values():
if not performance.has_query_performance(query_name):
return False
if not performance.is_query_successful(query_name):
return False
return True
def aggregated_query_execution_time(self, query_name: str) -> float:
"""Gets the execution time of query aggregated across all iterations.
Args:
query_name: Name of the query whose aggregated performance is requested
Returns:
A float value set to the query's aggregated execution time
Raises:
EdwPerformanceAggregationError: If the query failed in one or more
iterations
"""
if not self.aggregated_query_status(query_name):
raise EdwPerformanceAggregationError(
'Cannot aggregate invalid / failed query ' + query_name
)
query_performances = [
iteration_performance.get_query_performance(query_name)
for iteration_performance in self.iteration_performances.values()
]
return sum(query_performances) / self.total_iterations
def aggregated_query_metadata(self, query_name: str) -> dict[str, Any]:
"""Gets the metadata of a query aggregated across all iterations.
Args:
query_name: Name of the query whose aggregated performance is requested
Returns:
A dictionary set to the query's aggregated metadata, accumulated from the
raw query runs.
Raises:
EdwPerformanceAggregationError: If the query failed in one or more
iterations
"""
if not self.aggregated_query_status(query_name):
raise EdwPerformanceAggregationError(
'Cannot aggregate invalid / failed query ' + query_name
)
result = {}
for (
iteration_id,
iteration_performance,
) in self.iteration_performances.items():
result.update({
iteration_id + '_' + k: v
for (k, v) in iteration_performance.get_query_metadata(
query_name
).items()
})
return result
def get_aggregated_query_performance_sample(
self, query_name: str, metadata: dict[str, Any]
) -> sample.Sample:
"""Gets the performance of query aggregated across all iterations.
Args:
query_name: Name of the query whose aggregated performance is requested
metadata: A dictionary of execution attributes to be merged with the query
execution attributes, for eg. tpc suite, scale of dataset, etc.
Returns:
A sample of the query's aggregated execution time
"""
query_metadata = copy.copy(metadata)
query_metadata['query'] = query_name
query_metadata['aggregation_method'] = 'mean'
perf, exec_status, agg_md = -1.0, EdwQueryExecutionStatus.FAILED, {}
if self.aggregated_query_status(query_name):
perf = self.aggregated_query_execution_time(query_name=query_name)
exec_status = EdwQueryExecutionStatus.SUCCESSFUL
agg_md = self.aggregated_query_metadata(query_name=query_name)
query_metadata['execution_status'] = exec_status
query_metadata.update(agg_md)
return sample.Sample(
'edw_aggregated_query_time', perf, 'seconds', query_metadata
)
def get_all_query_performance_samples(
self, metadata: dict[str, Any]
) -> list[sample.Sample]:
"""Generates samples for all query performances.
Benchmark relies on iteration runs to generate the raw query performance
samples
Benchmark appends the aggregated query performance sample
Args:
metadata: A dictionary of execution attributes to be merged with the query
execution attributes, for eg. tpc suite, scale of dataset, etc.
Returns:
A list of samples (raw and aggregated)
"""
results = []
# Raw query performance samples
for iteration, performance in self.iteration_performances.items():
iteration_metadata = copy.copy(metadata)
iteration_metadata['iteration'] = iteration
results.extend(
performance.get_all_query_performance_samples(iteration_metadata)
)
# Aggregated query performance samples
for query in self.expected_queries:
results.append(
self.get_aggregated_query_performance_sample(
query_name=query, metadata=metadata
)
)
return results
def get_aggregated_wall_time_performance_sample(
self, metadata: dict[str, Any]
) -> sample.Sample:
"""Gets the wall time performance aggregated across all iterations.
Args:
metadata: A dictionary of execution attributes to be merged with the query
execution attributes, for eg. tpc suite, scale of dataset, etc.
Returns:
A sample of aggregated (averaged) wall time.
"""
wall_times = [
iteration.get_wall_time()
for iteration in self.iteration_performances.values()
]
aggregated_wall_time = sum(wall_times) / self.total_iterations
wall_time_metadata = copy.copy(metadata)
wall_time_metadata['aggregation_method'] = 'mean'
return sample.Sample(
'edw_aggregated_wall_time',
aggregated_wall_time,
'seconds',
wall_time_metadata,
)
def get_wall_time_performance_samples(self, metadata: dict[str, Any]):
"""Generates samples for all wall time performances.
Benchmark relies on iterations to generate the raw wall time performance
samples.
Benchmark appends the aggregated wall time performance sample
Args:
metadata: A dictionary of execution attributes to be merged with the query
execution attributes, for eg. tpc suite, scale of dataset, etc.
Returns:
A list of samples (raw and aggregated)
"""
results = []
for iteration, performance in self.iteration_performances.items():
iteration_metadata = copy.copy(metadata)
iteration_metadata['iteration'] = iteration
results.append(
performance.get_wall_time_performance_sample(iteration_metadata)
)
results.append(
self.get_aggregated_wall_time_performance_sample(metadata=metadata)
)
return results
def get_aggregated_geomean_performance_sample(
self, metadata: dict[str, Any]
) -> sample.Sample:
"""Gets the geomean performance aggregated across all iterations.
Args:
metadata: A dictionary of execution attributes to be merged with the query
execution attributes, for eg. tpc suite, scale of dataset, etc.
Returns:
A sample of aggregated geomean
Raises:
EdwPerformanceAggregationError: If the benchmark conatins a failed query
execution.
"""
if not self.is_successful():
raise EdwPerformanceAggregationError('Benchmark contains a failed query.')
aggregated_geo_mean = geometric_mean([
self.aggregated_query_execution_time(query_name=query)
for query in self.expected_queries
])
geomean_metadata = copy.copy(metadata)
geomean_metadata['intra_query_aggregation_method'] = 'mean'
geomean_metadata['inter_query_aggregation_method'] = 'geomean'
return sample.Sample(
'edw_aggregated_geomean',
aggregated_geo_mean,
'seconds',
geomean_metadata,
)
def get_queries_geomean_performance_samples(
self, metadata: dict[str, Any]
) -> list[sample.Sample]:
"""Generates samples for all geomean performances.
Benchmark relies on iteration runs to generate the raw geomean performance
samples
Benchmark appends the aggregated geomean performance sample
Args:
metadata: A dictionary of execution attributes to be merged with the query
execution attributes, for eg. tpc suite, scale of dataset, etc.
Returns:
A list of samples (raw and aggregated)
Raises:
EdwPerformanceAggregationError: If the benchmark conatins a failed query
execution
"""
if not self.is_successful():
raise EdwPerformanceAggregationError('Benchmark contains a failed query.')
results = []
for iteration, performance in self.iteration_performances.items():
iteration_metadata = copy.copy(metadata)
iteration_metadata['iteration'] = iteration
results.append(
performance.get_queries_geomean_performance_sample(
self.expected_queries, iteration_metadata
)
)
results.append(
self.get_aggregated_geomean_performance_sample(metadata=metadata)
)
return results