in spark_scripts/synchronize_topics.py [0:0]
def synchronize_signals(signals_df, topics):
master_time_df = create_master_time_df(signals_df, topics)
topic_signals = (
signals_df.select("bag_file", "bag_file_prefix","bag_file_bucket", "Time", "topic", "payload")
.groupby("bag_file", "bag_file_prefix", "bag_file_bucket", "Time")
.pivot("topic")
.agg(func.first("payload"))
.withColumn("source", func.lit("signals_df").cast(types.StringType()))
)
unioned_signals = (
master_time_df.select(*topic_signals.columns)
.union(topic_signals)
.orderBy(func.asc("Time"))
)
topic_cols_clean = ["bag_file", "bag_file_prefix","bag_file_bucket", "Time"]
for topic in topics:
unioned_signals = fill_with_last_value(unioned_signals, topic)
topic_cols_clean.append(f"{topic}_clean")
unioned_signals = unioned_signals.filter("source = 'master_time_df'").select(
*topic_cols_clean
)
return unioned_signals