def lambda_handler()

in source/consumer/lambda_handler.py [0:0]


def lambda_handler(event, context):
    print("Received event:", event)

    action = None
    asset_id = None
    payload = None

    for record in event['Records']:
        # Kinesis data is base64 encoded so decode here
        try:
            asset_id = record['kinesis']['partitionKey']
            payload = json.loads(base64.b64decode(record["kinesis"]["data"]))
        except Exception as e:
            print("Error decoding kinesis event", e)
        else:
            print("Decoded payload for asset:", asset_id)
            try:
                action = payload['Action']
            except KeyError as e:
                print("Missing action type from kinesis record:", e)
            else:
                print("Attempting the following action:", action)

        if action is None:
            print("Unable to determine action type")
        elif action == "INSERT":
            # The initial insert action will contain the filename and timestamp.
            # Persist the filename and timestamp to Elasticsearch so users can find
            # assets by searching those fields.
            try:
                # Get filename and timestamp from the payload of the stream message
                s3_key = payload['S3Key']
                filename = s3_key.split("/")[-1]
                created = payload['Created']
                extracted_items = []
                metadata = {"filename": filename, "created": created}
                extracted_items.append(metadata)
                # Save the filename and timestamp to Elasticsearch
                process_initialization(asset_id, metadata)
            except KeyError as e:
                print("Missing required keys in kinesis payload:", e)
        elif action == "MODIFY":
            try:
                operator = payload['Operator']
                s3_pointer = payload['Pointer']
                workflow = payload['Workflow']
            except KeyError as e:
                print("Missing required keys in kinesis payload:", e)
            else:
                # Read in json metadata from s3
                metadata = read_json_from_s3(s3_pointer)
                if metadata["Status"] == "Success":
                    print("Retrieved {operator} metadata from s3, inserting into Elasticsearch".format(operator=operator))
                    operator = operator.lower()
                    # webcaptions operators are processed the same, but they have a language extension
                    # in the operator name.  Strip that off now.  Any language is supported for search
                    if operator.startswith("webcaptions_"):
                        print("Got webcaptions operator {}".format(operator))
                        (operator, language_code) = operator.split("_")

                    # Route event to process method based on the operator type in the event.
                    # These names are the lowercase version of OPERATOR_NAME defined in /source/operators/operator-library.yaml
                    if operator in supported_operators:
                        if operator == "transcribevideo":
                            process_transcribe(asset_id, workflow, metadata["Results"], "video")
                        if operator == "transcribeaudio":
                            process_transcribe(asset_id, workflow, metadata["Results"], "audio")
                        if operator == "translate":
                            process_translate(asset_id, workflow, metadata["Results"])
                        if operator == "webcaptions":
                            process_webcaptions(asset_id, workflow, metadata["Results"], language_code)
                        if operator == "mediainfo":
                            process_mediainfo(asset_id, workflow, metadata["Results"])
                        if operator == "genericdatalookup":
                            process_generic_data(asset_id, workflow, metadata["Results"])
                        if operator == "labeldetection":
                            process_label_detection(asset_id, workflow, metadata["Results"])
                        if operator == "celebrityrecognition":
                            process_celebrity_detection(asset_id, workflow, metadata["Results"])
                        if operator == "contentmoderation":
                            process_content_moderation(asset_id, workflow, metadata["Results"])
                        if operator == "facedetection":
                            process_face_detection(asset_id, workflow, metadata["Results"])
                        if operator == "face_search":
                            process_face_search(asset_id, workflow, metadata["Results"])
                        if operator == "entities":
                            process_entities(asset_id, workflow, metadata["Results"])
                        if operator == "key_phrases":
                            process_keyphrases(asset_id, workflow, metadata["Results"])
                        if operator == "textdetection":
                            process_text_detection(asset_id, workflow, metadata["Results"])
                        if operator == "shotdetection":
                            process_shot_detection(asset_id, workflow, metadata["Results"])
                        if operator == "technicalcuedetection":
                            process_technical_cue_detection(asset_id, workflow, metadata["Results"])
                    else:
                        print("We do not store {operator} results".format(operator=operator))
                else:
                    print("Unable to read metadata from s3: {e}".format(e=metadata["Error"]))
        elif action == "REMOVE":
            try:
                operator = payload['Operator']
            except KeyError:
                print("Operator type not present in payload, this must be a request to delete the entire asset")
                es = connect_es(es_endpoint)
                delete_asset_all_indices(es, asset_id)
            else:
                print(payload)
                print('Not allowing deletion of specific metadata from ES as that is not exposed in the UI')