in broadcast-monitoring/src/consolidate_frame_results/app/main.py [0:0]
def consolidate_team_data_lambda_handler(event, context):
"""
Processes the team data from previous steps and merge the results from text and
logo detection
:param event: example
{
"config": {
"audio_check_enabled": true,
"station_logo_check_enabled": false,
"language_detect_check_enabled": true,
"team_detect_check_enabled": false,
"appsync_notify_enabled": true
},
...
"frames": [
{
"Stream_ID": "test_1",
"DateTime": "2020-02-19T22:45:14.938250Z",
...
"S3_Bucket": "aws-rnd-broadcast-maas-video-processing-dev",
"S3_Key": "frames/test_video_single_pipeline/test_1/original/2020/02/19/22/45:14:938250.jpg"
},
{
"Stream_ID": "test_1",
"DateTime": "2020-02-19T22:45:17.941250Z",
...
"S3_Bucket": "aws-rnd-broadcast-maas-video-processing-dev",
"S3_Key": "frames/test_video_single_pipeline/test_1/original/2020/02/19/22/45:17:941250.jpg"
}
]
}
:param context: lambda environment context
:return: None. This step will write its results to DynamoDB
"""
config = event['config']
# add the check to the active_checks list
active_configs = {k for k, v in config.items() if v}
team_checks_active = {k for k in active_configs if k in calculate_team_confidence.config_names}
logger.info('Active team checks [%s]', team_checks_active)
# test if any of the frame configs are active
if not team_checks_active:
logger.info('No team configurations active. Exiting frame consolidation')
return
# build a list of attributes to retrieve from DDB from the active checks
data_attributes = ', '.join(calculate_team_confidence.ddb_attrs)
# get ddb attributes for each frame
for frame in event['frames']:
s3_key = frame['S3_Key']
frame_key = {'Stream_ID': frame['Stream_ID'], 'DateTime': frame['DateTime']}
# get stored data for the frame to process
frame_data = get_item_ddb(
Key=frame_key,
table_name=DDB_FRAME_TABLE,
ProjectionExpression=data_attributes,
ddb_update_builder=dynamodb
)
converted_data = convert_from_ddb(frame_data)
# update ddb row with results of each check
with DDBUpdateBuilder(
key=frame_key,
table_name=DDB_FRAME_TABLE,
ddb_client=dynamodb
) as ddb_update_builder:
# write attributes to the segment row from each check
for result_name, result_data in consolidate_team_confidence(converted_data):
ddb_update_builder.update_attr(result_name, result_data, convert_to_ddb)
logger.info('Team data consolidated for frame: %s', s3_key)