cluster-trace-gpu-v2020/simulator/utils.py (191 lines of code) (raw):

import logging import csv import numpy as np from matplotlib import pyplot as plt ALLOC_POLICY_DICT = { 0: 'SJF', # 'short job first', SJF 1: 'SJU', # SJF with estimator using USER feature 2: 'SJG', # SJF with estimator using GROUP, USER feature 4: 'SJGG', # SJF with estimator using GROUP, USER, GPU feature 8: 'FIFO', # FIFO, the default } PREEMPT_POLICY_DICT = { 0: 'SDF', # 'smallest_duration_first' 1: 'SSF', # 'smallest_size_first 2: 'LGF', # 'large_gpu_first', # LGF, size:num_gpu } GPU_TYPE_INT_DICT = { "CPU": 0, "MISC": 1, "T4": 2, "P100": 3, "V100": 4 } def print_fn(log, level=1): LOG_LEVEL_DEBUG = 0 LOG_LEVEL_INFO = 1 LOG_LEVEL_WARNING = 2 LOG_LEVEL_ERROR = 3 if level == LOG_LEVEL_DEBUG: logging.debug(log) elif level == LOG_LEVEL_INFO: logging.info(log) elif level == LOG_LEVEL_WARNING: logging.warning(log) elif level == LOG_LEVEL_ERROR: logging.error(log) exit() def _repr_job_concise(job_dict): return "J %s([G %s,C %s]-D %s)" % (job_dict['job_id'], job_dict['num_gpu'], job_dict['num_cpu'], job_dict['duration']) def _repr_job_preempt(job_dict): return "J %s-[G %s,C %s]-O:%3s/D:%3s" % (job_dict['job_id'], job_dict['num_gpu'], job_dict['num_cpu'], job_dict['on_time'], job_dict['duration']) def _repr_job_done(job_dict): job_repr_concise = "J %s([G %s,C %s]-D %s-N %s)" % (job_dict['job_id'], job_dict['num_gpu'], job_dict['num_cpu'], job_dict['duration'], job_dict['node']) return "%25s: %4s ---> %4s" % (job_repr_concise, job_dict['jct'] - job_dict['duration'], job_dict['jct']) def _add_describe(describe_file): if describe_file is None: return None describe_dict = {} with open(describe_file, 'r') as fd: reader = csv.DictReader(fd, delimiter=',') for row in reader: for k, v in row.items(): if k=='count': row[k] = int(v) elif k in ['mean', 'std', 'min', '25%', '50%', '75%', 'max']: if v == '': v = 0 row[k] = float(v) describe_dict[row['user']] = row return describe_dict # dd['ae8ed1']['50%']==38.4 def _add_job(job_list, job_dict, describe_dict=None): # Add job (job_dict) into job_list for key, value in job_dict.items(): if value is not None and value.isdigit() and key != 'user': if type(value) == str: job_dict[key] = round(float(value)) else: # duration becomes an int job_dict[key] = round(value) elif key in ['wait_time','user_dur','user_gpu_dur','group_dur','group_gpu_dur']: try: job_dict[key] = float(value) except: pass keys = ['num_cpu', 'num_gpu', 'submit_time', 'num_inst'] for key in keys: if key not in job_dict or job_dict[key] == '': if key in ['num_cpu', 'num_gpu']: job_dict[key] = 0 else: # key in ['submit_time', 'num_inst'] job_dict[key] = 1 else: if key in ['num_cpu', 'num_gpu']: # in % job_dict[key] = round(100 * float(job_dict[key])) else: job_dict[key] = round(float(job_dict[key])) # Add entries to be used in scheduling job_dict['duration'] = int(float(job_dict['duration'])) if job_dict['duration'] <= 0: job_dict['duration'] = 1 # fix duration == 0 problem. job_dict['size'] = int((job_dict['num_gpu'] + job_dict['num_cpu']) * job_dict['duration']) # (gpu + cpu) x duration job_dict['on_time'] = 0 job_dict['wasted'] = 0 job_dict['jct'] = -1 job_dict['resource'] = [job_dict['num_gpu'], job_dict['num_cpu']] # list of resources job_dict['node'] = None # Add duration estimation if describe_dict is not None: jd_user = describe_dict.get(job_dict['user']) if jd_user is not None: job_dict['dur_avg'] = float(jd_user['mean']) # expectation job_dict['dur_std'] = float(jd_user['std']) # standard deviation job_dict['dur_med'] = float(jd_user['50%']) # median job_dict['dur_trim_mean'] = float(jd_user['trim_mean']) # discard 10% top and 10% tail when calc. mean # Remove original unused entries for drop_col in ['fuxi_job_name','fuxi_task_name','inst_id','running_cluster','model_name','iterations','interval','vc','jobid','status']: if drop_col in job_dict: job_dict.pop(drop_col) job_list.append(job_dict) def add_user_round_robin_id(job_list): # Add a new sorting metrics, user_rrid, to enforce scheduler picking jobs from multiple users # when all users' primary metrics are the same (e.g., 0). user_rrid_dict = {} # a new dict each time for job in job_list: user = job['user'] rrid = user_rrid_dict.get(user, None) if rrid is None: rrid = 0 user_rrid_dict[user] = 1 else: user_rrid_dict[user] += 1 job['user_rrid'] = rrid def large_job_pruning(job_list, gpu_limit, cpu_limit): if job_list is None: return [] for job in job_list: if 'num_gpu' in job and job['num_gpu'] > gpu_limit: gpu_was = job['num_gpu'] job['num_gpu'] = gpu_limit print_fn("{:s}: GPU {:d} ==> {:d}".format(_repr_job_concise(job), gpu_was, gpu_limit)) if 'num_cpu' in job and job['num_cpu'] > cpu_limit: cpu_was = job['num_cpu'] job['num_cpu'] = cpu_limit print_fn("{:s}: CPU {:d} ==> {:d}".format(_repr_job_concise(job), cpu_was, cpu_limit)) return job_list def plot_cluster_util(npyfile, to_date=False): cluster_util = np.load(npyfile) cluster_time, cluster_cpu, cluster_gpu = cluster_util[0], cluster_util[1], cluster_util[2] plt.clf() plt.plot(cluster_time, cluster_cpu / 10, label='10CPU') plt.plot(cluster_time, cluster_gpu, label='GPU') plt.legend() try: plt.savefig(str(npyfile).split('.npy')[0]+".png") except: plt.savefig("cluster_util") def plot_job_stats(npyfile, to_date=False): plt.figure(figsize=(16, 6), dpi=120) job_stats = np.load(npyfile) job_submit_time, job_duration, job_jct, job_gpu_type, job_num_inst, job_id = job_stats[0], job_stats[1], job_stats[2], job_stats[3], job_stats[4], job_stats[5] job_queue_delay = job_jct - job_duration plt.clf() plt.plot(job_submit_time, job_queue_delay, color='orange', label='queue_delay') plt.plot(job_submit_time, job_duration, color='black', alpha=0.3, label='duration') plt.legend() try: plt.savefig(str(npyfile).split('.npy')[0]+".png") except: plt.savefig("job_stats") def plot_multi_job_stats(npyfiles, to_date=False): plt.clf() plt.figure(figsize=(12, 6), dpi=120) for npyfile in npyfiles: job_stats = np.load(npyfile) job_submit_time, job_duration, job_jct, job_gpu_type, job_num_inst, job_id = job_stats[0], job_stats[1], job_stats[2], job_stats[3], job_stats[4], job_stats[5] job_queue_delay = job_jct - job_duration try: label=ALLOC_POLICY_DICT[int(str(npyfile).split('.log.a')[1].split('-p')[0])] except KeyError: label = str(npyfile).split('.log.')[1].split('-job_stats.npy')[0] plt.plot(job_submit_time, job_queue_delay, alpha=0.5, label=label+'-queue_delay') plt.plot(job_submit_time, job_duration, color='grey', alpha=0.3, label='job duration') plt.legend(loc='upper left') plt.title("Arrival jobs' duration and queueing delay") plt.xlabel("Submitted Time") plt.ylabel("Run/Wait Time") try: plt.savefig(str(npyfile).split('.log.')[0]+"-job_stats.png") except: plt.savefig("job_stats") def plot_multi_cluster_util(npyfiles, to_date=False): plt.clf() plt.figure(figsize=(12, 6), dpi=120) for npyfile in npyfiles: cluster_util = np.load(npyfile) cluster_time, cluster_cpu, cluster_gpu = cluster_util[0], cluster_util[1], cluster_util[2] try: label=ALLOC_POLICY_DICT[int(str(npyfile).split('.log.a')[1].split('-p')[0])] except KeyError: label = str(npyfile).split('.log.')[1].split('-cluster_util.npy')[0] plt.plot(cluster_time, cluster_gpu, alpha=0.5, label=label+'-GPU') plt.legend(loc='upper left') plt.title("Cluster Utilization") plt.xlabel("Time") plt.ylabel("Resource") try: plt.savefig(str(npyfile).split('.log.')[0]+"-cluster_util.png") except: plt.savefig("cluster_util")