in jobs/update_orphaning_dashboard_etl.py [0:0]
def longitudinal_shim_transform(project, dataset, table):
"""Transform preaggregated dataset to match legacy Longitudinal schema and make it available as
`longitudinal_shim` in SQL context.
"""
# BigQuery Storage API can't handle the schema and queries (`out_of_date_details_sql`) we have here
# As a workaround, we'll load AVRO-exported dump to Spark
print("Exporting BigQuery table as AVRO...")
destination_uri = GCS_TABLE_DUMP_PATH + "*"
job_config = bigquery.ExtractJobConfig()
job_config.destination_format = "AVRO"
bq = bigquery.Client()
table_ref = bq.dataset(dataset, project=project).table(table)
extract_job = bq.extract_table(
table_ref,
destination_uri,
location="US",
job_config=job_config
)
extract_job.result() # Waits for job to complete.
print(
"Exported {} to {}".format(table_ref, destination_uri)
)
longitudinal_shim_raw_from_avro=spark.read.format("avro").load(GCS_TABLE_DUMP_PATH)
def flatten_settings_update(settings):
channel_list = list(map(lambda x: x[0], settings["update"]["channel"]))
enabled_list = list(map(lambda x: x[0], settings["update"]["enabled"]))
return {"update": {"channel": channel_list, "enabled": enabled_list}}
return_struct_type = StructType([StructField("update", StructType([StructField("channel", ArrayType(StringType())), StructField("enabled", ArrayType(BooleanType()))]))])
udf_flatten_settings_update = udf(flatten_settings_update, return_struct_type)
def fix_schema(raw_longitudinal_df):
with_settings_fixed = raw_longitudinal_df.withColumn("settings_fixed", udf_flatten_settings_update("settings")).drop("settings").withColumnRenamed("settings_fixed", "settings")
return with_settings_fixed
longitudinal_shim_raw_from_avro_fixed = fix_schema(longitudinal_shim_raw_from_avro)
def merge_keyed_count_histograms(histograms, histogram_name):
res = {}
n_hist = len(histograms)
for i, histogram_struct in enumerate(histograms):
histogram_array = histogram_struct[histogram_name]
if histogram_array:
for key, count_histogram_string in histogram_array:
if key not in res:
res[key] = [0]*n_hist
res[key][i]=json.loads(count_histogram_string)['values'].get('0', 0)
return res
merge_keyed_count_histograms_udf = F.udf(merge_keyed_count_histograms, MapType(StringType(), ArrayType(IntegerType())))
def merge_keyed_count_histogram_col(df, col_name):
return df.withColumn(col_name+"_merged", merge_keyed_count_histograms_udf(col_name, F.lit(col_name))).drop(col_name).withColumnRenamed(col_name+"_merged", col_name)
def merge_enumerated_histograms(histograms, histogram_name, n_values):
res = []
all_null = True # needed to maintain compatibility with Longitudinal
for histogram_string_struct in histograms:
compacted_histogram = [0]*(int(n_values)+1)
histogram_string = histogram_string_struct[histogram_name]
if histogram_string:
all_null = False
values = json.loads(histogram_string).get('values', {})
for key, value in values.items():
compacted_histogram[int(key)] = value
res.append(compacted_histogram)
if all_null:
res = None
return res
merge_enumerated_histograms_udf = F.udf(merge_enumerated_histograms, ArrayType(ArrayType(IntegerType())))
def merge_enumerated_histogram_col(df, col_name, n_values):
return df.withColumn(col_name+"_merged", merge_enumerated_histograms_udf(col_name, F.lit(col_name), F.lit(n_values))).drop(col_name).withColumnRenamed(col_name+"_merged", col_name)
def merge_enumerated_histogram_columns(df, cols_n_values):
for col_name, n_values in cols_n_values:
df = merge_enumerated_histogram_col(df, col_name, n_values)
return df
def merge_count_histograms(histograms, histogram_name):
res = []
all_null = True # needed to maintain compatibility with Longitudinal
for histogram_string_struct in histograms:
histogram_string = histogram_string_struct[histogram_name]
if histogram_string:
all_null = False
histogram_value = json.loads(histogram_string)['values'].get('0', 0)
res.append(histogram_value)
else:
res.append(0)
if all_null:
res = None
return res
merge_count_histograms_udf = F.udf(merge_count_histograms, ArrayType(IntegerType()))
def merge_count_histogram_columns(df, cols):
for col_name in cols:
df = df.withColumn(col_name+"_merged", merge_count_histograms_udf(col_name, F.lit(col_name))).drop(col_name).withColumnRenamed(col_name+"_merged", col_name)
return df
with_keyed_count_histograms_merged = merge_keyed_count_histogram_col(longitudinal_shim_raw_from_avro_fixed, "update_check_extended_error_notify")
with_enumerated_histograms_merged = merge_enumerated_histogram_columns(with_keyed_count_histograms_merged, [["update_check_code_notify", 50], ["update_download_code_partial", 50], ["update_download_code_complete", 50], ["update_state_code_partial_stage", 20], ["update_state_code_complete_stage", 20],["update_state_code_unknown_stage", 20],["update_state_code_partial_startup", 20],["update_state_code_complete_startup", 20],["update_state_code_unknown_startup", 20],["update_status_error_code_complete_startup", 100],["update_status_error_code_partial_startup", 100],["update_status_error_code_unknown_startup", 100],["update_status_error_code_complete_stage", 100],["update_status_error_code_partial_stage", 100],["update_status_error_code_unknown_stage", 100]])
with_count_histograms_merged = merge_count_histogram_columns(with_enumerated_histograms_merged, ["update_check_no_update_notify","update_not_pref_update_auto_notify","update_ping_count_notify","update_unable_to_apply_notify"])
longitudinal_shim_df = with_count_histograms_merged
longitudinal_shim_df.registerTempTable("longitudinal_shim")