in mozetl/bhr_collection/bhr_collection.py [0:0]
def symbolicate_hang_keys(hangs, processed_modules):
hangs_by_id = hangs.zipWithUniqueId().map(lambda x: (x[1], x[0]))
hang_ids_by_frame = hangs_by_id.flatMap(get_frames_with_hang_id)
# NOTE: this is the logic that we replaced with Dataframes
# symbolication_maps_by_hang_id = (hang_ids_by_frame.leftOuterJoin(processed_modules)
# .map(get_symbolication_mapping_by_hang_id)
# .reduceByKey(shallow_merge))
# return hangs_by_id.join(symbolication_maps_by_hang_id).map(symbolicate_hang_with_mapping)
def get_hang_id_by_frame_row(hang_id_by_frame):
frame, hang_id = hang_id_by_frame
return Row(module_to_string(frame[0]), frame[1], hang_id)
hibf_cols = ["module", "offset", "hang_id"]
hibf_df = hang_ids_by_frame.map(get_hang_id_by_frame_row).toDF(hibf_cols)
def get_processed_modules_row(processed_module):
(module, offset), (symbol, module_name) = processed_module
return Row(module_to_string(module), offset, symbol, module_name)
pm_cols = ["module", "offset", "symbol", "module_name"]
pm_df = processed_modules.map(get_processed_modules_row).toDF(pm_cols)
smbhid_df = hibf_df.join(pm_df, on=["module", "offset"], how="left_outer")
debug_print_rdd_count(smbhid_df.rdd)
symbol_mapping_array = array("module", "offset", "symbol", "module_name")
symbol_mappings_df = (
smbhid_df.select("hang_id", symbol_mapping_array.alias("symbol_mapping"))
.groupBy("hang_id")
.agg(collect_list("symbol_mapping").alias("symbol_mappings"))
)
debug_print_rdd_count(symbol_mappings_df.rdd)
def get_hang_by_id_row(hang_by_id):
hang_id, hang = hang_by_id
return Row(hang_id, json.dumps(hang, ensure_ascii=False))
hbi_cols = ["hang_id", "hang_json"]
hbi_df = hangs_by_id.map(get_hang_by_id_row).toDF(hbi_cols)
result_df = hbi_df.join(symbol_mappings_df, on=["hang_id"])
debug_print_rdd_count(result_df.rdd)
def get_result_obj_from_row(row):
# creates a tuple of (unsymbolicated, symbolicated) for each item in row.symbol_mappings
mappings = tuple(
((string_to_module(mapping[0]), mapping[1]), (mapping[2], mapping[3]))
for mapping in row.symbol_mappings
)
hang = json.loads(row.hang_json)
return hang, mappings
result = result_df.rdd.map(get_result_obj_from_row)
debug_print_rdd_count(result)
return result.map(symbolicate_hang_with_mapping)