in spark_scripts/synchronize_topics.py [0:0]
def load_and_union_data(spark, batch_metadata):
distinct_topics = set()
for item in batch_metadata:
for t in item["topics"]:
distinct_topics.add(t)
topic_dfs = {}
for topic in distinct_topics:
dfs = []
for bag_file in batch_metadata:
print(f"{bag_file['Name']}_{topic}")
bag_dfs = [
load_file_path(
spark, file_path=file, topic=topic, bag_file=bag_file["Name"]
)
for file in bag_file["files"]
if topic in file
]
dfs.extend(bag_dfs)
topic_dfs[topic] = union_all(dfs)
return topic_dfs