def longitudinal_shim_transform()

in jobs/update_orphaning_dashboard_etl.py [0:0]


def longitudinal_shim_transform(project, dataset, table):
    """Transform preaggregated dataset to match legacy Longitudinal schema and make it available as
     `longitudinal_shim` in SQL context.
    """
    # BigQuery Storage API can't handle the schema and queries (`out_of_date_details_sql`) we have here
    # As a workaround, we'll load AVRO-exported dump to Spark
    print("Exporting BigQuery table as AVRO...")
    destination_uri = GCS_TABLE_DUMP_PATH + "*"

    job_config = bigquery.ExtractJobConfig()
    job_config.destination_format = "AVRO"

    bq = bigquery.Client()
    table_ref = bq.dataset(dataset, project=project).table(table)

    extract_job = bq.extract_table(
        table_ref,
        destination_uri,
        location="US",
        job_config=job_config
    )
    extract_job.result()  # Waits for job to complete.

    print(
        "Exported {} to {}".format(table_ref, destination_uri)
    )

    longitudinal_shim_raw_from_avro=spark.read.format("avro").load(GCS_TABLE_DUMP_PATH)

    def flatten_settings_update(settings):
        channel_list = list(map(lambda x: x[0], settings["update"]["channel"]))
        enabled_list = list(map(lambda x: x[0], settings["update"]["enabled"]))
        return {"update": {"channel": channel_list, "enabled": enabled_list}}

    return_struct_type = StructType([StructField("update", StructType([StructField("channel", ArrayType(StringType())), StructField("enabled", ArrayType(BooleanType()))]))])
    udf_flatten_settings_update = udf(flatten_settings_update, return_struct_type)

    def fix_schema(raw_longitudinal_df):
        with_settings_fixed = raw_longitudinal_df.withColumn("settings_fixed", udf_flatten_settings_update("settings")).drop("settings").withColumnRenamed("settings_fixed", "settings")
        return with_settings_fixed



    longitudinal_shim_raw_from_avro_fixed = fix_schema(longitudinal_shim_raw_from_avro)




    def merge_keyed_count_histograms(histograms, histogram_name):
        res = {}
        n_hist = len(histograms)

        for i, histogram_struct in enumerate(histograms):
            histogram_array = histogram_struct[histogram_name]
            if histogram_array:
                for key, count_histogram_string in histogram_array:
                    if key not in res:
                        res[key] = [0]*n_hist
                    res[key][i]=json.loads(count_histogram_string)['values'].get('0', 0)

        return res

    merge_keyed_count_histograms_udf = F.udf(merge_keyed_count_histograms, MapType(StringType(), ArrayType(IntegerType())))

    def merge_keyed_count_histogram_col(df, col_name):
        return df.withColumn(col_name+"_merged", merge_keyed_count_histograms_udf(col_name, F.lit(col_name))).drop(col_name).withColumnRenamed(col_name+"_merged", col_name)

    def merge_enumerated_histograms(histograms, histogram_name, n_values):
        res = []
        all_null = True # needed to maintain compatibility with Longitudinal
        for histogram_string_struct in histograms:
            compacted_histogram = [0]*(int(n_values)+1)
            histogram_string = histogram_string_struct[histogram_name]
            if histogram_string:
                all_null = False
                values = json.loads(histogram_string).get('values', {})
                for key, value in values.items():
                    compacted_histogram[int(key)] = value
            res.append(compacted_histogram)
        if all_null:
            res = None
        return res
    merge_enumerated_histograms_udf = F.udf(merge_enumerated_histograms, ArrayType(ArrayType(IntegerType())))
    def merge_enumerated_histogram_col(df, col_name, n_values):
        return df.withColumn(col_name+"_merged", merge_enumerated_histograms_udf(col_name, F.lit(col_name), F.lit(n_values))).drop(col_name).withColumnRenamed(col_name+"_merged", col_name)
    def merge_enumerated_histogram_columns(df, cols_n_values):
        for col_name, n_values in cols_n_values:
            df = merge_enumerated_histogram_col(df, col_name, n_values)
        return df


    def merge_count_histograms(histograms, histogram_name):
        res = []
        all_null = True # needed to maintain compatibility with Longitudinal
        for histogram_string_struct in histograms:
            histogram_string = histogram_string_struct[histogram_name]
            if histogram_string:
                all_null = False
                histogram_value = json.loads(histogram_string)['values'].get('0', 0)
                res.append(histogram_value)
            else:
                res.append(0)
        if all_null:
            res = None
        return res
    merge_count_histograms_udf = F.udf(merge_count_histograms, ArrayType(IntegerType()))
    def merge_count_histogram_columns(df, cols):
        for col_name in cols:
            df = df.withColumn(col_name+"_merged", merge_count_histograms_udf(col_name, F.lit(col_name))).drop(col_name).withColumnRenamed(col_name+"_merged", col_name)
        return df


    with_keyed_count_histograms_merged = merge_keyed_count_histogram_col(longitudinal_shim_raw_from_avro_fixed, "update_check_extended_error_notify")

    with_enumerated_histograms_merged = merge_enumerated_histogram_columns(with_keyed_count_histograms_merged, [["update_check_code_notify", 50], ["update_download_code_partial", 50], ["update_download_code_complete", 50], ["update_state_code_partial_stage", 20], ["update_state_code_complete_stage", 20],["update_state_code_unknown_stage", 20],["update_state_code_partial_startup", 20],["update_state_code_complete_startup", 20],["update_state_code_unknown_startup", 20],["update_status_error_code_complete_startup", 100],["update_status_error_code_partial_startup", 100],["update_status_error_code_unknown_startup", 100],["update_status_error_code_complete_stage", 100],["update_status_error_code_partial_stage", 100],["update_status_error_code_unknown_stage", 100]])

    with_count_histograms_merged = merge_count_histogram_columns(with_enumerated_histograms_merged, ["update_check_no_update_notify","update_not_pref_update_auto_notify","update_ping_count_notify","update_unable_to_apply_notify"])

    longitudinal_shim_df = with_count_histograms_merged

    longitudinal_shim_df.registerTempTable("longitudinal_shim")