in src/utils/JobRestAPIUtils.py [0:0]
def SubmitJob(jobParamsJsonStr):
ret = {}
jobParams = LoadJobParams(jobParamsJsonStr)
if "jobName" not in jobParams or len(jobParams["jobName"].strip()) == 0:
ret["error"] = "ERROR: Job name cannot be empty"
return ret
if "vcName" not in jobParams or len(jobParams["vcName"].strip()) == 0:
ret["error"] = "ERROR: VC name cannot be empty"
return ret
if jobParams.get("jobtrainingtype") == "PSDistJob":
num_workers = None
try:
num_workers = int(jobParams.get("numpsworker"))
except:
logger.exception("Parsing numpsworker in %s failed", jobParams)
if num_workers is None or num_workers == 0:
ret["error"] = "ERROR: Invalid numpsworker value"
return ret
if "userId" not in jobParams or len(jobParams["userId"].strip()) == 0:
jobParams["userId"] = GetUser(jobParams["userName"])["uid"]
if "preemptionAllowed" not in jobParams:
jobParams["preemptionAllowed"] = False
else:
jobParams["preemptionAllowed"] = ToBool(jobParams["preemptionAllowed"])
uniqId = str(uuid.uuid4())
if "jobId" not in jobParams or jobParams["jobId"] == "":
#jobParams["jobId"] = jobParams["jobName"] + "-" + str(uuid.uuid4())
#jobParams["jobId"] = jobParams["jobName"] + "-" + str(time.time())
jobParams["jobId"] = uniqId
#jobParams["jobId"] = jobParams["jobId"].replace("_","-").replace(".","-")
if "resourcegpu" not in jobParams:
jobParams["resourcegpu"] = 0
if isinstance(jobParams["resourcegpu"], str):
if len(jobParams["resourcegpu"].strip()) == 0:
jobParams["resourcegpu"] = 0
else:
jobParams["resourcegpu"] = int(jobParams["resourcegpu"])
populate_job_resource(jobParams)
if "familyToken" not in jobParams or jobParams["familyToken"].isspace():
jobParams["familyToken"] = uniqId
if "isParent" not in jobParams:
jobParams["isParent"] = 1
userName = getAlias(jobParams["userName"])
if not AuthorizationManager.HasAccess(
jobParams["userName"], ResourceType.VC, jobParams["vcName"].strip(),
Permission.User):
ret["error"] = "Access Denied!"
return ret
if "cmd" not in jobParams:
jobParams["cmd"] = ""
if "jobPath" in jobParams and len(jobParams["jobPath"].strip()) > 0:
jobPath = jobParams["jobPath"]
if ".." in jobParams["jobPath"]:
ret["error"] = "ERROR: '..' cannot be used in job directory"
return ret
if "\\." in jobParams["jobPath"]:
ret["error"] = "ERROR: invalided job directory"
return ret
if jobParams["jobPath"].startswith(
"/") or jobParams["jobPath"].startswith("\\"):
ret["error"] = "ERROR: job directory should not start with '/' or '\\' "
return ret
if not jobParams["jobPath"].startswith(userName):
jobParams["jobPath"] = os.path.join(userName, jobParams["jobPath"])
else:
jobPath = userName+"/" + "jobs/" + \
time.strftime("%y%m%d")+"/"+jobParams["jobId"]
jobParams["jobPath"] = jobPath
if "workPath" not in jobParams or len(jobParams["workPath"].strip()) == 0:
jobParams["workPath"] = "."
if ".." in jobParams["workPath"]:
ret["error"] = "ERROR: '..' cannot be used in work directory"
return ret
if "\\." in jobParams["workPath"]:
ret["error"] = "ERROR: invalided work directory"
return ret
if jobParams["workPath"].startswith(
"/") or jobParams["workPath"].startswith("\\"):
ret["error"] = "ERROR: work directory should not start with '/' or '\\' "
return ret
if not jobParams["workPath"].startswith(userName):
jobParams["workPath"] = os.path.join(userName, jobParams["workPath"])
if "dataPath" not in jobParams or len(jobParams["dataPath"].strip()) == 0:
jobParams["dataPath"] = "."
if ".." in jobParams["dataPath"]:
ret["error"] = "ERROR: '..' cannot be used in data directory"
return ret
if "\\." in jobParams["dataPath"]:
ret["error"] = "ERROR: invalided data directory"
return ret
if jobParams["dataPath"][0] == "/" or jobParams["dataPath"][0] == "\\":
ret["error"] = "ERROR: data directory should not start with '/' or '\\' "
return ret
jobParams["dataPath"] = jobParams["dataPath"].replace("\\", "/")
jobParams["workPath"] = jobParams["workPath"].replace("\\", "/")
jobParams["jobPath"] = jobParams["jobPath"].replace("\\", "/")
jobParams["dataPath"] = os.path.realpath(
os.path.join("/", jobParams["dataPath"]))[1:]
jobParams["workPath"] = os.path.realpath(
os.path.join("/", jobParams["workPath"]))[1:]
jobParams["jobPath"] = os.path.realpath(
os.path.join("/", jobParams["jobPath"]))[1:]
dataHandler = DataHandler()
if "logDir" in jobParams and len(jobParams["logDir"].strip()) > 0:
tensorboardParams = jobParams.copy()
# overwrite for distributed job
if tensorboardParams["jobtrainingtype"] == "PSDistJob":
tensorboardParams["jobtrainingtype"] = "RegularJob"
match = re.match('(.*)(/.*)', tensorboardParams["logDir"])
if not match is None:
newDir = match.group(1) + "/worker0" + match.group(2)
prefix = match.group(1)
match2 = re.match('.*/worker0', prefix)
if match2 is None:
tensorboardParams["logDir"] = newDir
#match = re.match('(.*--logdir\s+.*)(/.*--.*)', tensorboardParams["cmd"])
# if not match is None:
# tensorboardParams["cmd"] = match.group(1) + "/worker0" + match.group(2)
tensorboardParams["jobId"] = uniqId
tensorboardParams["jobName"] = "tensorboard-" + jobParams["jobName"]
tensorboardParams["jobPath"] = jobPath
tensorboardParams["jobType"] = "visualization"
tensorboardParams["cmd"] = "tensorboard --logdir " + \
tensorboardParams["logDir"] + " --host 0.0.0.0"
tensorboardParams["image"] = jobParams["image"]
tensorboardParams["resourcegpu"] = 0
tensorboardParams["interactivePort"] = "6006"
if "error" not in ret:
if not dataHandler.AddJob(tensorboardParams):
ret["error"] = "Cannot schedule tensorboard job."
if "error" not in ret:
if dataHandler.AddJob(jobParams):
ret["jobId"] = jobParams["jobId"]
if "jobPriority" in jobParams:
priority = DEFAULT_JOB_PRIORITY
try:
priority = int(jobParams["jobPriority"])
except Exception as e:
pass
permission = Permission.User
if AuthorizationManager.HasAccess(jobParams["userName"],
ResourceType.VC,
jobParams["vcName"].strip(),
Permission.Admin):
permission = Permission.Admin
priority = adjust_job_priority(priority, permission)
job_priorities = {jobParams["jobId"]: priority}
dataHandler.update_job_priority(job_priorities)
else:
ret["error"] = "Cannot schedule job. Cannot add job into database."
dataHandler.Close()
return ret