def batch_ingest()

in document_ai_warehouse/document_ai_warehouse_batch_ingestion/main.py [0:0]


def batch_ingest(args: argparse.Namespace) -> None:
    dir_uri = args.dir_uri
    folder_name = args.root_name
    schema_id = args.schema_id
    schema_name = args.schema_name
    overwrite = args.overwrite
    options = args.options
    flatten = args.flatten
    processor_id = args.processor_id

    if not processor_id:
        processor_id = PROCESSOR_ID
    Logger.info(
        f"Batch load into DocumentAI WH using \n root_name={folder_name}, processor_id={processor_id},"
        f"dir_uri={dir_uri}, overwrite={overwrite}, options={options}, flatten={flatten} \n"
        f"DOCAI_WH_PROJECT_NUMBER={DOCAI_WH_PROJECT_NUMBER}, "
        f"DOCAI_PROJECT_NUMBER={DOCAI_PROJECT_NUMBER}, "
        f"GCS_OUTPUT_BUCKET={GCS_OUTPUT_BUCKET}, "
        f"CALLER_USER={CALLER_USER}"
    )

    assert processor_id, (
        "processor_id is not set as PROCESSOR_ID env variable and "
        "is not provided as an input parameter (-p)"
    )
    assert GCS_OUTPUT_BUCKET, "GCS_OUTPUT_BUCKET not set"
    assert DOCAI_PROJECT_NUMBER, "DOCAI_PROJECT_NUMBER not set"
    assert DOCAI_WH_PROJECT_NUMBER, "DOCAI_WH_PROJECT_NUMBER not set"

    initial_start_time = time.time()

    (
        created_folders,
        files_to_parse,
        processed_files,
        processed_dirs,
        error_files,
    ) = prepare_file_structure(dir_uri, folder_name, overwrite, flatten)

    created_schemas, document_id_list = proces_documents(
        files_to_parse, schema_id, schema_name, processor_id, options
    )

    process_time = time.time() - initial_start_time
    time_elapsed = round(process_time)
    document_schema_str = ""
    if len(created_schemas) > 0:
        document_schema_str = (
            f"  - created document schema with id {','.join(list(created_schemas))}"
        )
    Logger.info(
        f"Job Completed in {str(round(time_elapsed / 60))} minute(s): \n"
        f"{document_schema_str}  \n"
        f"  - processed gcs files={len(processed_files)} \n"
        f"  - created dw documents={len(document_id_list)} \n"
        f"  - processed gcs directories={len(processed_dirs)} \n"
        f"  - created dw directories={len(created_folders)} \n"
    )
    if len(error_files) != 0:
        Logger.info(
            f"Following files could not be handled (Document page number exceeding limit of 200 pages? "
            f"{','.join(error_files)}"
        )