def run()

in bq-connector/docai_bq_connector/connector/DocAIBQConnector.py [0:0]


    def run(self):
        storage_manager = StorageManager(
            self.destination_project_id, self.destination_dataset_id
        )
        doc_ai_process = Processor(
            bucket_name=self.bucket_name,
            file_name=self.file_name,
            content_type=self.content_type,
            processor_project_id=self.processor_project_id,
            processor_location=self.processor_location,
            processor_id=self.processor_id,
            extraction_result_output_bucket=self.extraction_result_output_bucket,
            async_output_folder_gcs_uri=self.async_output_folder_gcs_uri,
            sync_timeout=self.doc_ai_sync_timeout,
            async_timeout=self.doc_ai_async_timeout,
            should_async_wait=self.should_async_wait,
            should_write_extraction_result=self.should_write_extraction_result,
            max_sync_page_count=self.max_sync_page_count,
        )

        document = doc_ai_process.process()

        # Check if it was invoked for a new document, or for an existing operation.
        if self.operation_id is None:
            # New document
            # Check if a HITL operation was initiated as part of the processing

            _hitl_op_id = None
            if isinstance(document, ProcessedDocument) and document is not None:
                _hitl_op_id = document.hitl_operation_id

            _current_doc_status = DocumentState.unknown
            if _hitl_op_id is None:
                _current_doc_status = DocumentState.document_processing_complete
            else:
                _current_doc_status = DocumentState.submitted_for_hitl

            _doc_unique_id = self._augment_metadata_mapping_info(
                file_name=self.file_name,
                hitl_operation_id=_hitl_op_id,
                doc_status=_current_doc_status,
            )

            # Insert tracking info in the doc_reference table
            bq_row = {
                "doc_id": _doc_unique_id,
                "file_name": self.file_name,
                "doc_status": str(_current_doc_status),
                "doc_type": self.metadata_mapper.get_value_for_metadata("doc_type"),
                "doc_event_id": self.metadata_mapper.get_value_for_metadata(
                    "doc_event_id"
                ),
                "doc_group_id": self.metadata_mapper.get_value_for_metadata(
                    "doc_group_id"
                ),
                "hitl_operation_id": _hitl_op_id,
                "created_at": self.metadata_mapper.get_value_for_metadata("created_at"),
                "updated_at": self.metadata_mapper.get_value_for_metadata("updated_at"),
                "destination_table_id": self.destination_table_id,
            }
            logging.debug("Will insert into doc_reference table:")
            logging.debug(bq_row)
            storage_manager.write_record("doc_reference", bq_row)
        else:
            # Existing document that was sent for HITL review
            # Retrieve info stored when the doc was first processed

            query = f"""
                SELECT
                doc_id, file_name, doc_status, doc_type, doc_event_id, doc_group_id, created_at, destination_table_id
                FROM `{self.destination_project_id}.{self.destination_dataset_id}.doc_reference`
                WHERE hitl_operation_id = @operation_id """
            query_params = [
                {"name": "operation_id", "type": "STRING", "value": self.operation_id}
            ]
            doc_reference_records = storage_manager.get_records(query, query_params)

            if len(doc_reference_records) == 0:
                raise InitialDocRecordNotFoundError(
                    f"Initial hitl reference record not found for hitl_operation_id: {self.operation_id}"
                )
            elif len(doc_reference_records) > 1:
                raise DocAlreadyProcessedError(
                    f"Duplicate hitl reference records found for hitl_operation_id: {self.operation_id}"
                )
            logging.debug("Will now work with the single result")
            doc_ref = doc_reference_records[0]
            _doc_id = doc_ref.get("doc_id")
            _doc_group_id = doc_ref.get("doc_group_id")
            _doc_type = doc_ref.get("doc_type")
            _orig_file_name = doc_ref.get("file_name")
            _doc_created_at = doc_ref.get("created_at")
            self.destination_table_id = doc_ref.get("destination_table_id")
            self._augment_metadata_mapping_info(
                file_name=_orig_file_name,
                hitl_operation_id=self.operation_id,
                doc_group_id=_doc_group_id,
                doc_type=_doc_type,
                doc_status=DocumentState.document_processing_complete,
                created_at=_doc_created_at,
            )
            # Update status in doc_reference table
            try:
                _status_update = {
                    "doc_status": str(DocumentState.document_processing_complete),
                    "updated_at": self.metadata_mapper.get_value_for_metadata(
                        "updated_at"
                    ),
                }
                logging.debug(
                    f"Will update doc_reference record for doc_id = {_doc_id} - "
                    f"New status = {str(DocumentState.document_processing_complete)}"
                )
                storage_manager.update_record(
                    table_id="doc_reference",
                    record_id_name="doc_id",
                    record_id_value=_doc_id,
                    cols_to_update=_status_update,
                )
            except Exception as e:
                # If the original document was processed fairly recently, the row in bq doc_reference table will still
                # be in BQ's streaming buffer and won't be updatable. Ignore this problem
                logging.info(
                    f"Could not update doc_reference table for doc_id = {_doc_id}. Probable cause: row still in BQ "
                    f"streaming buffer: {str(e)}"
                )

        # Process result, validate types, convert as necessary and store in destination BQ table.
        if not storage_manager.does_table_exist(self.destination_table_id):
            raise TableNotFoundError(
                f"Destination table {self.destination_table_id} not found "
                f"in '{self.destination_project_id}.{self.destination_dataset_id}'"
            )

        schema = storage_manager.get_table_schema(self.destination_table_id)
        mapper = BqDocumentMapper(
            document=document,
            bq_schema=schema,
            metadata_mapper=self.metadata_mapper,
            custom_fields=self.custom_fields,
            include_raw_entities=self.include_raw_entities,
            include_error_fields=self.include_error_fields,
            parsing_methodology=self.parsing_methodology,
        )

        if (
            self.continue_on_error is False
            and self.include_error_fields is False
            and len(mapper.errors) > 0
        ):
            logging.error(mapper.errors)
            exit(100)
        bq_row = mapper.to_bq_row()

        # 1: Attempt initial row insert
        insert_1_errors = storage_manager.write_record(
            self.destination_table_id, bq_row
        )
        self.log_bq_errors(1, insert_1_errors)
        exclude_fields: [str] = mapper.process_insert_errors(insert_1_errors)

        retry_success = False
        if self.continue_on_error is True:
            # 2: Attempt a second insert removing the offending fields
            if len(insert_1_errors) > 0 and len(exclude_fields) > 0:
                current_try = 0
                while current_try < self.retry_count:
                    current_try += 1
                    # BQ only reports a single column insert error per row
                    bq_row_attempt_2 = mapper.to_bq_row(exclude_fields=exclude_fields)
                    insert_2_errors = storage_manager.write_record(
                        self.destination_table_id, bq_row_attempt_2
                    )
                    self.log_bq_errors(current_try, insert_2_errors)
                    exclude_fields.extend(mapper.process_insert_errors(insert_2_errors))
                    if len(insert_2_errors) == 0:
                        retry_success = True
                        break

            # 3: If fails again, then fallback excluding the entities
            if len(insert_1_errors) > 0 and retry_success is False:
                bq_row_attempt_3 = mapper.to_bq_row(append_parsed_fields=False)
                if len(bq_row_attempt_3) > 0:
                    insert_3_errors = storage_manager.write_record(
                        self.destination_table_id, bq_row_attempt_3
                    )
                    self.log_bq_errors(self.retry_count + 1, insert_3_errors)
                else:
                    logging.warning("There are no fields to insert")

        return document