in mozetl/taar/taar_dynamo.py [0:0]
def extract_transform(spark, run_date, sample_rate=0):
currentDate = run_date
currentDateString = currentDate.strftime("%Y%m%d")
print("Processing %s" % currentDateString)
# Get the data for the desired date out of parquet
template = "s3://telemetry-parquet/main_summary/v4/submission_date_s3=%s"
datasetForDate = spark.read.parquet(template % currentDateString)
if sample_rate is not None and sample_rate != 0:
print("Sample rate set to %0.9f" % sample_rate)
datasetForDate = datasetForDate.sample(False, sample_rate)
else:
print("No sampling on dataset")
print("Parquet data loaded")
# Get the most recent (client_id, subsession_start_date) tuple
# for each client since the main_summary might contain
# multiple rows per client. We will use it to filter out the
# full table with all the columns we require.
clientShortList = datasetForDate.select(
"client_id",
"subsession_start_date",
row_number()
.over(Window.partitionBy("client_id").orderBy(desc("subsession_start_date")))
.alias("clientid_rank"),
)
print("clientShortList selected")
clientShortList = clientShortList.where("clientid_rank == 1").drop("clientid_rank")
print("clientShortList selected")
select_fields = [
"client_id",
"subsession_start_date",
"subsession_length",
"city",
"locale",
"os",
"places_bookmarks_count",
"scalar_parent_browser_engagement_tab_open_event_count",
"scalar_parent_browser_engagement_total_uri_count",
"scalar_parent_browser_engagement_unique_domains_count",
"active_addons",
"disabled_addons_ids",
]
dataSubset = datasetForDate.select(*select_fields)
print("datasetForDate select fields completed")
# Join the two tables: only the elements in both dataframes
# will make it through.
clientsData = dataSubset.join(
clientShortList, ["client_id", "subsession_start_date"]
)
print("clientsData join with client_id and subsession_start_date")
# Convert the DataFrame to JSON and get an RDD out of it.
subset = clientsData.select("client_id", "subsession_start_date")
print("clientsData select of client_id and subsession_start_date completed")
jsonDataRDD = clientsData.select(
"city",
"subsession_start_date",
"subsession_length",
"locale",
"os",
"places_bookmarks_count",
"scalar_parent_browser_engagement_tab_open_event_count",
"scalar_parent_browser_engagement_total_uri_count",
"scalar_parent_browser_engagement_unique_domains_count",
"active_addons",
"disabled_addons_ids",
).toJSON()
print("jsonDataRDD selected")
rdd = subset.rdd.zip(jsonDataRDD)
print("subset rdd has been zipped")
# Filter out any records with invalid dates or client_id
filtered_rdd = rdd.filter(filterDateAndClientID)
print("rdd filtered by date and client_id")
# Transform the JSON elements into a 4-tuple as per docstring
merged_filtered_rdd = filtered_rdd.map(list_transformer)
print("rdd has been transformed into tuples")
return merged_filtered_rdd