def parse_results()

in document-processing-workflows/src/functions/parse-results/main.py [0:0]


def parse_results(request):
    """Triggered by Workflow.
    Args:
        request (flask.Request): The request object.
        <https://flask.palletsprojects.com/en/1.1.x/api/#incoming-request-data>
    Returns:
        The response text, or any set of values that can be turned into a
        Response object using `make_response`
        <https://flask.palletsprojects.com/en/1.1.x/api/#flask.make_response>.
    """
    request_json = request.get_json(silent=True)
    response = request_json
    print(request_json)
    if not request_json:
        raise ValueError("Request JSON is empty")
    if "inputBucket" not in request_json:
        raise ValueError("inputBucket is not included in request_json")
    if "inputObject" not in request_json:
        raise ValueError("inputObject is not included in request_json")
    if "resultBucket" not in request_json:
        raise ValueError("resultBucket is not included in request_json")
    if "resultPrefix" not in request_json:
        raise ValueError("resultPrefix is not included in request_json")

    input_bucket_name = request_json.get("inputBucket")
    input_object_name = request_json.get("inputObject")
    result_bucket_name = request_json.get("resultBucket")
    result_prefix = request_json.get("resultPrefix")
    # define pageImageUrlPrefix and pageImageNames which may be used by a frontend to render the document pages from images stored on GCS
    response["pageImageUrlPrefix"] = result_prefix
    response["pageImageNames"] = []
    # retrieve all blobs in the result bucket for the given result prefix (e.g. belonging to the same batchProcess output)
    blobs = list_results(result_bucket_name, result_prefix)
    # only use blobs of content-type application/json which contain batchProcess output
    blobs = [blob for blob in blobs if blob.content_type == "application/json"]
    # if the result JSON is sharded merge it into a single result JSON
    if len(blobs) > 1:
        # merge sharded results
        document, page_image_names = merge_sharded_results(
            blobs,
            input_bucket_name,
            input_object_name,
            result_bucket_name,
            result_prefix,
        )
        # add pageImageNames from shards to response
        response["pageImageNames"] = response["pageImageNames"] + page_image_names
        # upload unsharded document
        storage_client = storage.Client()
        process_result_bucket = storage_client.get_bucket(result_bucket_name)
        document_blob = storage.Blob(
            name=str(Path(result_prefix, "unsharded.json")),
            bucket=process_result_bucket,
        )
        document_blob.upload_from_string(
            json.dumps(document), content_type="application/json"
        )
        # update resultObject to unsharded blob in response
        response["resultObject"] = document_blob.name
        # delete shard blobs
        for blob in blobs:
            blob.delete()
    else:  # document not sharded
        blob = blobs[0]
        # download blob content and parse into JSON
        content = blob.download_as_bytes()
        document = json.loads(content)
        # set resultObject to blob name in response
        response["resultObject"] = blob.name
        if (
            "pages" not in document
            or len(document["pages"]) == 0
            or "image" not in document["pages"][0]
        ):
            # if the pages property is missing (usually for CDS) then we need to render the images from PDF output and put them in the same location as the batchProcess output
            response["pageImageNames"] = response[
                "pageImageNames"
            ] + render_pdf_to_image(
                input_bucket_name=input_bucket_name,
                input_object_name=input_object_name,
                result_bucket_name=result_bucket_name,
                result_prefix=result_prefix,
            )
        else:
            # extract images from batchProcess output and put them into the same location as the batchProcess output for better performance in the HITL UI
            response["pageImageNames"] = response["pageImageNames"] + extract_image(
                document,
                input_bucket_name,
                input_object_name,
                result_bucket_name,
                result_prefix,
            )
            # remove images from pages
            for page in document["pages"]:
                if "image" in page:
                    del page["image"]
            # upload modified document
            blob.upload_from_string(
                json.dumps(document), content_type="application/json"
            )

    if "pages" in document:
        page_count = len(document["pages"])
    else:
        # splitter result may not have pages, only entities so we need to find the highest page number from the entities
        page_count = 1
        if "entities" in document:
            for entity in document["entities"]:
                if "pageAnchor" in entity and "pageRefs" in entity["pageAnchor"]:
                    for page_anchor in entity["pageAnchor"]["pageRefs"]:
                        if "page" in page_anchor:
                            # page_anchor starts at page 0
                            page_count = max(page_count, int(page_anchor["page"]) + 1)
    response["pageCount"] = page_count
    print(response)
    return jsonify(response)