def import_from_generic_csv()

in microservices/skill_service/src/routes/ingestion.py [0:0]


def import_from_generic_csv(source: ALLOWED_SOURCES_FOR_GENERIC_CSV_INGESTION,
                            skill_uri: Optional[str] = Form(default=None),
                            competency_uri: Optional[str] = Form(default=None),
                            category_uri: Optional[str] = Form(default=None),
                            domain_uri: Optional[str] = Form(default=None),
                            sub_domain_uri: Optional[str] = Form(default=None),
                            skills: Optional[UploadFile] = File(None),
                            competencies: Optional[UploadFile] = File(None),
                            categories: Optional[UploadFile] = File(None),
                            sub_domains: Optional[UploadFile] = File(None),
                            domains: Optional[UploadFile] = File(None)):
  """Batch job to import data from csv uploaded by user

  Args:
    - source(query param): name of registry/library through which data is acquired
    - skill_uri : GCS uri for the skills data csv file
    - competency_uri : GCS uri for the competencies data csv file
    - category_uri : GCS uri for the categories data csv file
    - domain_uri : GCS uri for the domains data csv file
    - sub_domain__uri : GCS uri for the sub domains data csv file
    - skills: csv file containing skills data
    - competencies: csv file containing competencies data
    - categories: csv file containing categories data
    - sub_domains: csv file containing sub domains data
    - domains: csv file containing domains data

  Raises:
    - HTTPException: 422 Unprocessable entity if file other than csv is passed
    - HTTPException: 500 Internal Server Error if data not correct or if
    something fails

  Returns: (BatchJobModel)
    - job_name: name of the batch job created
    - status: status of batch job
  """
  try:
    file_object_list = [skills, competencies, sub_domains, domains, categories]
    uri_object_list = [
        skill_uri, competency_uri, sub_domain_uri, domain_uri, category_uri
    ]

    file_check = any(i is not None for i in file_object_list)
    uri_check = any(i is not None for i in uri_object_list)

    if (not uri_check and not file_check) or (uri_check and file_check):
      raise ValidationError("Please either upload a CSV file or provide a "
                            "GCS CSV file URI but not both")

    input_data = {}
    input_data["source_name"] = source
    date = datetime.now().strftime("%Y_%m_%d-%I:%M:%S_%p")

    if uri_check:
      if category_uri:
        input_data["category_uri"] = category_uri
        if is_valid_path(category_uri):
          categories_json_array = parse_csv(category_uri)
          validate_categories_csv(categories_json_array)
        else:
          raise ResourceNotFoundException(
              "file does not exist at the specified path.")
      else:
        input_data["category_uri"] = None

      if competency_uri:
        input_data["competency_uri"] = competency_uri
        if is_valid_path(competency_uri):
          competencies_json_array = parse_csv(competency_uri)
          validate_competencies_csv(competencies_json_array)
        else:
          raise ResourceNotFoundException(
              "file does not exist at the specified path.")
      else:
        input_data["competency_uri"] = None

      if domain_uri:
        input_data["domain_uri"] = domain_uri
        if is_valid_path(domain_uri):
          domains_json_array = parse_csv(domain_uri)
          validate_domains_csv(domains_json_array)
        else:
          raise ResourceNotFoundException(
              "file does not exist at the specified path.")
      else:
        input_data["domain_uri"] = None

      if sub_domain_uri:
        input_data["sub_domain_uri"] = sub_domain_uri
        if is_valid_path(sub_domain_uri):
          sub_domains_json_array = parse_csv(sub_domain_uri)
          validate_sub_domains_csv(sub_domains_json_array)
        else:
          raise ResourceNotFoundException(
              "file does not exist at the specified path.")
      else:
        input_data["sub_domain_uri"] = None

      if skill_uri:
        input_data["skill_uri"] = skill_uri
        if is_valid_path(skill_uri):
          skills_json_array = parse_csv(skill_uri)
          validate_skills_csv(skills_json_array)
        else:
          raise ResourceNotFoundException(
              "file does not exist at the specified path.")
      else:
        input_data["skill_uri"] = None

    if file_check:
      if skills:
        if not skills.filename.endswith(".csv"):
          raise InvalidFileType()

        # validate data in provided csv
        csv_reader = csv.DictReader(
            codecs.iterdecode(skills.file, "utf-8"), delimiter=",")
        validate_skills_csv(list(csv_reader))

        skill_file_name = "skills_" + str(date) + ".csv"
        skill_uri = upload_file_to_bucket(GCP_BUCKET,
                                          "skill-service/user-uploaded-csvs",
                                          skill_file_name, skills.file)
        input_data["skill_uri"] = skill_uri
      else:
        input_data["skill_uri"] = None

      if competencies:
        if not competencies.filename.endswith(".csv"):
          raise InvalidFileType()

        # validate data in provided csv
        csv_reader = csv.DictReader(
            codecs.iterdecode(competencies.file, "utf-8"), delimiter=",")
        validate_competencies_csv(list(csv_reader))

        competency_file_name = "competencies_" + str(date) + ".csv"
        competency_uri = upload_file_to_bucket(
            GCP_BUCKET, "skill-service/user-uploaded-csvs",
            competency_file_name, competencies.file)
        input_data["competency_uri"] = competency_uri
      else:
        input_data["competency_uri"] = None

      if categories:
        if not categories.filename.endswith(".csv"):
          raise InvalidFileType()

        # validate data in provided csv
        csv_reader = csv.DictReader(
            codecs.iterdecode(categories.file, "utf-8"), delimiter=",")
        validate_categories_csv(list(csv_reader))

        category_file_name = "categories_" + str(date) + ".csv"
        category_uri = upload_file_to_bucket(
            GCP_BUCKET, "skill-service/user-uploaded-csvs", category_file_name,
            categories.file)
        input_data["category_uri"] = category_uri
      else:
        input_data["category_uri"] = None

      if sub_domains:
        if not sub_domains.filename.endswith(".csv"):
          raise InvalidFileType()

        # validate data in provided csv
        csv_reader = csv.DictReader(
            codecs.iterdecode(sub_domains.file, "utf-8"), delimiter=",")
        validate_sub_domains_csv(list(csv_reader))

        sub_domain_file_name = "sub_domains_" + str(date) + ".csv"
        sub_domain_uri = upload_file_to_bucket(
            GCP_BUCKET, "skill-service/user-uploaded-csvs",
            sub_domain_file_name, sub_domains.file)
        input_data["sub_domain_uri"] = sub_domain_uri
      else:
        input_data["sub_domain_uri"] = None

      if domains:
        if not domains.filename.endswith(".csv"):
          raise InvalidFileType()

        # validate data in provided csv
        csv_reader = csv.DictReader(
            codecs.iterdecode(domains.file, "utf-8"), delimiter=",")
        validate_domains_csv(list(csv_reader))

        domain_file_name = "domains_" + str(date) + ".csv"
        domain_uri = upload_file_to_bucket(GCP_BUCKET,
                                           "skill-service/user-uploaded-csvs",
                                           domain_file_name, domains.file)
        input_data["domain_uri"] = domain_uri
      else:
        input_data["domain_uri"] = None

    env_vars = {"DATABASE_PREFIX": DATABASE_PREFIX}
    response = initiate_batch_job(input_data, GENERIC_CSV_INGESTION_JOB_TYPE,
                                  env_vars)
    Logger.info(response)
    return response
  except ResourceNotFoundException as e:
    raise ResourceNotFound(str(e)) from e
  except ValidationError as e:
    raise BadRequest(str(e)) from e
  except ConflictError as e:
    raise Conflict(str(e)) from e
  except InvalidFileType as e:
    Logger.error(e)
    raise BadRequest("Invalid file type. CSV file expected") from e
  except Exception as e:
    Logger.error(e)
    Logger.error(traceback.print_exc())
    raise InternalServerError(str(e)) from e