def consolidate_fragment_lambda_handler()

in broadcast-monitoring/src/consolidate_frame_results/app/main.py [0:0]


def consolidate_fragment_lambda_handler(event, context):
    """
    Processes the gathered results from the previous Map step of the frame processing pipeline
    and writes the results to dynamodb

    :param event: example
    {
      "config": {
        "audio_check_enabled": true,
        "station_logo_check_enabled": false,
        "language_detect_check_enabled": true,
        "team_detect_check_enabled": false,
        "appsync_notify_enabled": true
      },
      "parsed": {
        "isMasterManifest": false,
        "streamId": "test_1",
        "lastSegment": {
          "s3Key": "live/test_video_single_pipeline/test_1_00043.ts",
          "versionId": "_ey0Mw8QDjqVgpCqUuE_v8tYlUVqd2Mo",
          "durationSec": 5.875,
          "startDateTime": "2020-02-22T22:15:59.375000Z",
          "startTimeRelative": 254.3
        },
        "expectedProgram": {
          "Team_Info": "AVL V NOR",
          "Station_Logo": "Prime Video",
          "Stream_ID": "test_1",
          "Event_Title": "EPL AVL V NOR",
          "Event_ID": "EPL-PROG3",
          "Event_Type": "Sports",
          "End_Time": 300,
          "Start_Time": 180,
          "languageCode": "en-en",
          "Segment_Start_Time_In_Loop": 254.3
        }
      },
      "frames": [
        {
          "Stream_ID": "test_1",
          "DateTime": "2020-02-19T22:45:14.938250Z",
            ...
          "S3_Bucket": "aws-rnd-broadcast-maas-video-processing-dev",
          "S3_Key": "frames/test_video_single_pipeline/test_1/original/2020/02/19/22/45:14:938250.jpg"
        },
        {
          "Stream_ID": "test_1",
          "DateTime": "2020-02-19T22:45:17.941250Z",
            ...
          "S3_Bucket": "aws-rnd-broadcast-maas-video-processing-dev",
          "S3_Key": "frames/test_video_single_pipeline/test_1/original/2020/02/19/22/45:17:941250.jpg"
        }
      ]
    }
    :param context: lambda environment context
    :return: None.  This step will write its results to DynamoDB
    """
    logger.info("DDB Frame Table: %s | DDB Fragment Table: %s", DDB_FRAME_TABLE, DDB_FRAGMENT_TABLE)
    config = event['config']
    stream_id = event['parsed']['streamId']
    segment_start_dt = event['parsed']['lastSegment']['startDateTime']

    # build test of checks from the enabled configs

    frame_checks = [station_logo_check, team_text_check, sports_check]

    active_configs = {k for k, v in config.items() if v}
    active_checks = [check for check in frame_checks if set(check.config_names).issubset(active_configs)]

    # test if any of the frame configs are active
    if not active_checks:
        logger.info('No active configurations to process.  Exiting frame consolidation')
        return

    # build a list of attributes to retrieve from DDB from the active checks
    data_attributes = ', '.join({attr for check in active_checks for attr in check.ddb_attrs})
    frame_data = []

    # get ddb attributes for each frame
    for frame in event['frames']:
        item = get_item_ddb(
            Key={'Stream_ID': frame['Stream_ID'], 'DateTime': frame['DateTime']},
            table_name=DDB_FRAME_TABLE,
            ProjectionExpression=data_attributes,
            ddb_client=dynamodb
        )

        frame_data.append(item)

    # update ddb row with results of each check
    with DDBUpdateBuilder(
            key={'Start_DateTime': segment_start_dt, 'Stream_ID': stream_id},
            table_name=DDB_FRAGMENT_TABLE,
            ddb_client=dynamodb
    ) as ddb_update_builder:
        # write attributes to the segment row from each check
        for result_name, result_data in check_processing_helper(active_checks, frame_data):
            ddb_update_builder.update_attr(result_name, result_data)

    logger.info('%d frame checks completed', len(active_checks))