lib/telemetry.py (469 lines of code) (raw):

import os import sys import numpy as np import pandas as pd from google.cloud import bigquery from django.template import Template, Context from django.template.loader import get_template # Remove any histograms that have empty datasets in # either a branch, or branch segment. def invalidDataSet(df, histogram, branches, segments): if df.empty: print(f"Empty dataset found, removing: {histogram}.") return True for branch in branches: branch_name = branch['name'] branch_df = df[df["branch"]==branch_name] if branch_df.empty: print(f"Empty dataset found for branch={branch_name}, removing: {histogram}.") return True for segment in segments: if segment=="All": continue branch_segment_df = branch_df[branch_df["segment"]==segment] if branch_segment_df.empty: print(f"Empty dataset found for segment={segment}, removing: {histogram}.") return True return False def segments_are_all_OS(segments): os_segments = set(["Windows", "All", "Linux", "Mac", "Android"]) for segment in segments: if segment not in os_segments: return False return True class TelemetryClient: def __init__(self, dataDir, config, skipCache): self.client = bigquery.Client() self.config = config self.dataDir = dataDir self.skipCache = skipCache self.queries = [] def collectResultsFromQuery_OS_segments(self, results, branch, segment, event_metrics, histograms): for histogram in self.config['histograms']: df = histograms[histogram] if segment == "All": subset = df[df["branch"] == branch][['bucket', 'counts']].groupby(['bucket']).sum() buckets = list(subset.index) counts = list(subset['counts']) else: subset = df[(df["segment"] == segment) & (df["branch"] == branch)] buckets = list(subset['bucket']) counts = list(subset['counts']) # Some clients report bucket sizes that are not real, and these buckets # end up having 1-5 samples in them. Filter these out entirely. if self.config['histograms'][histogram]['kind'] == 'numerical': remove=[] for i in range(1,len(counts)-1): if (counts[i-1] > 1000 and counts[i] < counts[i-1]/100) or \ (counts[i+1] > 1000 and counts[i] < counts[i+1]/100): remove.append(i) for i in sorted(remove, reverse=True): del buckets[i] del counts[i] # Add labels to the buckets for categorical histograms. if self.config['histograms'][histogram]['kind'] == 'categorical': labels = self.config['histograms'][histogram]['labels'] # Remove overflow bucket if it exists if len(labels)==(len(buckets)-1) and counts[-1]==0: del buckets[-1] del counts[-1] # Add missing buckets so they line up in each branch. if len(labels) > len(buckets): for i in range(len(buckets)): print(buckets[i], counts[i]) new_counts = [] for i,b in enumerate(labels): j = buckets.index(b) if b in buckets else None if j: new_counts.append(counts[j]) else: new_counts.append(0) counts = new_counts # Remap bucket values to the appropriate label names. buckets = labels # If there is a max, then overflow larger buckets into the max. if 'max' in self.config['histograms'][histogram]: maxBucket = self.config['histograms'][histogram]['max'] remove=[] maxBucketCount=0 for i,x in enumerate(buckets): if x >= maxBucket: remove.append(i) maxBucketCount = maxBucketCount + counts[i] for i in sorted(remove, reverse=True): del buckets[i] del counts[i] buckets.append(maxBucket) counts.append(maxBucketCount) assert len(buckets) == len(counts) results[branch][segment]['histograms'][histogram] = {} results[branch][segment]['histograms'][histogram]['bins'] = buckets results[branch][segment]['histograms'][histogram]['counts'] = counts print(f" segment={segment} len(histogram: {histogram}) = ", len(buckets)) for metric in self.config['pageload_event_metrics']: df = event_metrics[metric] if segment == "All": subset = df[df["branch"] == branch][['bucket', 'counts']].groupby(['bucket']).sum() buckets = list(subset.index) counts = list(subset['counts']) else: subset = df[(df["segment"] == segment) & (df["branch"] == branch)] buckets = list(subset['bucket']) counts = list(subset['counts']) assert len(buckets) == len(counts) results[branch][segment]['pageload_event_metrics'][metric] = {} results[branch][segment]['pageload_event_metrics'][metric]['bins'] = buckets results[branch][segment]['pageload_event_metrics'][metric]['counts'] = counts print(f" segment={segment} len(pageload event: {metric}) = ", len(buckets)) def getResults(self): if self.config['is_experiment'] is True: return self.getResultsForExperiment() else: return self.getResultsForNonExperiment() def getResultsForNonExperiment(self): # Get data for each pageload event metric. event_metrics = {} for metric in self.config['pageload_event_metrics']: event_metrics[metric] = self.getPageloadEventDataNonExperiment(metric) print(event_metrics[metric]) #Get data for each histogram in this segment. histograms = {} remove = [] for histogram in self.config['histograms']: df = self.getHistogramDataNonExperiment(self.config, histogram) print(df) # Remove histograms that are empty. if invalidDataSet(df, histogram, self.config['branches'], self.config['segments']): remove.append(histogram) continue histograms[histogram] = df for hist in remove: if hist in self.config['histograms']: del self.config['histograms'][hist] # Combine histogram and pageload event results. results = {} for i in range(len(self.config['branches'])): branch_name = self.config['branches'][i]['name'] results[branch_name] = {} for segment in self.config['segments']: print (f"Aggregating results for segment={segment} and branch={branch_name}") results[branch_name][segment] = {"histograms": {}, "pageload_event_metrics": {}} # Special case when segments is OS only. self.collectResultsFromQuery_OS_segments(results, branch_name, segment, event_metrics, histograms) results['queries'] = self.queries return results def getResultsForExperiment(self): # Get data for each pageload event metric. event_metrics = {} for metric in self.config['pageload_event_metrics']: event_metrics[metric] = self.getPageloadEventData(metric) print(event_metrics[metric]) #Get data for each histogram in this segment. histograms = {} remove = [] for histogram in self.config['histograms']: df = self.getHistogramData(self.config, histogram) # Mark histograms that have invalid data sets. if invalidDataSet(df, histogram, self.config['branches'], self.config['segments']): remove.append(histogram) continue histograms[histogram] = df # Remove invalid histogram data. for hist in remove: if hist in self.config['histograms']: print(f"Empty dataset found, removing: {histogram}.") del self.config['histograms'][hist] # Combine histogram and pageload event results. results = {} for branch in self.config['branches']: branch_name = branch['name'] results[branch_name] = {} for segment in self.config['segments']: print (f"Aggregating results for segment={segment} and branch={branch_name}") results[branch_name][segment] = {"histograms": {}, "pageload_event_metrics": {}} # Special case when segments is OS only. self.collectResultsFromQuery_OS_segments(results, branch_name, segment, event_metrics, histograms) results['queries'] = self.queries return results def generatePageloadEventQuery_OS_segments_non_experiment(self, metric): t = get_template("other/glean/pageload_events_os_segments.sql") minVal = self.config['pageload_event_metrics'][metric]['min'] maxVal = self.config['pageload_event_metrics'][metric]['max'] branches = self.config["branches"] for i in range(len(branches)): branches[i]["last"] = False if "version" in self.config["branches"][i]: version = self.config["branches"][i]["version"] branches[i]["ver_condition"] = f"AND SPLIT(client_info.app_display_version, '.')[offset(0)] = \"{version}\"" if "architecture" in self.config["branches"][i]: arch = self.config["branches"][i]["architecture"] branches[i]["arch_condition"] = f"AND client_info.architecture = \"{arch}\"" if "glean_conditions" in self.config["branches"][i]: branches[i]["glean_conditions"] = self.config["branches"][i]["glean_conditions"] branches[-1]["last"] = True print(branches) context = { "minVal": minVal, "maxVal": maxVal, "metric": metric, "branches": branches } query = t.render(context) # Remove empty lines before returning query = "".join([s for s in query.strip().splitlines(True) if s.strip()]) self.queries.append({ "name": f"Pageload event: {metric}", "query": query }) return query def generatePageloadEventQuery_OS_segments(self, metric): t = get_template("experiment/glean/pageload_events_os_segments.sql") print(self.config['pageload_event_metrics'][metric]) metricMin = self.config['pageload_event_metrics'][metric]['min'] metricMax = self.config['pageload_event_metrics'][metric]['max'] isp_blacklist = [] if 'isp_blacklist' in self.config: with open(self.config['isp_blacklist'], 'r') as file: isp_blacklist = [line.strip() for line in file] context = { "include_non_enrolled_branch": self.config['include_non_enrolled_branch'], "minVal": metricMin, "maxVal": metricMax, "slug": self.config['slug'], "channel": self.config['channel'], "startDate": self.config['startDate'], "endDate": self.config['endDate'], "metric": metric, "blacklist": isp_blacklist } query = t.render(context) # Remove empty lines before returning query = "".join([s for s in query.strip().splitlines(True) if s.strip()]) self.queries.append({ "name": f"Pageload event: {metric}", "query": query }) return query # Not currently used, and not well supported. def generatePageloadEventQuery_Generic(self): t = get_template("archived/events_generic.sql") segmentInfo = [] for segment in self.config['segments']: segmentInfo.append({ "name": segment, "conditions": self.config['segments'][segment] }) maxBucket = 0 minBucket = 30000 for metric in self.config['pageload_event_metrics']: metricMin = self.config['pageload_event_metrics'][metric]['min'] metricMax = self.config['pageload_event_metrics'][metric]['max'] if metricMax > maxBucket: maxBucket = metricMax if metricMin < minBucket: minBucket = metricMin context = { "minBucket": minBucket, "maxBucket": maxBucket, "is_experiment": self.config['is_experiment'], "slug": self.config['slug'], "channel": self.config['channel'], "startDate": self.config['startDate'], "endDate": self.config['endDate'], "metrics": self.config['pageload_event_metrics'], "segments": segmentInfo } query = t.render(context) # Remove empty lines before returning query = "".join([s for s in query.strip().splitlines(True) if s.strip()]) self.queries.append({ "name": f"Pageload event: {metric}", "query": query }) return query # Use *_os_segments queries if the segments is OS only which is much faster than generic query. def generateHistogramQuery_OS_segments_legacy(self, histogram): t = get_template("experiment/legacy/histogram_os_segments.sql") isp_blacklist = [] if 'isp_blacklist' in self.config: with open(self.config['isp_blacklist'], 'r') as file: isp_blacklist = [line.strip() for line in file] context = { "include_non_enrolled_branch": self.config['include_non_enrolled_branch'], "slug": self.config['slug'], "channel": self.config['channel'], "startDate": self.config['startDate'], "endDate": self.config['endDate'], "histogram": histogram, "available_on_desktop": self.config['histograms'][histogram]['available_on_desktop'], "available_on_android": self.config['histograms'][histogram]['available_on_android'], "blacklist": isp_blacklist } query = t.render(context) # Remove empty lines before returning query = "".join([s for s in query.strip().splitlines(True) if s.strip()]) self.queries.append({ "name": f"Histogram: {histogram}", "query": query }) return query def generateHistogramQuery_OS_segments_glean(self, histogram): t = get_template("experiment/glean/histogram_os_segments.sql") context = { "include_non_enrolled_branch": self.config['include_non_enrolled_branch'], "slug": self.config['slug'], "channel": self.config['channel'], "startDate": self.config['startDate'], "endDate": self.config['endDate'], "histogram": histogram, "available_on_desktop": self.config['histograms'][histogram]['available_on_desktop'], "available_on_android": self.config['histograms'][histogram]['available_on_android'], } query = t.render(context) # Remove empty lines before returning query = "".join([s for s in query.strip().splitlines(True) if s.strip()]) self.queries.append({ "name": f"Histogram: {histogram}", "query": query }) return query def generateHistogramQuery_OS_segments_non_experiment_legacy(self, histogram): t = get_template("other/legacy/histogram_os_segments.sql") branches = self.config["branches"] for i in range(len(branches)): branches[i]["last"] = False if "version" in self.config["branches"][i]: version = self.config["branches"][i]["version"] branches[i]["ver_condition"] = f"AND SPLIT(application.display_version, '.')[offset(0)] = \"{version}\"" if "architecture" in self.config["branches"][i]: arch = self.config["branches"][i]["architecture"] branches[i]["arch_condition"] = f"AND application.architecture = \"{arch}\"" if "legacy_conditions" in self.config["branches"][i]: branches[i]["legacy_conditions"] = self.config["branches"][i]["legacy_conditions"] branches[-1]["last"] = True context = { "histogram": histogram, "available_on_desktop": self.config['histograms'][histogram]['available_on_desktop'], "available_on_android": self.config['histograms'][histogram]['available_on_android'], "branches": branches, "channel": self.config["branches"][0]["channel"], } query = t.render(context) # Remove empty lines before returning query = "".join([s for s in query.strip().splitlines(True) if s.strip()]) self.queries.append({ "name": f"Histogram: {histogram}", "query": query }) return query def generateHistogramQuery_OS_segments_non_experiment_glean(self, histogram): t = get_template("other/glean/histogram_os_segments.sql") branches = self.config["branches"] for i in range(len(branches)): branches[i]["last"] = False if "version" in self.config["branches"][i]: version = self.config["branches"][i]["version"] branches[i]["ver_condition"] = f"AND SPLIT(client_info.app_display_version, '.')[offset(0)] = \"{version}\"" if "architecture" in self.config["branches"][i]: arch = self.config["branches"][i]["architecture"] branches[i]["arch_condition"] = f"AND client_info.architecture = \"{arch}\"" if "glean_conditions" in self.config["branches"][i]: branches[i]["glean_conditions"] = self.config["branches"][i]["glean_conditions"] branches[-1]["last"] = True context = { "histogram": histogram, "available_on_desktop": self.config['histograms'][histogram]['available_on_desktop'], "available_on_android": self.config['histograms'][histogram]['available_on_android'], "branches": branches } query = t.render(context) # Remove empty lines before returning query = "".join([s for s in query.strip().splitlines(True) if s.strip()]) self.queries.append({ "name": f"Histogram: {histogram}", "query": query }) return query # Not currently used, and not well supported. def generateHistogramQuery_Generic(self, histogram): t = get_template("archived/histogram_generic.sql") segmentInfo = [] for segment in self.config['segments']: segmentInfo.append({ "name": segment, "conditions": self.config['segments'][segment] }) context = { "is_experiment": self.config['is_experiment'], "slug": self.config['slug'], "channel": self.config['channel'], "startDate": self.config['startDate'], "endDate": self.config['endDate'], "histogram": histogram, "available_available_on_desktop": self.config['histograms'][histogram]['available_on_desktop'], "available_on_android": self.config['histograms'][histogram]['available_on_android'], "segments": segmentInfo } query = t.render(context) # Remove empty lines before returning query = "".join([s for s in query.strip().splitlines(True) if s.strip()]) self.queries.append({ "name": f"Histogram: {histogram}", "query": query }) return query def checkForExistingData(self, filename): if self.skipCache: df = None else: try: df = pd.read_pickle(filename) print(f"Found local data in {filename}") except: df = None return df def getHistogramDataNonExperiment(self, config, histogram): slug = config['slug'] hist_name = histogram.split('.')[-1] filename=os.path.join(self.dataDir, f"{slug}-{hist_name}.pkl") df = self.checkForExistingData(filename) if df is not None: return df if segments_are_all_OS(self.config['segments']): if config["histograms"][histogram]["glean"]: query = self.generateHistogramQuery_OS_segments_non_experiment_glean(histogram) else: query = self.generateHistogramQuery_OS_segments_non_experiment_legacy(histogram) else: print("No current support for generic non-experiment queries.") sys.exit(1) print("Running query:\n" + query) job = self.client.query(query) df = job.to_dataframe() print(f"Writing '{slug}' histogram results for {histogram} to disk.") df.to_pickle(filename) return df def getHistogramData(self, config, histogram): slug = config['slug'] hist_name = histogram.split('.')[-1] filename=os.path.join(self.dataDir, f"{slug}-{hist_name}.pkl") df = self.checkForExistingData(filename) if df is not None: return df if segments_are_all_OS(self.config['segments']): if config["histograms"][histogram]["glean"]: query = self.generateHistogramQuery_OS_segments_glean(histogram) else: query = self.generateHistogramQuery_OS_segments_legacy(histogram) else: # Generic segments are not well supported right now. print("No current support for generic non-experiment queries.") sys.exit(1) print("Running query:\n" + query) job = self.client.query(query) df = job.to_dataframe() print(f"Writing '{slug}' histogram results for {histogram} to disk.") df.to_pickle(filename) return df def getPageloadEventDataNonExperiment(self, metric): slug = self.config['slug'] filename=os.path.join(self.dataDir, f"{slug}-pageload-events-{metric}.pkl") df = self.checkForExistingData(filename) if df is not None: return df if segments_are_all_OS(self.config['segments']): query = self.generatePageloadEventQuery_OS_segments_non_experiment(metric) else: print("Generic non-experiment query currently not supported.") sys.exit(1) print("Running query:\n" + query) job = self.client.query(query) df = job.to_dataframe() print(f"Writing '{slug}' pageload event results to disk.") df.to_pickle(filename) return df def getPageloadEventData(self, metric): slug = self.config['slug'] filename=os.path.join(self.dataDir, f"{slug}-pageload-events-{metric}.pkl") df = self.checkForExistingData(filename) if df is not None: return df if segments_are_all_OS(self.config['segments']): query = self.generatePageloadEventQuery_OS_segments(metric) else: #query = self.generatePageloadEventQuery_Generic() print("No current support for generic pageload event queries.") sys.exit(1) print("Running query:\n" + query) job = self.client.query(query) df = job.to_dataframe() print(f"Writing '{slug}' pageload event results to disk.") df.to_pickle(filename) return df