def summarize_person_scenes()

in spark_scripts/detect_scenes.py [0:0]


def summarize_person_scenes(df):
    people_in_lane = df.rdd.map(
        lambda row: row.asDict()
    ).map(
        people_in_scenes
    ).toDF()

    scene_state_udf = func.udf(
        lambda num, lag: "start" if num > 0 and lag == 0 else ("end" if num == 0 and lag > 0 else None), StringType())

    win = Window.orderBy("Time").partitionBy("bag_file", "bag_file_prefix","bag_file_bucket")

    people_in_lane = people_in_lane.withColumn(
        "num_people_in_scene_lag1",
        func.lag(
            func.col("num_people_in_scene"),
            1
        ).over(win)
    ).filter("num_people_in_scene is not null and num_people_in_scene_lag1 is not null ")

    summary = people_in_lane.withColumn(
        "scene_state",
        scene_state_udf(people_in_lane.num_people_in_scene, people_in_lane.num_people_in_scene_lag1)
    ).filter("scene_state is not null").withColumn(
        "end_time",
        func.lead(
            func.col("Time"),
            1
        ).over(win)
    ).filter("scene_state = 'start'") \
        .withColumnRenamed("Time", "start_time") \
        .withColumnRenamed("num_people_in_scene", "num_people_in_scene_start") \
        .select("bag_file", "bag_file_prefix","bag_file_bucket", "start_time", "end_time", "num_people_in_scene_start") \
        .withColumn("scene_id", func.concat(func.col("bag_file"), func.lit("_PersonInLane_"), func.col("start_time"))) \
        .withColumn("scene_length", func.col("end_time") - func.col("start_time")) \
        .withColumn("topics_analyzed", func.lit(",".join(["rgb_right_detections_only_clean","post_process_lane_points_rgb_front_right_clean"])))

    return summary