in broadcast-monitoring/src/consolidate_frame_results/app/main.py [0:0]
def consolidate_fragment_lambda_handler(event, context):
"""
Processes the gathered results from the previous Map step of the frame processing pipeline
and writes the results to dynamodb
:param event: example
{
"config": {
"audio_check_enabled": true,
"station_logo_check_enabled": false,
"language_detect_check_enabled": true,
"team_detect_check_enabled": false,
"appsync_notify_enabled": true
},
"parsed": {
"isMasterManifest": false,
"streamId": "test_1",
"lastSegment": {
"s3Key": "live/test_video_single_pipeline/test_1_00043.ts",
"versionId": "_ey0Mw8QDjqVgpCqUuE_v8tYlUVqd2Mo",
"durationSec": 5.875,
"startDateTime": "2020-02-22T22:15:59.375000Z",
"startTimeRelative": 254.3
},
"expectedProgram": {
"Team_Info": "AVL V NOR",
"Station_Logo": "Prime Video",
"Stream_ID": "test_1",
"Event_Title": "EPL AVL V NOR",
"Event_ID": "EPL-PROG3",
"Event_Type": "Sports",
"End_Time": 300,
"Start_Time": 180,
"languageCode": "en-en",
"Segment_Start_Time_In_Loop": 254.3
}
},
"frames": [
{
"Stream_ID": "test_1",
"DateTime": "2020-02-19T22:45:14.938250Z",
...
"S3_Bucket": "aws-rnd-broadcast-maas-video-processing-dev",
"S3_Key": "frames/test_video_single_pipeline/test_1/original/2020/02/19/22/45:14:938250.jpg"
},
{
"Stream_ID": "test_1",
"DateTime": "2020-02-19T22:45:17.941250Z",
...
"S3_Bucket": "aws-rnd-broadcast-maas-video-processing-dev",
"S3_Key": "frames/test_video_single_pipeline/test_1/original/2020/02/19/22/45:17:941250.jpg"
}
]
}
:param context: lambda environment context
:return: None. This step will write its results to DynamoDB
"""
logger.info("DDB Frame Table: %s | DDB Fragment Table: %s", DDB_FRAME_TABLE, DDB_FRAGMENT_TABLE)
config = event['config']
stream_id = event['parsed']['streamId']
segment_start_dt = event['parsed']['lastSegment']['startDateTime']
# build test of checks from the enabled configs
frame_checks = [station_logo_check, team_text_check, sports_check]
active_configs = {k for k, v in config.items() if v}
active_checks = [check for check in frame_checks if set(check.config_names).issubset(active_configs)]
# test if any of the frame configs are active
if not active_checks:
logger.info('No active configurations to process. Exiting frame consolidation')
return
# build a list of attributes to retrieve from DDB from the active checks
data_attributes = ', '.join({attr for check in active_checks for attr in check.ddb_attrs})
frame_data = []
# get ddb attributes for each frame
for frame in event['frames']:
item = get_item_ddb(
Key={'Stream_ID': frame['Stream_ID'], 'DateTime': frame['DateTime']},
table_name=DDB_FRAME_TABLE,
ProjectionExpression=data_attributes,
ddb_client=dynamodb
)
frame_data.append(item)
# update ddb row with results of each check
with DDBUpdateBuilder(
key={'Start_DateTime': segment_start_dt, 'Stream_ID': stream_id},
table_name=DDB_FRAGMENT_TABLE,
ddb_client=dynamodb
) as ddb_update_builder:
# write attributes to the segment row from each check
for result_name, result_data in check_processing_helper(active_checks, frame_data):
ddb_update_builder.update_attr(result_name, result_data)
logger.info('%d frame checks completed', len(active_checks))