in common/src/common/utils/kf_job_app.py [0:0]
def kube_create_job(job_specs, namespace="default", env_vars={}):
""" Create a kube job based on the job spec """
logging.info("kube_create_job: {}".format(job_specs))
logging.info("kube_create_job: namespace {} env {}".format(
namespace, env_vars))
try:
# check for pending/active duplicate job
job_logs = {}
logging.info("Type of request body")
logging.info(job_specs["input_data"])
logging.info(type(job_specs["input_data"]))
duplicate_job = find_duplicate_jobs(job_specs["type"],
job_specs["input_data"])
if duplicate_job:
return duplicate_job
# Create the job definition
logging.info("Batch Job Creation Started")
container_image = job_specs["container_image"]
limits = job_specs.get("limits", DEFAULT_JOB_LIMITS)
requests = job_specs.get("requests", DEFAULT_JOB_REQUESTS)
name = str(uuid.uuid4()) #job name
# creating a job entry in firestore
job_model = BatchJobModel()
job_model.id = name
job_model.type = job_specs["type"]
job_model.status = "pending"
job_model.uuid = name
job_model.save()
logging.info("Batch Job {}: Started with job type " \
"{}".format(job_model.name,job_model.type))
logging.info("Batch Job {}: Updated Batch Job Status " \
"to pending in firestore".format(job_model.name))
if job_specs["type"] in JOB_TYPES_WITH_PREDETERMINED_TITLES:
job_model.input_data = job_specs[
"input_data"] # data required for running job
if isinstance(job_specs["input_data"], str):
try:
job_specs["input_data"] = json.loads(job_specs["input_data"])
except JSONDecodeError as e:
logging.info("Unable to convert job_specs['input_data'] to dict,\
\nError: {}".format(e))
if isinstance(job_specs["input_data"], dict) and \
"title" in job_specs["input_data"].keys():
job_model.name = job_specs["input_data"]["title"]
else:
job_model.name = name
else:
created_time = job_model.created_time
job_name_suffix = str(created_time.year)+"-"+\
str(created_time.month)+"-"+str(
created_time.day)+"-"+str(created_time.hour)+"-"+\
str(created_time.minute)+"-"+str(created_time.second)
input_data = json.loads(job_specs["input_data"])
input_data["title"] = input_data["title"] + "-" +\
job_name_suffix
job_specs["input_data"] = json.dumps(input_data)
job_model.name = input_data["title"]
job_model.input_data = job_specs[
"input_data"] # data required for running job
if job_specs["type"] == "assessment-items":
job_logs = {job_specs["type"]: get_cloud_link(job_specs["type"]),
input_data["activity"]: get_cloud_link(
input_data["activity"].replace("_", "-"))}
elif job_specs["type"] in ["course-ingestion",
"course-ingestion_topic-tree",
"course-ingestion_learning-units"]:
job_logs = {job_specs["type"]: get_cloud_link("course-ingestion")}
elif job_specs["type"] in ["deep-knowledge-tracing"]:
job_logs = {job_specs["type"]: get_cloud_link("deep-knowledge-tracing")}
else:
job_logs = {}
job_model.job_logs = job_logs
job_model.save(merge=True)
logging.info("Batch Job {}: " \
"model updated in firestore".format(job_model.name))
logging.info("Batch Job {}: " \
"creating kube job object".format(job_model.name))
body = kube_create_job_object(
name=name,
container_image=container_image,
namespace=namespace,
env_vars=env_vars,
limits=limits,
requests=requests)
logging.info("Batch Job {}: " \
"kube job body created".format(job_model.name))
# call kube batch API to create job
job = api_instance.create_namespaced_job(namespace, body, pretty=True)
logging.info("Batch Job {} id {}: Created".format(job, job_model.uuid))
response = {
"job_name": job_model.uuid,
"doc_id": job_model.id,
"status": "active",
"job logs": job_logs
}
return response
except Exception as e:
logging.error("Batch Job {}: Failed".format(job_specs))
logging.error(traceback.print_exc())
BatchJobModel.delete_by_id(job_model.id)
raise BatchJobError(str(e)) from e