in src/data_load/load.py [0:0]
def main(argv):
# Arg Parser - https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest='subparser')
# Ingest command
parser_ingest = subparsers.add_parser('ingest')
parser_ingest.add_argument('-d', '--dir', dest='dir', help='Directory name', required=True)
parser_ingest.add_argument('-b', '--batch', dest='batch', help='Batch size', required=False, type=int, default=1)
parser_ingest.add_argument('-w', "--work-products", dest="wpc", help="Is workproduct data?", action='store_true', required=False, default=False)
parser_ingest.add_argument('-f', '--file-location-map-file', dest='file_location_map', help='Json file where file locations are stored', required=False)
parser_ingest.add_argument('-r', '--standard-reference', dest='standard_reference', help="Is standard reference data?", default=False, action='store_true')
# Standard references
parser_references = subparsers.add_parser("references")
parser_references.add_argument('-d', '--dir', dest='dir', help='Directory name', required=True)
parser_references.add_argument('-s', '--ingestion-sequence-file', dest='ingestion_sequence', help='Json file where file locations are stored', required=True)
# Status command
parser_status = subparsers.add_parser('status')
parser_status.add_argument('-w', '--wait', dest='wait', help='Should wait for ingestion to complete', action='store_true', required=False, default=False)
parser_status.add_argument("-i", "--ingestion-name", dest="ingestion_name", default="ingestion")
# Reports command
parser_reports = subparsers.add_parser('reports')
parser_reports.add_argument('-f', '--file', dest="file", help="Reports File", required=True)
# Verify command
parser_verify = subparsers.add_parser('verify')
parser_verify.add_argument('-d', '--dir', dest='dir', help='Directory name', required=True)
parser_verify.add_argument('-b', '--batch', dest='batch', help='Batch size', required=False, type=int, default=1)
parser_verify.add_argument('-r', '--standard-reference', dest='standard_reference', help="Is standard reference data?", default=False, action='store_true')
parser_verify.add_argument('-s', '--ingestion-sequence-file', dest='ingestion_sequence', help='Json file where file locations are stored', required=False)
# Delete command
parser_verify = subparsers.add_parser('delete')
parser_verify.add_argument('-d', '--dir', dest='dir', help='Directory name', required=True)
# Datasets command
parser_verify = subparsers.add_parser('datasets')
parser_verify.add_argument('-d', '--dir', dest='dir', help='Directory name', required=True)
parser_verify.add_argument( "-f", "--output-file-name", dest="output", help="File to which the file info is saved to", default="datasets-location.json")
parsed_args = parser.parse_args()
logger.info(f"args: {parsed_args}")
#####################
# Action: ingest
#####################
if parsed_args.subparser == "ingest":
open(LOG_FILENAME, 'w').close()
# Get Arguments
vars_parsed_args = vars(parsed_args)
a_dir = vars_parsed_args.get("dir")
a_batch_size = vars_parsed_args.get("batch")
a_is_wpc = vars_parsed_args.get("wpc")
a_location_map = vars_parsed_args.get("file_location_map")
a_standard_ref = vars_parsed_args.get("standard_reference")
# Execute Action
execute_ingestion(a_dir, a_batch_size, a_is_wpc, a_location_map, a_standard_ref)
#####################
# Action: references
#####################
elif parsed_args.subparser == "references":
open(LOG_FILENAME, 'w').close()
# Get Arguments
vars_parsed_args = vars(parsed_args)
a_dir = vars_parsed_args.get("dir")
a_sequence = vars_parsed_args.get("ingestion_sequence")
a_batch_size = vars_parsed_args.get("batch")
# Execute Action
execute_sequence_ingestion(a_dir, a_batch_size, ingestion_sequence=a_sequence)
#####################
# Action: datasets
#####################
elif parsed_args.subparser == "datasets":
# Get Arguments
vars_parsed_args = vars(parsed_args)
a_dir = vars_parsed_args.get("dir")
a_file_name = vars_parsed_args.get("output")
# Execute Action
success, failed = load_files(a_dir)
logger.debug(f"Files that are successfully uploaded: {len(success)}")
# Create Result File
with open(a_file_name, 'w') as f:
json.dump(success, f, indent=4)
logger.info(f"File location map is saved to {f.name}")
logger.info(f"Files that could not be uploaded: {len(failed)}")
logger.info(pformat(failed))
#####################
# Action: status
#####################
elif parsed_args.subparser == "status":
# Get Arguments
vars_parsed_args = vars(parsed_args)
a_wait = vars_parsed_args.get("wait")
a_run_id = vars_parsed_args.get("ingestion_name")
# Execute Action
results = status_check()
if a_wait:
while True:
completed = True
for result in results:
if result.get("status") == "running":
completed = False
break
if completed:
break
time.sleep(60)
# Execute Action
results = status_check()
if not os.path.isfile(RESULTS_FILENAME):
# checks if file exists
results_map = {}
else:
# Load existing manifest ingestion results
with open(RESULTS_FILENAME) as file:
results_map = json.load(file)
results_map[a_run_id] = results
with open(RESULTS_FILENAME, 'w') as f:
json.dump(results_map, f, indent=4)
#####################
# Action: verify
#####################
elif parsed_args.subparser == "verify":
# Verify the ingestion is successful
vars_parsed_args = vars(parsed_args)
if vars_parsed_args.get("standard_reference"):
logger.info("Verifying standard references")
success, failed = verify_references(
vars_parsed_args.get("dir"),
ingestion_sequence=vars_parsed_args.get("ingestion_sequence"))
pass
else:
success, failed = verify_ingestion(
vars_parsed_args.get("dir"), vars_parsed_args.get("batch"))
logger.info(
f"Number of Records that are ingested successfully: {len(success)}")
logger.warning(
f"Number of Records that could not be ingested successfully: {len(failed)}")
if len(failed) > 0:
logger.info("Record IDs that could not be ingested")
logger.info(pformat(failed))
#####################
# Action: reports
#####################
elif parsed_args.subparser == "reports":
vars_parsed_args = vars(parsed_args)
compute_reports(vars_parsed_args.get("file"))
#####################
# Action: delete
#####################
elif parsed_args.subparser == "delete":
# Delete the list of records
success, failed = delete_ingested_records(vars(parsed_args).get("dir"))
logger.info(
f"Number of Records that are deleted successfully: {len(success)}")
logger.info("Record IDs that could not be deleted")
logger.info(pformat(failed))