in src/data_load/load.py [0:0]
def verify_ingestion(dir_name, batch_size=1):
# Recursive traversal of files and subdirectories of the root directory and files processing
success = []
failed = []
reference_pattern = "{}:reference-data".format(config.get("CONNECTION", "data-partition-id"))
master_pattern = "{}:master-data".format(config.get("CONNECTION", "data-partition-id"))
sleep_after_count = 1000
queries_made = 0
for root, _, files in os.walk(dir_name):
logger.debug(f"Files list: {files}")
cur_batch = 0
record_ids = []
for file in files:
filepath = os.path.join(root, file)
if filepath.endswith(".json"):
with open(filepath) as file:
data_object = json.load(file)
else:
continue
if not data_object:
logger.error(f"Error with file {filepath}. File is empty.")
elif "ReferenceData" in data_object and len(data_object["ReferenceData"]) > 0:
ingested_data = data_object["ReferenceData"]
elif "MasterData" in data_object and len(data_object["MasterData"]) > 0:
ingested_data = data_object["MasterData"]
elif "Data" in data_object:
ingested_data = data_object["Data"]
if isinstance(ingested_data, dict):
if id not in ingested_data["WorkProduct"]:
# Add the Work-Product Id -> opendes:work-product--WorkProduct:load_document_69_D_CH_11_pdf.json
work_product_id = generate_workproduct_id(ingested_data["WorkProduct"]["data"]["Name"],
get_directory_name(filepath))
record_ids.append(work_product_id)
else:
record_ids.append(ingested_datum.get("id").replace('osdu:reference-data', reference_pattern).replace('osdu:master-data', master_pattern))
cur_batch += 1
elif isinstance(ingested_data, list):
for ingested_datum in ingested_data:
if "id" in ingested_datum:
record_ids.append(reference_data_id(ingested_datum, reference_pattern, master_pattern))
cur_batch += 1
if cur_batch >= batch_size:
logger.debug(f"Searching records with batch size {cur_batch}")
s, f = verify_ids(record_ids)
success += s
failed += f
queries_made = queries_made + cur_batch
if queries_made >= sleep_after_count:
time.sleep(60)
queries_made = 0
cur_batch = 0
record_ids = []
else:
logger.debug(
f"Current batch size after process {filepath} is {cur_batch}. Reading more files..")
if cur_batch > 0:
logger.debug(
f"Searching remaining records with batch size {cur_batch}")
s, f = verify_ids(record_ids)
success += s
failed += f
queries_made = queries_made + cur_batch
if queries_made >= sleep_after_count:
time.sleep(60)
queries_made = 0
return success, failed