azure-slurm/slurmcc/cost.py (231 lines of code) (raw):

import os import re import shutil import sys import json import csv from tabulate import tabulate from datetime import datetime import logging import subprocess from collections import namedtuple from hpc.autoscale.cost.azurecost import azurecost from .util import run log = logging.getLogger('cost') class Statistics: def __init__(self): self.jobs = 0 self.running_jobs = 0 self.processed = 0 self.unprocessed = 0 self.cost_per_sku = {} self.admincomment_err = 0 def display(self): table = [] table.append(['Total Jobs', self.jobs]) table.append(['Total Processed Jobs', self.processed]) table.append(['Total processed running jobs', self.running_jobs]) table.append(['Unprocessed Jobs', self.unprocessed]) table.append(['Jobs with admincomment errors', self.admincomment_err]) print(tabulate(table, headers=['SUMMARY',''], tablefmt="simple")) class CostSlurm: def __init__(self, start:str, end: str, cluster: str, cache_root: str, fmt: str=None) -> None: self.start = start self. end = end self.cluster = cluster self.sacct = shutil.which("sacct") if not self.sacct: raise RuntimeError("Could not find valid sacct binary") self.squeue = shutil.which("squeue") if not self.squeue: raise RuntimeError("Could not find valid squeue binary") self.sacctmgr = shutil.which("sacctmgr") if not self.sacctmgr: raise RuntimeError("Could not find valid sacctmgr binary") self.stats = Statistics() self.cache = f"{cache_root}/slurm" try: os.makedirs(self.cache, 0o777, exist_ok=True) except OSError as e: log.error("Unable to create cache directory {self.cache}") log.error(e.strerror) raise default_output_fmt = "jobid,user,account,cluster,partition,ncpus,nnodes,submit,start,end,elapsedraw,state" default_input_fmt = default_output_fmt + ",admincomment" self.options = ["--allusers", "--duplicates", "--parsable2", "--allocations", "--noheader"] avail_format = self.get_sacct_fields() if fmt: req_fmt = fmt.split(",") in_fmt = default_input_fmt.split(",") for f in req_fmt: if f not in avail_format: raise ValueError(f"{f} is not a valid sacct format option") if f not in in_fmt: in_fmt.append(f) self.output_format = ",".join(req_fmt) self.input_format = ",".join(in_fmt) else: self.output_format = default_output_fmt self.input_format = default_input_fmt self.in_fmt_t = namedtuple('in_fmt_t', self.input_format) self.slurm_fmt_t = namedtuple('slurm_fmt_t', self.output_format) self.c_fmt_t = namedtuple('c_fmt_t', ['cost']) def get_sacct_fields(self): options = [] cmd = [self.sacct, "-e"] out = run(cmd) for line in out.stdout.splitlines(): for opt in line.split(): options.append(opt.lower()) return options def _construct_command(self) -> list: args = [self.sacct] for opt in self.options: args.append(opt) args.append("-M") args.append(self.cluster) args.append(f"--start={self.start}") args.append(f"--end={self.end}") args.append("-o") args.append(self.input_format) return args def use_cache(self, filename) -> bool: return False def get_queue_rec_file(self) -> str: return os.path.join(self.cache, f"queue.out") def get_job_rec_file(self) -> str: return os.path.join(self.cache, f"sacct-{self.start}-{self.end}.out") def get_queue_records(self) -> str: _queue_rec_file = self.get_queue_rec_file() if self.use_cache(_queue_rec_file): return _queue_rec_file cmd = [self.squeue, "--json"] with open(_queue_rec_file, 'w') as fp: output = run(cmd, stdout=fp) if output.returncode: log.error("could not read slurm queue") return _queue_rec_file def process_queue(self) -> dict: running_jobs = {} queue_rec = self.get_queue_records() with open(queue_rec, 'r') as fp: data = json.load(fp) for job in data['jobs']: if job['job_state'] != 'RUNNING' and job['job_state'] != 'CONFIGURING': continue job_id = job['job_id'] if job['admin_comment']: running_jobs[job_id] = job['admin_comment'] return running_jobs def fetch_job_records(self) -> str: _job_rec_file = self.get_job_rec_file() if self.use_cache(_job_rec_file): return _job_rec_file cmd = self._construct_command() with open(_job_rec_file, 'w') as fp: output = run(cmd, stdout=fp) if output.returncode: log.error("Could not fetch slurm records") return _job_rec_file def parse_admincomment(self, comment: str): return json.loads(comment) def get_output_format(self, azcost: azurecost): az_fmt = azcost.get_job_format() return namedtuple('out_fmt_t', list(self.slurm_fmt_t._fields + az_fmt._fields + self.c_fmt_t._fields)) def process_jobs(self, azcost: azurecost, jobsfp, out_fmt_t): _job_rec_file = self.fetch_job_records() running = self.process_queue() fp = open(_job_rec_file, newline='') reader = csv.reader(fp, delimiter='|') writer = csv.writer(jobsfp, delimiter=',') for row in map(self.in_fmt_t._make, reader): self.stats.jobs += 1 if row.state == 'RUNNING' and int(row.jobid) in running: admincomment = running[int(row.jobid)] self.stats.running_jobs += 1 else: admincomment = row.admincomment try: comment_d = self.parse_admincomment(admincomment)[0] sku_name = comment_d['vm_size'] cpupernode = comment_d['pcpu_count'] region = comment_d['location'] spot = comment_d['spot'] except (json.JSONDecodeError,IndexError) as e: log.debug(f"Cannot parse admincomment job={row.jobid} cluster={row.cluster} admincomment={admincomment}") self.stats.admincomment_err += 1 self.stats.unprocessed += 1 continue except KeyError as e: log.debug(f"Key: {e.args[0]} not found in admincomment, job={row.jobid}, cluster={row.cluster}") self.stats.admincomment_err +=1 self.stats.unprocessed += 1 continue charge_factor = float(row.ncpus) / float(cpupernode) az_fmt = azcost.get_job(sku_name, region, spot) charged_cost = ((az_fmt.rate/3600) * float(row.elapsedraw)) * charge_factor c_fmt = self.c_fmt_t(cost=charged_cost) if (region,sku_name) not in self.stats.cost_per_sku: self.stats.cost_per_sku[(region,sku_name)] = 0 self.stats.cost_per_sku[(region,sku_name)] += charged_cost out_row = [] for f in out_fmt_t._fields: if f in self.in_fmt_t._fields: out_row.append(row._asdict()[f]) elif f in az_fmt._fields: out_row.append(az_fmt._asdict()[f]) elif f in self.c_fmt_t._fields: out_row.append(c_fmt._asdict()[f]) else: log.error(f"encountered an unexpected field {f}") writer.writerow(out_row) self.stats.processed += 1 fp.close() def _escape(s: str) -> str: return re.sub("[^a-zA-Z0-9-]", "-", s).lower() class CostDriver: def __init__(self, azcost: azurecost, config: dict): self.config = config self.azcost = azcost self.cluster = config.get('cluster_name') if not self.cluster: raise ValueError("cluster_name not present in config") self.cluster = _escape(self.cluster) def run(self, start: datetime, end: datetime, out: str, fmt: str): log.debug(f"start: {start}") log.debug(f"end: {end}") sacct_start = start.isoformat() sacct_end = end.isoformat() cost_config = self.config.get('cost', {}) if not cost_config or not cost_config.get('cache_root'): log.debug("Using /tmp as cost cache dir") cache_root = "/tmp" else: cache_root = cost_config.get('cache_root') cost_slurm = CostSlurm(start=sacct_start, end=sacct_end, cluster=self.cluster, cache_root=cache_root,fmt=fmt) try: os.makedirs(out, exist_ok=True) except OSError as e: log.error(f"Cannot create output directory {out}") raise jobs_csv = os.path.join(out, "jobs.csv") part_csv = os.path.join(out, "partition.csv") part_hourly = os.path.join(out, "partition_hourly.csv") fmt = self.azcost.get_job_format() out_fmt_t = cost_slurm.get_output_format(self.azcost) with open(jobs_csv, 'w') as fp: writer = csv.writer(fp, delimiter=',') writer.writerow(list(out_fmt_t._fields)) cost_slurm.process_jobs(azcost=self.azcost, jobsfp=fp, out_fmt_t=out_fmt_t) fmt = self.azcost.get_nodearray_format() with open(part_csv, 'w') as fp: writer = csv.writer(fp, delimiter=',') writer.writerow(list(fmt._fields)) self.azcost.get_nodearray(fp, start=sacct_start, end=sacct_end) fmt = self.azcost.get_nodearray_hourly_format() with open(part_hourly, 'w') as fp: writer = csv.writer(fp, delimiter=',') writer.writerow(list(fmt._fields)) self.azcost.get_nodearray_hourly(fp, start=sacct_start, end=sacct_end) cost_slurm.stats.display()