def kube_create_job()

in common/src/common/utils/kf_job_app.py [0:0]


def kube_create_job(job_specs, namespace="default", env_vars={}):
  """ Create a kube job based on the job spec """
  logging.info("kube_create_job: {}".format(job_specs))
  logging.info("kube_create_job: namespace {} env {}".format(
      namespace, env_vars))
  try:
    # check for pending/active duplicate job
    job_logs = {}
    logging.info("Type of request body")
    logging.info(job_specs["input_data"])
    logging.info(type(job_specs["input_data"]))
    duplicate_job = find_duplicate_jobs(job_specs["type"],
                                        job_specs["input_data"])
    if duplicate_job:
      return duplicate_job

    # Create the job definition
    logging.info("Batch Job Creation Started")
    container_image = job_specs["container_image"]
    limits = job_specs.get("limits", DEFAULT_JOB_LIMITS)
    requests = job_specs.get("requests", DEFAULT_JOB_REQUESTS)
    name = str(uuid.uuid4())  #job name

    # creating a job entry in firestore
    job_model = BatchJobModel()
    job_model.id = name
    job_model.type = job_specs["type"]
    job_model.status = "pending"
    job_model.uuid = name
    job_model.save()
    logging.info("Batch Job {}: Started with job type " \
        "{}".format(job_model.name,job_model.type))
    logging.info("Batch Job {}: Updated Batch Job Status " \
        "to pending in firestore".format(job_model.name))

    if job_specs["type"] in JOB_TYPES_WITH_PREDETERMINED_TITLES:
      job_model.input_data = job_specs[
        "input_data"]  # data required for running job
      if isinstance(job_specs["input_data"], str):
        try:
          job_specs["input_data"] = json.loads(job_specs["input_data"])
        except JSONDecodeError as e:
          logging.info("Unable to convert job_specs['input_data'] to dict,\
            \nError: {}".format(e))
      if isinstance(job_specs["input_data"], dict) and \
        "title" in job_specs["input_data"].keys():
        job_model.name = job_specs["input_data"]["title"]
      else:
        job_model.name = name
    else:
      created_time = job_model.created_time
      job_name_suffix = str(created_time.year)+"-"+\
        str(created_time.month)+"-"+str(
        created_time.day)+"-"+str(created_time.hour)+"-"+\
        str(created_time.minute)+"-"+str(created_time.second)
      input_data = json.loads(job_specs["input_data"])
      input_data["title"] = input_data["title"] + "-" +\
        job_name_suffix
      job_specs["input_data"] = json.dumps(input_data)
      job_model.name = input_data["title"]
      job_model.input_data = job_specs[
          "input_data"]  # data required for running job

    if job_specs["type"] == "assessment-items":
      job_logs = {job_specs["type"]: get_cloud_link(job_specs["type"]),
                  input_data["activity"]: get_cloud_link(
                    input_data["activity"].replace("_", "-"))}

    elif job_specs["type"] in ["course-ingestion",
                               "course-ingestion_topic-tree",
                               "course-ingestion_learning-units"]:
      job_logs = {job_specs["type"]: get_cloud_link("course-ingestion")}
    elif job_specs["type"] in ["deep-knowledge-tracing"]:
      job_logs = {job_specs["type"]: get_cloud_link("deep-knowledge-tracing")}
    else:
      job_logs = {}

    job_model.job_logs = job_logs

    job_model.save(merge=True)

    logging.info("Batch Job {}:  " \
        "model updated in firestore".format(job_model.name))

    logging.info("Batch Job {}:  " \
        "creating kube job object".format(job_model.name))
    body = kube_create_job_object(
      name=name,
      container_image=container_image,
      namespace=namespace,
      env_vars=env_vars,
      limits=limits,
      requests=requests)

    logging.info("Batch Job {}:  " \
        "kube job body created".format(job_model.name))

    # call kube batch API to create job
    job = api_instance.create_namespaced_job(namespace, body, pretty=True)
    logging.info("Batch Job {} id {}: Created".format(job, job_model.uuid))

    response = {
      "job_name": job_model.uuid,
      "doc_id": job_model.id,
      "status": "active",
      "job logs": job_logs
    }
    return response

  except Exception as e:
    logging.error("Batch Job {}: Failed".format(job_specs))
    logging.error(traceback.print_exc())
    BatchJobModel.delete_by_id(job_model.id)
    raise BatchJobError(str(e)) from e