mozetl/shield/privacy_prefs.py (54 lines of code) (raw):

# coding: utf-8 # Started from mashing up: # https://gist.githubusercontent.com/ilanasegall/b3ce1aa0d3cc8c117a35b4a4fb9d4681/raw/c8a96e823cd56072e896e4c2d94c496306b59c8c/blok_df.py # with: # https://github.com/mozilla/python_mozetl/blob/689afa3d23229ca717422314c5a56abd83a85a0d/mozetl/testpilot/containers.py from datetime import date, timedelta from pyspark.sql.types import StringType from moztelemetry.dataset import Dataset from ..basic import convert_pings, DataFrameConfig SHIELD_ADDON_ID = "@shield-study-privacy" # Note that the JSON paths in this config are unusual for Shield pings. # See https://bugzil.la/1387236 for more details and discussion COMMON_COLUMN_CONFIGS = [ ("client_id", "clientId", None, StringType()), ("branch", "payload/branch", None, StringType()), ("study_state", "payload/study_state", None, StringType()), ("event", "payload/event", None, StringType()), ("originDomain", "payload/originDomain", None, StringType()), ("breakage", "payload/breakage", None, StringType()), ("notes", "payload/notes", None, StringType()), ] STUDY_STATE_DATAFRAME_COLUMN_CONFIGS = COMMON_COLUMN_CONFIGS + [ ("study", "payload/study_name", None, StringType()) ] STUDY_EVENT_DATAFRAME_COLUMN_CONFIGS = COMMON_COLUMN_CONFIGS + [ ("study", "payload/study", None, StringType()) ] def transform_state_pings(sqlContext, pings): return convert_pings( sqlContext, pings, DataFrameConfig(STUDY_STATE_DATAFRAME_COLUMN_CONFIGS, include_state_pings), ) def transform_event_pings(sqlContext, pings): return convert_pings( sqlContext, pings, DataFrameConfig(STUDY_EVENT_DATAFRAME_COLUMN_CONFIGS, include_event_pings), ) def include_event_pings(ping): return ping["payload/study"] == SHIELD_ADDON_ID def include_state_pings(ping): return ping["payload/study_name"] == SHIELD_ADDON_ID def etl_job(sc, sqlContext, submission_date=None, save=True): s3_path = "s3n://telemetry-parquet/harter/privacy_prefs_shield/v2" if submission_date is None: submission_date = (date.today() - timedelta(1)).strftime("%Y%m%d") pings = ( Dataset.from_source("telemetry") .where( docType="shield-study", submissionDate=submission_date, appName="Firefox" ) .records(sc) ) transformed_event_pings = transform_event_pings(sqlContext, pings) transformed_state_pings = transform_state_pings(sqlContext, pings) transformed_pings = transformed_event_pings.union(transformed_state_pings) if save: path = s3_path + "/submission_date={}".format(submission_date) transformed_pings.repartition(1).write.mode("overwrite").parquet(path) return transformed_pings