mozetl/graphics/graphics_telemetry_dashboard.py (831 lines of code) (raw):
# Migrated from Databricks to run on dataproc
# pip install:
# python_moztelemetry
# git+https://github.com/FirefoxGraphics/telemetry.git#egg=pkg&subdirectory=analyses/bigquery_shim
# six==1.15.0
import argparse
import datetime
import json
import os
import sys
import time
from bigquery_shim import dashboard, snake_case
from moztelemetry import get_pings_properties
from pyspark import SparkContext
from pyspark.sql import SparkSession
from google.cloud import storage
def fmt_date(d):
return d.strftime("%Y%m%d")
def repartition(pipeline):
return pipeline.repartition(MaxPartitions).cache()
storage_client = storage.Client()
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName("graphics-trends").getOrCreate()
MaxPartitions = sc.defaultParallelism * 4
start_time = datetime.datetime.now()
# Going forward we only care about sessions from Firefox 53+, since it
# is the first release to not support Windows XP and Vista, which distorts
# our statistics.
MinFirefoxVersion = "53"
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument("--default-time-window", type=int, default=14)
parser.add_argument("--release-fraction", type=float, default=0.003)
parser.add_argument(
"--output-bucket", default="moz-fx-data-static-websit-8565-analysis-output"
)
parser.add_argument("--output-prefix", default="gfx/telemetry-data/")
return parser.parse_args()
args = parse_args()
# The directory where to place the telemetry results
OUTPUT_BUCKET = args.output_bucket
OUTPUT_PREFIX = args.output_prefix
# Configuration for general data that spans all Firefox versions. As of
# 7-19-2017 a sample fraction of .003 for 2 weeks of submissions yielded
# 12mil pings, so we estimate there are about 4bn total. We try to target
# about 2-5mil in the general fraction below.
DefaultTimeWindow = args.default_time_window
ReleaseFraction = args.release_fraction
bucket = storage_client.get_bucket(OUTPUT_BUCKET)
blobs = bucket.list_blobs(prefix=OUTPUT_PREFIX)
existing_objects = [blob.name for blob in blobs]
print(f"Existing objects: {existing_objects}")
# List of keys for properties on session pings that we care about.
GfxKey = "environment/system/gfx"
MonitorsKey = "environment/system/gfx/monitors"
ArchKey = "environment/build/architecture"
FeaturesKey = "environment/system/gfx/features"
UserPrefsKey = "environment/settings/userPrefs"
DeviceResetReasonKey = "payload/histograms/DEVICE_RESET_REASON"
SANITY_TEST = "payload/histograms/GRAPHICS_SANITY_TEST"
STARTUP_TEST_KEY = "payload/histograms/GRAPHICS_DRIVER_STARTUP_TEST"
WebGLSuccessKey = "payload/histograms/CANVAS_WEBGL_SUCCESS"
WebGL2SuccessKey = "payload/histograms/CANVAS_WEBGL2_SUCCESS"
MediaDecoderKey = "payload/histograms/MEDIA_DECODER_BACKEND_USED"
WebGLAcclFailureKey = "payload/keyedHistograms/CANVAS_WEBGL_ACCL_FAILURE_ID"
WebGLFailureKey = "payload/keyedHistograms/CANVAS_WEBGL_FAILURE_ID"
# This is the filter list, so we only select the above properties.
PropertyList = [
GfxKey,
FeaturesKey,
UserPrefsKey,
MonitorsKey,
ArchKey,
DeviceResetReasonKey,
SANITY_TEST,
STARTUP_TEST_KEY,
WebGLSuccessKey,
WebGL2SuccessKey,
MediaDecoderKey,
WebGLAcclFailureKey,
WebGLFailureKey,
]
###########################################################
# Helper function block for fetching and filtering pings. #
###########################################################
def union_pipelines(a, b):
if a is None:
return b
return a + b
def fetch_raw_pings(**kwargs):
time_window = kwargs.pop("timeWindow", DefaultTimeWindow)
fraction = kwargs.pop("fraction", ReleaseFraction)
channel = kwargs.pop("channel", None)
# Since builds take a bit to disseminate, go back about 4 hours. This is a
# completely made up number.
limit = datetime.timedelta(0, 60 * 60 * 4)
now = datetime.datetime.now()
start = now - datetime.timedelta(time_window) - limit
end = now - limit
# NOTE: ReleaseFraction is not used in the shim
pings = dashboard.fetch_results(
spark,
start,
end,
project_id="mozdata",
channel=channel,
min_firefox_version=MinFirefoxVersion,
)
metadata = [
{
"info": {
"channel": "*",
"fraction": fraction,
"day_range": (end - start).days,
}
}
]
info = {"metadata": metadata, "timestamp": now}
return pings, info
# Transform each ping to make it easier to work with in later stages.
def validate(p):
name = p.get("environment/system/os/name") or "w"
version = p.get("environment/system/os/version") or "0"
if name == "Linux":
p["OSVersion"] = None
p["OS"] = "Linux"
p["OSName"] = "Linux"
elif name == "Windows_NT":
spmaj = p.get("environment/system/os/servicePackMajor") or "0"
p["OSVersion"] = version + "." + str(spmaj)
p["OS"] = "Windows-" + version + "." + str(spmaj)
p["OSName"] = "Windows"
elif name == "Darwin":
p["OSVersion"] = version
p["OS"] = "Darwin-" + version
p["OSName"] = "Darwin"
else:
p["OSVersion"] = version
p["OS"] = "{0}-{1}".format(name, version)
p["OSName"] = name
# Telemetry data isn't guaranteed to be well-formed so unfortunately
# we have to do some validation on it. If we get to the end, we set
# p['valid'] to True, and this gets filtered over later. In addition
# we have a wrapper below to help fetch strings that may be null.
if not p.get("environment/build/version", None):
return p
p["FxVersion"] = p["environment/build/version"].split(".")[0]
# Verify that we have at least one adapter.
try:
adapter = p["environment/system/gfx/adapters"][0]
except (KeyError, IndexError, TypeError):
return p
if adapter is None or not hasattr(adapter, "__getitem__"):
return p
def t(obj, key):
return obj.get(key, None) or "Unknown"
# We store the device ID as a vendor/device string, because the device ID
# alone is not enough to determine whether the key is unique.
#
# We also merge 'Intel Open Source Technology Center' with the device ID
# that should be reported, 0x8086, for simplicity.
vendor_id = t(adapter, "vendorID")
if vendor_id == "Intel Open Source Technology Center":
p["vendorID"] = "0x8086"
else:
p["vendorID"] = vendor_id
p["deviceID"] = "{0}/{1}".format(p["vendorID"], t(adapter, "deviceID"))
p["driverVersion"] = "{0}/{1}".format(p["vendorID"], t(adapter, "driverVersion"))
p["deviceAndDriver"] = "{0}/{1}".format(p["deviceID"], t(adapter, "driverVersion"))
p["driverVendor"] = adapter.get("driverVendor", None)
p["valid"] = True
return p
def reduce_pings(pings):
return get_pings_properties(
pings,
[
"clientId",
"creationDate",
"environment/build/version",
"environment/build/buildId",
"environment/system/memoryMB",
"environment/system/isWow64",
"environment/system/cpu",
"environment/system/os/name",
"environment/system/os/version",
"environment/system/os/servicePackMajor",
"environment/system/gfx/adapters",
"payload/info/revision",
]
+ PropertyList,
)
def format_pings(pings):
pings = pings.map(dashboard.convert_bigquery_results)
pings = reduce_pings(pings)
pings = pings.map(snake_case.convert_snake_case_dict)
pings = pings.map(validate)
filtered_pings = pings.filter(lambda p: p.get("valid", False))
return filtered_pings.cache()
def fetch_and_format(**kwargs):
raw_pings, info = fetch_raw_pings(**kwargs)
return format_pings(raw_pings), info
##################################################################
# Helper function block for massaging pings into aggregate data. #
##################################################################
# Take each key in |b| and add it to |a|, accumulating its value into
# |a| if it already exists.
def combiner(a, b):
result = a
for key in b:
count_a = a.get(key, 0)
count_b = b[key]
result[key] = count_a + count_b
return result
# Helper for reduceByKey => count.
def map_x_to_count(data, source_key):
def extract(p):
return (p[source_key],)
return data.map(extract).countByKey()
# After reduceByKey(combiner), we get a mapping like:
# key => { variable => value }
#
# This function collapses 'variable' instances below a threshold into
# a catch-all identifier ('Other').
def coalesce_to_n_items(agg, max_items):
obj = []
for superkey, breakdown in agg:
if len(breakdown) <= max_items:
obj += [(superkey, breakdown)]
continue
items = sorted(breakdown.items(), key=lambda obj: obj[1], reverse=True)
new_breakdown = {k: v for k, v in items[0:max_items]}
total = 0
for k, v in items[max_items:]:
total += v
if total:
new_breakdown["Other"] = new_breakdown.get("Other", 0) + total
obj += [(superkey, new_breakdown)]
return obj
#############################
# Helper for writing files. #
#############################
def apply_ping_info(obj, **kwargs):
if "pings" not in kwargs:
return
pings, info = kwargs.pop("pings")
# To make the sample source information more transparent, we include
# the breakdown of Firefox channel numbers.
if "__share" not in info:
info["__share"] = map_x_to_count(pings, "FxVersion")
obj["sessions"] = {
"count": pings.count(),
"timestamp": time.mktime(info["timestamp"].timetuple()),
"shortdate": fmt_date(info["timestamp"]),
"metadata": info["metadata"],
"share": info["__share"],
}
def export(filename, obj, **kwargs):
full_filename = os.path.join(OUTPUT_PREFIX, f"{filename}.json")
print("Writing to {0}".format(full_filename))
# serialize snake case dicts via their underlying dict
bucket = storage_client.get_bucket(OUTPUT_BUCKET)
blob = bucket.blob(full_filename)
# serialize snake case dicts via their underlying dict
blob.upload_from_string(
json.dumps(obj, cls=snake_case.SnakeCaseEncoder),
content_type="application/json",
)
def timed_export(filename, callback, **kwargs):
start = datetime.datetime.now()
obj = callback()
apply_ping_info(obj, **kwargs)
end = datetime.datetime.now()
elapsed = end - start
obj["phaseTime"] = elapsed.total_seconds()
export(filename, obj, **kwargs)
export_time = datetime.datetime.now() - end
print("Computed {0} in {1} seconds.".format(filename, elapsed.total_seconds()))
print("Exported {0} in {1} seconds.".format(filename, export_time.total_seconds()))
# Profiler for debugging.
class Prof(object):
def __init__(self, name):
self.name = name
def __enter__(self):
self.sout("Starting {0}... ".format(self.name))
self.start = datetime.datetime.now()
return None
def __exit__(self, type, value, traceback):
self.end = datetime.datetime.now()
self.sout(
"... {0}: {1}s".format(self.name, (self.end - self.start).total_seconds())
)
def sout(self, s):
sys.stdout.write(s)
sys.stdout.write("\n")
sys.stdout.flush()
def quiet_logs(sc):
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org").setLevel(logger.Level.ERROR)
logger.LogManager.getLogger("akka").setLevel(logger.Level.ERROR)
quiet_logs(sc)
# Get a general ping sample across all Firefox channels.
with Prof("General pings"):
general_pings, general_ping_info = fetch_and_format()
general_pings = general_pings.cache()
# Windows gets some preferential breakdown treatment.
windows_pings = general_pings.filter(lambda p: p["OSName"] == "Windows")
windows_pings = windows_pings.cache()
mac_pings = general_pings.filter(lambda p: p["OSName"] == "Darwin")
mac_pings = repartition(mac_pings)
linux_pings = general_pings.filter(lambda p: p["OSName"] == "Linux")
linux_pings = repartition(linux_pings)
# # ANALYSES ARE BELOW
# ## General statistics
# Results by operating system.
if "__share" not in general_ping_info:
general_ping_info["__share"] = map_x_to_count(general_pings, "FxVersion")
def get_general_statistics_for_subset(subset, windows_subset):
os_share = map_x_to_count(subset, "OSName")
# Results by Windows version.
windows_share = map_x_to_count(windows_subset, "OSVersion")
# Top-level stats.
vendor_share = map_x_to_count(subset, "vendorID")
return {"os": os_share, "windows": windows_share, "vendors": vendor_share}
def get_general_statistics():
obj = {}
obj["devices"] = map_x_to_count(general_pings, "deviceID")
obj["drivers"] = map_x_to_count(general_pings, "driverVersion")
by_fx = {}
with Prof("general stats for all"):
by_fx["all"] = get_general_statistics_for_subset(general_pings, windows_pings)
for key in general_ping_info["__share"]:
subset = general_pings.filter(lambda p: p["FxVersion"] == key)
windows = subset.filter(lambda p: p["OSName"] == "Windows")
subset = repartition(subset)
windows = repartition(windows)
with Prof("general stats for " + key):
by_fx[key] = get_general_statistics_for_subset(subset, windows)
obj["byFx"] = by_fx
return obj
timed_export(
filename="general-statistics",
callback=get_general_statistics,
pings=(general_pings, general_ping_info),
)
# ## Device/driver search database
def get_driver_statistics():
obj = {"deviceAndDriver": map_x_to_count(general_pings, "deviceAndDriver")}
return obj
timed_export(
filename="device-statistics",
callback=get_driver_statistics,
save_history=False, # No demand yet, and too much data.
pings=(general_pings, general_ping_info),
)
# ## TDR Statistics
#############################
# Perform the TDR analysis. #
#############################
def get_tdr_statistics():
num_tdr_reasons = 8
def ping_has_tdr_for(p, reason):
return p[DeviceResetReasonKey][reason] > 0
# Specialized version of map_x_to_y, for TDRs. We cast to int because for
# some reason the values Spark returns do not serialize with JSON.
def map_reason_to_vendor(p, reason, dest_key):
return (int(reason), {p[dest_key]: int(p[DeviceResetReasonKey][reason])})
def map_vendor_to_reason(p, reason, dest_key):
return (p[dest_key], {int(reason): int(p[DeviceResetReasonKey][reason])})
# Filter out pings that do not have any TDR data. We expect this to be a huge reduction
# in the sample set, and the resulting partition count gets way off. We repartition
# immediately for performance.
tdr_subset = windows_pings.filter(
lambda p: p.get(DeviceResetReasonKey, None) is not None
)
tdr_subset = tdr_subset.repartition(MaxPartitions)
tdr_subset = tdr_subset.cache()
# Aggregate the device reset data.
tdr_results = tdr_subset.map(lambda p: p[DeviceResetReasonKey]).reduce(
lambda x, y: x + y
)
# For each TDR reason, get a list tuple of (reason, vendor => resetCount). Then
# we combine these into a single series.
reason_to_vendor_tuples = None
vendor_to_reason_tuples = None
for reason in range(1, num_tdr_reasons):
subset = tdr_subset.filter(lambda p: ping_has_tdr_for(p, reason))
subset = subset.cache()
tuples = subset.map(lambda p: map_reason_to_vendor(p, reason, "vendorID"))
reason_to_vendor_tuples = union_pipelines(reason_to_vendor_tuples, tuples)
tuples = subset.map(lambda p: map_vendor_to_reason(p, reason, "vendorID"))
vendor_to_reason_tuples = union_pipelines(vendor_to_reason_tuples, tuples)
tdr_reason_to_vendor = reason_to_vendor_tuples.reduceByKey(combiner, MaxPartitions)
tdr_vendor_to_reason = vendor_to_reason_tuples.reduceByKey(combiner, MaxPartitions)
return {
"tdrPings": tdr_subset.count(),
"results": [int(value) for value in tdr_results],
"reasonToVendor": tdr_reason_to_vendor.collect(),
"vendorToReason": tdr_vendor_to_reason.collect(),
}
# Write TDR statistics.
timed_export(
filename="tdr-statistics",
callback=get_tdr_statistics,
pings=(windows_pings, general_ping_info),
)
# ## System Statistics
##########################
# Get system statistics. #
##########################
CpuKey = "environment/system/cpu"
MemoryKey = "environment/system/memoryMB"
def get_bucketed_memory(pings):
def get_bucket(p):
x = int(p / 1000)
if x < 1:
return "less_1gb"
if x <= 4:
return x
if x <= 8:
return "4_to_8"
if x <= 16:
return "8_to_16"
if x <= 32:
return "16_to_32"
return "more_32"
memory_rdd = pings.map(lambda p: p.get(MemoryKey, 0))
memory_rdd = memory_rdd.filter(lambda p: p > 0)
memory_rdd = memory_rdd.map(lambda p: (get_bucket(p),))
memory_rdd = repartition(memory_rdd)
return memory_rdd.countByKey()
def get_cpu_features(pings):
cpuid_rdd = pings.map(lambda p: p.get(CpuKey, None))
cpuid_rdd = cpuid_rdd.filter(lambda p: p is not None)
cpuid_rdd = cpuid_rdd.map(lambda p: p.get("extensions", None))
# Unfortunately, Firefox 39 had a bug where CPU features could be reported even
# if they weren't present. To detect this we filter pings that have ARMv6 support
# on x86/64.
cpuid_rdd = cpuid_rdd.filter(lambda p: p is not None and "hasARMv6" not in p)
cpuid_rdd = repartition(cpuid_rdd)
# Count before we blow up the list.
with Prof("cpu count for x86"):
total = cpuid_rdd.count()
cpuid_rdd = cpuid_rdd.flatMap(lambda p: [(ex, 1) for ex in p])
with Prof("cpu features for x86"):
feature_map = cpuid_rdd.countByKey()
return {"total": total, "features": feature_map}
def get_system_statistics():
def get_logical_cores(p):
cpu = p.get(CpuKey, None)
if cpu is None:
return "unknown"
return cpu.get("count", "unknown")
with Prof("logical cores"):
logical_cores = general_pings.map(
lambda p: (get_logical_cores(p),)
).countByKey()
cpu_features = get_cpu_features(general_pings)
with Prof("memory buckets"):
memory = get_bucketed_memory(general_pings)
def get_os_bits(p):
arch = p.get(ArchKey, "unknown")
if arch == "x86-64":
return "64"
if arch == "x86":
wow64 = p.get("environment/system/isWow64", False)
if wow64:
return "32_on_64"
return "32"
return "unknown"
with Prof("OS bit count"):
os_bits = windows_pings.map(lambda p: (get_os_bits(p),)).countByKey()
return {
"logical_cores": logical_cores,
"x86": cpu_features,
"memory": memory,
"wow": os_bits,
}
timed_export(
filename="system-statistics",
callback=get_system_statistics,
pings=(general_pings, general_ping_info),
)
# ## Sanity Test Statistics
# Set up constants.
SANITY_TEST_PASSED = 0
SANITY_TEST_FAILED_RENDER = 1
SANITY_TEST_FAILED_VIDEO = 2
SANITY_TEST_CRASHED = 3
SANITY_TEST_TIMEDOUT = 4
SANITY_TEST_LAST_VALUE = 5
# We don't want to fold FAILED_LAYERS and FAILED_VIDEO into the same
# resultset, so we use this function to split them out.
def get_sanity_test_result(p):
if p.get(SANITY_TEST, None) is None:
return None
if p[SANITY_TEST][SANITY_TEST_PASSED] > 0:
return SANITY_TEST_PASSED
if p[SANITY_TEST][SANITY_TEST_CRASHED] > 0:
return SANITY_TEST_CRASHED
if p[SANITY_TEST][SANITY_TEST_FAILED_RENDER] > 0:
return SANITY_TEST_FAILED_RENDER
if p[SANITY_TEST][SANITY_TEST_FAILED_VIDEO] > 0:
return SANITY_TEST_FAILED_VIDEO
if p[SANITY_TEST][SANITY_TEST_TIMEDOUT] > 0:
return SANITY_TEST_TIMEDOUT
return None
#########################
# Sanity test analysis. #
#########################
def get_sanity_tests_for_slice(sanity_test_pings):
data = sanity_test_pings.filter(lambda p: get_sanity_test_result(p) is not None)
# Aggregate the sanity test data.
with Prof("initial map"):
sanity_test_results = data.map(
lambda p: (get_sanity_test_result(p),)
).countByKey()
with Prof("share resolve"):
os_share = map_x_to_count(data, "OSVersion")
with Prof("ping_count"):
sanity_test_count = data.count()
ping_count = sanity_test_pings.count()
sanity_test_by_vendor = None
sanity_test_by_os = None
sanity_test_by_device = None
sanity_test_by_driver = None
for value in range(SANITY_TEST_FAILED_RENDER, SANITY_TEST_LAST_VALUE):
subset = data.filter(lambda p: get_sanity_test_result(p) == value)
tuples = subset.map(
lambda p: (value, {p["vendorID"]: int(p[SANITY_TEST][value])})
)
sanity_test_by_vendor = union_pipelines(sanity_test_by_vendor, tuples)
tuples = subset.map(lambda p: (value, {p["OS"]: int(p[SANITY_TEST][value])}))
sanity_test_by_os = union_pipelines(sanity_test_by_os, tuples)
tuples = subset.map(
lambda p: (value, {p["deviceID"]: int(p[SANITY_TEST][value])})
)
sanity_test_by_device = union_pipelines(sanity_test_by_device, tuples)
tuples = subset.map(
lambda p: (value, {p["driverVersion"]: int(p[SANITY_TEST][value])})
)
sanity_test_by_driver = union_pipelines(sanity_test_by_driver, tuples)
sanity_test_by_vendor = repartition(sanity_test_by_vendor)
sanity_test_by_os = repartition(sanity_test_by_os)
sanity_test_by_device = repartition(sanity_test_by_device)
sanity_test_by_driver = repartition(sanity_test_by_driver)
with Prof("vendor resolve"):
sanity_test_by_vendor = sanity_test_by_vendor.reduceByKey(combiner)
with Prof("os resolve"):
sanity_test_by_os = sanity_test_by_os.reduceByKey(combiner)
with Prof("device resolve"):
sanity_test_by_device = sanity_test_by_device.reduceByKey(combiner)
with Prof("driver resolve"):
sanity_test_by_driver = sanity_test_by_driver.reduceByKey(combiner)
print(
"Partitions: {0},{1},{2},{3}".format(
sanity_test_by_vendor.getNumPartitions(),
sanity_test_by_os.getNumPartitions(),
sanity_test_by_device.getNumPartitions(),
sanity_test_by_driver.getNumPartitions(),
)
)
with Prof("vendor collect"):
by_vendor = sanity_test_by_vendor.collect()
with Prof("os collect"):
by_os = sanity_test_by_os.collect()
with Prof("device collect"):
by_device = sanity_test_by_device.collect()
with Prof("driver collect"):
by_driver = sanity_test_by_driver.collect()
return {
"sanityTestPings": sanity_test_count,
"totalPings": ping_count,
"results": sanity_test_results,
"byVendor": by_vendor,
"byOS": by_os,
"byDevice": coalesce_to_n_items(by_device, 10),
"byDriver": coalesce_to_n_items(by_driver, 10),
"windows": os_share,
}
def get_sanity_tests():
obj = {}
obj["windows"] = get_sanity_tests_for_slice(windows_pings)
return obj
# Write Sanity Test statistics.
timed_export(
filename="sanity-test-statistics",
callback=get_sanity_tests,
pings=(windows_pings, general_ping_info),
)
# ## Startup Crash Guard Statistics
STARTUP_OK = 0
STARTUP_ENV_CHANGED = 1
STARTUP_CRASHED = 2
STARTUP_ACCEL_DISABLED = 3
def get_startup_tests():
startup_test_pings = general_pings.filter(
lambda p: p.get(STARTUP_TEST_KEY, None) is not None
)
startup_test_pings = startup_test_pings.repartition(MaxPartitions)
startup_test_pings = startup_test_pings.cache()
startup_test_results = startup_test_pings.map(lambda p: p[STARTUP_TEST_KEY]).reduce(
lambda x, y: x + y
)
os_share = map_x_to_count(startup_test_pings, "OS")
return {
"startupTestPings": startup_test_pings.count(),
"results": [int(i) for i in startup_test_results],
"windows": os_share,
}
# Write startup test results.
timed_export(
filename="startup-test-statistics",
callback=get_startup_tests,
pings=(general_pings, general_ping_info),
)
# ## Monitor Statistics
def get_monitor_count(p):
monitors = p.get(MonitorsKey, None)
try:
return len(monitors)
except TypeError:
return 0
def get_monitor_res(p, i):
width = p[MonitorsKey][i].get("screenWidth", 0)
height = p[MonitorsKey][i].get("screenHeight", 0)
if width == 0 or height == 0:
return "Unknown"
return "{0}x{1}".format(width, height)
def get_monitor_statistics():
def get_monitor_rdds_for_index(data, i):
def get_refresh_rate(p):
refresh_rate = p[MonitorsKey][i].get("refreshRate", 0)
return refresh_rate if refresh_rate > 1 else "Unknown"
def get_resolution(p):
return get_monitor_res(p, i)
monitors_at_index = data.filter(lambda p: get_monitor_count(p) == monitor_count)
monitors_at_index = repartition(monitors_at_index)
refresh_rates = monitors_at_index.map(lambda p: (get_refresh_rate(p),))
resolutions = monitors_at_index.map(lambda p: (get_resolution(p),))
return refresh_rates, resolutions
monitor_counts = windows_pings.map(lambda p: (get_monitor_count(p),)).countByKey()
monitor_counts.pop(0, None)
refresh_rates = None
resolutions = None
for monitor_count in monitor_counts:
rate_subset, res_subset = get_monitor_rdds_for_index(
windows_pings, monitor_count - 1
)
refresh_rates = union_pipelines(refresh_rates, rate_subset)
resolutions = union_pipelines(resolutions, res_subset)
monitor_refresh_rates = refresh_rates.countByKey()
monitor_resolutions = resolutions.countByKey()
return {
"counts": monitor_counts,
"refreshRates": monitor_refresh_rates,
"resolutions": monitor_resolutions,
}
timed_export(
filename="monitor-statistics",
callback=get_monitor_statistics,
pings=(windows_pings, general_ping_info),
)
# ## Mac OS X Statistics
mac_pings = general_pings.filter(lambda p: p["OSName"] == "Darwin")
mac_pings = repartition(mac_pings)
def get_mac_statistics():
version_map = map_x_to_count(mac_pings, "OSVersion")
def get_scale(p):
monitors = p.get(MonitorsKey, None)
if not monitors:
return "unknown"
try:
return monitors[0]["scale"]
except (KeyError, IndexError):
"unknown"
scale_map = mac_pings.map(lambda p: (get_scale(p),)).countByKey()
def get_arch(p):
arch = p.get(ArchKey, "unknown")
if arch == "x86-64":
return "64"
if arch == "x86":
return "32"
return "unknown"
arch_map = mac_pings.map(lambda p: (get_arch(p),)).countByKey()
return {"versions": version_map, "retina": scale_map, "arch": arch_map}
timed_export(
filename="mac-statistics",
callback=get_mac_statistics,
pings=(mac_pings, general_ping_info),
)
# ### Helpers for Compositor/Acceleration fields
# Build graphics feature statistics.
def get_compositor(p):
compositor = p[FeaturesKey].get("compositor", "none")
if compositor == "none":
user_prefs = p.get(UserPrefsKey, None)
if user_prefs is not None:
omtc = user_prefs.get("layers.offmainthreadcomposition.enabled", True)
if not omtc:
return "disabled"
elif compositor == "d3d11":
if advanced_layers_status(p) == "available":
return "advanced_layers"
return compositor
def get_d3d11_status(p):
d3d11 = p[FeaturesKey].get("d3d11", None)
if not hasattr(d3d11, "__getitem__"):
return "unknown"
status = d3d11.get("status", "unknown")
if status != "available":
return status
if d3d11.get("warp", False):
return "warp"
return d3d11.get("version", "unknown")
def get_warp_status(p):
if "blacklisted" not in p[FeaturesKey]["d3d11"]:
return "unknown"
if p[FeaturesKey]["d3d11"]["blacklisted"]:
return "blacklist"
return "device failure"
def get_d2d_status(p):
d2d = p[FeaturesKey].get("d2d", None)
if not hasattr(d2d, "__getitem__"):
return ("unknown",)
status = d2d.get("status", "unknown")
if status != "available":
return (status,)
return (d2d.get("version", "unknown"),)
def has_working_d3d11(p):
d3d11 = p[FeaturesKey].get("d3d11", None)
if d3d11 is None:
return False
return d3d11.get("status") == "available"
def gpu_process_status(p):
gpu_proc = p[FeaturesKey].get("gpuProcess", None)
if gpu_proc is None or not gpu_proc.get("status", None):
return "none"
return gpu_proc.get("status")
def get_texture_sharing_status(p):
return (p[FeaturesKey]["d3d11"].get("textureSharing", "unknown"),)
def advanced_layers_status(p):
al = p[FeaturesKey].get("advancedLayers", None)
if al is None:
return "none"
return al.get("status", None)
# ## Windows Compositor and Blacklisting Statistics
# Get pings with graphics features. This landed in roughly the 7-19-2015 nightly.
def windows_feature_filter(p):
return p["OSName"] == "Windows" and p.get(FeaturesKey) is not None
windows_features = windows_pings.filter(lambda p: p.get(FeaturesKey) is not None)
windows_features = windows_features.cache()
# We skip certain windows versions in detail lists since this phase is
# very expensive to compute.
important_windows_versions = ("6.1.0", "6.1.1", "6.2.0", "6.3.0", "10.0.0")
def get_windows_features():
windows_compositor_map = windows_features.map(
lambda p: (get_compositor(p),)
).countByKey()
d3d11_status_map = windows_features.map(
lambda p: (get_d3d11_status(p),)
).countByKey()
d2d_status_map = windows_features.map(get_d2d_status).countByKey()
def get_content_backends(rdd):
rdd = rdd.filter(lambda p: p[GfxKey].get("ContentBackend") is not None)
rdd = rdd.map(lambda p: (p[GfxKey]["ContentBackend"],))
return rdd.countByKey()
content_backends = get_content_backends(windows_features)
warp_pings = windows_features.filter(lambda p: get_d3d11_status(p) == "warp")
warp_pings = repartition(warp_pings)
warp_status_map = warp_pings.map(lambda p: (get_warp_status(p),)).countByKey()
texture_sharing_map = (
windows_features.filter(has_working_d3d11)
.map(get_texture_sharing_status)
.countByKey()
)
blacklisted_pings = windows_features.filter(
lambda p: get_d3d11_status(p) == "blacklisted"
)
blacklisted_pings = repartition(blacklisted_pings)
blacklisted_devices = map_x_to_count(blacklisted_pings, "deviceID")
blacklisted_drivers = map_x_to_count(blacklisted_pings, "driverVersion")
blacklisted_os = map_x_to_count(blacklisted_pings, "OSVersion")
blocked_pings = windows_features.filter(lambda p: get_d3d11_status(p) == "blocked")
blocked_pings = repartition(blocked_pings)
blocked_vendors = map_x_to_count(blocked_pings, "vendorID")
# Media decoder backends.
def get_media_decoders(rdd):
rdd = rdd.filter(lambda p: p.get(MediaDecoderKey, None) is not None)
if rdd.count() == 0:
# These three values correspond to WMF Software, DXVA D3D11, and DXVA D3D9
return [0, 0, 0]
decoders = rdd.map(lambda p: p.get(MediaDecoderKey)).reduce(lambda x, y: x + y)
return [int(i) for i in decoders]
media_decoders = get_media_decoders(windows_features)
def gpu_process_map(rdd):
return rdd.map(lambda p: (gpu_process_status(p),)).countByKey()
def advanced_layers_map(rdd):
return rdd.map(lambda p: (advanced_layers_status(p),)).countByKey()
# Now, build the same data except per version.
feature_pings_by_os = map_x_to_count(windows_features, "OSVersion")
windows_features_by_version = {}
for os_version in feature_pings_by_os:
if os_version not in important_windows_versions:
continue
subset = windows_features.filter(lambda p: p["OSVersion"] == os_version)
subset = repartition(subset)
results = {
"count": subset.count(),
"compositors": subset.map(lambda p: (get_compositor(p),)).countByKey(),
# Setting to empty list due to histogram deprecation.
# For more info see: https://bugzilla.mozilla.org/show_bug.cgi?id=1914369
"plugin_models": [],
"content_backends": get_content_backends(subset),
"media_decoders": get_media_decoders(subset),
"gpu_process": gpu_process_map(subset),
"advanced_layers": advanced_layers_map(subset),
}
try:
if int(os_version.split(".")[0]) >= 6:
results["d3d11"] = subset.map(
lambda p: (get_d3d11_status(p),)
).countByKey()
results["d2d"] = subset.map(get_d2d_status).countByKey()
warp_pings = subset.filter(lambda p: get_d3d11_status(p) == "warp")
results["warp"] = warp_pings.map(
lambda p: (get_warp_status(p),)
).countByKey()
except Exception:
pass
finally:
# Free resources.
warp_pings = None
subset = None
windows_features_by_version[os_version] = results
return {
"all": {
"compositors": windows_compositor_map,
"content_backends": content_backends,
"d3d11": d3d11_status_map,
"d2d": d2d_status_map,
"textureSharing": texture_sharing_map,
"warp": warp_status_map,
# Setting to empty list due to histogram deprecation.
# For more info see: https://bugzilla.mozilla.org/show_bug.cgi?id=1914369
"plugin_models": [],
"media_decoders": media_decoders,
"gpu_process": gpu_process_map(windows_features),
"advanced_layers": advanced_layers_map(windows_features),
},
"byVersion": windows_features_by_version,
"d3d11_blacklist": {
"devices": blacklisted_devices,
"drivers": blacklisted_drivers,
"os": blacklisted_os,
},
"d3d11_blocked": {"vendors": blocked_vendors},
}
timed_export(
filename="windows-features",
callback=get_windows_features,
pings=(windows_features, general_ping_info),
)
windows_features = None
# ## Linux
linux_pings = general_pings.filter(lambda p: p["OSName"] == "Linux")
linux_pings = repartition(linux_pings)
def get_linux_statistics():
pings = linux_pings.filter(lambda p: p["driverVendor"] is not None)
driver_vendor_map = map_x_to_count(pings, "driverVendor")
pings = linux_pings.filter(lambda p: p.get(FeaturesKey) is not None)
compositor_map = pings.map(lambda p: (get_compositor(p),)).countByKey()
return {"driverVendors": driver_vendor_map, "compositors": compositor_map}
timed_export(
filename="linux-statistics",
callback=get_linux_statistics,
pings=(linux_pings, general_ping_info),
)
# ## WebGL Statistics
# Note, this depends on running the
# "Helpers for Compositor/Acceleration fields" a few blocks above.
def get_gl_statistics():
webgl_status_rdd = general_pings.filter(
lambda p: p.get(WebGLFailureKey, None) is not None
)
webgl_status_rdd = webgl_status_rdd.map(lambda p: p[WebGLFailureKey])
webgl_status_map = webgl_status_rdd.reduce(combiner)
webgl_accl_status_rdd = general_pings.filter(
lambda p: p.get(WebGLAcclFailureKey, None) is not None
)
webgl_accl_status_rdd = webgl_accl_status_rdd.map(lambda p: p[WebGLAcclFailureKey])
webgl_accl_status_map = webgl_accl_status_rdd.reduce(combiner)
return {
"webgl": {
"acceleration_status": webgl_accl_status_map,
"status": webgl_status_map,
}
}
def web_gl_statistics_for_key(key):
histogram_pings = general_pings.filter(lambda p: p.get(key) is not None)
histogram_pings = repartition(histogram_pings)
# Note - we're counting sessions where WebGL succeeded or failed,
# rather than the raw number of times either succeeded or failed.
# Also note that we don't double-count systems where both is true.
# Instead we only count a session's successes if it had no failures.
failure_rdd = histogram_pings.filter(lambda p: p[key][0] > 0)
success_rdd = histogram_pings.filter(lambda p: p[key][0] == 0 and p[key][1] > 0)
failure_count = failure_rdd.count()
failure_by_os = map_x_to_count(failure_rdd, "OS")
failure_by_vendor = map_x_to_count(failure_rdd, "vendorID")
failure_by_device = map_x_to_count(failure_rdd, "deviceID")
failure_by_driver = map_x_to_count(failure_rdd, "driverVersion")
success_count = success_rdd.count()
success_by_os = map_x_to_count(success_rdd, "OS")
def get_compositor_any_os(p):
if p["OSName"] != "Windows":
# This data is not reliable yet - see bug 1247148.
return "unknown"
return get_compositor(p)
success_by_cc = success_rdd.map(lambda p: (get_compositor_any_os(p),)).countByKey()
return {
"successes": {
"count": success_count,
"os": success_by_os,
"compositors": success_by_cc,
},
"failures": {
"count": failure_count,
"os": failure_by_os,
"vendors": failure_by_vendor,
"devices": failure_by_device,
"drivers": failure_by_driver,
},
}
def get_web_gl_statistics():
return {
"webgl1": web_gl_statistics_for_key(WebGLSuccessKey),
"webgl2": web_gl_statistics_for_key(WebGL2SuccessKey),
"general": get_gl_statistics(),
}
timed_export(
filename="webgl-statistics",
callback=get_web_gl_statistics,
pings=(general_pings, general_ping_info),
)
end_time = datetime.datetime.now()
total_elapsed = (end_time - start_time).total_seconds()
print("Total time: {0}".format(total_elapsed))