in source/consumer/lambda_handler.py [0:0]
def lambda_handler(event, context):
print("Received event:", event)
action = None
asset_id = None
payload = None
for record in event['Records']:
# Kinesis data is base64 encoded so decode here
try:
asset_id = record['kinesis']['partitionKey']
payload = json.loads(base64.b64decode(record["kinesis"]["data"]))
except Exception as e:
print("Error decoding kinesis event", e)
else:
print("Decoded payload for asset:", asset_id)
try:
action = payload['Action']
except KeyError as e:
print("Missing action type from kinesis record:", e)
else:
print("Attempting the following action:", action)
if action is None:
print("Unable to determine action type")
elif action == "INSERT":
# The initial insert action will contain the filename and timestamp.
# Persist the filename and timestamp to Elasticsearch so users can find
# assets by searching those fields.
try:
# Get filename and timestamp from the payload of the stream message
s3_key = payload['S3Key']
filename = s3_key.split("/")[-1]
created = payload['Created']
extracted_items = []
metadata = {"filename": filename, "created": created}
extracted_items.append(metadata)
# Save the filename and timestamp to Elasticsearch
process_initialization(asset_id, metadata)
except KeyError as e:
print("Missing required keys in kinesis payload:", e)
elif action == "MODIFY":
try:
operator = payload['Operator']
s3_pointer = payload['Pointer']
workflow = payload['Workflow']
except KeyError as e:
print("Missing required keys in kinesis payload:", e)
else:
# Read in json metadata from s3
metadata = read_json_from_s3(s3_pointer)
if metadata["Status"] == "Success":
print("Retrieved {operator} metadata from s3, inserting into Elasticsearch".format(operator=operator))
operator = operator.lower()
# webcaptions operators are processed the same, but they have a language extension
# in the operator name. Strip that off now. Any language is supported for search
if operator.startswith("webcaptions_"):
print("Got webcaptions operator {}".format(operator))
(operator, language_code) = operator.split("_")
# Route event to process method based on the operator type in the event.
# These names are the lowercase version of OPERATOR_NAME defined in /source/operators/operator-library.yaml
if operator in supported_operators:
if operator == "transcribevideo":
process_transcribe(asset_id, workflow, metadata["Results"], "video")
if operator == "transcribeaudio":
process_transcribe(asset_id, workflow, metadata["Results"], "audio")
if operator == "translate":
process_translate(asset_id, workflow, metadata["Results"])
if operator == "webcaptions":
process_webcaptions(asset_id, workflow, metadata["Results"], language_code)
if operator == "mediainfo":
process_mediainfo(asset_id, workflow, metadata["Results"])
if operator == "genericdatalookup":
process_generic_data(asset_id, workflow, metadata["Results"])
if operator == "labeldetection":
process_label_detection(asset_id, workflow, metadata["Results"])
if operator == "celebrityrecognition":
process_celebrity_detection(asset_id, workflow, metadata["Results"])
if operator == "contentmoderation":
process_content_moderation(asset_id, workflow, metadata["Results"])
if operator == "facedetection":
process_face_detection(asset_id, workflow, metadata["Results"])
if operator == "face_search":
process_face_search(asset_id, workflow, metadata["Results"])
if operator == "entities":
process_entities(asset_id, workflow, metadata["Results"])
if operator == "key_phrases":
process_keyphrases(asset_id, workflow, metadata["Results"])
if operator == "textdetection":
process_text_detection(asset_id, workflow, metadata["Results"])
if operator == "shotdetection":
process_shot_detection(asset_id, workflow, metadata["Results"])
if operator == "technicalcuedetection":
process_technical_cue_detection(asset_id, workflow, metadata["Results"])
else:
print("We do not store {operator} results".format(operator=operator))
else:
print("Unable to read metadata from s3: {e}".format(e=metadata["Error"]))
elif action == "REMOVE":
try:
operator = payload['Operator']
except KeyError:
print("Operator type not present in payload, this must be a request to delete the entire asset")
es = connect_es(es_endpoint)
delete_asset_all_indices(es, asset_id)
else:
print(payload)
print('Not allowing deletion of specific metadata from ES as that is not exposed in the UI')