in flatten_join_nested_file.py [0:0]
def denormalize_table(tables_to_join_map, start_join_level, number_nested_levels, dataframes_map, denormlized_dataframe_map):
global join_count
# set the next join level and start looping on all tables to be joined at the current level
next_join_level = start_join_level+1
# if the next level is not yet the last level in the nested hierarchy iterate at the next nested level
if next_join_level < number_nested_levels:
denormalize_table(tables_to_join_map, next_join_level,
number_nested_levels, dataframes_map, denormlized_dataframe_map)
for left_table in tables_to_join_map[str(start_join_level)]:
# start the join setting the first dataframe to the one for the left_table
df_left = dataframes_map[left_table]
# if the left_table has at least a child table start the loop to join it with all its nested tables
if tables_to_join_map[str(start_join_level)][left_table]['has_child']:
for right_table in tables_to_join_map[str(start_join_level)][left_table]['join_to_tbls']:
# look up the foreign key and set it as join column for left table
if right_table+"_sk" in tables_to_join_map[str(start_join_level)][left_table]['join_col']:
join_col = right_table+"_sk"
# if the right table had been already denormalized use that dataframe otherwise use the original dataframe
if right_table in denormlized_dataframe_map:
df_right = denormlized_dataframe_map[right_table]
else:
df_right = dataframes_map[right_table]
# check if there is a naming conflict and resolve it adding the tablename prefix
left_cols = df_left.schema.names
right_cols = df_right.schema.names
for col in right_cols:
if col in left_cols and not col == join_col:
df_right = df_right.withColumnRenamed(
col, right_table+"_"+col)
right_cols.append(right_table+"_"+col)
right_cols.remove(col)
# execute the join matching left and right join column names to avoid column duplication in the output dataframe
df_left = df_left.join(df_right, join_col, how='left_outer')
df_joined_count = df_left.count()
join_count = join_count+1
print('join number: ', join_count)
print(left_table, ' left join ', right_table,
'row count is: ', df_joined_count)
# add the denormalized data frame to the map and continue with the loop
denormlized_dataframe_map[left_table] = df_left
return denormlized_dataframe_map