cluster-trace-gpu-v2020/simulator/simulator.py (187 lines of code) (raw):

# Date: 03.28 # Note: 2D: CPU + GPU import random import csv import copy import numpy as np from utils import print_fn, _add_job, _repr_job_done, _add_describe, GPU_TYPE_INT_DICT from cluster import Cluster from node import Node from scheduler import Scheduler """ Jobs OrderedDict keys: job_id,duration,resource=[num_gpu, num_cpu] duration,size,on_time,wasted,jct, # submit_time,model_name,iterations,interval """ class Simulator: def __init__(self, csv_file, alloc_policy=0, preempt_policy=0, sort_node_policy=0, oracle=False, random_seed=42, max_time=int(1e10), num_gpus=None, num_cpus=None, num_nodes=4, pattern=1, delta=1, num_spare_node=1, hetero=False, describe_file=None, export_job_stats=False, export_cluster_util=False, log_file=None, arrival_rate=None, arrival_interval=60, arrival_shuffle=False, num_jobs_limit=None, gpu_type_matching=0, verbose=0): self.cluster = None self.scheduler = None self.cur_time = None self.num_jobs = None self.job_list = [] self.job_runn_list = [] self.exit_flag = None self.max_time = max_time self.num_gpus = num_gpus self.num_cpus = num_cpus self.num_nodes = num_nodes self.csv_file = csv_file self.oracle = oracle # know fluctuation? self.alloc_policy = alloc_policy self.preempt_policy = preempt_policy self.sort_node_policy = sort_node_policy self.describe_dict = _add_describe(describe_file) # describe_file: each users' job distribution in csv format. self.arrival_rate = arrival_rate # job arrival rate self.arrival_interval = arrival_interval self.arrival_shuffle = arrival_shuffle self.num_jobs_limit = num_jobs_limit self.job_origin_list = self.add_job(self.csv_file, self.describe_dict, limit=num_jobs_limit * 10) self.pattern = pattern # which resource varying pattern? self.delta = delta # time granularity, minimum: 1 second. self.num_spare_node = num_spare_node self.hetero = hetero self.gpu_type_matching = gpu_type_matching self.export_job_stats = export_job_stats self.export_cluster_util = export_cluster_util self.log_file = log_file # just pass the path self.verbose = verbose random.seed(random_seed) @staticmethod def add_job(csv_file, describe_dict, limit=None): """ limit: To avoid reading too many jobs when the sampled number << total number of jobs in trace file. """ job_list = [] with open(csv_file, 'r') as fd: reader = csv.DictReader(fd, delimiter=',') keys = reader.fieldnames for i, row in enumerate(reader): _add_job(job_list, row, describe_dict) if limit is not None and i >= limit: break return job_list @staticmethod def set_job_list_arrival_time(job_list, arrival_rate=None, interval=60, shuffle_order=False): """ job_list: jobs to execute in this run arrival_rate: num of jobs to arrive at each time interval (-1 or None means no changes) interval: time interval (default: 60) shuffle_order: bool, whether each user's inherent job order are shuffled (default: False) """ if arrival_rate is None or arrival_rate < 0: return 0 # respect the original submit time if shuffle_order is True: np.random.shuffle(job_list) else: job_list.sort(key=lambda e: (e.get('submit_time', float('inf')), e['job_id'])) arrival_counter = 0 for job in job_list: arrival_time = (arrival_counter // arrival_rate) * interval job['submit_time'] = arrival_time arrival_counter += 1 def init_node_list(self): return [Node(id=i) for i in range(self.num_nodes)] def init_node_list_hetero(self): node_list = [] node_id = 0 for _ in range(16): # low-ended machines first node_list.append(Node(node_id, 0, 96, gpu_type='CPU')) node_id += 1 for _ in range(56): # low-ended machines first node_list.append(Node(node_id, 8, 96, gpu_type='MISC')) node_id += 1 for _ in range(100): # low-ended machines first node_list.append(Node(node_id, 2, 96, gpu_type='T4')) node_id += 1 for _ in range(160): # low-ended machines first node_list.append(Node(node_id, 2, 64, gpu_type='P100')) node_id += 1 for _ in range(48): node_list.append(Node(node_id, 8, 96, gpu_type='V100')) node_id += 1 return node_list def init_go(self, num_jobs=None): self.cur_time = 0 self.job_list = copy.deepcopy(self.job_origin_list) # copy each obj in the list num_jobs = num_jobs if num_jobs is not None else self.num_jobs_limit if (num_jobs is not None) and num_jobs <= len(self.job_list): random.shuffle(self.job_list) self.job_list = self.job_list[:num_jobs] self.set_job_list_arrival_time(self.job_list, self.arrival_rate, self.arrival_interval, self.arrival_shuffle) print_fn("----------------------------- RANDOM: %d" % random.randint(1000, 9999)) print_fn("%d Job loaded" % len(self.job_list)) # Init Cluster resources if self.hetero: node_list = self.init_node_list_hetero() elif self.num_nodes == 1 and self.num_gpus is not None: # i.e., one big node formulation node_list = [Node(id=1, num_gpus=self.num_gpus, num_cpus=self.num_cpus)] else: node_list = self.init_node_list() self.cluster = Cluster(node_list=node_list, job_list=self.job_list, random_seed=random.randint(1000, 9999), num_spare_node=self.num_spare_node, pattern=self.pattern, export_cluster_util=self.export_cluster_util) self.scheduler = Scheduler(cluster=self.cluster, alloc_policy=self.alloc_policy, preempt_policy=self.preempt_policy, sort_node_policy=self.sort_node_policy, verbose=self.verbose, gpu_type_matching=self.gpu_type_matching) self.num_jobs = len(self.job_list) self.exit_flag = 0 print_fn("Spared nodes: %s" % self.cluster.spare_node_id) def exp_summary(self, id=None): job_history = self.cluster.job_history num_jobs_done = job_history.num_jobs_done jct_summary = job_history.jct_summary wait_time_summary = job_history.wait_time_summary job_done_list = job_history.job_done_list wasted_summary = job_history.wasted_summary assert num_jobs_done == len(job_done_list) print_fn("Wasted progress in summary: %s" % wasted_summary) if num_jobs_done == self.num_jobs: print_fn("All Done (makespan) at %s" % self.cur_time) else: print_fn("%d of %d jobs Done (makespan) at %s" % (num_jobs_done, self.num_jobs, self.cur_time)) print_fn("%d jobs' average JCT: %.4f, average wait time: %.4f" % (num_jobs_done, jct_summary / num_jobs_done, wait_time_summary / num_jobs_done)) # Print job done breakdown job_done_list.sort(key=lambda e: e['job_id']) if self.export_job_stats is True: job_stats = np.zeros(shape=(6, len(job_done_list)), dtype=int) for i, job in enumerate(job_done_list): print_fn("%s || %s" % (_repr_job_done(job), job)) job_stats[0][i] = job['submit_time'] job_stats[1][i] = job['duration'] job_stats[2][i] = job['jct'] job_stats[3][i] = GPU_TYPE_INT_DICT.get(job.get('gpu_type', 'N/A'), -1) job_stats[4][i] = job.get('num_inst', 1) job_stats[5][i] = job['job_id'] job_stats_name = "%s.a%s-p%s-i%s-job_stats.npy" % ( self.log_file.name, self.alloc_policy, self.preempt_policy, id) job_stats_file = self.log_file.parent / job_stats_name np.save(job_stats_file, job_stats) else: for job in job_done_list: print_fn("%s || %s" % (_repr_job_done(job), job)) print_fn("") if self.export_cluster_util is True: cluster_util = np.asarray([ self.cluster.cluster_time, self.cluster.cluster_cpu, self.cluster.cluster_gpu ]) cluster_util_name = "%s.a%s-p%s-i%s-cluster_util.npy" % ( self.log_file.name, self.alloc_policy, self.preempt_policy, id) cluster_util_file = self.log_file.parent / cluster_util_name np.save(cluster_util_file, cluster_util) return num_jobs_done, jct_summary, wait_time_summary def simulator_go(self, repeat=1, num_jobs=None): """ :return: [[num_jobs, avg_jct, wait_time, makespan], [], ... ] """ result = [] for repeat_id in range(repeat): self.init_go(num_jobs=num_jobs) while not self.exit_flag: self.tic(self.delta) num_jobs_done, jct_summary, wait_time_summary = self.exp_summary(repeat_id) result.append((num_jobs_done, jct_summary / num_jobs_done, wait_time_summary / num_jobs_done, self.cur_time)) return result def tic(self, delta=1): if self.cur_time < self.max_time: self.cluster.tic_svc(self.cur_time) # Preempt job self.scheduler.preempt_job(self.cluster) # Allocate job self.scheduler.alloc_job(self.cluster) # Jobs tic and global cur_time += delta tic_return_value = self.cluster.tic_job(delta) if tic_return_value >= 0: self.cur_time = tic_return_value else: self.cur_time = self.cur_time + delta self.exit_flag = 1 else: print_fn("TIMEOUT {} with jobs {}".format(self.cur_time, self.cluster.job_list)) self.exit_flag = 1 raise TimeoutError("TIMEOUT {} with jobs {}".format(self.cur_time, self.cluster.job_list))