in bq-connector/docai_bq_connector/doc_ai_processing/Processor.py [0:0]
def _process_async(self) -> Union[DocumentOperation, ProcessedDocument]:
# if self.should_async_wait is True:
# return type of ProcessedDocument
# else return type of DocumentOperation
"""
This uses Doc AI to process the document asynchronously. The limit is 100 pages.
Args:
gcs_blob:
content_type:
project_number:
location:
processor_id:
gcs_output_uri:
gcs_output_uri_prefix:
timeout:
Returns:
"""
# You must set the api_endpoint if you use a location other than 'us', e.g.:
opts = self._get_document_ai_options()
client = documentai.DocumentProcessorServiceClient(client_options=opts)
# Add a unique folder to the uri for this particular async operation
unique_folder = uuid.uuid4().hex
if self.async_output_folder_gcs_uri is None:
raise Exception(
"--async_output_folder_gcs_uri must be set when a document is processed asynchronously"
)
destination_uri = f"{self.async_output_folder_gcs_uri}/{unique_folder}"
gcs_documents = documentai.GcsDocuments(
documents=[
{"gcs_uri": self._get_input_uri(), "mime_type": self.content_type}
]
)
# 'mime_type' can be 'application/pdf', 'image/tiff',
# and 'image/gif', or 'application/json'
input_config = documentai.BatchDocumentsInputConfig(gcs_documents=gcs_documents)
# Where to write results
output_config = documentai.DocumentOutputConfig(
gcs_output_config={"gcs_uri": destination_uri}
)
processor_uri = client.processor_path(
self.processor_project_id, self.processor_location, self.processor_id
)
logging.debug(
f"name={processor_uri}, input_documents={input_config}, "
f"document_output_config={output_config}"
)
request = documentai.types.document_processor_service.BatchProcessRequest(
name=processor_uri,
input_documents=input_config,
document_output_config=output_config,
skip_human_review=False, # TODO: Add supporting input arg.
)
operation = client.batch_process_documents(request)
logging.debug(f"DocAI Batch Process started. LRO = {operation.operation.name}")
if self.should_async_wait is False:
return DocumentOperation(operation.operation.name)
# Wait for the operation to finish
operation.result(timeout=self.async_timeout)
logging.debug("DocAI Batch Process finished")
if operation.metadata and operation.metadata.individual_process_statuses:
cur_process_status = operation.metadata.individual_process_statuses[0]
hitl_gcs_output = cur_process_status.output_gcs_destination
hitl_op_full_path = (
cur_process_status.human_review_status.human_review_operation
)
else:
# Fallback to using the GCS path set in the request
hitl_gcs_output = output_config
hitl_op_full_path = None
# Results are written to GCS. Use a regex to find
# output files
match = re.match(r"gs://([^/]+)/(.+)", hitl_gcs_output)
if match:
output_bucket = match.group(1)
prefix = match.group(2)
else:
raise InvalidGcsUriError(
"The supplied async_output_folder_gcs_uri is not a properly structured GCS Path"
)
storage_client = storage.Client()
bucket = storage_client.get_bucket(output_bucket)
blob_list = list(bucket.list_blobs(prefix=prefix))
# should always be a single document here
for i, blob in enumerate(blob_list):
# If JSON file, download the contents of this blob as a bytes object.
if blob.content_type == "application/json":
blob_as_bytes = blob.download_as_bytes()
document = documentai.Document.from_json(
blob_as_bytes, ignore_unknown_fields=True
)
logging.debug(f"Fetched file {i + 1}: {blob.name}")
else:
logging.info(f"Skipping non-supported file type {blob.name}")
# Delete the unique folder created for this operation
blobs = list(bucket.list_blobs(prefix=prefix))
bucket.delete_blobs(blobs)
hitl_op_id = None
if hitl_op_full_path:
logging.debug(f"Async processing returned hitl_op = {hitl_op_full_path}")
hitl_op_id = hitl_op_full_path.split("/").pop()
results_dict = documentai.types.Document.to_dict(document)
return ProcessedDocument(
document=document,
dictionary=results_dict,
hitl_operation_id=hitl_op_id,
)