in spark_scripts/detect_scenes.py [0:0]
def summarize_person_scenes(df):
people_in_lane = df.rdd.map(
lambda row: row.asDict()
).map(
people_in_scenes
).toDF()
scene_state_udf = func.udf(
lambda num, lag: "start" if num > 0 and lag == 0 else ("end" if num == 0 and lag > 0 else None), StringType())
win = Window.orderBy("Time").partitionBy("bag_file", "bag_file_prefix","bag_file_bucket")
people_in_lane = people_in_lane.withColumn(
"num_people_in_scene_lag1",
func.lag(
func.col("num_people_in_scene"),
1
).over(win)
).filter("num_people_in_scene is not null and num_people_in_scene_lag1 is not null ")
summary = people_in_lane.withColumn(
"scene_state",
scene_state_udf(people_in_lane.num_people_in_scene, people_in_lane.num_people_in_scene_lag1)
).filter("scene_state is not null").withColumn(
"end_time",
func.lead(
func.col("Time"),
1
).over(win)
).filter("scene_state = 'start'") \
.withColumnRenamed("Time", "start_time") \
.withColumnRenamed("num_people_in_scene", "num_people_in_scene_start") \
.select("bag_file", "bag_file_prefix","bag_file_bucket", "start_time", "end_time", "num_people_in_scene_start") \
.withColumn("scene_id", func.concat(func.col("bag_file"), func.lit("_PersonInLane_"), func.col("start_time"))) \
.withColumn("scene_length", func.col("end_time") - func.col("start_time")) \
.withColumn("topics_analyzed", func.lit(",".join(["rgb_right_detections_only_clean","post_process_lane_points_rgb_front_right_clean"])))
return summary