def symbolicate_hang_keys()

in mozetl/bhr_collection/bhr_collection.py [0:0]


def symbolicate_hang_keys(hangs, processed_modules):
    hangs_by_id = hangs.zipWithUniqueId().map(lambda x: (x[1], x[0]))
    hang_ids_by_frame = hangs_by_id.flatMap(get_frames_with_hang_id)

    # NOTE: this is the logic that we replaced with Dataframes
    # symbolication_maps_by_hang_id = (hang_ids_by_frame.leftOuterJoin(processed_modules)
    #                                  .map(get_symbolication_mapping_by_hang_id)
    #                                  .reduceByKey(shallow_merge))
    # return hangs_by_id.join(symbolication_maps_by_hang_id).map(symbolicate_hang_with_mapping)

    def get_hang_id_by_frame_row(hang_id_by_frame):
        frame, hang_id = hang_id_by_frame
        return Row(module_to_string(frame[0]), frame[1], hang_id)

    hibf_cols = ["module", "offset", "hang_id"]
    hibf_df = hang_ids_by_frame.map(get_hang_id_by_frame_row).toDF(hibf_cols)

    def get_processed_modules_row(processed_module):
        (module, offset), (symbol, module_name) = processed_module
        return Row(module_to_string(module), offset, symbol, module_name)

    pm_cols = ["module", "offset", "symbol", "module_name"]
    pm_df = processed_modules.map(get_processed_modules_row).toDF(pm_cols)

    smbhid_df = hibf_df.join(pm_df, on=["module", "offset"], how="left_outer")
    debug_print_rdd_count(smbhid_df.rdd)

    symbol_mapping_array = array("module", "offset", "symbol", "module_name")
    symbol_mappings_df = (
        smbhid_df.select("hang_id", symbol_mapping_array.alias("symbol_mapping"))
        .groupBy("hang_id")
        .agg(collect_list("symbol_mapping").alias("symbol_mappings"))
    )
    debug_print_rdd_count(symbol_mappings_df.rdd)

    def get_hang_by_id_row(hang_by_id):
        hang_id, hang = hang_by_id
        return Row(hang_id, json.dumps(hang, ensure_ascii=False))

    hbi_cols = ["hang_id", "hang_json"]
    hbi_df = hangs_by_id.map(get_hang_by_id_row).toDF(hbi_cols)

    result_df = hbi_df.join(symbol_mappings_df, on=["hang_id"])
    debug_print_rdd_count(result_df.rdd)

    def get_result_obj_from_row(row):
        # creates a tuple of (unsymbolicated, symbolicated) for each item in row.symbol_mappings
        mappings = tuple(
            ((string_to_module(mapping[0]), mapping[1]), (mapping[2], mapping[3]))
            for mapping in row.symbol_mappings
        )
        hang = json.loads(row.hang_json)
        return hang, mappings

    result = result_df.rdd.map(get_result_obj_from_row)
    debug_print_rdd_count(result)

    return result.map(symbolicate_hang_with_mapping)