in mozetl/bhr_collection/bhr_collection.py [0:0]
def get_data(sc, sql_context, config, date, end_date=None):
sql_context.sql("set spark.sql.shuffle.partitions={}".format(sc.defaultParallelism))
if end_date is None:
end_date = date
submission_start_str = date - timedelta(days=5)
submission_end_str = end_date + timedelta(days=5)
date_str = date.strftime("%Y%m%d")
end_date_str = end_date.strftime("%Y%m%d")
max_sample_slices = 10000
sample_slices = int(
max(min(max_sample_slices * config["sample_size"], max_sample_slices), 1)
)
sql = f"""
SELECT
environment,
application,
payload,
FROM
`moz-fx-data-shared-prod.telemetry_stable.bhr_v4`
WHERE
-- Use document_id to sample
ABS(MOD(FARM_FINGERPRINT(document_id), {max_sample_slices})) < {sample_slices}
AND submission_timestamp BETWEEN '{submission_start_str}' AND '{submission_end_str}'
"""
pings_df = (
spark.read.format("bigquery")
.option("query", sql)
.option("viewsEnabled", True)
# sql results need to be saved to a table
.option("materializationProject", "moz-fx-data-shared-prod")
.option("materializationDataset", "tmp")
.load()
)
if config["exclude_modules"]:
properties = [
"environment/system/os/name",
"environment/system/os/version",
"application/architecture",
"application/build_id",
"payload/hangs",
"payload/time_since_last_ping",
]
else:
properties = [
"environment/system/os/name",
"environment/system/os/version",
"application/architecture",
"application/build_id",
"payload/modules",
"payload/hangs",
"payload/time_since_last_ping",
]
print("%d results total" % pings_df.rdd.count())
mapped = pings_df.rdd.map(lambda p: get_ping_properties(p, properties)).filter(
lambda p: properties_are_not_none(p, properties)
)
try:
result = mapped.filter(
lambda p: p["application/build_id"][:8] >= date_str
and p["application/build_id"][:8] <= end_date_str
)
print("%d results after first filter" % result.count())
return result
except ValueError:
return None