mozetl/symbolication/top_signatures_correlations.py (142 lines of code) (raw):

# Migrated from Databricks to run on dataproc # pip install: # boto3==1.16.20 # scipy==1.5.4 import argparse import hashlib import os import sys from collections import defaultdict from datetime import datetime from pyspark import SparkContext from pyspark.sql import SparkSession from google.cloud import storage sys.path += [os.path.abspath("."), os.path.abspath("crashcorrelations")] os.system("git clone https://github.com/marco-c/crashcorrelations.git") os.system("pip download stemming==1.0.1") os.system("tar xf stemming-1.0.1.tar.gz") sc = SparkContext.getOrCreate() spark = SparkSession.builder.appName("modules-with-missing-symbols").getOrCreate() gcs_client = storage.Client() sc.addPyFile("stemming-1.0.1/stemming/porter2.py") # Number of top signatures to look at TOP_SIGNATURE_COUNT = 200 # Number of days to look at to figure out top signatures TOP_SIGNATURE_PERIOD_DAYS = 5 # Number of days to look at for telemetry crash data TELEMETRY_CRASHES_PERIOD_DAYS = 30 # Name of the GCS bucket where results are stored RESULTS_GCS_BUCKET = "moz-fx-data-static-websit-8565-analysis-output" from crashcorrelations import ( # noqa E402 utils, download_data, crash_deviations, comments, ) # workaround airflow not able to different schedules for tasks in a dag def parse_args(): parser = argparse.ArgumentParser() parser.add_argument( "--run-on-days", nargs="+", type=int, required=True, help="Only run job on given days (0 is sunday)", ) parser.add_argument( "--date", type=datetime.fromisoformat, default=datetime.utcnow(), help="Run date, defaults to current dat", ) return parser.parse_args() def remove_results_gcs(job_name): bucket = gcs_client.bucket(RESULTS_GCS_BUCKET) for key in bucket.list_blobs(prefix=job_name + "/data/"): key.delete() def upload_results_gcs(job_name, directory): bucket = gcs_client.bucket(RESULTS_GCS_BUCKET) for root, dirs, files in os.walk(directory): for name in files: full_path = os.path.join(root, name) blob = bucket.blob( "{}/data/{}".format( job_name, full_path[len(directory) + 1 :] # noqa E203 ) ) blob.content_encoding = "gzip" blob.upload_from_filename(full_path, content_type="application/json") args = parse_args() if args.date.isoweekday() % 7 not in args.run_on_days: print( f"Skipping because run date day of week" f" {args.date} is not in {args.run_on_days}" ) sys.exit(0) print(datetime.utcnow()) channels = ["release", "beta", "nightly", "esr"] channel_to_versions = {} for channel in channels: channel_to_versions[channel] = download_data.get_versions(channel) signatures = {} for channel in channels: signatures[channel] = download_data.get_top( TOP_SIGNATURE_COUNT, versions=channel_to_versions[channel], days=TOP_SIGNATURE_PERIOD_DAYS, ) utils.rmdir("top-signatures-correlations_output") utils.mkdir("top-signatures-correlations_output") totals = {"date": str(utils.utc_today())} addon_related_signatures = defaultdict(list) for channel in channels: print(channel) utils.mkdir("top-signatures-correlations_output/" + channel) dataset = crash_deviations.get_telemetry_crashes( spark, versions=channel_to_versions[channel], days=5 ) results, total_reference, total_groups = crash_deviations.find_deviations( sc, dataset, signatures=signatures[channel] ) totals[channel] = total_reference try: dataset = crash_deviations.get_telemetry_crashes( spark, versions=channel_to_versions[channel], days=TELEMETRY_CRASHES_PERIOD_DAYS, ) top_words = comments.get_top_words(dataset, signatures[channel]) except Exception: top_words = {} for signature in signatures[channel]: if signature not in results: continue addons = [ result for result in results[signature] if any( "Addon" in elem and float(result["count_group"]) / total_groups[signature] > float(result["count_reference"]) / total_reference for elem in result["item"].keys() if len(result["item"]) == 1 ) ] if len(addons) > 0: addon_related_signatures[channel].append( { "signature": signature, "addons": addons, "total": total_groups[signature], } ) res = {"total": total_groups[signature], "results": results[signature]} if signature in top_words: res["top_words"] = top_words[signature] utils.write_json( "top-signatures-correlations_output/" + channel + "/" + hashlib.sha1(signature.encode("utf-8")).hexdigest() + ".json.gz", res, ) utils.write_json("top-signatures-correlations_output/all.json.gz", totals) utils.write_json( "top-signatures-correlations_output/addon_related_signatures.json.gz", addon_related_signatures, ) print(datetime.utcnow()) # Will be uploaded under # https://analysis-output.telemetry.mozilla.org/top-signatures-correlations/data/ remove_results_gcs("top-signatures-correlations") upload_results_gcs("top-signatures-correlations", "top-signatures-correlations_output")