decisionai_plugin/common/util/job_record.py (59 lines of code) (raw):
import datetime
import json
from dateutil import parser
class JobRecord(object):
STATUS_RUNNING = "Running"
STATUS_INQUEUE = "InQueue"
STATUS_SUCCESS = "Success"
STATUS_FAILED = "Failed"
STATUS_UNKNOWN = "Unknown"
MODE_TRAINING = "Training"
MODE_INFERENCE = "Inference"
TTL_IN_SECONDS = 7200 * 2
def __init__(self,
job_id: str,
mode: str,
algorithm_name: str,
model_id: str,
subscription: str,
params: dict,
status: str = None,
create_time: str = None
):
self.job_id = job_id
self.mode = mode
self.algorithm_name = algorithm_name
self.model_id = model_id
self.subscription = subscription
self.params = params or {}
self.status = status or JobRecord.STATUS_UNKNOWN
self.create_time = create_time or str(datetime.datetime.now())
def __iter__(self):
obj = {
"job_id": self.job_id,
"mode": self.mode,
"algorithm_name": self.algorithm_name,
"model_id": self.model_id,
"subscription": self.subscription,
"params": self.params,
"status": self.status,
"create_time": self.create_time
}
yield from obj.items()
def exceeded_ttl(self):
create_time = parser.parse(self.create_time).replace(tzinfo=None)
now = datetime.datetime.now()
if now > create_time + datetime.timedelta(seconds=JobRecord.TTL_IN_SECONDS):
self.change_status(JobRecord.STATUS_FAILED)
return True
return False
def change_status(self, new_status):
if self.status in [JobRecord.STATUS_SUCCESS, JobRecord.STATUS_FAILED]:
return
self.status = new_status
if __name__ == "__main__":
def create_jobs():
for i in range(100):
job_id = "job_%02d" % i
job = JobRecord(job_id, 'train', 'algo1', 'model1', 'subscription1', params={})
print(dict(job))