treeherder/etl/perf.py (260 lines of code) (raw):

import copy import logging from datetime import datetime from hashlib import sha1 import simplejson as json from django.conf import settings from treeherder.log_parser.utils import validate_perf_data from treeherder.model.models import Job, OptionCollection, Repository from treeherder.perf.models import ( MultiCommitDatum, PerformanceDatum, PerformanceDatumReplicate, PerformanceFramework, PerformanceSignature, ) from treeherder.perf.tasks import generate_alerts logger = logging.getLogger(__name__) def _get_application_name(validated_perf_datum: dict): try: return validated_perf_datum["application"]["name"] except KeyError: return "" def _get_application_version(validated_perf_datum: dict): try: return validated_perf_datum["application"]["version"] except KeyError: return "" def _get_signature_hash(signature_properties): signature_prop_values = list(signature_properties.keys()) str_values = [] for value in signature_properties.values(): if not isinstance(value, str): str_values.append(json.dumps(value, sort_keys=True)) else: str_values.append(value) signature_prop_values.extend(str_values) sha = sha1() sha.update("".join(map(str, sorted(signature_prop_values))).encode("utf-8")) return sha.hexdigest() def _order_and_concat(words: list) -> str: return " ".join(sorted(words)) def _create_or_update_signature(repository, signature_hash, framework, application, defaults): signature, created = PerformanceSignature.objects.get_or_create( repository=repository, signature_hash=signature_hash, framework=framework, application=application, defaults=defaults, ) if not created: if signature.last_updated > defaults["last_updated"]: defaults["last_updated"] = signature.last_updated signature, _ = PerformanceSignature.objects.update_or_create( repository=repository, signature_hash=signature_hash, framework=framework, application=application, defaults=defaults, ) return signature def _deduce_push_timestamp(perf_datum: dict, job_push_time: datetime) -> tuple[datetime, bool]: is_multi_commit = False if not settings.PERFHERDER_ENABLE_MULTIDATA_INGESTION: # the old way of ingestion return job_push_time, is_multi_commit multidata_timestamp = perf_datum.get("pushTimestamp", None) if multidata_timestamp: multidata_timestamp = datetime.fromtimestamp(multidata_timestamp) is_multi_commit = True return (multidata_timestamp or job_push_time), is_multi_commit def _suite_should_alert_based_on( signature: PerformanceSignature, job: Job, new_datum_ingested: bool ) -> bool: return ( signature.should_alert is not False and new_datum_ingested and job.repository.performance_alerts_enabled and job.tier_is_sheriffable ) def _test_should_alert_based_on( signature: PerformanceSignature, job: Job, new_datum_ingested: bool, suite: dict ) -> bool: """ By default if there is no summary, we should schedule a generate alerts task for the subtest, since we have new data (this can be over-ridden by the optional "should alert" property) """ return ( (signature.should_alert or (signature.should_alert is None and suite.get("value") is None)) and new_datum_ingested and job.repository.performance_alerts_enabled and job.tier_is_sheriffable ) def _test_should_gather_replicates_based_on( repository: Repository, suite_name: str, replicates: list | None ) -> bool: """ Determine if we should gather/ingest replicates. Currently, it's only available on the try branch. Some tests also don't have replicates available as it's not a required field in our performance artifact schema. Replicates are also gathered for speedometer3 tests from mozilla-central. """ if replicates and len(replicates) > 0: if repository.name in ("try",): return True elif repository.name in ("mozilla-central",): if suite_name == "speedometer3": return True elif "applink-startup" in suite_name: return True return False def _load_perf_datum(job: Job, perf_datum: dict): validate_perf_data(perf_datum) extra_properties = {} reference_data = { "option_collection_hash": job.signature.option_collection_hash, "machine_platform": job.signature.machine_platform, } option_collection = OptionCollection.objects.get( option_collection_hash=job.signature.option_collection_hash ) try: framework = PerformanceFramework.objects.get(name=perf_datum["framework"]["name"]) except PerformanceFramework.DoesNotExist: if perf_datum["framework"]["name"] == "job_resource_usage": return logger.warning( f"Performance framework {perf_datum['framework']['name']} does not exist, skipping load of performance artifacts" ) return if not framework.enabled: logger.info( f"Performance framework {perf_datum['framework']['name']} is not enabled, skipping" ) return application = _get_application_name(perf_datum) application_version = _get_application_version(perf_datum) for suite in perf_datum["suites"]: suite_extra_properties = copy.copy(extra_properties) ordered_tags = _order_and_concat(suite.get("tags", [])) deduced_timestamp, is_multi_commit = _deduce_push_timestamp(perf_datum, job.push.time) suite_extra_options = "" if suite.get("extraOptions"): suite_extra_properties = {"test_options": sorted(suite["extraOptions"])} suite_extra_options = _order_and_concat(suite["extraOptions"]) summary_signature_hash = None # if we have a summary value, create or get its signature by all its subtest # properties. if suite.get("value") is not None: # summary series summary_properties = {"suite": suite["name"]} summary_properties.update(reference_data) summary_properties.update(suite_extra_properties) summary_signature_hash = _get_signature_hash(summary_properties) signature = _create_or_update_signature( job.repository, summary_signature_hash, framework, application, { "test": "", "suite": suite["name"], "suite_public_name": suite.get("publicName"), "option_collection": option_collection, "platform": job.machine_platform, "tags": ordered_tags, "extra_options": suite_extra_options, "measurement_unit": suite.get("unit"), "lower_is_better": suite.get("lowerIsBetter", True), "has_subtests": True, # these properties below can be either True, False, or null # (None). Null indicates no preference has been set. "should_alert": suite.get("shouldAlert"), "alert_change_type": PerformanceSignature._get_alert_change_type( suite.get("alertChangeType") ), "alert_threshold": suite.get("alertThreshold"), "min_back_window": suite.get("minBackWindow"), "max_back_window": suite.get("maxBackWindow"), "fore_window": suite.get("foreWindow"), "last_updated": job.push.time, }, ) (suite_datum, datum_created) = PerformanceDatum.objects.get_or_create( repository=job.repository, job=job, push=job.push, signature=signature, push_timestamp=deduced_timestamp, defaults={"value": suite["value"], "application_version": application_version}, ) if suite_datum.should_mark_as_multi_commit(is_multi_commit, datum_created): # keep a register with all multi commit perf data MultiCommitDatum.objects.create(perf_datum=suite_datum) if _suite_should_alert_based_on(signature, job, datum_created): generate_alerts.apply_async(args=[signature.id], queue="generate_perf_alerts") for subtest in suite["subtests"]: subtest_properties = {"suite": suite["name"], "test": subtest["name"]} subtest_properties.update(reference_data) subtest_properties.update(suite_extra_properties) summary_signature = None if summary_signature_hash is not None: subtest_properties.update({"parent_signature": summary_signature_hash}) summary_signature = PerformanceSignature.objects.get( repository=job.repository, framework=framework, signature_hash=summary_signature_hash, application=application, ) subtest_signature_hash = _get_signature_hash(subtest_properties) value = list( subtest["value"] for subtest in suite["subtests"] if subtest["name"] == subtest_properties["test"] ) signature = _create_or_update_signature( job.repository, subtest_signature_hash, framework, application, { "test": subtest_properties["test"], "suite": suite["name"], "test_public_name": subtest.get("publicName"), "suite_public_name": suite.get("publicName"), "option_collection": option_collection, "platform": job.machine_platform, "tags": ordered_tags, "extra_options": suite_extra_options, "measurement_unit": subtest.get("unit"), "lower_is_better": subtest.get("lowerIsBetter", True), "has_subtests": False, # these properties below can be either True, False, or # null (None). Null indicates no preference has been # set. "should_alert": subtest.get("shouldAlert"), "alert_change_type": PerformanceSignature._get_alert_change_type( subtest.get("alertChangeType") ), "alert_threshold": subtest.get("alertThreshold"), "min_back_window": subtest.get("minBackWindow"), "max_back_window": subtest.get("maxBackWindow"), "fore_window": subtest.get("foreWindow"), "parent_signature": summary_signature, "last_updated": job.push.time, }, ) (subtest_datum, datum_created) = PerformanceDatum.objects.get_or_create( repository=job.repository, job=job, push=job.push, signature=signature, push_timestamp=deduced_timestamp, defaults={"value": value[0], "application_version": application_version}, ) if _test_should_gather_replicates_based_on( job.repository, suite["name"], subtest.get("replicates", []) ): try: # Add the replicates to the PerformanceDatumReplicate table, and # catch and ignore any exceptions that are produced here so we don't # impact the standard workflow PerformanceDatumReplicate.objects.bulk_create( [ PerformanceDatumReplicate( value=replicate, performance_datum=subtest_datum ) for replicate in subtest["replicates"] ] ) except Exception as e: logger.info(f"Failed to ingest replicates for datum {subtest_datum}: {e}") if subtest_datum.should_mark_as_multi_commit(is_multi_commit, datum_created): # keep a register with all multi commit perf data MultiCommitDatum.objects.create(perf_datum=subtest_datum) if _test_should_alert_based_on(signature, job, datum_created, suite): generate_alerts.apply_async(args=[signature.id], queue="generate_perf_alerts") def store_performance_artifact(job, artifact): blob = json.loads(artifact["blob"]) performance_data = blob["performance_data"] if isinstance(performance_data, list): for perfdatum in performance_data: _load_perf_datum(job, perfdatum) else: _load_perf_datum(job, performance_data)