in source/consumer/lambda_handler.py [0:0]
def process_technical_cue_detection(asset, workflow, results):
metadata = json.loads(results)
es = connect_es(es_endpoint)
extracted_items = []
# We can tell if json results are paged by checking to see if the json results are an instance of the list type.
if isinstance(metadata, list):
# handle paged results
for page in metadata:
if "Segments" in page:
for item in page["Segments"]:
try:
item["Operator"] = "technical_cue_detection"
item["Workflow"] = workflow
if "TechnicalCueSegment" in item:
item["Confidence"] = item["TechnicalCueSegment"]["Confidence"]
item["Type"] = item["TechnicalCueSegment"]["Type"]
del item["TechnicalCueSegment"]
item["StartTimestamp"] = item["StartTimestampMillis"]
item["EndTimestamp"] = item["EndTimestampMillis"]
del item["StartTimestampMillis"]
del item["EndTimestampMillis"]
extracted_items.append(item)
except KeyError as e:
print("KeyError: " + str(e))
print("Item: " + json.dumps(item))
else:
# these results are not paged
if "Segments" in metadata:
for item in metadata["Segments"]:
try:
item["Operator"] = "technical_cue_detection"
item["Workflow"] = workflow
if "TechnicalCueSegment" in item:
item["Confidence"] = item["TechnicalCueSegment"]["Confidence"]
item["Type"] = item["TechnicalCueSegment"]["Type"]
del item["TechnicalCueSegment"]
item["StartTimestamp"] = item["StartTimestampMillis"]
item["EndTimestamp"] = item["EndTimestampMillis"]
del item["StartTimestampMillis"]
del item["EndTimestampMillis"]
extracted_items.append(item)
except KeyError as e:
print("KeyError: " + str(e))
print("Item: " + json.dumps(item))
bulk_index(es, asset, "technical_cues", extracted_items)