cluster-trace-gpu-v2020/simulator/job_history.py (100 lines of code) (raw):
from utils import add_user_round_robin_id
import numpy as np
class JobHistory:
"""
As a class to record run jobs as "job history"
Inc.: 1. job_done_list
2. user_job_stats
3. visualization
"""
def __init__(self, job_done_list=None, window=10):
self.job_done_list = list() if job_done_list is None else job_done_list.copy()
self.user_job_stats = {}
self.window = window
self.job_window_list_dict = {}
self.temp_user_stats = {}
for job in self.job_done_list:
self.stats_add_done_job(job) # Only stats recorded
# No jct summary here
def alloc_job_sort(self, job_list, job_runn_list=None, metrics='dur_avg'):
add_user_round_robin_id(job_list)
# Early estimation: adding running jobs' on_time as sorting metrics
if job_runn_list is not None:
for job in job_runn_list:
if job['user'] not in self.user_job_stats:
self.temp_user_stats[job['user']] = job['on_time'] # take job's current on_time as duration estimation
job_list.sort(key=lambda j: (self.get_job_metrics(j, metrics), j['user_rrid']))
def get_job_metrics(self, job, metrics):
user = job['user']
if user in self.user_job_stats:
return self.user_job_stats[user][metrics]
elif user in self.temp_user_stats:
return self.temp_user_stats[user]
else:
return 0 # no history, no running job ==> duration = 0 ==> highest priority
def stats_add_done_job(self, job):
window_list = [] # Could use deque for better performance
user = job['user']
dur = job['duration']
if user not in self.user_job_stats:
assert user not in self.job_window_list_dict
self.job_window_list_dict[user] = [dur]
self.user_job_stats[user] = {
'num_job': 1,
'dur_avg': dur,
'dur_mva': dur,
'dur_har': min(1, 1 / dur) # Harmonic mean
}
else:
job_win_list = self.job_window_list_dict[user]
length = len(job_win_list)
if length >= self.window:
job_win_list.pop(0)
job_win_list.append(dur)
dur_avg = self.user_job_stats[user]['dur_avg']
dur_har = self.user_job_stats[user]['dur_har']
num_job = self.user_job_stats[user]['num_job']
dur_avg = dur_avg + (dur - dur_avg) / (num_job + 1)
dur_har = dur_har + (min(1, 1/dur) - dur_har) / (num_job + 1)
self.user_job_stats[user] = {
'num_job': num_job + 1,
'dur_avg': dur_avg,
'dur_mva': sum(job_win_list)/len(job_win_list),
'dur_har': dur_har
}
def add_done_job(self, job):
self.job_done_list_add_job(job)
self.stats_add_done_job(job)
# self.jct_summary_v += job['jct'] # No jct summary here
def job_done_list_add_job(self, job):
""" form a more concise repr of job """
job_dict = {}
for k, v in job.items():
if k in ['job_id', 'submit_time', 'duration', 'jct', 'wasted',
'num_inst', 'num_cpu', 'num_gpu', 'node', 'user', 'gpu_type']:
job_dict[k] = v
self.job_done_list.append(job_dict)
@property
def num_jobs_done(self):
num_jobs_done = 0
for stats in self.user_job_stats.values():
num_jobs_done += stats['num_job']
return num_jobs_done
# Not recommended using job_done_list
@property
def jct_summary(self):
jct_summary = 0
for job in self.job_done_list:
jct_summary += job['jct']
# assert self.jct_summary_v == jct_summary
return jct_summary
@property
def wait_time_summary(self):
wait_time_summary = 0
for job in self.job_done_list:
wait_time_summary += job['jct'] - job['duration']
return wait_time_summary
@property
def wasted_summary(self):
wasted_summary = 0
for job in self.job_done_list:
wasted_summary += job['wasted']
return wasted_summary
def predict(self, user, metrics=None):
"""
metrics: in ['dur_avg', 'dur_har', 'dur_mva']
"""
metrics = 'dur_avg' if metrics is None else metrics
assert metrics in ['dur_avg', 'dur_har', 'dur_mva']
if user not in self.user_job_stats:
return 0 # No record => optimistic est. as 0.
else:
res = self.user_job_stats[user][metrics]
if np.isnan(res):
raise TypeError
return res