cluster-trace-gpu-v2020/simulator/cluster.py (210 lines of code) (raw):
from collections import OrderedDict
from node import Node
from utils import print_fn, _repr_job_preempt, _repr_job_done, large_job_pruning
from job_history import JobHistory
class Cluster:
def __init__(self, node_list=None, num_nodes=None, num_gpus=20,
num_cpus=20, pattern=1, period=124, job_list=None,
random_seed=0, num_spare_node=None,
export_cluster_util=False):
if node_list is not None:
node_list = node_list
elif num_nodes is not None:
node_list = [Node(id=i) for i in range(num_nodes)]
else:
node_list = [Node(id=0, num_gpus=num_gpus, num_cpus=num_cpus)]
temp_node_dict = dict()
self.num_gpus, self.num_cpus = 0, 0
for node in node_list:
self.num_gpus += node.num_gpus
self.num_cpus += node.num_cpus
temp_node_dict[node.id] = node
self.node_dict = OrderedDict(sorted(temp_node_dict.items(),
key=lambda t: t[1].id))
self.cur_time = 0
self.svc = {'num_gpu': 0, 'num_cpu': 0} # high-priority service
self.svc_former_ratio = 0
# self.job_full_list = job_list # all jobs received from all times
self.job_full_list = large_job_pruning(job_list, self.num_gpus, self.num_cpus)
self.job_full_list.sort(key=lambda j: -j['submit_time'])
self.job_list = []
self.retrieve_job_from_full_list() # feed self.user_job_queue into self.job_list
self.job_history = JobHistory()
# Capacity changing pattern & period
self.pattern = pattern
self.period = period
# Spare specific node
self.num_spare_node = num_spare_node
self.spare_node_id = []
if num_spare_node is not None:
for i in range(num_spare_node):
spare_node_index = random_seed % len(node_list)
spare_node_id = node_list[spare_node_index].id
while spare_node_id in self.spare_node_id:
random_seed += 29741 # a random prime number
spare_node_index = random_seed % len(node_list)
spare_node_id = node_list[spare_node_index].id
self.spare_node_id.append(spare_node_id) # indicate which node to spare
random_seed += 29741 # a random prime number
self.export_cluster_util = export_cluster_util
self.cluster_time = []
self.cluster_cpu = []
self.cluster_gpu = []
self.idle_cluster_counter = 0
def retrieve_job_from_full_list(self):
while len(self.job_full_list) > 0:
job = self.job_full_list[-1]
if job['submit_time'] <= self.cur_time:
job = self.job_full_list.pop()
self.job_list.append(job)
else:
return 0
def sorted_node_list(self):
node_list = list(self.node_dict.values())
node_list.sort(key=lambda n: n.id)
return node_list
def tic_job(self, delta=1):
# Unlike tic_svc(), it receives simulator's cur_time as its own cur_time
# Here it returns a "cur_time" value to the simulator
# If succeed: return cur_time >= 0
# Else: return cur_time < 0 ==> exit_flag = 1
self.cur_time += delta
if self.export_cluster_util and self.cur_time % 10000 == 0:
self.record_cluster_util()
self.retrieve_job_from_full_list() # update self.job_list
job_runn_list = self.job_runn_list
if len(job_runn_list) > 0:
for job in job_runn_list:
job['on_time'] += delta
job['progress'] = job['on_time'] * job['num_gpu']
# Job done logic
if job['on_time'] >= job['duration']:
over_tic_time = job['on_time'] - job['duration'] # only if delta > 1
job['on_time'] -= over_tic_time
job['progress'] -= over_tic_time * job['num_gpu']
job['done'] = 1
host_node_id = job['node']
host_node = self.node_dict.get(host_node_id)
suc = host_node.release_job(job=job)
assert suc
job['jct'] = self.cur_time - over_tic_time - job['submit_time'] # deduct submit_time
self.job_history.add_done_job(job)
print_fn("%sDONE: %s || %s" % (self.log_prefix, _repr_job_done(job), job))
return self.cur_time # exit_flag = 0, still going
# len(job_runn_list) <= 0,
elif len(self.job_list) > 0: # empty cluster with job pending
self.idle_cluster_counter += 1
print_fn("%sIDLE cluster until jobs: %s" % (self.log_prefix, [_repr_job_preempt(e) for e in self.job_list]))
if self.idle_cluster_counter % 10000 == 0:
print_fn('{} idle cluster: {}'.format(self.idle_cluster_counter, [_repr_job_preempt(e) for e in self.job_list]), level=2)
return self.cur_time # exit_flag = 0, still going
elif len(self.job_full_list) > 0: # i.e., empty cluster waiting for jobs to come
wake_time = self.job_full_list[-1]['submit_time'] - delta # the submit_time of the earliest job
assert self.cur_time <= wake_time # if ==, i.e., the stride is unnecessary
self.cur_time = wake_time
return self.cur_time # exit_flag = 0, still going
else: # no running job, no pending job, no coming job => exit.
return -1 # exit
def tic_svc(self, cur_time):
self.cur_time = cur_time
cap_ratio = self.get_cap_ratio(cur_time)
svc_ratio = 1 - cap_ratio
if self.svc_former_ratio != svc_ratio:
self.svc_former_ratio = svc_ratio
print_fn("%sService WAS:%s" % (self.log_prefix, str([n.__repr__() for n in self.node_list])))
for node in self.node_list:
if node.id in self.spare_node_id: # spare from service allocation
continue
node.set_svc_res_by_ratio(ratio=svc_ratio)
print_fn("%sService NOW:%s" % (self.log_prefix, str([n.__repr__() for n in self.node_list])))
def replace_svc(self):
# Migrating services or jobs for vacancies.
raise NotImplementedError("Cluster replace service")
def display_capacity_pattern(self, max_time=200):
for cur_time in range(max_time):
cur_gpus, cur_cpus = self.get_capacity(cur_time)
four_gpus, four_cpus = int(cur_gpus / 4), int(cur_cpus / 4)
left_gpus, left_cpus = int(cur_gpus % 4), int(cur_cpus % 4)
print("[%3s] G%3d |%s%s\n C%3d |%s%s" % (cur_time, cur_gpus, "####|" * four_gpus, "#" * left_gpus, cur_cpus, "xxxx|" * four_cpus, "x" * left_cpus ))
def display_capacity_pattern_csv(self, max_time=200):
print("time,GPUs,CPUs")
for cur_time in range(max_time):
cur_gpus, cur_cpus = self.get_capacity(cur_time)
# four_gpus, four_cpus = int(cur_gpus / 4), int(cur_cpus / 4)
# left_gpus, left_cpus = int(cur_gpus % 4), int(cur_cpus % 4)
print("%d,%d,%d" % (cur_time, cur_gpus, cur_cpus))
def get_capacity(self, time, num_spare_node=None):
"""
Only for display_capacity_pattern()
:param time: cluster.cur_time, cluster.num_spare_node
:return: [cur_gpus, cur_cpus]
"""
num_spare_node = self.num_spare_node if num_spare_node is None else num_spare_node
ratio = self.get_cap_ratio(time)
if num_spare_node is None:
return [int(ratio * self.num_gpus), int(ratio * self.num_cpus)]
else:
if not self.spare_node_id:
spare_node_id = list(range(num_spare_node))
else:
spare_node_id = self.spare_node_id
g, c = 0, 0
for node in self.node_list:
if node.id in spare_node_id:
g += node.num_gpus
c += node.num_cpus
else:
g += node.num_gpus - int((1 - ratio) * node.num_gpus)
c += node.num_cpus - int((1 - ratio) * node.num_cpus)
assert g >= 0 and c >= 0
return [g, c]
def get_cap_ratio(self, time, pattern=None, period=None):
pattern = self.pattern if pattern is None else pattern
period = self.period if period is None else period
pattern_ratio_dict = {
0: {1:(0,1000)}, # always maximum capacity
1: {1:(0, 62), 0.6:(62, 124)},
2: {0.6:[(0, 10), (62, 124)], 1:(10, 62)},
3: {1:(0, 20), 0.9:(20, 40), 0.8:(40, 60), 0.7:(60, 80), 0.6:(80, 100), 0.5:(100, 124)},
4: {0.5:(0, 20), 0.6:(20, 40), 0.7:(40, 60), 0.8:(60, 80), 0.9:(80, 100)},
5: {1:[(0, 10), (110, 124)], 0.9:[(10, 20),(100, 110)], 0.8:[(20, 30),(90, 100)], 0.7:[(30, 40),(80, 90)], 0.6:(40, 50), 0.5:(50, 70), 0.4:(70, 80)},
6: {1:[(0, 20), (50, 60), (110, 124)], 0.6:(20, 50), 0.4:(60, 110)},
7: {1:[(0, 20), (50, 60), (110, 124)], 0.9:(20, 50), 0.8:(60, 110)}
} # { pattern1: {ratio1: [ (lower_bound1, upper_bound1), (lb2, ub2), ... ], ratio2: [...]}, pattern2: {...} }
t_mod_p = time % period
ratio_dict = pattern_ratio_dict.get(pattern, {})
for key, val in ratio_dict.items():
if type(val) == tuple:
val = [val] # becomes a list
for bound in val:
if bound[0] <= t_mod_p < bound[1]:
return key
return 1
def record_cluster_util(self):
self.cluster_time.append(self.cur_time)
self.cluster_cpu.append(self.job_cpus)
self.cluster_gpu.append(self.job_gpus)
@property
def node_list(self):
return list(self.node_dict.values())
@property
def cur_rsrc(self):
return [self.cur_gpus, self.cur_cpus]
@property
def cur_gpus(self):
return self.num_gpus - self.svc_gpus
@property
def cur_cpus(self):
return self.num_cpus - self.svc_cpus
@property
def job_runn_list(self):
job_runn_list = []
for node in self.node_list:
job_runn_list.extend(node.job_runn_list)
return job_runn_list
@property
def svc_gpus(self):
return sum([n.svc_gpus for n in self.node_list])
@property
def svc_cpus(self):
return sum([n.svc_cpus for n in self.node_list])
@property
def idl_gpus(self):
return sum([n.idl_gpus for n in self.node_list])
@property
def idl_cpus(self):
return sum([n.idl_cpus for n in self.node_list])
@property
def job_gpus(self):
return sum([n.job_gpus for n in self.node_list])
@property
def job_cpus(self):
return sum([n.job_cpus for n in self.node_list])
@property
def log_prefix(self):
if self.export_cluster_util is True: # add util export
self.record_cluster_util()
return "[%6s],[GPU,CPU]:[%7s,%8s]/[%7s,%8s]." % (self.cur_time, self.job_gpus, self.job_cpus, self.cur_gpus, self.cur_cpus)