in spark_scripts/synchronize_topics.py [0:0]
def create_master_time_df(signals_df, topics):
"""
Explode possible timestamps for each bag file's time range
"""
time_interval_secs = 0.1
w = Window.partitionBy("bag_file", "bag_file_prefix","bag_file_bucket").orderBy(func.asc("Time"))
first_and_last_signals = (
signals_df.withColumn("rn", func.row_number().over(w))
.withColumn("max_rn", func.max("rn").over(Window.partitionBy("bag_file", "bag_file_prefix","bag_file_bucket")))
.where((func.col("rn") == func.col("max_rn")) | (func.col("rn") == 1))
.select("bag_file", "bag_file_prefix","bag_file_bucket", "Time", "rn")
.groupBy("bag_file", "bag_file_prefix","bag_file_bucket")
.agg(func.collect_list("Time").alias("Times"))
.collect()
)
def customFunction(row):
df = spark.range(
row.Times[0] / time_interval_secs, row.Times[1] / time_interval_secs
).select(func.col("id"))
return (
df.withColumn("Time", func.expr(f"id * {time_interval_secs}"))
.withColumn("bag_file", func.lit(row.bag_file))
.withColumn("bag_file_prefix", func.lit(row.bag_file_prefix))
.withColumn("bag_file_bucket", func.lit(row.bag_file_bucket))
.drop("id")
)
dfs = []
for bag_file in first_and_last_signals:
print(bag_file)
dfs.append(customFunction(bag_file))
master_time_df = union_all(dfs).withColumn(
"source", func.lit("master_time_df").cast(types.StringType())
)
for t in topics:
master_time_df = master_time_df.withColumn(
t, func.lit(None).cast(types.StringType())
)
return master_time_df