in source/consumer/lambda_handler.py [0:0]
def lambda_handler(event, context):
print("Received event:", event)
action = None
asset_id = None
payload = None
for record in event['Records']:
# Kinesis data is base64 encoded so decode here
try:
asset_id = record['kinesis']['partitionKey']
payload = json.loads(base64.b64decode(record["kinesis"]["data"]))
except Exception as e:
print("Error decoding kinesis event", e)
else:
print("Decoded payload for asset:", asset_id)
try:
action = payload['Action']
except KeyError as e:
print("Missing action type from kinesis record:", e)
else:
print("Attempting the following action:", action)
if action is None:
print("Unable to determine action type")
elif action == "INSERT":
print("Not handling INSERT actions")
elif action == "MODIFY":
try:
operator = payload['Operator']
s3_pointer = payload['Pointer']
workflow = payload['Workflow']
except KeyError as e:
print("Missing required keys in kinesis payload:", e)
else:
# Read in json metadata from s3
metadata = read_json_from_s3(s3_pointer)
if metadata["Status"] == "Success":
print("Retrieved {operator} metadata from s3, inserting into Elasticsearch".format(operator=operator))
operator = operator.lower()
# Route event to process method based on the operator type in the event.
# These names are the lowercase version of OPERATOR_NAME defined in /source/operators/operator-library.yaml
if operator in supported_operators:
if operator == "transcribevideo":
process_transcribe(asset_id, workflow, metadata["Results"], "video")
if operator == "transcribeaudio":
process_transcribe(asset_id, workflow, metadata["Results"], "audio")
if operator == "translate":
process_translate(asset_id, workflow, metadata["Results"])
if operator == "mediainfo":
process_mediainfo(asset_id, workflow, metadata["Results"])
if operator == "genericdatalookup":
process_generic_data(asset_id, workflow, metadata["Results"])
if operator == "labeldetection":
process_label_detection(asset_id, workflow, metadata["Results"])
if operator == "celebrityrecognition":
process_celebrity_detection(asset_id, workflow, metadata["Results"])
if operator == "contentmoderation":
process_content_moderation(asset_id, workflow, metadata["Results"])
if operator == "facedetection":
process_face_detection(asset_id, workflow, metadata["Results"])
if operator == "facesearch":
process_face_search(asset_id, workflow, metadata["Results"])
if operator == "entities":
process_entities(asset_id, workflow, metadata["Results"])
if operator == "key_phrases":
process_keyphrases(asset_id, workflow, metadata["Results"])
if operator == "textdetection":
process_text_detection(asset_id, workflow, metadata["Results"])
if operator == "shotdetection":
process_shot_detection(asset_id, workflow, metadata["Results"])
if operator == "technicalcuedetection":
process_technical_cue_detection(asset_id, workflow, metadata["Results"])
else:
print("We do not store {operator} results".format(operator=operator))
else:
print("Unable to read metadata from s3: {e}".format(e=metadata["Error"]))
elif action == "REMOVE":
try:
operator = payload['Operator']
except KeyError:
print("Operator type not present in payload, this must be a request to delete the entire asset")
es = connect_es(es_endpoint)
delete_asset_all_indices(es, asset_id)
else:
print(payload)
print('Not allowing deletion of specific metadata from ES as that is not exposed in the UI')