in components/common/src/common/utils/kf_job_app.py [0:0]
def kube_create_job(job_specs,
namespace="default",
env_vars={},
existing_job_model=None) -> BatchJobModel:
""" Create a kube job based on the job spec """
logging.info("kube_create_job: {}".format(job_specs))
logging.info("kube_create_job: namespace {} env {}".format(
namespace, env_vars))
try:
# check for pending/active duplicate job
job_logs = {}
logging.info("Type of request body")
logging.info(job_specs["input_data"])
logging.info(type(job_specs["input_data"]))
# Create the job definition
logging.info("Batch Job Creation Started")
container_image = job_specs["container_image"]
limits = job_specs.get("limits", DEFAULT_JOB_LIMITS)
requests = job_specs.get("requests", DEFAULT_JOB_REQUESTS)
if existing_job_model:
job_model = existing_job_model
name = job_model.id
else:
name = str(uuid.uuid4()) # job name
# creating a job entry in firestore
job_model = BatchJobModel()
job_model.id = name
job_model.type = job_specs["type"]
job_model.status = "pending"
job_model.uuid = name
job_model.save()
logging.info("Batch Job {}: Started with job type "
"{}".format(job_model.name, job_model.type))
logging.info("Batch Job {}: Updated Batch Job Status "
"to pending in firestore".format(job_model.name))
if job_specs["type"] in JOB_TYPES_WITH_PREDETERMINED_TITLES:
job_model.input_data = job_specs[
"input_data"] # data required for running job
if isinstance(job_specs["input_data"], str):
try:
job_specs["input_data"] = json.loads(job_specs["input_data"])
except JSONDecodeError as e:
logging.info("Unable to convert job_specs['input_data'] to dict,\
\nError: {}".format(e))
if (isinstance(job_specs["input_data"], dict) and
"title" in job_specs["input_data"].keys()):
job_model.name = job_specs["input_data"]["title"]
else:
job_model.name = name
else:
created_time = job_model.created_time
job_name_suffix = (str(created_time.year) + "-" +
str(created_time.month) + "-" +
str(created_time.day) + "-" +
str(created_time.hour) + "-" +
str(created_time.minute) + "-" +
str(created_time.second))
input_data = json.loads(job_specs["input_data"])
input_data["title"] = input_data["title"] + "-" + job_name_suffix
job_specs["input_data"] = json.dumps(input_data)
job_model.name = input_data["title"]
job_model.input_data = job_specs[
"input_data"] # data required for running job
if job_specs["type"] == "assessment-items":
job_logs = {job_specs["type"]: get_cloud_link(job_specs["type"]),
input_data["activity"]: get_cloud_link(
input_data["activity"].replace("_", "-"))}
elif job_specs["type"] in ["course-ingestion",
"course-ingestion_topic-tree",
"course-ingestion_learning-units"]:
job_logs = {job_specs["type"]: get_cloud_link("course-ingestion")}
elif job_specs["type"] in ["deep-knowledge-tracing"]:
job_logs = {job_specs["type"]: get_cloud_link("deep-knowledge-tracing")}
else:
job_logs = {}
job_model.job_logs = job_logs
job_model.save(merge=True)
logging.info("Batch Job {}: "
"model updated in firestore".format(job_model.name))
logging.info("Batch Job {}: "
"creating kube job object".format(job_model.name))
# Handle different container types
command = None
args = None
if job_specs["type"] == JOB_TYPE_WEBSCRAPER:
# Webscraper uses Go binary
command = ["/app/webscraper"]
args = [] # Add any required args for webscraper
body = kube_create_job_object(
name=name,
container_image=container_image,
namespace=namespace,
env_vars=env_vars,
limits=limits,
requests=requests,
command=command,
args=args)
logging.info("Batch Job {}: "
"kube job body created".format(job_model.name))
# call kube batch API to create job
kube_job = api_instance.create_namespaced_job(namespace, body, pretty=True)
logging.info("Batch Job {} id {}: Created".format(
kube_job, job_model.uuid))
job_model.status = "active"
job_model.save()
return job_model
except Exception as e:
logging.error("Batch Job {}: Failed".format(job_specs))
logging.error(traceback.print_exc())
BatchJobModel.delete_by_id(job_model.id)
raise BatchJobError(str(e)) from e