def process_text_detection()

in source/consumer/lambda_handler.py [0:0]


def process_text_detection(asset, workflow, results):
    metadata = json.loads(results)
    es = connect_es(es_endpoint)
    extracted_items = []
    # We can tell if json results are paged by checking to see if the json results are an instance of the list type.
    if isinstance(metadata, list):
        # handle paged results
        for page in metadata:
            if len(page["TextDetections"]) > 0:
                for item in page["TextDetections"]:
                    try:
                        # Handle text detection schema for videos
                        if "TextDetection" in item:
                            text_detection = item["TextDetection"]
                            text_detection["Timestamp"] = item["Timestamp"]
                            text_detection["Operator"] = "textDetection"
                            text_detection["Workflow"] = workflow
                            # Flatten the bbox Label array
                            text_detection["BoundingBox"] = text_detection["Geometry"]["BoundingBox"]
                            del text_detection["Geometry"]
                            print(text_detection)
                            extracted_items.append(text_detection)
                        # Handle text detection schema for images
                        else:
                            text_detection = item
                            text_detection["Operator"] = "textDetection"
                            text_detection["Workflow"] = workflow
                            print(text_detection)
                            extracted_items.append(text_detection)
                    except KeyError as e:
                        print("KeyError: " + str(e))
                        print("Item: " + json.dumps(item))
    else:
        # these results are not paged
        if len(metadata["TextDetections"]) > 0:
                for item in metadata["TextDetections"]:
                    try:
                        # Handle text detection schema for videos
                        if "TextDetection" in item:
                            text_detection = item["TextDetection"]
                            text_detection["Timestamp"] = item["Timestamp"]
                            text_detection["Operator"] = "textDetection"
                            text_detection["Workflow"] = workflow
                            # Flatten the bbox Label array
                            text_detection["BoundingBox"] = text_detection["Geometry"]["BoundingBox"]
                            del text_detection["Geometry"]
                            print(text_detection)
                            extracted_items.append(text_detection)
                        # Handle text detection schema for images
                        else:
                            text_detection = item
                            text_detection["Operator"] = "textDetection"
                            text_detection["Workflow"] = workflow
                            print(text_detection)
                            extracted_items.append(text_detection)
                    except KeyError as e:
                        print("KeyError: " + str(e))
                        print("Item: " + json.dumps(item))
    bulk_index(es, asset, "textDetection", extracted_items)