def synchronize_signals()

in spark_scripts/synchronize_topics.py [0:0]


def synchronize_signals(signals_df, topics):
    master_time_df = create_master_time_df(signals_df, topics)

    topic_signals = (
        signals_df.select("bag_file", "bag_file_prefix","bag_file_bucket", "Time", "topic", "payload")
        .groupby("bag_file", "bag_file_prefix", "bag_file_bucket", "Time")
        .pivot("topic")
        .agg(func.first("payload"))
        .withColumn("source", func.lit("signals_df").cast(types.StringType()))
    )

    unioned_signals = (
        master_time_df.select(*topic_signals.columns)
        .union(topic_signals)
        .orderBy(func.asc("Time"))
    )

    topic_cols_clean = ["bag_file", "bag_file_prefix","bag_file_bucket", "Time"]

    for topic in topics:
        unioned_signals = fill_with_last_value(unioned_signals, topic)
        topic_cols_clean.append(f"{topic}_clean")

    unioned_signals = unioned_signals.filter("source = 'master_time_df'").select(
        *topic_cols_clean
    )

    return unioned_signals