def kube_create_job()

in components/common/src/common/utils/kf_job_app.py [0:0]


def kube_create_job(job_specs,
                    namespace="default",
                    env_vars={},
                    existing_job_model=None) -> BatchJobModel:
  """ Create a kube job based on the job spec """
  logging.info("kube_create_job: {}".format(job_specs))
  logging.info("kube_create_job: namespace {} env {}".format(
      namespace, env_vars))
  try:
    # check for pending/active duplicate job
    job_logs = {}
    logging.info("Type of request body")
    logging.info(job_specs["input_data"])
    logging.info(type(job_specs["input_data"]))
    # Create the job definition
    logging.info("Batch Job Creation Started")
    container_image = job_specs["container_image"]
    limits = job_specs.get("limits", DEFAULT_JOB_LIMITS)
    requests = job_specs.get("requests", DEFAULT_JOB_REQUESTS)

    if existing_job_model:
      job_model = existing_job_model
      name = job_model.id
    else:
      name = str(uuid.uuid4())  # job name
      # creating a job entry in firestore
      job_model = BatchJobModel()
      job_model.id = name
      job_model.type = job_specs["type"]
      job_model.status = "pending"
      job_model.uuid = name
      job_model.save()

    logging.info("Batch Job {}: Started with job type "
                 "{}".format(job_model.name, job_model.type))
    logging.info("Batch Job {}: Updated Batch Job Status "
                 "to pending in firestore".format(job_model.name))

    if job_specs["type"] in JOB_TYPES_WITH_PREDETERMINED_TITLES:
      job_model.input_data = job_specs[
        "input_data"]  # data required for running job
      if isinstance(job_specs["input_data"], str):
        try:
          job_specs["input_data"] = json.loads(job_specs["input_data"])
        except JSONDecodeError as e:
          logging.info("Unable to convert job_specs['input_data'] to dict,\
            \nError: {}".format(e))
      if (isinstance(job_specs["input_data"], dict) and
              "title" in job_specs["input_data"].keys()):
        job_model.name = job_specs["input_data"]["title"]
      else:
        job_model.name = name
    else:
      created_time = job_model.created_time
      job_name_suffix = (str(created_time.year) + "-" +
                         str(created_time.month) + "-" +
                         str(created_time.day) + "-" +
                         str(created_time.hour) + "-" +
                         str(created_time.minute) + "-" +
                         str(created_time.second))
      input_data = json.loads(job_specs["input_data"])
      input_data["title"] = input_data["title"] + "-" + job_name_suffix
      job_specs["input_data"] = json.dumps(input_data)
      job_model.name = input_data["title"]
      job_model.input_data = job_specs[
          "input_data"]  # data required for running job

    if job_specs["type"] == "assessment-items":
      job_logs = {job_specs["type"]: get_cloud_link(job_specs["type"]),
                  input_data["activity"]: get_cloud_link(
          input_data["activity"].replace("_", "-"))}

    elif job_specs["type"] in ["course-ingestion",
                               "course-ingestion_topic-tree",
                               "course-ingestion_learning-units"]:
      job_logs = {job_specs["type"]: get_cloud_link("course-ingestion")}
    elif job_specs["type"] in ["deep-knowledge-tracing"]:
      job_logs = {job_specs["type"]: get_cloud_link("deep-knowledge-tracing")}
    else:
      job_logs = {}

    job_model.job_logs = job_logs

    job_model.save(merge=True)

    logging.info("Batch Job {}:  "
                 "model updated in firestore".format(job_model.name))

    logging.info("Batch Job {}:  "
                 "creating kube job object".format(job_model.name))

    # Handle different container types
    command = None
    args = None
    if job_specs["type"] == JOB_TYPE_WEBSCRAPER:
      # Webscraper uses Go binary
      command = ["/app/webscraper"]
      args = []  # Add any required args for webscraper

    body = kube_create_job_object(
      name=name,
      container_image=container_image,
      namespace=namespace,
      env_vars=env_vars,
      limits=limits,
      requests=requests,
      command=command,
      args=args)

    logging.info("Batch Job {}:  "
                 "kube job body created".format(job_model.name))

    # call kube batch API to create job
    kube_job = api_instance.create_namespaced_job(namespace, body, pretty=True)
    logging.info("Batch Job {} id {}: Created".format(
      kube_job, job_model.uuid))

    job_model.status = "active"
    job_model.save()
    return job_model

  except Exception as e:
    logging.error("Batch Job {}: Failed".format(job_specs))
    logging.error(traceback.print_exc())
    BatchJobModel.delete_by_id(job_model.id)
    raise BatchJobError(str(e)) from e