mozetl/graphics/graphics_telemetry_trends.py (427 lines of code) (raw):
# Migrated from Databricks to run on dataproc
# pip install:
# python_moztelemetry
# git+https://github.com/FirefoxGraphics/telemetry.git#egg=pkg&subdirectory=analyses/bigquery_shim
# boto3==1.16.20
# six==1.15.0
import argparse
import datetime
import json
import os
import sys
import time
import requests
from bigquery_shim import trends
from moztelemetry import get_one_ping_per_client
from pyspark import SparkContext
from pyspark.sql import SparkSession
from google.cloud import storage
def fmt_date(d):
return d.strftime("%Y%m%d")
def jstime(d):
return time.mktime(d.timetuple())
def repartition(pipeline):
return pipeline.repartition(MaxPartitions).cache()
storage_client = storage.Client()
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName("graphics-trends").getOrCreate()
MaxPartitions = sc.defaultParallelism * 4
# Keep this small (0.00001) for fast backfill testing.
WeeklyFraction = 0.003
# Amount of days Telemetry keeps.
MaxHistoryInDays = datetime.timedelta(days=210)
# Bucket we'll drop files into on GCS. If this is None, we won't attempt any
# GCS uploads, and the analysis will start from scratch.
GCS_BUCKET = "moz-fx-data-static-websit-8565-analysis-output"
GCS_PREFIX = "gfx/telemetry-data/"
GITHUB_REPO = "https://raw.githubusercontent.com/FirefoxGraphics/moz-gfx-telemetry"
# List of jobs allowed to have a first-run (meaning no GCS content).
BrandNewJobs = []
# If true, backfill up to MaxHistoryInDays rather than the last update.
ForceMaxBackfill = False
OUTPUT_PATH = "output"
os.mkdir(OUTPUT_PATH)
ArchKey = "environment/build/architecture"
FxVersionKey = "environment/build/version"
Wow64Key = "environment/system/isWow64"
CpuKey = "environment/system/cpu"
GfxAdaptersKey = "environment/system/gfx/adapters"
GfxFeaturesKey = "environment/system/gfx/features"
OSNameKey = "environment/system/os/name"
OSVersionKey = "environment/system/os/version"
OSServicePackMajorKey = "environment/system/os/servicePackMajor"
FirstValidDate = datetime.datetime.utcnow() - MaxHistoryInDays
# Log spam eats up disk space, so we disable it.
def quiet_logs(sc):
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.ERROR)
logger.LogManager.getLogger("akka").setLevel(logger.Level.ERROR)
quiet_logs(sc)
# This is the entry-point to grabbing reduced, preformatted pings.
def fetch_and_format(start_date, end_date):
pings = get_raw_pings(start_date, end_date)
pings = get_one_ping_per_client(pings)
pings = pings.map(validate)
pings = pings.filter(lambda p: p.get("valid", False))
return pings.cache()
def get_raw_pings(start_date, end_date):
# WeeklyFraction ignored and baked into the included query
return trends.fetch_results(spark, start_date, end_date, project_id="mozdata")
# Transform each ping to make it easier to work with in later stages.
def validate(p):
try:
name = p.get(OSNameKey) or "w"
version = p.get(OSVersionKey) or "0"
if name == "Linux":
p["OSVersion"] = None
p["OS"] = "Linux"
p["OSName"] = "Linux"
elif name == "Windows_NT":
spmaj = p.get(OSServicePackMajorKey) or "0"
p["OSVersion"] = version + "." + str(spmaj)
p["OS"] = "Windows-" + version + "." + str(spmaj)
p["OSName"] = "Windows"
elif name == "Darwin":
p["OSVersion"] = version
p["OS"] = "Darwin-" + version
p["OSName"] = "Darwin"
else:
p["OSVersion"] = version
p["OS"] = "{0}-{1}".format(name, version)
p["OSName"] = name
except Exception:
return p
p["valid"] = True
return p
# Profiler for debugging. Use in a |with| clause.
class Prof(object):
level = 0
def __init__(self, name):
self.name = name
def __enter__(self):
self.sout("Starting {0}... ".format(self.name))
self.start = datetime.datetime.now()
Prof.level += 1
return None
def __exit__(self, type, value, traceback):
Prof.level -= 1
self.end = datetime.datetime.now()
self.sout(
"... {0}: {1}s".format(self.name, (self.end - self.start).total_seconds())
)
def sout(self, s):
sys.stdout.write(("##" * Prof.level) + " ")
sys.stdout.write(s)
sys.stdout.write("\n")
sys.stdout.flush()
# Helpers.
def fix_vendor(vendor_id):
if vendor_id == "Intel Open Source Technology Center":
return "0x8086"
return vendor_id
def get_vendor(ping):
try:
adapter = ping[GfxAdaptersKey][0]
return fix_vendor(adapter["vendorID"])
except Exception:
return "unknown"
def get_os_bits(ping):
arch = ping.get(ArchKey, "unknown")
if arch == "x86-64":
return "64"
elif arch == "x86":
if ping.get(Wow64Key, False):
return "32_on_64"
return "32"
return "unknown"
def get_gen(ping, vendor_block):
adapter = ping[GfxAdaptersKey][0]
device_id = adapter.get("deviceID", "unknown")
if device_id not in vendor_block:
return "unknown"
return vendor_block[device_id][0]
def get_d3d11(ping):
try:
d3d11 = ping[GfxFeaturesKey]["d3d11"]
if d3d11["status"] != "available":
return d3d11["status"]
if d3d11.get("warp", False):
return "warp"
return d3d11["version"]
except Exception:
return "unknown"
def get_d2d(ping):
try:
status = ping[GfxFeaturesKey]["d2d"]["status"]
if status != "available":
return status
return ping[GfxFeaturesKey]["d2d"]["version"]
except Exception:
return "unknown"
def get_version(ping):
v = ping.get(FxVersionKey, None)
if v is None or not isinstance(v, str):
return "unknown"
return v.split(".")[0]
def get_compositor(ping):
features = ping.get(GfxFeaturesKey, None)
if features is None:
return "none"
return features.get("compositor", "none")
# A TrendBase encapsulates the data needed to visualize a trend.
# It has four functions:
# prepare (download from cache)
# willUpdate (check if update is needed)
# update (add analysis data for a week of pings)
# finish (upload back to cache)
class TrendBase(object):
def __init__(self, name):
super(TrendBase, self).__init__()
self.name = "{0}-v2.json".format(name)
# Called before analysis starts.
def prepare(self):
print("Preparing {0}".format(self.name))
return True
# Called before querying pings for the week for the given date. Return
# false to indicate that this should no longer receive updates.
def will_update(self, date):
raise Exception("Return true or false")
def update(self, pings, **kwargs):
raise Exception("NYI")
def finish(self):
pass
# Given a list of trend objects, query weeks from the last sunday
# and iterating backwards until no trend object requires an update.
def do_update(trends):
root = TrendGroup("root", trends)
root.prepare()
# Start each analysis slice on a Sunday.
latest = most_recent_sunday()
end = latest
while True:
start = end - datetime.timedelta(7)
assert latest.weekday() == 6
if not root.will_update(start):
break
try:
with Prof("fetch {0}".format(start)) as _:
pings = fetch_and_format(start, end)
except Exception:
if not ForceMaxBackfill:
raise
with Prof("compute {0}".format(start)) as _:
if not root.update(pings, start_date=start, end_date=end):
break
end = start
root.finish()
def most_recent_sunday():
now = datetime.datetime.utcnow()
this_morning = datetime.datetime(now.year, now.month, now.day)
if this_morning.weekday() == 6:
return this_morning
diff = datetime.timedelta(0 - this_morning.weekday() - 1)
return this_morning + diff
# A TrendGroup is a collection of TrendBase objects. It lets us
# group similar trends together. For example, if five trends all
# need to filter Windows pings, we can filter for Windows pings
# once and cache the result, rather than redo the filter each
# time.
#
# Trend groups keep an "active" list of trends that will probably
# need another update. If any trend stops requesting data, it is
# removed from the active list.
class TrendGroup(TrendBase):
def __init__(self, name, trends):
super(TrendGroup, self).__init__(name)
self.trends = trends
self.active = []
def prepare(self):
self.trends = [trend for trend in self.trends if trend.prepare()]
self.active = self.trends[:]
return len(self.trends) > 0
def will_update(self, date):
self.active = [trend for trend in self.active if trend.will_update(date)]
return len(self.active) > 0
def update(self, pings, **kwargs):
pings = pings.cache()
self.active = [trend for trend in self.active if trend.update(pings, **kwargs)]
return len(self.active) > 0
def finish(self):
for trend in self.trends:
trend.finish()
# A Trend object takes a new set of pings for a week's worth of data,
# analyzes it, and adds the result to the trend set. Trend sets are
# cached in GCS as JSON.
#
# If the latest entry in the cache covers less than a full week of
# data, the entry is removed so that week can be re-queried.
class Trend(TrendBase):
def __init__(self, filename):
super(Trend, self).__init__(filename)
self.local_path = os.path.join(OUTPUT_PATH, self.name)
self.cache = None
self.lastFullWeek = None
self.newDataPoints = []
def query(self, pings):
raise Exception("NYI")
def will_update(self, date):
if date < FirstValidDate:
return False
if self.lastFullWeek is not None and date <= self.lastFullWeek:
return False
return True
def prepare(self):
self.cache = self.fetch_json()
if self.cache is None:
self.cache = {"created": jstime(datetime.datetime.utcnow()), "trend": []}
# Make sure trends are sorted in ascending order.
self.cache["trend"] = self.cache["trend"] or []
self.cache["trend"] = sorted(self.cache["trend"], key=lambda o: o["start"])
if len(self.cache["trend"]) and not ForceMaxBackfill:
last_data_point = self.cache["trend"][-1]
last_data_point_start = datetime.datetime.utcfromtimestamp(
last_data_point["start"]
)
last_data_point_end = datetime.datetime.utcfromtimestamp(
last_data_point["end"]
)
print(last_data_point, last_data_point_start, last_data_point_end)
if last_data_point_end - last_data_point_start < datetime.timedelta(7):
# The last data point had less than a full week, so we stop at the
# previous week, and remove the incomplete datapoint.
self.lastFullWeek = last_data_point_start - datetime.timedelta(7)
self.cache["trend"].pop()
else:
# The last data point covered a full week, so that's our stopping
# point.
self.lastFullWeek = last_data_point_start
print(self.lastFullWeek)
return True
# Optional hook - transform pings before querying.
def transform_pings(self, pings):
return pings
def update(self, pings, start_date, end_date, **kwargs):
with Prof("count {0}".format(self.name)):
pings = self.transform_pings(pings)
count = pings.count()
if count == 0:
print("WARNING: no pings in RDD")
return False
with Prof("query {0} (count: {1})".format(self.name, count)):
data = self.query(pings)
self.newDataPoints.append(
{
"start": jstime(start_date),
"end": jstime(end_date),
"total": count,
"data": data,
}
)
return True
def finish(self):
# If we're doing a maximum backfill, remove points from the cache that are
# after the least recent data point that we newly queried.
if ForceMaxBackfill and len(self.newDataPoints):
stop_at = self.newDataPoints[-1]["start"]
last_index = None
for index, entry in enumerate(self.cache["trend"]):
if entry["start"] >= stop_at:
last_index = index
break
if last_index is not None:
self.cache["trend"] = self.cache["trend"][:last_index]
# Note: the backfill algorithm in DoUpdate() walks in reverse, so dates
# will be accumulated in descending order. The final list should be in
# ascending order, so we reverse.
self.cache["trend"] += self.newDataPoints[::-1]
text = json.dumps(self.cache)
print("Writing file {0}".format(self.local_path))
with open(self.local_path, "w") as fp:
fp.write(text)
if GCS_BUCKET is not None:
try:
bucket = storage_client.get_bucket(GCS_BUCKET)
blob = bucket.blob(os.path.join(GCS_PREFIX, self.name))
blob.upload_from_filename(self.local_path)
except Exception as e:
print("Failed gcs upload: {0}".format(e))
def fetch_json(self):
print("Reading file {0}".format(self.local_path))
if GCS_BUCKET is not None:
try:
storage_client = storage.Client()
bucket = storage_client.get_bucket(GCS_BUCKET)
blob = bucket.blob(os.path.join(GCS_PREFIX, self.name))
blob.download_to_filename(self.local_path)
with open(self.local_path, "r") as fp:
return json.load(fp)
except Exception:
if self.name not in BrandNewJobs:
raise
return None
else:
try:
with open(self.local_path, "r") as fp:
return json.load(fp)
except Exception:
pass
return None
class FirefoxTrend(Trend):
def __init__(self):
super(FirefoxTrend, self).__init__("trend-firefox")
def query(self, pings, **kwargs):
return pings.map(lambda p: (get_version(p),)).countByKey()
class WindowsGroup(TrendGroup):
def __init__(self, trends):
super(WindowsGroup, self).__init__("Windows", trends)
def update(self, pings, **kwargs):
pings = pings.filter(lambda p: p["OSName"] == "Windows")
return super(WindowsGroup, self).update(pings, **kwargs)
class WinverTrend(Trend):
def __init__(self):
super(WinverTrend, self).__init__("trend-windows-versions")
def query(self, pings):
return pings.map(lambda p: (p["OSVersion"],)).countByKey()
class WinCompositorTrend(Trend):
def __init__(self):
super(WinCompositorTrend, self).__init__("trend-windows-compositors")
def will_update(self, date):
# This metric didn't ship until Firefox 43.
if date < datetime.datetime(2015, 11, 15):
return False
return super(WinCompositorTrend, self).will_update(date)
def query(self, pings):
return pings.map(lambda p: (get_compositor(p),)).countByKey()
class WinArchTrend(Trend):
def __init__(self):
super(WinArchTrend, self).__init__("trend-windows-arch")
def query(self, pings):
return pings.map(lambda p: (get_os_bits(p),)).countByKey()
# This group restricts pings to Windows Vista+, and must be inside a
# group that restricts pings to Windows.
class WindowsVistaPlusGroup(TrendGroup):
def __init__(self, trends):
super(WindowsVistaPlusGroup, self).__init__("Windows Vista+", trends)
def update(self, pings, **kwargs):
pings = pings.filter(lambda p: not p["OSVersion"].startswith("5.1"))
return super(WindowsVistaPlusGroup, self).update(pings, **kwargs)
class Direct2DTrend(Trend):
def __init__(self):
super(Direct2DTrend, self).__init__("trend-windows-d2d")
def query(self, pings):
return pings.map(lambda p: (get_d2d(p),)).countByKey()
def will_update(self, date):
# This metric didn't ship until Firefox 43.
if date < datetime.datetime(2015, 11, 15):
return False
return super(Direct2DTrend, self).will_update(date)
class Direct3D11Trend(Trend):
def __init__(self):
super(Direct3D11Trend, self).__init__("trend-windows-d3d11")
def query(self, pings):
return pings.map(lambda p: (get_d3d11(p),)).countByKey()
def will_update(self, date):
# This metric didn't ship until Firefox 43.
if date < datetime.datetime(2015, 11, 15):
return False
return super(Direct3D11Trend, self).will_update(date)
class WindowsVendorTrend(Trend):
def __init__(self):
super(WindowsVendorTrend, self).__init__("trend-windows-vendors")
def query(self, pings):
return pings.map(lambda p: ((get_vendor(p) or "unknown"),)).countByKey()
# Device generation trend - a little more complicated, since we download
# the generation database to produce a mapping.
class DeviceGenTrend(Trend):
device_map = None
def __init__(self, vendor, vendor_name):
super(DeviceGenTrend, self).__init__(
"trend-windows-device-gen-{0}".format(vendor_name)
)
self.vendorBlock = None
self.vendorID = vendor
def prepare(self):
# Grab the vendor -> device -> gen map.
if not DeviceGenTrend.device_map:
resp = requests.get("{0}/master/www/gfxdevices.json".format(GITHUB_REPO))
DeviceGenTrend.device_map = resp.json()
self.vendorBlock = DeviceGenTrend.device_map[self.vendorID]
return super(DeviceGenTrend, self).prepare()
def transform_pings(self, pings):
vendor_id = self.vendorID
return pings.filter(lambda p: get_vendor(p) == vendor_id)
def query(self, pings):
# Can't use member variables and methods with maps because parent
# class references boto3 client which isn't picklable
vendor_block = self.vendorBlock
return pings.map(lambda p: (get_gen(p, vendor_block),)).countByKey()
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--force-max-backfill", action="store_true")
parser.add_argument("--weekly-fraction", type=float, default=0.003)
parser.add_argument(
"--gcs-bucket", default="moz-fx-data-static-websit-8565-analysis-output"
)
parser.add_argument("--gcs-prefix", default="gfx/telemetry-data/")
parser.add_argument("--max-history-in-days", type=int, default=210)
parser.add_argument("--brand-new-jobs", action="append", default=[])
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
ForceMaxBackfill = args.force_max_backfill
WeeklyFraction = args.weekly_fraction
GCS_BUCKET = args.gcs_bucket
GCS_PREFIX = args.gcs_prefix
MaxHistoryInDays = datetime.timedelta(days=args.max_history_in_days)
BrandNewJobs = args.brand_new_jobs
do_update(
[
FirefoxTrend(),
WindowsGroup(
[
WinverTrend(),
WinCompositorTrend(),
WinArchTrend(),
WindowsVendorTrend(),
WindowsVistaPlusGroup([Direct2DTrend(), Direct3D11Trend()]),
DeviceGenTrend("0x8086", "intel"),
DeviceGenTrend("0x10de", "nvidia"),
DeviceGenTrend("0x1002", "amd"),
]
),
]
)