in microservices/skill_service/src/routes/ingestion.py [0:0]
def import_from_generic_csv(source: ALLOWED_SOURCES_FOR_GENERIC_CSV_INGESTION,
skill_uri: Optional[str] = Form(default=None),
competency_uri: Optional[str] = Form(default=None),
category_uri: Optional[str] = Form(default=None),
domain_uri: Optional[str] = Form(default=None),
sub_domain_uri: Optional[str] = Form(default=None),
skills: Optional[UploadFile] = File(None),
competencies: Optional[UploadFile] = File(None),
categories: Optional[UploadFile] = File(None),
sub_domains: Optional[UploadFile] = File(None),
domains: Optional[UploadFile] = File(None)):
"""Batch job to import data from csv uploaded by user
Args:
- source(query param): name of registry/library through which data is acquired
- skill_uri : GCS uri for the skills data csv file
- competency_uri : GCS uri for the competencies data csv file
- category_uri : GCS uri for the categories data csv file
- domain_uri : GCS uri for the domains data csv file
- sub_domain__uri : GCS uri for the sub domains data csv file
- skills: csv file containing skills data
- competencies: csv file containing competencies data
- categories: csv file containing categories data
- sub_domains: csv file containing sub domains data
- domains: csv file containing domains data
Raises:
- HTTPException: 422 Unprocessable entity if file other than csv is passed
- HTTPException: 500 Internal Server Error if data not correct or if
something fails
Returns: (BatchJobModel)
- job_name: name of the batch job created
- status: status of batch job
"""
try:
file_object_list = [skills, competencies, sub_domains, domains, categories]
uri_object_list = [
skill_uri, competency_uri, sub_domain_uri, domain_uri, category_uri
]
file_check = any(i is not None for i in file_object_list)
uri_check = any(i is not None for i in uri_object_list)
if (not uri_check and not file_check) or (uri_check and file_check):
raise ValidationError("Please either upload a CSV file or provide a "
"GCS CSV file URI but not both")
input_data = {}
input_data["source_name"] = source
date = datetime.now().strftime("%Y_%m_%d-%I:%M:%S_%p")
if uri_check:
if category_uri:
input_data["category_uri"] = category_uri
if is_valid_path(category_uri):
categories_json_array = parse_csv(category_uri)
validate_categories_csv(categories_json_array)
else:
raise ResourceNotFoundException(
"file does not exist at the specified path.")
else:
input_data["category_uri"] = None
if competency_uri:
input_data["competency_uri"] = competency_uri
if is_valid_path(competency_uri):
competencies_json_array = parse_csv(competency_uri)
validate_competencies_csv(competencies_json_array)
else:
raise ResourceNotFoundException(
"file does not exist at the specified path.")
else:
input_data["competency_uri"] = None
if domain_uri:
input_data["domain_uri"] = domain_uri
if is_valid_path(domain_uri):
domains_json_array = parse_csv(domain_uri)
validate_domains_csv(domains_json_array)
else:
raise ResourceNotFoundException(
"file does not exist at the specified path.")
else:
input_data["domain_uri"] = None
if sub_domain_uri:
input_data["sub_domain_uri"] = sub_domain_uri
if is_valid_path(sub_domain_uri):
sub_domains_json_array = parse_csv(sub_domain_uri)
validate_sub_domains_csv(sub_domains_json_array)
else:
raise ResourceNotFoundException(
"file does not exist at the specified path.")
else:
input_data["sub_domain_uri"] = None
if skill_uri:
input_data["skill_uri"] = skill_uri
if is_valid_path(skill_uri):
skills_json_array = parse_csv(skill_uri)
validate_skills_csv(skills_json_array)
else:
raise ResourceNotFoundException(
"file does not exist at the specified path.")
else:
input_data["skill_uri"] = None
if file_check:
if skills:
if not skills.filename.endswith(".csv"):
raise InvalidFileType()
# validate data in provided csv
csv_reader = csv.DictReader(
codecs.iterdecode(skills.file, "utf-8"), delimiter=",")
validate_skills_csv(list(csv_reader))
skill_file_name = "skills_" + str(date) + ".csv"
skill_uri = upload_file_to_bucket(GCP_BUCKET,
"skill-service/user-uploaded-csvs",
skill_file_name, skills.file)
input_data["skill_uri"] = skill_uri
else:
input_data["skill_uri"] = None
if competencies:
if not competencies.filename.endswith(".csv"):
raise InvalidFileType()
# validate data in provided csv
csv_reader = csv.DictReader(
codecs.iterdecode(competencies.file, "utf-8"), delimiter=",")
validate_competencies_csv(list(csv_reader))
competency_file_name = "competencies_" + str(date) + ".csv"
competency_uri = upload_file_to_bucket(
GCP_BUCKET, "skill-service/user-uploaded-csvs",
competency_file_name, competencies.file)
input_data["competency_uri"] = competency_uri
else:
input_data["competency_uri"] = None
if categories:
if not categories.filename.endswith(".csv"):
raise InvalidFileType()
# validate data in provided csv
csv_reader = csv.DictReader(
codecs.iterdecode(categories.file, "utf-8"), delimiter=",")
validate_categories_csv(list(csv_reader))
category_file_name = "categories_" + str(date) + ".csv"
category_uri = upload_file_to_bucket(
GCP_BUCKET, "skill-service/user-uploaded-csvs", category_file_name,
categories.file)
input_data["category_uri"] = category_uri
else:
input_data["category_uri"] = None
if sub_domains:
if not sub_domains.filename.endswith(".csv"):
raise InvalidFileType()
# validate data in provided csv
csv_reader = csv.DictReader(
codecs.iterdecode(sub_domains.file, "utf-8"), delimiter=",")
validate_sub_domains_csv(list(csv_reader))
sub_domain_file_name = "sub_domains_" + str(date) + ".csv"
sub_domain_uri = upload_file_to_bucket(
GCP_BUCKET, "skill-service/user-uploaded-csvs",
sub_domain_file_name, sub_domains.file)
input_data["sub_domain_uri"] = sub_domain_uri
else:
input_data["sub_domain_uri"] = None
if domains:
if not domains.filename.endswith(".csv"):
raise InvalidFileType()
# validate data in provided csv
csv_reader = csv.DictReader(
codecs.iterdecode(domains.file, "utf-8"), delimiter=",")
validate_domains_csv(list(csv_reader))
domain_file_name = "domains_" + str(date) + ".csv"
domain_uri = upload_file_to_bucket(GCP_BUCKET,
"skill-service/user-uploaded-csvs",
domain_file_name, domains.file)
input_data["domain_uri"] = domain_uri
else:
input_data["domain_uri"] = None
env_vars = {"DATABASE_PREFIX": DATABASE_PREFIX}
response = initiate_batch_job(input_data, GENERIC_CSV_INGESTION_JOB_TYPE,
env_vars)
Logger.info(response)
return response
except ResourceNotFoundException as e:
raise ResourceNotFound(str(e)) from e
except ValidationError as e:
raise BadRequest(str(e)) from e
except ConflictError as e:
raise Conflict(str(e)) from e
except InvalidFileType as e:
Logger.error(e)
raise BadRequest("Invalid file type. CSV file expected") from e
except Exception as e:
Logger.error(e)
Logger.error(traceback.print_exc())
raise InternalServerError(str(e)) from e