esrally/reporter.py (915 lines of code) (raw):
# Licensed to Elasticsearch B.V. under one or more contributor
# license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright
# ownership. Elasticsearch B.V. licenses this file to you under
# the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import csv
import io
import logging
import sys
from functools import partial
import tabulate
from esrally import exceptions, metrics, types
from esrally.utils import console, convert
from esrally.utils import io as rio
FINAL_SCORE = r"""
------------------------------------------------------
_______ __ _____
/ ____(_)___ ____ _/ / / ___/_________ ________
/ /_ / / __ \/ __ `/ / \__ \/ ___/ __ \/ ___/ _ \
/ __/ / / / / / /_/ / / ___/ / /__/ /_/ / / / __/
/_/ /_/_/ /_/\__,_/_/ /____/\___/\____/_/ \___/
------------------------------------------------------
"""
def summarize(results, cfg: types.Config):
SummaryReporter(results, cfg).report()
def compare(cfg: types.Config, baseline_id, contender_id):
if not baseline_id or not contender_id:
raise exceptions.SystemSetupError("compare needs baseline and a contender")
race_store = metrics.race_store(cfg)
ComparisonReporter(cfg).report(race_store.find_by_race_id(baseline_id), race_store.find_by_race_id(contender_id))
def print_internal(message):
console.println(message, logger=logging.getLogger(__name__).info)
def print_header(message):
print_internal(console.format.bold(message))
def write_single_report(report_file, report_format, cwd, numbers_align, headers, data_plain, data_rich):
if report_format == "markdown":
formatter = partial(format_as_markdown, numbers_align=numbers_align)
elif report_format == "csv":
formatter = format_as_csv
else:
raise exceptions.SystemSetupError("Unknown report format '%s'" % report_format)
print_internal(formatter(headers, data_rich))
if len(report_file) > 0:
normalized_report_file = rio.normalize_path(report_file, cwd)
# ensure that the parent folder already exists when we try to write the file...
rio.ensure_dir(rio.dirname(normalized_report_file))
with open(normalized_report_file, mode="a+", encoding="utf-8") as f:
f.writelines(formatter(headers, data_plain))
def format_as_markdown(headers, data, numbers_align):
rendered = tabulate.tabulate(data, headers=headers, tablefmt="pipe", numalign=numbers_align, stralign="right")
return rendered + "\n"
def format_as_csv(headers, data):
with io.StringIO() as out:
writer = csv.writer(out)
writer.writerow(headers)
for metric_record in data:
writer.writerow(metric_record)
return out.getvalue()
def disk_usage_fields(stats):
return {
"inverted index": stats.disk_usage_inverted_index,
"stored fields": stats.disk_usage_stored_fields,
"doc values": stats.disk_usage_doc_values,
"points": stats.disk_usage_points,
"norms": stats.disk_usage_norms,
"term vectors": stats.disk_usage_term_vectors,
"total": stats.disk_usage_total,
}
def collate_disk_usage_stats(stats):
collated = {}
for stat, field_stats in disk_usage_fields(stats).items():
for field_stat in field_stats:
collated.setdefault(field_stat["index"], {}).setdefault(field_stat["field"], {})[stat] = field_stat["value"]
return collated
def total_disk_usage_per_field(stats):
totals = []
for field_stat in stats.disk_usage_total:
totals.append([field_stat["index"], field_stat["value"], field_stat["field"]])
return totals
class SummaryReporter:
def __init__(self, results, config: types.Config):
self.results = results
self.report_file = config.opts("reporting", "output.path")
self.report_format = config.opts("reporting", "format")
self.numbers_align = config.opts("reporting", "numbers.align", mandatory=False, default_value="decimal")
reporting_values = config.opts("reporting", "values")
self.report_all_values = reporting_values == "all"
self.report_all_percentile_values = reporting_values == "all-percentiles"
self.show_processing_time = convert.to_bool(config.opts("reporting", "output.processingtime", mandatory=False, default_value=False))
self.cwd = config.opts("node", "rally.cwd")
def report(self):
print_header(FINAL_SCORE)
stats = self.results
warnings = []
metrics_table = []
metrics_table.extend(self._report_totals(stats))
metrics_table.extend(self._report_ml_processing_times(stats))
metrics_table.extend(self._report_gc_metrics(stats))
metrics_table.extend(self._report_disk_usage(stats))
metrics_table.extend(self._report_segment_memory(stats))
metrics_table.extend(self._report_segment_counts(stats))
metrics_table.extend(self._report_transform_stats(stats))
metrics_table.extend(self._report_ingest_pipeline_stats(stats))
metrics_table.extend(self._report_disk_usage_per_field(stats))
for record in stats.op_metrics:
task = record["task"]
metrics_table.extend(self._report_throughput(record, task))
metrics_table.extend(self._report_latency(record, task))
metrics_table.extend(self._report_service_time(record, task))
# this is mostly needed for debugging purposes but not so relevant to end users
if self.show_processing_time:
metrics_table.extend(self._report_processing_time(record, task))
metrics_table.extend(self._report_error_rate(record, task))
self.add_warnings(warnings, record, task)
self.write_report(metrics_table)
if warnings:
for warning in warnings:
console.warn(warning)
def add_warnings(self, warnings, values, op):
if values["error_rate"] > 0:
warnings.append(f"Error rate is {round(values['error_rate'] * 100, 2)} for operation '{op}'. Please check the logs.")
if values["throughput"]["median"] is None:
error_rate = values["error_rate"]
if error_rate:
warnings.append(
"No throughput metrics available for [%s]. Likely cause: Error rate is %.1f%%. Please check the logs."
% (op, error_rate * 100)
)
else:
warnings.append("No throughput metrics available for [%s]. Likely cause: The benchmark ended already during warmup." % op)
def write_report(self, metrics_table):
write_single_report(
self.report_file,
self.report_format,
self.cwd,
self.numbers_align,
headers=["Metric", "Task", "Value", "Unit"],
data_plain=metrics_table,
data_rich=metrics_table,
)
def _report_throughput(self, values, task):
throughput = values["throughput"]
unit = throughput["unit"]
return self._join(
self._line("Min Throughput", task, throughput["min"], unit, lambda v: "%.2f" % v),
self._line("Mean Throughput", task, throughput["mean"], unit, lambda v: "%.2f" % v),
self._line("Median Throughput", task, throughput["median"], unit, lambda v: "%.2f" % v),
self._line("Max Throughput", task, throughput["max"], unit, lambda v: "%.2f" % v),
)
def _report_latency(self, values, task):
return self._report_percentiles("latency", task, values["latency"])
def _report_service_time(self, values, task):
return self._report_percentiles("service time", task, values["service_time"])
def _report_processing_time(self, values, task):
return self._report_percentiles("processing time", task, values["processing_time"])
def _report_percentiles(self, name, task, value):
lines = []
if value:
for percentile in metrics.percentiles_for_sample_size(sys.maxsize):
percentile_value = value.get(metrics.encode_float_key(percentile))
a_line = self._line(
"%sth percentile %s" % (percentile, name), task, percentile_value, "ms", force=self.report_all_percentile_values
)
self._append_non_empty(lines, a_line)
return lines
def _report_error_rate(self, values, task):
return self._join(self._line("error rate", task, values["error_rate"], "%", lambda v: "%.2f" % (v * 100.0)))
def _report_totals(self, stats):
lines = []
lines.extend(self._report_total_time("indexing time", stats.total_time))
lines.extend(self._report_total_time_per_shard("indexing time", stats.total_time_per_shard))
lines.extend(self._report_total_time("indexing throttle time", stats.indexing_throttle_time))
lines.extend(self._report_total_time_per_shard("indexing throttle time", stats.indexing_throttle_time_per_shard))
lines.extend(self._report_total_time("merge time", stats.merge_time))
lines.extend(self._report_total_count("merge count", stats.merge_count))
lines.extend(self._report_total_time_per_shard("merge time", stats.merge_time_per_shard))
lines.extend(self._report_total_time("merge throttle time", stats.merge_throttle_time))
lines.extend(self._report_total_time_per_shard("merge throttle time", stats.merge_throttle_time_per_shard))
lines.extend(self._report_total_time("refresh time", stats.refresh_time))
lines.extend(self._report_total_count("refresh count", stats.refresh_count))
lines.extend(self._report_total_time_per_shard("refresh time", stats.refresh_time_per_shard))
lines.extend(self._report_total_time("flush time", stats.flush_time))
lines.extend(self._report_total_count("flush count", stats.flush_count))
lines.extend(self._report_total_time_per_shard("flush time", stats.flush_time_per_shard))
return lines
def _report_total_time(self, name, total_time):
unit = "min"
return self._join(
self._line(f"Cumulative {name} of primary shards", "", total_time, unit, convert.ms_to_minutes),
)
def _report_total_time_per_shard(self, name, total_time_per_shard):
unit = "min"
return self._join(
self._line(f"Min cumulative {name} across primary shards", "", total_time_per_shard.get("min"), unit, convert.ms_to_minutes),
self._line(
f"Median cumulative {name} across primary shards",
"",
total_time_per_shard.get("median"),
unit,
convert.ms_to_minutes,
),
self._line(f"Max cumulative {name} across primary shards", "", total_time_per_shard.get("max"), unit, convert.ms_to_minutes),
)
def _report_total_count(self, name, total_count):
return self._join(
self._line(f"Cumulative {name} of primary shards", "", total_count, ""),
)
def _report_ml_processing_times(self, stats):
lines = []
for processing_time in stats.ml_processing_time:
job_name = processing_time["job"]
unit = processing_time["unit"]
lines.append(self._line("Min ML processing time", job_name, processing_time["min"], unit))
lines.append(self._line("Mean ML processing time", job_name, processing_time["mean"], unit))
lines.append(self._line("Median ML processing time", job_name, processing_time["median"], unit))
lines.append(self._line("Max ML processing time", job_name, processing_time["max"], unit))
return lines
def _report_gc_metrics(self, stats):
return self._join(
self._line("Total Young Gen GC time", "", stats.young_gc_time, "s", convert.ms_to_seconds),
self._line("Total Young Gen GC count", "", stats.young_gc_count, ""),
self._line("Total Old Gen GC time", "", stats.old_gc_time, "s", convert.ms_to_seconds),
self._line("Total Old Gen GC count", "", stats.old_gc_count, ""),
self._line("Total ZGC Cycles GC time", "", stats.zgc_cycles_gc_time, "s", convert.ms_to_seconds),
self._line("Total ZGC Cycles GC count", "", stats.zgc_cycles_gc_count, ""),
self._line("Total ZGC Pauses GC time", "", stats.zgc_pauses_gc_time, "s", convert.ms_to_seconds),
self._line("Total ZGC Pauses GC count", "", stats.zgc_pauses_gc_count, ""),
)
def _report_disk_usage(self, stats):
return self._join(
self._line("Dataset size", "", stats.dataset_size, "GB", convert.bytes_to_gb),
self._line("Store size", "", stats.store_size, "GB", convert.bytes_to_gb),
self._line("Translog size", "", stats.translog_size, "GB", convert.bytes_to_gb),
)
def _report_segment_memory(self, stats):
unit = "MB"
return self._join(
self._line("Heap used for segments", "", stats.memory_segments, unit, convert.bytes_to_mb),
self._line("Heap used for doc values", "", stats.memory_doc_values, unit, convert.bytes_to_mb),
self._line("Heap used for terms", "", stats.memory_terms, unit, convert.bytes_to_mb),
self._line("Heap used for norms", "", stats.memory_norms, unit, convert.bytes_to_mb),
self._line("Heap used for points", "", stats.memory_points, unit, convert.bytes_to_mb),
self._line("Heap used for stored fields", "", stats.memory_stored_fields, unit, convert.bytes_to_mb),
)
def _report_segment_counts(self, stats):
return self._join(self._line("Segment count", "", stats.segment_count, ""))
def _report_transform_stats(self, stats):
lines = []
for processing_time in stats.total_transform_processing_times:
lines.append(self._line("Transform processing time", processing_time["id"], processing_time["mean"], processing_time["unit"]))
for index_time in stats.total_transform_index_times:
lines.append(self._line("Transform indexing time", index_time["id"], index_time["mean"], index_time["unit"]))
for search_time in stats.total_transform_search_times:
lines.append(self._line("Transform search time", search_time["id"], search_time["mean"], search_time["unit"]))
for throughput in stats.total_transform_throughput:
lines.append(self._line("Transform throughput", throughput["id"], throughput["mean"], throughput["unit"]))
return lines
def _report_ingest_pipeline_stats(self, stats):
return self._join(
self._line("Total Ingest Pipeline count", "", stats.ingest_pipeline_cluster_count, ""),
self._line("Total Ingest Pipeline time", "", stats.ingest_pipeline_cluster_time, "s", convert.ms_to_seconds),
self._line("Total Ingest Pipeline failed", "", stats.ingest_pipeline_cluster_failed, ""),
)
def _report_disk_usage_per_field(self, stats):
collated = collate_disk_usage_stats(stats)
lines = []
for index, _total, field in sorted(total_disk_usage_per_field(stats)):
for stat, value in collated[index][field].items():
lines.append(
self._line(f"{index} {field} {stat}", "", value, convert.bytes_to_human_unit(value), convert.bytes_to_human_value)
)
return lines
def _join(self, *args):
lines = []
for arg in args:
self._append_non_empty(lines, arg)
return lines
def _append_non_empty(self, lines, line):
if line and len(line) > 0:
lines.append(line)
def _line(self, k, task, v, unit, converter=lambda x: x, force=False):
if v is not None or force or self.report_all_values:
u = unit if v is not None else None
return [k, task, converter(v), u]
else:
return []
class ComparisonReporter:
def __init__(self, config: types.Config):
self.report_file = config.opts("reporting", "output.path")
self.report_format = config.opts("reporting", "format")
self.numbers_align = config.opts("reporting", "numbers.align", mandatory=False, default_value="decimal")
self.cwd = config.opts("node", "rally.cwd")
self.show_processing_time = convert.to_bool(config.opts("reporting", "output.processingtime", mandatory=False, default_value=False))
self.plain = False
def report(self, r1, r2):
# we don't verify anything about the races as it is possible that the user benchmarks two different tracks intentionally
baseline_stats = metrics.GlobalStats(r1.results)
contender_stats = metrics.GlobalStats(r2.results)
print_internal("")
print_internal("Comparing baseline")
print_internal(" Race ID: %s" % r1.race_id)
print_internal(" Race timestamp: %s" % r1.race_timestamp)
if r1.challenge_name:
print_internal(" Challenge: %s" % r1.challenge_name)
print_internal(" Car: %s" % r1.car_name)
if r1.user_tags:
r1_user_tags = ", ".join(["%s=%s" % (k, v) for k, v in sorted(r1.user_tags.items())])
print_internal(" User tags: %s" % r1_user_tags)
print_internal("")
print_internal("with contender")
print_internal(" Race ID: %s" % r2.race_id)
print_internal(" Race timestamp: %s" % r2.race_timestamp)
if r2.challenge_name:
print_internal(" Challenge: %s" % r2.challenge_name)
print_internal(" Car: %s" % r2.car_name)
if r2.user_tags:
r2_user_tags = ", ".join(["%s=%s" % (k, v) for k, v in sorted(r2.user_tags.items())])
print_internal(" User tags: %s" % r2_user_tags)
print_header(FINAL_SCORE)
metric_table_plain = self._metrics_table(baseline_stats, contender_stats, plain=True)
metric_table_rich = self._metrics_table(baseline_stats, contender_stats, plain=False)
# Writes metric_table_rich to console, writes metric_table_plain to file
self._write_report(metric_table_plain, metric_table_rich)
def _metrics_table(self, baseline_stats, contender_stats, plain):
self.plain = plain
metrics_table = []
metrics_table.extend(self._report_total_times(baseline_stats, contender_stats))
metrics_table.extend(self._report_ml_processing_times(baseline_stats, contender_stats))
metrics_table.extend(self._report_gc_metrics(baseline_stats, contender_stats))
metrics_table.extend(self._report_disk_usage(baseline_stats, contender_stats))
metrics_table.extend(self._report_segment_memory(baseline_stats, contender_stats))
metrics_table.extend(self._report_segment_counts(baseline_stats, contender_stats))
metrics_table.extend(self._report_transform_processing_times(baseline_stats, contender_stats))
metrics_table.extend(self._report_ingest_pipeline_counts(baseline_stats, contender_stats))
metrics_table.extend(self._report_ingest_pipeline_times(baseline_stats, contender_stats))
metrics_table.extend(self._report_ingest_pipeline_failed(baseline_stats, contender_stats))
# Skip disk usage stats comparison if the disk_usage_total field does not exist
if baseline_stats.disk_usage_total and contender_stats.disk_usage_total:
metrics_table.extend(self._report_disk_usage_stats_per_field(baseline_stats, contender_stats))
for t in baseline_stats.tasks():
if t in contender_stats.tasks():
metrics_table.extend(self._report_throughput(baseline_stats, contender_stats, t))
metrics_table.extend(self._report_latency(baseline_stats, contender_stats, t))
metrics_table.extend(self._report_service_time(baseline_stats, contender_stats, t))
if self.show_processing_time:
metrics_table.extend(self._report_processing_time(baseline_stats, contender_stats, t))
metrics_table.extend(self._report_error_rate(baseline_stats, contender_stats, t))
return metrics_table
def _write_report(self, metrics_table, metrics_table_console):
write_single_report(
self.report_file,
self.report_format,
self.cwd,
self.numbers_align,
headers=["Metric", "Task", "Baseline", "Contender", "Diff", "Unit", "Diff %"],
data_plain=metrics_table,
data_rich=metrics_table_console,
)
def _report_throughput(self, baseline_stats, contender_stats, task):
b_min = baseline_stats.metrics(task)["throughput"]["min"]
b_mean = baseline_stats.metrics(task)["throughput"]["mean"]
b_median = baseline_stats.metrics(task)["throughput"]["median"]
b_max = baseline_stats.metrics(task)["throughput"]["max"]
b_unit = baseline_stats.metrics(task)["throughput"]["unit"]
c_min = contender_stats.metrics(task)["throughput"]["min"]
c_mean = contender_stats.metrics(task)["throughput"]["mean"]
c_median = contender_stats.metrics(task)["throughput"]["median"]
c_max = contender_stats.metrics(task)["throughput"]["max"]
return self._join(
self._line("Min Throughput", b_min, c_min, task, b_unit, treat_increase_as_improvement=True),
self._line("Mean Throughput", b_mean, c_mean, task, b_unit, treat_increase_as_improvement=True),
self._line("Median Throughput", b_median, c_median, task, b_unit, treat_increase_as_improvement=True),
self._line("Max Throughput", b_max, c_max, task, b_unit, treat_increase_as_improvement=True),
)
def _report_latency(self, baseline_stats, contender_stats, task):
baseline_latency = baseline_stats.metrics(task)["latency"]
contender_latency = contender_stats.metrics(task)["latency"]
return self._report_percentiles("latency", task, baseline_latency, contender_latency)
def _report_service_time(self, baseline_stats, contender_stats, task):
baseline_service_time = baseline_stats.metrics(task)["service_time"]
contender_service_time = contender_stats.metrics(task)["service_time"]
return self._report_percentiles("service time", task, baseline_service_time, contender_service_time)
def _report_processing_time(self, baseline_stats, contender_stats, task):
baseline_processing_time = baseline_stats.metrics(task)["processing_time"]
contender_processing_time = contender_stats.metrics(task)["processing_time"]
return self._report_percentiles("processing time", task, baseline_processing_time, contender_processing_time)
def _report_percentiles(self, name, task, baseline_values, contender_values):
lines = []
for percentile in metrics.percentiles_for_sample_size(sys.maxsize):
baseline_value = baseline_values.get(metrics.encode_float_key(percentile))
contender_value = contender_values.get(metrics.encode_float_key(percentile))
self._append_non_empty(
lines,
self._line(
"%sth percentile %s" % (percentile, name),
baseline_value,
contender_value,
task,
"ms",
treat_increase_as_improvement=False,
),
)
return lines
def _report_error_rate(self, baseline_stats, contender_stats, task):
baseline_error_rate = baseline_stats.metrics(task)["error_rate"]
contender_error_rate = contender_stats.metrics(task)["error_rate"]
return self._join(
self._line(
"error rate",
baseline_error_rate,
contender_error_rate,
task,
"%",
treat_increase_as_improvement=False,
formatter=convert.factor(100.0),
)
)
def _report_ml_processing_times(self, baseline_stats, contender_stats):
lines = []
for baseline in baseline_stats.ml_processing_time:
job_name = baseline["job"]
unit = baseline["unit"]
# O(n^2) but we assume here only a *very* limited number of jobs (usually just one)
for contender in contender_stats.ml_processing_time:
if contender["job"] == job_name:
lines.append(
self._line(
"Min ML processing time", baseline["min"], contender["min"], job_name, unit, treat_increase_as_improvement=False
)
)
lines.append(
self._line(
"Mean ML processing time",
baseline["mean"],
contender["mean"],
job_name,
unit,
treat_increase_as_improvement=False,
)
)
lines.append(
self._line(
"Median ML processing time",
baseline["median"],
contender["median"],
job_name,
unit,
treat_increase_as_improvement=False,
)
)
lines.append(
self._line(
"Max ML processing time", baseline["max"], contender["max"], job_name, unit, treat_increase_as_improvement=False
)
)
return lines
def _report_transform_processing_times(self, baseline_stats, contender_stats):
lines = []
if baseline_stats.total_transform_processing_times is None:
return lines
for baseline in baseline_stats.total_transform_processing_times:
transform_id = baseline["id"]
for contender in contender_stats.total_transform_processing_times:
if contender["id"] == transform_id:
lines.append(
self._line(
"Transform processing time",
baseline["mean"],
contender["mean"],
transform_id,
baseline["unit"],
treat_increase_as_improvement=False,
)
)
for baseline in baseline_stats.total_transform_index_times:
transform_id = baseline["id"]
for contender in contender_stats.total_transform_index_times:
if contender["id"] == transform_id:
lines.append(
self._line(
"Transform indexing time",
baseline["mean"],
contender["mean"],
transform_id,
baseline["unit"],
treat_increase_as_improvement=False,
)
)
for baseline in baseline_stats.total_transform_search_times:
transform_id = baseline["id"]
for contender in contender_stats.total_transform_search_times:
if contender["id"] == transform_id:
lines.append(
self._line(
"Transform search time",
baseline["mean"],
contender["mean"],
transform_id,
baseline["unit"],
treat_increase_as_improvement=False,
)
)
for baseline in baseline_stats.total_transform_throughput:
transform_id = baseline["id"]
for contender in contender_stats.total_transform_throughput:
if contender["id"] == transform_id:
lines.append(
self._line(
"Transform throughput",
baseline["mean"],
contender["mean"],
transform_id,
baseline["unit"],
treat_increase_as_improvement=True,
)
)
return lines
def _report_ingest_pipeline_counts(self, baseline_stats, contender_stats):
if baseline_stats.ingest_pipeline_cluster_count is None:
return []
return self._join(
self._line(
"Total Ingest Pipeline count",
baseline_stats.ingest_pipeline_cluster_count,
contender_stats.ingest_pipeline_cluster_count,
"",
"",
treat_increase_as_improvement=False,
)
)
def _report_ingest_pipeline_times(self, baseline_stats, contender_stats):
if baseline_stats.ingest_pipeline_cluster_time is None:
return []
return self._join(
self._line(
"Total Ingest Pipeline time",
baseline_stats.ingest_pipeline_cluster_time,
contender_stats.ingest_pipeline_cluster_time,
"",
"ms",
treat_increase_as_improvement=False,
)
)
def _report_ingest_pipeline_failed(self, baseline_stats, contender_stats):
if baseline_stats.ingest_pipeline_cluster_failed is None:
return []
return self._join(
self._line(
"Total Ingest Pipeline failed",
baseline_stats.ingest_pipeline_cluster_failed,
contender_stats.ingest_pipeline_cluster_failed,
"",
"",
treat_increase_as_improvement=False,
)
)
def _report_disk_usage_stats_per_field(self, baseline_stats, contender_stats):
best = {}
for index, total, field in total_disk_usage_per_field(baseline_stats):
best.setdefault(index, {})[field] = total
collated_baseline = collate_disk_usage_stats(baseline_stats)
for index, total, field in total_disk_usage_per_field(contender_stats):
for_idx = best.setdefault(index, {})
prev = for_idx.get(field, 0)
if prev < total:
for_idx[field] = total
collated_contender = collate_disk_usage_stats(contender_stats)
totals = []
for index, for_idx in best.items():
for field, total in for_idx.items():
totals.append([index, total, field])
totals.sort()
lines = []
for index, _total, field in totals:
for stat in disk_usage_fields(baseline_stats):
if index in collated_baseline and index in collated_contender:
baseline_value = collated_baseline[index].get(field, {}).get(stat, 0)
contender_value = collated_contender[index].get(field, {}).get(stat, 0)
if baseline_value == 0 and contender_value == 0:
continue
else:
continue
unit = convert.bytes_to_human_unit(min(baseline_value, contender_value))
lines.append(
self._line(
f"{index} {field} {stat}",
baseline_value,
contender_value,
"",
unit,
treat_increase_as_improvement=False,
formatter=partial(convert.bytes_to_unit, unit),
)
)
return lines
def _report_total_times(self, baseline_stats, contender_stats):
lines = []
lines.extend(
self._report_total_time(
"indexing time",
baseline_stats.total_time,
contender_stats.total_time,
)
)
lines.extend(
self._report_total_time_per_shard(
"indexing time",
baseline_stats.total_time_per_shard,
contender_stats.total_time_per_shard,
)
)
lines.extend(
self._report_total_time(
"indexing throttle time",
baseline_stats.indexing_throttle_time,
contender_stats.indexing_throttle_time,
)
)
lines.extend(
self._report_total_time_per_shard(
"indexing throttle time",
baseline_stats.indexing_throttle_time_per_shard,
contender_stats.indexing_throttle_time_per_shard,
)
)
lines.extend(
self._report_total_time(
"merge time",
baseline_stats.merge_time,
contender_stats.merge_time,
)
)
lines.extend(
self._report_total_count(
"merge count",
baseline_stats.merge_count,
contender_stats.merge_count,
)
)
lines.extend(
self._report_total_time_per_shard(
"merge time",
baseline_stats.merge_time_per_shard,
contender_stats.merge_time_per_shard,
)
)
lines.extend(
self._report_total_time(
"merge throttle time",
baseline_stats.merge_throttle_time,
contender_stats.merge_throttle_time,
)
)
lines.extend(
self._report_total_time_per_shard(
"merge throttle time",
baseline_stats.merge_throttle_time_per_shard,
contender_stats.merge_throttle_time_per_shard,
)
)
lines.extend(
self._report_total_time(
"refresh time",
baseline_stats.refresh_time,
contender_stats.refresh_time,
)
)
lines.extend(
self._report_total_count(
"refresh count",
baseline_stats.refresh_count,
contender_stats.refresh_count,
)
)
lines.extend(
self._report_total_time_per_shard(
"refresh time",
baseline_stats.refresh_time_per_shard,
contender_stats.refresh_time_per_shard,
)
)
lines.extend(
self._report_total_time(
"flush time",
baseline_stats.flush_time,
contender_stats.flush_time,
)
)
lines.extend(
self._report_total_count(
"flush count",
baseline_stats.flush_count,
contender_stats.flush_count,
)
)
lines.extend(
self._report_total_time_per_shard(
"flush time",
baseline_stats.flush_time_per_shard,
contender_stats.flush_time_per_shard,
)
)
return lines
def _report_total_time(self, name, baseline_total, contender_total):
unit = "min"
return self._join(
self._line(
f"Cumulative {name} of primary shards",
baseline_total,
contender_total,
"",
unit,
treat_increase_as_improvement=False,
formatter=convert.ms_to_minutes,
),
)
def _report_total_time_per_shard(self, name, baseline_per_shard, contender_per_shard):
unit = "min"
return self._join(
self._line(
f"Min cumulative {name} across primary shard",
baseline_per_shard.get("min"),
contender_per_shard.get("min"),
"",
unit,
treat_increase_as_improvement=False,
formatter=convert.ms_to_minutes,
),
self._line(
f"Median cumulative {name} across primary shard",
baseline_per_shard.get("median"),
contender_per_shard.get("median"),
"",
unit,
treat_increase_as_improvement=False,
formatter=convert.ms_to_minutes,
),
self._line(
f"Max cumulative {name} across primary shard",
baseline_per_shard.get("max"),
contender_per_shard.get("max"),
"",
unit,
treat_increase_as_improvement=False,
formatter=convert.ms_to_minutes,
),
)
def _report_total_count(self, name, baseline_total, contender_total):
return self._join(
self._line(f"Cumulative {name} of primary shards", baseline_total, contender_total, "", "", treat_increase_as_improvement=False)
)
def _report_gc_metrics(self, baseline_stats, contender_stats):
line_method = self._line
def _time_metric(metric_prefix, description):
return line_method(
f"Total {description} GC time",
getattr(baseline_stats, f"{metric_prefix}_gc_time"),
getattr(contender_stats, f"{metric_prefix}_gc_time"),
"",
"s",
treat_increase_as_improvement=False,
formatter=convert.ms_to_seconds,
)
def _count_metric(metric_prefix, description):
return line_method(
f"Total {description} GC count",
getattr(baseline_stats, f"{metric_prefix}_gc_count"),
getattr(contender_stats, f"{metric_prefix}_gc_count"),
"",
"",
treat_increase_as_improvement=False,
)
return self._join(
_time_metric("young", "Young Gen"),
_count_metric("young", "Young Gen"),
_time_metric("old", "Old Gen"),
_count_metric("old", "Old Gen"),
_time_metric("zgc_cycles", "ZGC Cycles"),
_count_metric("zgc_cycles", "ZGC Cycles"),
_time_metric("zgc_pauses", "ZGC Pauses"),
_count_metric("zgc_pauses", "ZGC Pauses"),
)
def _report_disk_usage(self, baseline_stats, contender_stats):
return self._join(
self._line(
"Dataset size",
baseline_stats.dataset_size,
contender_stats.dataset_size,
"",
"GB",
treat_increase_as_improvement=False,
formatter=convert.bytes_to_gb,
),
self._line(
"Store size",
baseline_stats.store_size,
contender_stats.store_size,
"",
"GB",
treat_increase_as_improvement=False,
formatter=convert.bytes_to_gb,
),
self._line(
"Translog size",
baseline_stats.translog_size,
contender_stats.translog_size,
"",
"GB",
treat_increase_as_improvement=False,
formatter=convert.bytes_to_gb,
),
)
def _report_segment_memory(self, baseline_stats, contender_stats):
return self._join(
self._line(
"Heap used for segments",
baseline_stats.memory_segments,
contender_stats.memory_segments,
"",
"MB",
treat_increase_as_improvement=False,
formatter=convert.bytes_to_mb,
),
self._line(
"Heap used for doc values",
baseline_stats.memory_doc_values,
contender_stats.memory_doc_values,
"",
"MB",
treat_increase_as_improvement=False,
formatter=convert.bytes_to_mb,
),
self._line(
"Heap used for terms",
baseline_stats.memory_terms,
contender_stats.memory_terms,
"",
"MB",
treat_increase_as_improvement=False,
formatter=convert.bytes_to_mb,
),
self._line(
"Heap used for norms",
baseline_stats.memory_norms,
contender_stats.memory_norms,
"",
"MB",
treat_increase_as_improvement=False,
formatter=convert.bytes_to_mb,
),
self._line(
"Heap used for points",
baseline_stats.memory_points,
contender_stats.memory_points,
"",
"MB",
treat_increase_as_improvement=False,
formatter=convert.bytes_to_mb,
),
self._line(
"Heap used for stored fields",
baseline_stats.memory_stored_fields,
contender_stats.memory_stored_fields,
"",
"MB",
treat_increase_as_improvement=False,
formatter=convert.bytes_to_mb,
),
)
def _report_segment_counts(self, baseline_stats, contender_stats):
return self._join(
self._line(
"Segment count",
baseline_stats.segment_count,
contender_stats.segment_count,
"",
"",
treat_increase_as_improvement=False,
)
)
def _join(self, *args):
lines = []
for arg in args:
self._append_non_empty(lines, arg)
return lines
def _append_non_empty(self, lines, line):
if line and len(line) > 0:
lines.append(line)
def _line(self, metric, baseline, contender, task, unit, treat_increase_as_improvement, formatter=lambda x: x):
if baseline is not None and contender is not None:
return [
metric,
str(task),
formatter(baseline),
formatter(contender),
self._diff(baseline, contender, treat_increase_as_improvement, formatter),
unit,
self._diff(baseline, contender, treat_increase_as_improvement, formatter, as_percentage=True),
]
else:
return []
def _diff(self, baseline, contender, treat_increase_as_improvement, formatter=lambda x: x, as_percentage=False):
def identity(x):
return x
def _safe_divide(n, d):
return n / d if d else 0
if self.plain:
color_greater = identity
color_smaller = identity
color_neutral = identity
elif treat_increase_as_improvement:
color_greater = console.format.green
color_smaller = console.format.red
color_neutral = console.format.neutral
else:
color_greater = console.format.red
color_smaller = console.format.green
color_neutral = console.format.neutral
if as_percentage:
diff = _safe_divide(contender - baseline, baseline) * 100.0
precision = 2
suffix = "%"
else:
diff = formatter(contender - baseline)
precision = 5
suffix = ""
# ensures that numbers that appear as "zero" are also colored neutrally
threshold = 10**-precision
formatted = f"{diff:.{precision}f}{suffix}"
if diff >= threshold:
return color_greater(f"+{formatted}")
elif diff <= -threshold:
return color_smaller(formatted)
else:
return color_neutral(formatted)