document_ai_warehouse/document_ai_warehouse_batch_ingestion/main.py (531 lines of code) (raw):
import argparse
import json
import os
import time
from typing import Any, Dict, List, Optional, Set, Tuple
from common.utils import helper
from common.utils import storage_utils
from common.utils.docai_warehouse_helper import get_key_value_pairs
from common.utils.docai_warehouse_helper import get_metadata_properties
from common.utils.document_ai_utils import DocumentaiUtils
from common.utils.document_warehouse_utils import DocumentWarehouseUtils
from common.utils.helper import is_date
from common.utils.logging_handler import Logger
from config import API_LOCATION
from config import CALLER_USER
from config import DOCAI_PROJECT_NUMBER
from config import DOCAI_WH_PROJECT_NUMBER
from config import FOLDER_SCHEMA_PATH
from config import GCS_OUTPUT_BUCKET
from config import PROCESSOR_ID
from google.api_core.exceptions import NotFound
from google.cloud import contentwarehouse_v1
from google.cloud import storage
dw_utils = DocumentWarehouseUtils(
project_number=DOCAI_WH_PROJECT_NUMBER, api_location=API_LOCATION
)
docai_utils = DocumentaiUtils(
project_number=DOCAI_PROJECT_NUMBER, api_location=API_LOCATION
)
storage_client = storage.Client()
def get_schema(args: argparse.Namespace):
file_uri = args.file_path
schema_name = args.schema_name
processor_id = args.processor_id
if not processor_id:
processor_id = PROCESSOR_ID
Logger.info(
f"Get document schema with: \nuri={file_uri}, processor_id={processor_id}, schema_name={schema_name} \n"
f"GCS_OUTPUT_BUCKET={GCS_OUTPUT_BUCKET}, "
f"CALLER_USER={CALLER_USER}"
)
assert processor_id, (
"processor_id is not set as PROCESSOR_ID env variable and "
"is not provided as an input parameter (-p)"
)
assert GCS_OUTPUT_BUCKET, "GCS_OUTPUT_BUCKET not set"
assert DOCAI_PROJECT_NUMBER, "DOCAI_PROJECT_NUMBER not set"
docai_output_list = docai_utils.batch_extraction(
processor_id, [file_uri], GCS_OUTPUT_BUCKET
)
processor = docai_utils.get_processor(processor_id)
for f in docai_output_list:
document_ai_output = docai_output_list[f]
keys = get_key_value_pairs(document_ai_output)
if not schema_name:
schema_name = processor.display_name
schema_path = create_mapping_schema(schema_name, keys)
print(f"Generated {schema_path} with document schema for {file_uri}")
def upload_schema(args: argparse.Namespace):
schema_path = args.file_path
overwrite = args.overwrite
if not schema_path:
Logger.error("Path to the schema file was not provided")
return
Logger.info(
f"Upload document schema with: \nschema_path={schema_path}, overwrite={overwrite}"
)
create_document_schema(schema_path, overwrite)
def delete_schema(args: argparse.Namespace) -> None:
schema_ids = args.schema_ids
schema_names = args.schema_names
if len(schema_ids) > 0:
for schema_id in schema_ids:
delete_schema_by_id(schema_id)
if len(schema_names) > 0:
for schema_name in schema_names:
delete_schema_by_name(schema_name)
def batch_ingest(args: argparse.Namespace) -> None:
dir_uri = args.dir_uri
folder_name = args.root_name
schema_id = args.schema_id
schema_name = args.schema_name
overwrite = args.overwrite
options = args.options
flatten = args.flatten
processor_id = args.processor_id
if not processor_id:
processor_id = PROCESSOR_ID
Logger.info(
f"Batch load into DocumentAI WH using \n root_name={folder_name}, processor_id={processor_id},"
f"dir_uri={dir_uri}, overwrite={overwrite}, options={options}, flatten={flatten} \n"
f"DOCAI_WH_PROJECT_NUMBER={DOCAI_WH_PROJECT_NUMBER}, "
f"DOCAI_PROJECT_NUMBER={DOCAI_PROJECT_NUMBER}, "
f"GCS_OUTPUT_BUCKET={GCS_OUTPUT_BUCKET}, "
f"CALLER_USER={CALLER_USER}"
)
assert processor_id, (
"processor_id is not set as PROCESSOR_ID env variable and "
"is not provided as an input parameter (-p)"
)
assert GCS_OUTPUT_BUCKET, "GCS_OUTPUT_BUCKET not set"
assert DOCAI_PROJECT_NUMBER, "DOCAI_PROJECT_NUMBER not set"
assert DOCAI_WH_PROJECT_NUMBER, "DOCAI_WH_PROJECT_NUMBER not set"
initial_start_time = time.time()
(
created_folders,
files_to_parse,
processed_files,
processed_dirs,
error_files,
) = prepare_file_structure(dir_uri, folder_name, overwrite, flatten)
created_schemas, document_id_list = proces_documents(
files_to_parse, schema_id, schema_name, processor_id, options
)
process_time = time.time() - initial_start_time
time_elapsed = round(process_time)
document_schema_str = ""
if len(created_schemas) > 0:
document_schema_str = (
f" - created document schema with id {','.join(list(created_schemas))}"
)
Logger.info(
f"Job Completed in {str(round(time_elapsed / 60))} minute(s): \n"
f"{document_schema_str} \n"
f" - processed gcs files={len(processed_files)} \n"
f" - created dw documents={len(document_id_list)} \n"
f" - processed gcs directories={len(processed_dirs)} \n"
f" - created dw directories={len(created_folders)} \n"
)
if len(error_files) != 0:
Logger.info(
f"Following files could not be handled (Document page number exceeding limit of 200 pages? "
f"{','.join(error_files)}"
)
FUNCTION_MAP = {
"batch_ingest": batch_ingest,
"get_schema": get_schema,
"upload_schema": upload_schema,
"delete_schema": delete_schema,
}
def main():
parser = get_args()
args = parser.parse_args()
func = FUNCTION_MAP[args.command]
func(args)
def get_args():
# Read command line arguments
args_parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
description="""
Script with multiple commands options to batch_ingest documents, generate schema,
upload schema or delete schema using Document AI Warehouse.
""",
epilog="""
Examples:
Batch ingestion of files inside GCS directory:
> python main.py batch_ingest -d=gs://my-folder -p PROCESSOR_ID [-n=UM_Guidelines] [-sn=schema_name] [--overwrite]
Generate document schema based on the Document AI output:
> python main.py get_schema -f=gs://my-folder/my-form.pdf -p PROCESSOR_ID [-sn=schema_name]
Upload document schema into Document AI WH:
> python main.py upload_schema -f=gs://my-folder/schema_name.json [-o]
Delete document schema from Document AI WH:
> python main.py delete_schema -ss=schema_id1 -ss=schena_id2 -sns=schema_name1 -sns=schema_name2
""",
)
args_parser.add_argument("command", choices=FUNCTION_MAP.keys())
args_parser.add_argument(
"-d",
dest="dir_uri",
help="Path to gs directory uri, containing data with PDF documents to be loaded. "
"All original structure of sub-folders will be preserved.",
)
args_parser.add_argument(
"-s", dest="schema_id", help="Optional existing schema_id."
)
args_parser.add_argument("-p", dest="processor_id", help="Processor_ID.")
args_parser.add_argument(
"-sn",
dest="schema_name",
help="Optional name of the schema to be created (should not exist).",
)
args_parser.add_argument(
"-o",
"--overwrite",
dest="overwrite",
help="Overwrite files/schema if already exist.",
action="store_true",
default=False,
)
args_parser.add_argument(
"-f",
dest="file_path",
help="Path to file.",
)
args_parser.add_argument(
"--flatten",
dest="flatten",
help="Flatten the directory structure.",
action="store_true",
default=False,
)
args_parser.add_argument(
"--options",
dest="options",
help=" When set (by default), will automatically fill in document properties using schema options.",
action="store_true",
default=True,
)
args_parser.add_argument(
"-n",
dest="root_name",
help="Name of the root folder inside DW for batch ingestion."
" When skipped, will use the same name of the folder being loaded from.",
)
args_parser.add_argument(
"-sns",
dest="schema_names",
action="append",
default=[],
help="Schema display_name to be deleted.",
)
args_parser.add_argument(
"-ss",
dest="schema_ids",
action="append",
default=[],
help="Schema_ids to be deleted.",
)
return args_parser
def proces_documents(
files_to_parse: Dict[str, Any],
schema_id: str,
schema_name: str,
processor_id: str,
options: bool,
) -> Tuple[Set[str], List[str]]:
created_schemas: Set[str] = set()
document_id_list: List[str] = []
if len(files_to_parse) == 0:
return created_schemas, document_id_list
docai_output_list = docai_utils.batch_extraction(
processor_id, list(files_to_parse.keys()), GCS_OUTPUT_BUCKET
)
processor = docai_utils.get_processor(processor_id)
document_schemas = get_document_schemas()
document_schema_id = None
if not schema_name:
schema_name = processor.display_name
for f_uri in docai_output_list:
document_ai_output = docai_output_list[f_uri]
if f_uri in files_to_parse:
keys = get_key_value_pairs(document_ai_output)
create_new_schema = False
if schema_id:
document_schema_id = schema_id
else:
if schema_name in document_schemas:
document_schema_id = document_schemas[schema_name]
schema = dw_utils.get_document_schema(document_schema_id)
if (
schema
and len(keys) != 0
and len(schema.property_definitions) == 0
and options
):
create_new_schema = True
else:
create_new_schema = True
if create_new_schema:
schema_path = create_mapping_schema(schema_name, keys, options)
new_schema_id = create_document_schema(schema_path, True)
if document_schema_id != new_schema_id:
created_schemas.add(new_schema_id)
document_schemas[schema_name] = new_schema_id
document_schema_id = new_schema_id
(parent_id, reference_id) = files_to_parse[f_uri]
schema = dw_utils.get_document_schema(document_schema_id)
metadata_properties = get_metadata_properties(keys, schema)
if document_schema_id:
try:
document_id = upload_document_gcs(
f_uri,
document_schema_id,
parent_id,
reference_id,
document_ai_output,
metadata_properties,
)
if document_id:
document_id_list.append(document_id)
except Exception as ex:
Logger.error(f"Failed to upload {f_uri} - {ex}")
return created_schemas, document_id_list
def prepare_file_structure(
dir_uri: str,
folder_name: str,
overwrite: bool,
flatten: bool,
):
created_folders = []
files_to_parse = {}
processed_files = []
processed_dirs = set()
error_files = []
folder_schema_id = create_folder_schema(FOLDER_SCHEMA_PATH)
bucket_name, prefix = helper.split_uri_2_bucket_prefix(dir_uri)
if not prefix.endswith(".pdf") and prefix != "":
prefix = prefix + "/"
blobs = list(storage_client.list_blobs(bucket_name, prefix=prefix))
if folder_name is None:
folder_name = bucket_name
for blob in blobs:
filename = blob.name
Logger.info(f"Handling {filename}")
try:
if filename.endswith(".pdf"):
if flatten:
dirs = [filename.replace("/", "__")]
else:
dirs = filename.split("/")
if " " in dirs[:-1]:
Logger.warning(
f"Skipping {filename} since name contains space, currently this is not supported."
)
parent_id = create_folder(folder_schema_id, folder_name, folder_name)
parent = dw_utils.get_document(parent_id, CALLER_USER)
for d in dirs:
reference_id = f"{parent.reference_id}__{d}".strip()
if not d.endswith(".pdf"):
processed_dirs.add(d)
if reference_id not in created_folders:
create_folder(folder_schema_id, d, reference_id)
created_folders.append(reference_id)
parent = dw_utils.get_document(
f"referenceId/{reference_id}", CALLER_USER
)
parent_id = parent.name.split("/")[-1]
else:
if document_exists(reference_id):
if overwrite:
delete_document(reference_id)
else:
Logger.info(
f"Skipping gs://{bucket_name}/{filename} since it already exists..."
)
continue
files_to_parse[f"gs://{bucket_name}/{filename}"] = (
parent_id,
reference_id,
)
processed_files.append(filename)
except Exception as ex:
Logger.error(f"Exception {ex} while handling {filename}")
error_files.append(filename)
return created_folders, files_to_parse, processed_files, processed_dirs, error_files
def get_type(value: str) -> str:
if type(value) == bool or str(value) == "":
return "text_type_options" # bool Not Supported
if is_date(value):
return "date_time_type_options"
if is_valid_int(value):
return "integer_type_options"
if is_valid_float(value):
return "float_type_options"
return "text_type_options"
def is_valid_float(string: str) -> bool:
try:
float(string)
return True
except ValueError:
return False
def is_valid_bool(string: str) -> bool:
return string.lower() in ["true", "false"]
def is_valid_int(string: str) -> bool:
return string.isdigit()
def create_mapping_schema(display_name: str, names, options: bool = True) -> str:
properties: List[Dict[str, Any]] = []
mapping_dic = {
"display_name": display_name,
"property_definitions": [],
"document_is_folder": False,
"description": "Auto-generated using batch upload",
}
if options:
for name, value in names:
definition = {
"name": name,
"display_name": name,
"is_repeatable": False,
"is_filterable": True,
"is_searchable": True,
"is_metadata": True,
"is_required": False,
}
v_type = get_type(value)
if v_type:
definition[v_type] = {}
properties.append(definition)
mapping_dic["property_definitions"] = properties
file_path = os.path.join(
os.path.dirname(__file__), "schema_files", f"{display_name}.json"
)
with open(file_path, "w") as f:
json.dump(mapping_dic, f, indent=2)
return file_path
def document_exists(reference_id: str) -> bool:
reference_path = f"referenceId/{reference_id}"
try:
dw_utils.get_document(reference_path, CALLER_USER)
return True
except NotFound:
return False
def delete_document(reference_id: str) -> None:
Logger.info(f"delete_document reference_id={reference_id}")
reference_path = f"referenceId/{reference_id}"
dw_utils.delete_document(document_id=reference_path, caller_user_id=CALLER_USER)
def upload_document_gcs(
file_uri: str,
document_schema_id: str,
folder_id: str,
reference_id: str,
document_ai_output,
metadata_properties: List[contentwarehouse_v1.Property],
) -> Optional[str]:
create_document_response = dw_utils.create_document(
display_name=os.path.basename(file_uri),
mime_type="application/pdf",
document_schema_id=document_schema_id,
raw_document_path=file_uri,
docai_document=document_ai_output,
caller_user_id=CALLER_USER,
reference_id=reference_id,
metadata_properties=metadata_properties,
)
Logger.debug(
f"create_document_response={create_document_response}"
) # Verify that the properties have been set correctly
if create_document_response:
document_id = create_document_response.document.name.split("/")[-1]
dw_utils.link_document_to_folder(
document_id=document_id,
folder_document_id=folder_id,
caller_user_id=CALLER_USER,
)
Logger.info(
f"Created document {file_uri} with reference_id={reference_id} inside folder_id={folder_id} "
f"and using schema_id={document_schema_id}"
)
return document_id
return None
def create_folder_schema(schema_path: str) -> str:
folder_schema = storage_utils.read_file(schema_path, mode="r")
display_name = json.loads(folder_schema).get("display_name")
for ds in dw_utils.list_document_schemas():
if ds.display_name == display_name and ds.document_is_folder:
return ds.name.split("/")[-1]
create_schema_response = dw_utils.create_document_schema(folder_schema)
folder_schema_id = create_schema_response.name.split("/")[-1]
Logger.info(f"folder_schema_id={folder_schema_id}")
response = dw_utils.get_document_schema(schema_id=folder_schema_id)
Logger.debug(f"response={response}")
return folder_schema_id
def create_folder(
folder_schema_id: str, display_name: str, reference_id: str
) -> Optional[str]:
reference_path = f"referenceId/{reference_id}"
try:
document = dw_utils.get_document(reference_path, CALLER_USER)
folder_id = document.name.split("/")[-1]
return folder_id
except NotFound:
Logger.info(
f" -------> Creating sub-folder [{display_name}] with reference_id=[{reference_id}]"
)
create_folder_response = dw_utils.create_document(
display_name=display_name,
document_schema_id=folder_schema_id,
caller_user_id=CALLER_USER,
reference_id=reference_id,
)
if create_folder_response is not None:
folder_id = create_folder_response.document.name.split("/")[-1]
return folder_id
return None
def get_document_schemas() -> Dict[str, Any]:
schemas = {}
for ds in dw_utils.list_document_schemas():
if ds.display_name not in schemas:
schemas[ds.display_name] = ds.name.split("/")[-1]
return schemas
def create_document_schema(schema_path: str, overwrite_schema: bool = False) -> str:
document_schema = storage_utils.read_file(schema_path, mode="r")
display_name = json.loads(document_schema).get("display_name")
for ds in dw_utils.list_document_schemas():
if ds.display_name == display_name and not ds.document_is_folder:
document_schema_id = ds.name.split("/")[-1]
if overwrite_schema:
try:
Logger.info(
f"Removing {ds.display_name} with document_schema_id={document_schema_id}"
)
dw_utils.delete_document_schema(document_schema_id)
except Exception as ex:
Logger.warning(f"Could not replace schema due to error {ex}")
return document_schema_id
else:
Logger.info(
f"create_document_schema - Document schema with display_name = {display_name} already "
f"exists with schema_id = {document_schema_id}"
)
return document_schema_id
create_schema_response = dw_utils.create_document_schema(document_schema)
document_schema_id = create_schema_response.name.split("/")[-1]
Logger.info(
f"create_document_schema - Created document schema with display_name = {display_name} "
f"and schema_id = {document_schema_id}"
)
return document_schema_id
def delete_schema_by_id(schema_id: str) -> None:
try:
Logger.info(f"Removing schema with schema_id={schema_id}")
dw_utils.delete_document_schema(schema_id)
except Exception as ex:
Logger.warning(f"Could not replace schema due to error {ex}")
def delete_schema_by_name(display_name: str) -> None:
Logger.info(f"Deleting schema with display_name={display_name}")
for ds in dw_utils.list_document_schemas():
if ds.display_name == display_name and not ds.document_is_folder:
document_schema_id = ds.name.split("/")[-1]
try:
Logger.info(
f"Removing {ds.display_name} with document_schema_id={document_schema_id}"
)
dw_utils.delete_document_schema(document_schema_id)
except Exception as ex:
Logger.warning(f"Could not delete schema due to error {ex}")
else:
Logger.info(
f"Schema with display_name={display_name} and schema_id={document_schema_id} "
f"has been successfully deleted "
)
if __name__ == "__main__":
main()