def verify_ingestion()

in src/data_load/load.py [0:0]


def verify_ingestion(dir_name, batch_size=1):
    # Recursive traversal of files and subdirectories of the root directory and files processing

    success = []
    failed = []

    reference_pattern = "{}:reference-data".format(config.get("CONNECTION", "data-partition-id"))
    master_pattern = "{}:master-data".format(config.get("CONNECTION", "data-partition-id"))
    sleep_after_count = 1000
    queries_made = 0

    for root, _, files in os.walk(dir_name):
        logger.debug(f"Files list: {files}")
        cur_batch = 0
        record_ids = []
        for file in files:
            filepath = os.path.join(root, file)
            if filepath.endswith(".json"):
                with open(filepath) as file:
                    data_object = json.load(file)

            else:
                continue
            if not data_object:
                logger.error(f"Error with file {filepath}. File is empty.")

            elif "ReferenceData" in data_object and len(data_object["ReferenceData"]) > 0:
                ingested_data = data_object["ReferenceData"]

            elif "MasterData" in data_object and len(data_object["MasterData"]) > 0:
                ingested_data = data_object["MasterData"]

            elif "Data" in data_object:
                ingested_data = data_object["Data"]

            if isinstance(ingested_data, dict):
                if id not in ingested_data["WorkProduct"]:
                    # Add the Work-Product Id -> opendes:work-product--WorkProduct:load_document_69_D_CH_11_pdf.json
                    work_product_id = generate_workproduct_id(ingested_data["WorkProduct"]["data"]["Name"],
                                                              get_directory_name(filepath))
                    record_ids.append(work_product_id)
                else:
                    record_ids.append(ingested_datum.get("id").replace('osdu:reference-data', reference_pattern).replace('osdu:master-data', master_pattern))
                cur_batch += 1

            elif isinstance(ingested_data, list):
                for ingested_datum in ingested_data:
                    if "id" in ingested_datum:
                        record_ids.append(reference_data_id(ingested_datum, reference_pattern, master_pattern))
                        cur_batch += 1

            if cur_batch >= batch_size:
                logger.debug(f"Searching records with batch size {cur_batch}")
                s, f = verify_ids(record_ids)
                success += s
                failed += f
                queries_made = queries_made + cur_batch
                if queries_made >= sleep_after_count:
                    time.sleep(60)
                    queries_made = 0

                cur_batch = 0
                record_ids = []
            else:
                logger.debug(
                    f"Current batch size after process {filepath} is {cur_batch}. Reading more files..")

        if cur_batch > 0:
            logger.debug(
                f"Searching remaining records with batch size {cur_batch}")
            s, f = verify_ids(record_ids)
            success += s
            failed += f
            queries_made = queries_made + cur_batch
            if queries_made >= sleep_after_count:
                time.sleep(60)
                queries_made = 0

    return success, failed