def create_master_time_df()

in spark_scripts/synchronize_topics.py [0:0]


def create_master_time_df(signals_df, topics):
    """
    Explode possible timestamps for each bag file's time range
    """
    time_interval_secs = 0.1

    w = Window.partitionBy("bag_file", "bag_file_prefix","bag_file_bucket").orderBy(func.asc("Time"))

    first_and_last_signals = (
        signals_df.withColumn("rn", func.row_number().over(w))
        .withColumn("max_rn", func.max("rn").over(Window.partitionBy("bag_file", "bag_file_prefix","bag_file_bucket")))
        .where((func.col("rn") == func.col("max_rn")) | (func.col("rn") == 1))
        .select("bag_file", "bag_file_prefix","bag_file_bucket", "Time", "rn")
        .groupBy("bag_file", "bag_file_prefix","bag_file_bucket")
        .agg(func.collect_list("Time").alias("Times"))
        .collect()
    )

    def customFunction(row):

        df = spark.range(
            row.Times[0] / time_interval_secs, row.Times[1] / time_interval_secs
        ).select(func.col("id"))

        return (
            df.withColumn("Time", func.expr(f"id * {time_interval_secs}"))
            .withColumn("bag_file", func.lit(row.bag_file))
            .withColumn("bag_file_prefix", func.lit(row.bag_file_prefix))
            .withColumn("bag_file_bucket", func.lit(row.bag_file_bucket))
            .drop("id")
        )

    dfs = []
    for bag_file in first_and_last_signals:
        print(bag_file)
        dfs.append(customFunction(bag_file))

    master_time_df = union_all(dfs).withColumn(
        "source", func.lit("master_time_df").cast(types.StringType())
    )

    for t in topics:
        master_time_df = master_time_df.withColumn(
            t, func.lit(None).cast(types.StringType())
        )

    return master_time_df