def form_parser_extraction()

in microservices/extraction_service/src/utils/extract_entities.py [0:0]


def form_parser_extraction(parser_details: dict, gcs_doc_path: str,
                           doc_type: str, context: str, timeout: int):
  """
  This is form parser extraction main function. It will send
  request to parser and retrieve response and call
    default and derived entities functions

  Parameters
    ----------
    parser_details: It has parser info like parser id, name, location, and etc
    gcs_doc_path: Document gcs path
    doc_type: Document Type
    context: context name
    timeout: Max time given for extraction entities using async form parser API

  Returns: Form parser response - list of dicts having entity, value,
    confidence and manual_extraction information.
    -------
  """

  location = parser_details["location"]
  processor_id = parser_details["processor_id"]
  opts = {}

  # Location can be 'us' or 'eu'
  if location == "eu":
    opts = {"api_endpoint": "eu-documentai.googleapis.com"}

  client = documentai.DocumentProcessorServiceClient(client_options=opts)
  # create a temp folder to store parser op, delete folder once processing done
  # call create gcs bucket function to create bucket,
  # folder will be created automatically not the bucket
  gcs_output_uri = f"gs://{DOCAI_OUTPUT_BUCKET_NAME}"
  letters = string.ascii_lowercase
  temp_folder = "".join(random.choice(letters) for i in range(10))
  gcs_output_uri_prefix = "temp_" + temp_folder
  # temp folder location
  destination_uri = f"{gcs_output_uri}/{gcs_output_uri_prefix}/"
  # delete temp folder
  # del_gcs_folder(gcs_output_uri.split("//")[1], gcs_output_uri_prefix)
  gcs_documents = documentai.GcsDocuments(documents=[{
      "gcs_uri": gcs_doc_path,
      "mime_type": "application/pdf"
  }])
  input_config = documentai.BatchDocumentsInputConfig\
      (gcs_documents=gcs_documents)
  # Temp op folder location
  output_config = documentai.DocumentOutputConfig(
      gcs_output_config={"gcs_uri": destination_uri})

  Logger.info(f"input_config = {input_config}")
  Logger.info(f"output_config = {output_config}")
  Logger.info(f"processor_id = {processor_id}")

  # parser api end point
  # name = f"projects/{project_id}/locations/{location}/processors/{processor_id}"
  Logger.info("Form parser extraction api called")

  # request for Doc AI
  request = documentai.types.document_processor_service.BatchProcessRequest(
      name=processor_id,
      input_documents=input_config,
      document_output_config=output_config,
  )
  operation = client.batch_process_documents(request)
  # Wait for the operation to finish
  operation.result(timeout=timeout)

  # Results are written to GCS. Use a regex to find
  # output files
  match = re.match(r"gs://([^/]+)/(.+)", destination_uri)
  output_bucket = match.group(1)
  prefix = match.group(2)

  storage_client = storage.Client()
  bucket = storage_client.get_bucket(output_bucket)
  blob_list = list(bucket.list_blobs(prefix=prefix))
  extracted_entity_list = []
  form_parser_text = ""
  # saving form parser json, this can be removed from pipeline
  if not os.path.exists(temp_folder):
    os.mkdir(temp_folder)
  # browse through output jsons
  for i, blob in enumerate(blob_list):
    # If JSON file, download the contents of this blob as a bytes object.
    if ".json" in blob.name:
      blob_as_bytes = blob.download_as_bytes()
      # saving the parser response to the folder, remove this while integration
      # parser_json_fname = "temp.json"
      parser_json_fname = \
          os.path.join(temp_folder, f"res_{i}.json")
      with open(parser_json_fname, "wb") as file_obj:
        blob.download_to_file(file_obj)

      document = documentai.types.Document.from_json(blob_as_bytes)
      # print(f"Fetched file {i + 1}")
      form_parser_text += document.text
      # Read the text recognition output from the processor
      for page in document.pages:
        for form_field in page.form_fields:
          field_name, field_name_confidence, field_coordinates = \
              extract_form_fields(form_field.field_name, document)
          field_value, field_value_confidence, value_coordinates = \
              extract_form_fields(form_field.field_value, document)
          # noise removal from keys and values
          field_name = clean_form_parser_keys(field_name)
          field_value = strip_value(field_value)
          temp_dict = {
              "key": field_name,
              "key_coordinates": field_coordinates,
              "value": field_value,
              "value_coordinates": value_coordinates,
              "key_confidence": round(field_name_confidence, 2),
              "value_confidence": round(field_value_confidence, 2),
              "page_no": int(page.page_number),
              "page_width": int(page.dimension.width),
              "page_height": int(page.dimension.height)
          }

          extracted_entity_list.append(temp_dict)

      print("Extraction completed")
    else:
      print(f"Skipping non-supported file type {blob.name}")

  # Get corresponding mapping dict, for specific context or fallback to "all"
  docai_entity_mapping_by_context = DOCAI_ENTITY_MAPPING.get(context)
  mapping_dict = docai_entity_mapping_by_context.get(
      doc_type) or DOCAI_ENTITY_MAPPING["all"][doc_type]

  print(f"context = {context}")
  print(f"doc_type = {doc_type}")
  print(f"mapping_dict = {mapping_dict}")

  # Extract desired entites from form parser
  try:
    form_parser_entities_list, flag = form_parser_entities_mapping(
        extracted_entity_list, mapping_dict, form_parser_text, temp_folder)

    # delete temp folder
    if os.path.exists(temp_folder):
      shutil.rmtree(temp_folder)
    del_gcs_folder(gcs_output_uri.split("//")[1], gcs_output_uri_prefix)
    Logger.info("Required entities created from Form parser response")
    return form_parser_entities_list, flag
  except Exception as e:
    Logger.error(e)
    if os.path.exists(temp_folder):
      shutil.rmtree(temp_folder)