def denormalize_table()

in flatten_join_nested_file.py [0:0]


def denormalize_table(tables_to_join_map, start_join_level, number_nested_levels, dataframes_map, denormlized_dataframe_map):

    global join_count
    # set the next join level and start looping on all tables to be joined at the current level
    next_join_level = start_join_level+1

    # if the next level is not yet the last level in the nested hierarchy iterate at the next nested level
    if next_join_level < number_nested_levels:
        denormalize_table(tables_to_join_map, next_join_level,
                          number_nested_levels, dataframes_map, denormlized_dataframe_map)

    for left_table in tables_to_join_map[str(start_join_level)]:

        # start the join setting the first dataframe to the one for the left_table
        df_left = dataframes_map[left_table]

        # if the left_table has at least a child table start the loop to join it with all its nested tables
        if tables_to_join_map[str(start_join_level)][left_table]['has_child']:
            for right_table in tables_to_join_map[str(start_join_level)][left_table]['join_to_tbls']:

                # look up the foreign key and set it as join column for left table
                if right_table+"_sk" in tables_to_join_map[str(start_join_level)][left_table]['join_col']:
                    join_col = right_table+"_sk"

                # if the right table had been already denormalized use that dataframe otherwise use the original dataframe
                if right_table in denormlized_dataframe_map:
                    df_right = denormlized_dataframe_map[right_table]
                else:
                    df_right = dataframes_map[right_table]

                # check if there is a naming conflict and resolve it adding the tablename prefix
                left_cols = df_left.schema.names
                right_cols = df_right.schema.names

                for col in right_cols:
                    if col in left_cols and not col == join_col:
                        df_right = df_right.withColumnRenamed(
                            col, right_table+"_"+col)
                        right_cols.append(right_table+"_"+col)
                        right_cols.remove(col)

                # execute the join matching left and right join column names to avoid column duplication in the output dataframe
                df_left = df_left.join(df_right, join_col, how='left_outer')
                df_joined_count = df_left.count()
                join_count = join_count+1
                print('join number: ', join_count)
                print(left_table, ' left join ', right_table,
                      'row count is: ', df_joined_count)

        # add the denormalized data frame to the map and continue with the loop
        denormlized_dataframe_map[left_table] = df_left

    return denormlized_dataframe_map