def lambda_handler()

in source/consumer/lambda_handler.py [0:0]


def lambda_handler(event, context):
    print("Received event:", event)

    action = None
    asset_id = None
    payload = None

    for record in event['Records']:
        # Kinesis data is base64 encoded so decode here
        try:
            asset_id = record['kinesis']['partitionKey']
            payload = json.loads(base64.b64decode(record["kinesis"]["data"]))
        except Exception as e:
            print("Error decoding kinesis event", e)
        else:
            print("Decoded payload for asset:", asset_id)
            try:
                action = payload['Action']
            except KeyError as e:
                print("Missing action type from kinesis record:", e)
            else:
                print("Attempting the following action:", action)

        if action is None:
            print("Unable to determine action type")
        elif action == "INSERT":
            print("Not handling INSERT actions")
        elif action == "MODIFY":
            try:
                operator = payload['Operator']
                s3_pointer = payload['Pointer']
                workflow = payload['Workflow']
            except KeyError as e:
                print("Missing required keys in kinesis payload:", e)
            else:
                # Read in json metadata from s3
                metadata = read_json_from_s3(s3_pointer)
                if metadata["Status"] == "Success":
                    print("Retrieved {operator} metadata from s3, inserting into Elasticsearch".format(operator=operator))
                    operator = operator.lower()
                    # Route event to process method based on the operator type in the event.
                    # These names are the lowercase version of OPERATOR_NAME defined in /source/operators/operator-library.yaml
                    if operator in supported_operators:
                        if operator == "transcribevideo":
                            process_transcribe(asset_id, workflow, metadata["Results"], "video")
                        if operator == "transcribeaudio":
                            process_transcribe(asset_id, workflow, metadata["Results"], "audio")
                        if operator == "translate":
                            process_translate(asset_id, workflow, metadata["Results"])
                        if operator == "mediainfo":
                            process_mediainfo(asset_id, workflow, metadata["Results"])
                        if operator == "genericdatalookup":
                            process_generic_data(asset_id, workflow, metadata["Results"])
                        if operator == "labeldetection":
                            process_label_detection(asset_id, workflow, metadata["Results"])
                        if operator == "celebrityrecognition":
                            process_celebrity_detection(asset_id, workflow, metadata["Results"])
                        if operator == "contentmoderation":
                            process_content_moderation(asset_id, workflow, metadata["Results"])
                        if operator == "facedetection":
                            process_face_detection(asset_id, workflow, metadata["Results"])
                        if operator == "facesearch":
                            process_face_search(asset_id, workflow, metadata["Results"])
                        if operator == "entities":
                            process_entities(asset_id, workflow, metadata["Results"])
                        if operator == "key_phrases":
                            process_keyphrases(asset_id, workflow, metadata["Results"])
                        if operator == "textdetection":
                            process_text_detection(asset_id, workflow, metadata["Results"])
                        if operator == "shotdetection":
                            process_shot_detection(asset_id, workflow, metadata["Results"])
                        if operator == "technicalcuedetection":
                            process_technical_cue_detection(asset_id, workflow, metadata["Results"])
                    else:
                        print("We do not store {operator} results".format(operator=operator))
                else:
                    print("Unable to read metadata from s3: {e}".format(e=metadata["Error"]))
        elif action == "REMOVE":
            try:
                operator = payload['Operator']
            except KeyError:
                print("Operator type not present in payload, this must be a request to delete the entire asset")
                es = connect_es(es_endpoint)
                delete_asset_all_indices(es, asset_id)
            else:
                print(payload)
                print('Not allowing deletion of specific metadata from ES as that is not exposed in the UI')