in document-processing-workflows/src/functions/parse-results/main.py [0:0]
def parse_results(request):
"""Triggered by Workflow.
Args:
request (flask.Request): The request object.
<https://flask.palletsprojects.com/en/1.1.x/api/#incoming-request-data>
Returns:
The response text, or any set of values that can be turned into a
Response object using `make_response`
<https://flask.palletsprojects.com/en/1.1.x/api/#flask.make_response>.
"""
request_json = request.get_json(silent=True)
response = request_json
print(request_json)
if not request_json:
raise ValueError("Request JSON is empty")
if "inputBucket" not in request_json:
raise ValueError("inputBucket is not included in request_json")
if "inputObject" not in request_json:
raise ValueError("inputObject is not included in request_json")
if "resultBucket" not in request_json:
raise ValueError("resultBucket is not included in request_json")
if "resultPrefix" not in request_json:
raise ValueError("resultPrefix is not included in request_json")
input_bucket_name = request_json.get("inputBucket")
input_object_name = request_json.get("inputObject")
result_bucket_name = request_json.get("resultBucket")
result_prefix = request_json.get("resultPrefix")
# define pageImageUrlPrefix and pageImageNames which may be used by a frontend to render the document pages from images stored on GCS
response["pageImageUrlPrefix"] = result_prefix
response["pageImageNames"] = []
# retrieve all blobs in the result bucket for the given result prefix (e.g. belonging to the same batchProcess output)
blobs = list_results(result_bucket_name, result_prefix)
# only use blobs of content-type application/json which contain batchProcess output
blobs = [blob for blob in blobs if blob.content_type == "application/json"]
# if the result JSON is sharded merge it into a single result JSON
if len(blobs) > 1:
# merge sharded results
document, page_image_names = merge_sharded_results(
blobs,
input_bucket_name,
input_object_name,
result_bucket_name,
result_prefix,
)
# add pageImageNames from shards to response
response["pageImageNames"] = response["pageImageNames"] + page_image_names
# upload unsharded document
storage_client = storage.Client()
process_result_bucket = storage_client.get_bucket(result_bucket_name)
document_blob = storage.Blob(
name=str(Path(result_prefix, "unsharded.json")),
bucket=process_result_bucket,
)
document_blob.upload_from_string(
json.dumps(document), content_type="application/json"
)
# update resultObject to unsharded blob in response
response["resultObject"] = document_blob.name
# delete shard blobs
for blob in blobs:
blob.delete()
else: # document not sharded
blob = blobs[0]
# download blob content and parse into JSON
content = blob.download_as_bytes()
document = json.loads(content)
# set resultObject to blob name in response
response["resultObject"] = blob.name
if (
"pages" not in document
or len(document["pages"]) == 0
or "image" not in document["pages"][0]
):
# if the pages property is missing (usually for CDS) then we need to render the images from PDF output and put them in the same location as the batchProcess output
response["pageImageNames"] = response[
"pageImageNames"
] + render_pdf_to_image(
input_bucket_name=input_bucket_name,
input_object_name=input_object_name,
result_bucket_name=result_bucket_name,
result_prefix=result_prefix,
)
else:
# extract images from batchProcess output and put them into the same location as the batchProcess output for better performance in the HITL UI
response["pageImageNames"] = response["pageImageNames"] + extract_image(
document,
input_bucket_name,
input_object_name,
result_bucket_name,
result_prefix,
)
# remove images from pages
for page in document["pages"]:
if "image" in page:
del page["image"]
# upload modified document
blob.upload_from_string(
json.dumps(document), content_type="application/json"
)
if "pages" in document:
page_count = len(document["pages"])
else:
# splitter result may not have pages, only entities so we need to find the highest page number from the entities
page_count = 1
if "entities" in document:
for entity in document["entities"]:
if "pageAnchor" in entity and "pageRefs" in entity["pageAnchor"]:
for page_anchor in entity["pageAnchor"]["pageRefs"]:
if "page" in page_anchor:
# page_anchor starts at page 0
page_count = max(page_count, int(page_anchor["page"]) + 1)
response["pageCount"] = page_count
print(response)
return jsonify(response)