mozetl/symbolication/top_signatures_correlations.py (142 lines of code) (raw):
# Migrated from Databricks to run on dataproc
# pip install:
# boto3==1.16.20
# scipy==1.5.4
import argparse
import hashlib
import os
import sys
from collections import defaultdict
from datetime import datetime
from pyspark import SparkContext
from pyspark.sql import SparkSession
from google.cloud import storage
sys.path += [os.path.abspath("."), os.path.abspath("crashcorrelations")]
os.system("git clone https://github.com/marco-c/crashcorrelations.git")
os.system("pip download stemming==1.0.1")
os.system("tar xf stemming-1.0.1.tar.gz")
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName("modules-with-missing-symbols").getOrCreate()
gcs_client = storage.Client()
sc.addPyFile("stemming-1.0.1/stemming/porter2.py")
# Number of top signatures to look at
TOP_SIGNATURE_COUNT = 200
# Number of days to look at to figure out top signatures
TOP_SIGNATURE_PERIOD_DAYS = 5
# Number of days to look at for telemetry crash data
TELEMETRY_CRASHES_PERIOD_DAYS = 30
# Name of the GCS bucket where results are stored
RESULTS_GCS_BUCKET = "moz-fx-data-static-websit-8565-analysis-output"
from crashcorrelations import ( # noqa E402
utils,
download_data,
crash_deviations,
comments,
)
# workaround airflow not able to different schedules for tasks in a dag
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--run-on-days",
nargs="+",
type=int,
required=True,
help="Only run job on given days (0 is sunday)",
)
parser.add_argument(
"--date",
type=datetime.fromisoformat,
default=datetime.utcnow(),
help="Run date, defaults to current dat",
)
return parser.parse_args()
def remove_results_gcs(job_name):
bucket = gcs_client.bucket(RESULTS_GCS_BUCKET)
for key in bucket.list_blobs(prefix=job_name + "/data/"):
key.delete()
def upload_results_gcs(job_name, directory):
bucket = gcs_client.bucket(RESULTS_GCS_BUCKET)
for root, dirs, files in os.walk(directory):
for name in files:
full_path = os.path.join(root, name)
blob = bucket.blob(
"{}/data/{}".format(
job_name, full_path[len(directory) + 1 :] # noqa E203
)
)
blob.content_encoding = "gzip"
blob.upload_from_filename(full_path, content_type="application/json")
args = parse_args()
if args.date.isoweekday() % 7 not in args.run_on_days:
print(
f"Skipping because run date day of week"
f" {args.date} is not in {args.run_on_days}"
)
sys.exit(0)
print(datetime.utcnow())
channels = ["release", "beta", "nightly", "esr"]
channel_to_versions = {}
for channel in channels:
channel_to_versions[channel] = download_data.get_versions(channel)
signatures = {}
for channel in channels:
signatures[channel] = download_data.get_top(
TOP_SIGNATURE_COUNT,
versions=channel_to_versions[channel],
days=TOP_SIGNATURE_PERIOD_DAYS,
)
utils.rmdir("top-signatures-correlations_output")
utils.mkdir("top-signatures-correlations_output")
totals = {"date": str(utils.utc_today())}
addon_related_signatures = defaultdict(list)
for channel in channels:
print(channel)
utils.mkdir("top-signatures-correlations_output/" + channel)
dataset = crash_deviations.get_telemetry_crashes(
spark, versions=channel_to_versions[channel], days=5
)
results, total_reference, total_groups = crash_deviations.find_deviations(
sc, dataset, signatures=signatures[channel]
)
totals[channel] = total_reference
try:
dataset = crash_deviations.get_telemetry_crashes(
spark,
versions=channel_to_versions[channel],
days=TELEMETRY_CRASHES_PERIOD_DAYS,
)
top_words = comments.get_top_words(dataset, signatures[channel])
except Exception:
top_words = {}
for signature in signatures[channel]:
if signature not in results:
continue
addons = [
result
for result in results[signature]
if any(
"Addon" in elem
and float(result["count_group"]) / total_groups[signature]
> float(result["count_reference"]) / total_reference
for elem in result["item"].keys()
if len(result["item"]) == 1
)
]
if len(addons) > 0:
addon_related_signatures[channel].append(
{
"signature": signature,
"addons": addons,
"total": total_groups[signature],
}
)
res = {"total": total_groups[signature], "results": results[signature]}
if signature in top_words:
res["top_words"] = top_words[signature]
utils.write_json(
"top-signatures-correlations_output/"
+ channel
+ "/"
+ hashlib.sha1(signature.encode("utf-8")).hexdigest()
+ ".json.gz",
res,
)
utils.write_json("top-signatures-correlations_output/all.json.gz", totals)
utils.write_json(
"top-signatures-correlations_output/addon_related_signatures.json.gz",
addon_related_signatures,
)
print(datetime.utcnow())
# Will be uploaded under
# https://analysis-output.telemetry.mozilla.org/top-signatures-correlations/data/
remove_results_gcs("top-signatures-correlations")
upload_results_gcs("top-signatures-correlations", "top-signatures-correlations_output")