def _process_async()

in bq-connector/docai_bq_connector/doc_ai_processing/Processor.py [0:0]


    def _process_async(self) -> Union[DocumentOperation, ProcessedDocument]:
        # if self.should_async_wait is True:
        # return type of ProcessedDocument
        # else return type of DocumentOperation
        """
        This uses Doc AI to process the document asynchronously.  The limit is 100 pages.
        Args:
            gcs_blob:
            content_type:
            project_number:
            location:
            processor_id:
            gcs_output_uri:
            gcs_output_uri_prefix:
            timeout:
        Returns:

        """
        # You must set the api_endpoint if you use a location other than 'us', e.g.:
        opts = self._get_document_ai_options()
        client = documentai.DocumentProcessorServiceClient(client_options=opts)

        # Add a unique folder to the uri for this particular async operation
        unique_folder = uuid.uuid4().hex

        if self.async_output_folder_gcs_uri is None:
            raise Exception(
                "--async_output_folder_gcs_uri must be set when a document is processed asynchronously"
            )
        destination_uri = f"{self.async_output_folder_gcs_uri}/{unique_folder}"

        gcs_documents = documentai.GcsDocuments(
            documents=[
                {"gcs_uri": self._get_input_uri(), "mime_type": self.content_type}
            ]
        )

        # 'mime_type' can be 'application/pdf', 'image/tiff',
        # and 'image/gif', or 'application/json'
        input_config = documentai.BatchDocumentsInputConfig(gcs_documents=gcs_documents)

        # Where to write results
        output_config = documentai.DocumentOutputConfig(
            gcs_output_config={"gcs_uri": destination_uri}
        )

        processor_uri = client.processor_path(
            self.processor_project_id, self.processor_location, self.processor_id
        )

        logging.debug(
            f"name={processor_uri}, input_documents={input_config}, "
            f"document_output_config={output_config}"
        )
        request = documentai.types.document_processor_service.BatchProcessRequest(
            name=processor_uri,
            input_documents=input_config,
            document_output_config=output_config,
            skip_human_review=False,  # TODO: Add supporting input arg.
        )

        operation = client.batch_process_documents(request)
        logging.debug(f"DocAI Batch Process started. LRO = {operation.operation.name}")

        if self.should_async_wait is False:
            return DocumentOperation(operation.operation.name)

        # Wait for the operation to finish
        operation.result(timeout=self.async_timeout)
        logging.debug("DocAI Batch Process finished")

        if operation.metadata and operation.metadata.individual_process_statuses:
            cur_process_status = operation.metadata.individual_process_statuses[0]
            hitl_gcs_output = cur_process_status.output_gcs_destination
            hitl_op_full_path = (
                cur_process_status.human_review_status.human_review_operation
            )
        else:
            # Fallback to using the GCS path set in the request
            hitl_gcs_output = output_config
            hitl_op_full_path = None

        # Results are written to GCS. Use a regex to find
        # output files
        match = re.match(r"gs://([^/]+)/(.+)", hitl_gcs_output)
        if match:
            output_bucket = match.group(1)
            prefix = match.group(2)
        else:
            raise InvalidGcsUriError(
                "The supplied async_output_folder_gcs_uri is not a properly structured GCS Path"
            )

        storage_client = storage.Client()
        bucket = storage_client.get_bucket(output_bucket)
        blob_list = list(bucket.list_blobs(prefix=prefix))
        # should always be a single document here
        for i, blob in enumerate(blob_list):
            # If JSON file, download the contents of this blob as a bytes object.
            if blob.content_type == "application/json":
                blob_as_bytes = blob.download_as_bytes()

                document = documentai.Document.from_json(
                    blob_as_bytes, ignore_unknown_fields=True
                )
                logging.debug(f"Fetched file {i + 1}: {blob.name}")
            else:
                logging.info(f"Skipping non-supported file type {blob.name}")

        # Delete the unique folder created for this operation
        blobs = list(bucket.list_blobs(prefix=prefix))
        bucket.delete_blobs(blobs)
        hitl_op_id = None
        if hitl_op_full_path:
            logging.debug(f"Async processing returned hitl_op = {hitl_op_full_path}")
            hitl_op_id = hitl_op_full_path.split("/").pop()

        results_dict = documentai.types.Document.to_dict(document)
        return ProcessedDocument(
            document=document,
            dictionary=results_dict,
            hitl_operation_id=hitl_op_id,
        )