in spark_scripts/detect_scenes.py [0:0]
def main(batch_metadata_table_name, batch_id, input_bucket, output_bucket, output_dynamo_table, spark):
# Load files to process
batch_metadata = get_batch_file_metadata(
table_name=batch_metadata_table_name,
batch_id=batch_id
)
# Load topic data from s3 and union
synchronized_data = load_data(spark, input_bucket, batch_metadata=batch_metadata, table_name="synchronized_topics")
detected_scenes = detect_scenes(synchronized_data)
# Save Synchronized Signals to S3
write_results_s3(
detected_scenes,
table_name="scene_detections",
output_bucket=output_bucket,
partition_cols=['bag_file']
)
scene_metadata_df = scene_metadata(detected_scenes)
write_results_dynamo(
scene_metadata_df,
output_dynamo_table
)