def consolidate_team_data_lambda_handler()

in broadcast-monitoring/src/consolidate_frame_results/app/main.py [0:0]


def consolidate_team_data_lambda_handler(event, context):
    """
    Processes the team data from previous steps and merge the results from text and
    logo detection

    :param event: example
    {
      "config": {
        "audio_check_enabled": true,
        "station_logo_check_enabled": false,
        "language_detect_check_enabled": true,
        "team_detect_check_enabled": false,
        "appsync_notify_enabled": true
      },
        ...
      "frames": [
        {
          "Stream_ID": "test_1",
          "DateTime": "2020-02-19T22:45:14.938250Z",
            ...
          "S3_Bucket": "aws-rnd-broadcast-maas-video-processing-dev",
          "S3_Key": "frames/test_video_single_pipeline/test_1/original/2020/02/19/22/45:14:938250.jpg"
        },
        {
          "Stream_ID": "test_1",
          "DateTime": "2020-02-19T22:45:17.941250Z",
            ...
          "S3_Bucket": "aws-rnd-broadcast-maas-video-processing-dev",
          "S3_Key": "frames/test_video_single_pipeline/test_1/original/2020/02/19/22/45:17:941250.jpg"
        }
      ]
    }
    :param context: lambda environment context
    :return: None.  This step will write its results to DynamoDB
    """
    config = event['config']

    # add the check to the active_checks list
    active_configs = {k for k, v in config.items() if v}
    team_checks_active = {k for k in active_configs if k in calculate_team_confidence.config_names}

    logger.info('Active team checks [%s]', team_checks_active)

    # test if any of the frame configs are active
    if not team_checks_active:
        logger.info('No team configurations active. Exiting frame consolidation')
        return

    # build a list of attributes to retrieve from DDB from the active checks
    data_attributes = ', '.join(calculate_team_confidence.ddb_attrs)

    # get ddb attributes for each frame
    for frame in event['frames']:
        s3_key = frame['S3_Key']
        frame_key = {'Stream_ID': frame['Stream_ID'], 'DateTime': frame['DateTime']}
        # get stored data for the frame to process
        frame_data = get_item_ddb(
            Key=frame_key,
            table_name=DDB_FRAME_TABLE,
            ProjectionExpression=data_attributes,
            ddb_update_builder=dynamodb
        )

        converted_data = convert_from_ddb(frame_data)

        # update ddb row with results of each check
        with DDBUpdateBuilder(
            key=frame_key,
            table_name=DDB_FRAME_TABLE,
            ddb_client=dynamodb
        ) as ddb_update_builder:
            # write attributes to the segment row from each check
            for result_name, result_data in consolidate_team_confidence(converted_data):
                ddb_update_builder.update_attr(result_name, result_data, convert_to_ddb)

        logger.info('Team data consolidated for frame: %s', s3_key)