def load_and_union_data()

in spark_scripts/synchronize_topics.py [0:0]


def load_and_union_data(spark, batch_metadata):
    distinct_topics = set()
    for item in batch_metadata:
        for t in item["topics"]:
            distinct_topics.add(t)

    topic_dfs = {}

    for topic in distinct_topics:
        dfs = []
        for bag_file in batch_metadata:
            print(f"{bag_file['Name']}_{topic}")
            bag_dfs = [
                load_file_path(
                    spark, file_path=file, topic=topic, bag_file=bag_file["Name"]
                )
                for file in bag_file["files"]
                if topic in file
            ]
            dfs.extend(bag_dfs)
        topic_dfs[topic] = union_all(dfs)

    return topic_dfs