ansible/roles/slurm/files/scripts/resume.py (568 lines of code) (raw):

#!/usr/bin/env python3 # Copyright (C) SchedMD LLC. # Copyright 2015 Google Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from typing import List import argparse import collections import json import logging import os import sys import yaml from itertools import chain from pathlib import Path import util from util import ( chunked, dirs, ensure_execute, execute_with_futures, get_insert_operations, log_api_request, map_with_futures, run, separate, to_hostlist, to_hostlist_fast, trim_self_link, wait_for_operation, ) from util import cfg, lkp, NSDict, TPU # from util import cfg, lkp, NSDict import slurm_gcp_plugins filename = Path(__file__).name LOGFILE = (Path(cfg.slurm_log_dir if cfg else ".") / filename).with_suffix(".log") log = logging.getLogger(filename) global_resume_data = None PLACEMENT_MAX_CNT = 150 # Placement group needs to be the same for an entire bulk_insert hence # if placement is used the actual BULK_INSERT_LIMIT will be # max([1000, PLACEMENT_MAX_CNT]) BULK_INSERT_LIMIT = 5000 def instance_properties(nodeset, model, placement_group, labels=None): template = lkp.node_template(model) template_info = lkp.template_info(template) props = NSDict() slurm_metadata = { "slurm_cluster_name": cfg.slurm_cluster_name, "slurm_instance_role": "compute", "startup-script": ( Path(cfg.slurm_scripts_dir or util.dirs.scripts) / "startup.sh" ).read_text(), } info_metadata = { item.get("key"): item.get("value") for item in template_info.metadata["items"] } props_metadata = {**info_metadata, **slurm_metadata} props.metadata = { "items": [NSDict({"key": k, "value": v}) for k, v in props_metadata.items()] } labels = { "slurm_cluster_name": cfg.slurm_cluster_name, "slurm_instance_role": "compute", **(labels or {}), } props.labels = {**template_info.labels, **labels} for disk in template_info.disks: # do not label local ssd if ( "diskType" not in disk.initializeParams or disk.initializeParams.diskType == "local-ssd" ): continue disk.initializeParams.labels.update(labels) props.disks = template_info.disks if placement_group: props.scheduling = { "onHostMaintenance": "TERMINATE", "automaticRestart": False, } props.resourcePolicies = [ placement_group, ] if nodeset.reservation_name: reservation_name = nodeset.reservation_name zones = list(nodeset.zone_policy_allow or []) assert len(zones) == 1, "Only single zone is supported if using a reservation" reservation = lkp.reservation(reservation_name, zones[0]) props.reservationAffinity = { "consumeReservationType": "SPECIFIC_RESERVATION", "key": f"compute.{util.universe_domain()}/reservation-name", "values": [reservation_name], } policies = util.reservation_resource_policies(reservation) if policies: props.scheduling = { "onHostMaintenance": "TERMINATE", "automaticRestart": False, } props.resourcePolicies = policies log.info( f"reservation {reservation_name} is being used with policies {props.resourcePolicies}" ) else: props.resourcePolicies = [] log.info( f"reservation {reservation_name} is being used without any policies" ) if nodeset.maintenance_interval: props.scheduling = props.scheduling or {} props.scheduling["maintenanceInterval"] = nodeset.maintenance_interval return props def per_instance_properties(node): props = NSDict() # No properties beyond name are supported yet. return props def create_instances_request(nodes, partition_name, placement_group, job_id=None): """Call regionInstances.bulkInsert to create instances""" assert len(nodes) > 0 if placement_group: assert len(nodes) <= min(PLACEMENT_MAX_CNT, BULK_INSERT_LIMIT) else: assert len(nodes) <= BULK_INSERT_LIMIT # model here indicates any node that can be used to describe the rest model = next(iter(nodes)) nodeset = lkp.node_nodeset(model) template = lkp.node_template(model) region = lkp.node_region(model) partition = cfg.partitions[partition_name] log.debug(f"create_instances_request: {model} placement: {placement_group}") body = NSDict() body.count = len(nodes) body.minCount = 1 # source of instance properties body.sourceInstanceTemplate = template labels = ( dict(slurm_job_id=job_id) if job_id is not None and partition.enable_job_exclusive else None ) # overwrites properties across all instances body.instanceProperties = instance_properties( nodeset, model, placement_group, labels ) # key is instance name, value overwrites properties body.perInstanceProperties = {k: per_instance_properties(k) for k in nodes} zones = { **{ f"zones/{zone}": {"preference": "ALLOW"} for zone in nodeset.zone_policy_allow or [] }, **{ f"zones/{zone}": {"preference": "DENY"} for zone in nodeset.zone_policy_deny or [] }, } body.locationPolicy.targetShape = cfg.zone_target_shape or "ANY_SINGLE_ZONE" if zones: body.locationPolicy.locations = zones if lkp.cfg.enable_slurm_gcp_plugins: slurm_gcp_plugins.pre_instance_bulk_insert( lkp=lkp, nodes=nodes, placement_group=placement_group, request_body=body, ) request = util.compute.regionInstances().bulkInsert( project=cfg.project, region=region, body=body.to_dict() ) if log.isEnabledFor(logging.DEBUG): log.debug( f"new request: endpoint={request.methodId} nodes={to_hostlist_fast(nodes)}" ) log_api_request(request) return request def group_nodes_bulk(nodes, resume_data=None): """group nodes by job_id, placement_group, node_group, and max bulkInsert size""" if resume_data is None: # all nodes will be considered jobless jobs = {} else: jobs = {job.job_id: job for job in resume_data.jobs} # expand all job nodelists for job in jobs.values(): job.nodelist_alloc = job.nodes_alloc job.nodes_alloc = util.to_hostnames(job.nodelist_alloc) job.nodelist_resume = job.nodes_resume job.nodes_resume = util.to_hostnames(job.nodelist_resume) job.tpu = util.part_is_tpu(job.partition) if not job.tpu: # create placement groups if nodes for job need it job.placement_groups = create_placement_groups( node_list=job.nodes_alloc, job_id=job.job_id, ) # placement group assignment is based on all allocated nodes, but we only want to # handle nodes in nodes_resume in this run. for pg, pg_nodes in job.placement_groups.items(): job.placement_groups[pg] = list( set(pg_nodes).intersection(job.nodes_resume) ) # a bit of a hack, but nodes resumed using scontrol instead of through job scheduling do not have a job jobless_nodes = list( set(nodes).difference( chain.from_iterable(job.nodes_resume for job in jobs.values()) ) ) jobless_nodes_tpu = [] for jobless_node in jobless_nodes[:]: if lkp.node_is_tpu(jobless_node): jobless_nodes.remove(jobless_node) jobless_nodes_tpu.append(jobless_node) jobs["Normal_None"] = NSDict( job_id=None, nodes_resume=jobless_nodes, nodes_alloc=jobless_nodes, placement_groups=create_placement_groups(node_list=jobless_nodes), partition=None, tpu=False, ) jobs["TPU_None"] = NSDict( job_id=None, nodes_resume=jobless_nodes_tpu, nodes_alloc=jobless_nodes_tpu, partition=None, tpu=True, ) BulkChunk = collections.namedtuple( "BulkChunk", ["prefix", "job_id", "partition_name", "placement_group", "nodes", "i"], ) BulkChunkTPU = collections.namedtuple( "BulkChunkTPU", ["prefix", "job_id", "partition_name", "nodes", "i"], ) grouped_nodes = [ BulkChunk( prefix, job_id if job_id != "Normal_None" else None, jobs[job_id].partition, placement_group, chunk_nodes, i, ) for job_id, job in jobs.items() if not job.tpu for placement_group, pg_nodes in job.placement_groups.items() for prefix, nodes in util.groupby_unsorted(pg_nodes, lkp.node_prefix) for i, chunk_nodes in enumerate(chunked(nodes, n=BULK_INSERT_LIMIT)) ] grouped_nodes_tpu = [ BulkChunkTPU( prefix, job_id if job_id != "TPU_None" else None, jobs[job_id].partition, chunk_nodes, i, ) for job_id, job in jobs.items() if job.tpu for prefix, nodes in util.groupby_unsorted(job.nodes_resume, lkp.node_prefix) for i, chunk_nodes in enumerate(lkp.chunk_tpu_nodes(list(nodes))) ] def group_name(chunk: BulkChunk): if chunk.placement_group is not None: return f"{chunk.prefix}:job{chunk.job_id}:{chunk.placement_group}:{chunk.i}" if chunk.job_id is not None: return f"{chunk.prefix}:job{chunk.job_id}:{chunk.i}" return f"{chunk.prefix}:{chunk.i}" def group_name_tpu(chunk: BulkChunkTPU): if chunk.job_id is not None: return f"{chunk.prefix}:job{chunk.job_id}:{chunk.i}" return f"{chunk.prefix}:{chunk.i}" grouped_nodes = {group_name(chunk): chunk for chunk in grouped_nodes} grouped_nodes_tpu = {group_name_tpu(chunk): chunk for chunk in grouped_nodes_tpu} return grouped_nodes, grouped_nodes_tpu def start_tpu(data): tpu = data["tpu"] node = data["node"] if len(node) == 1: node = node[0] log.debug( f"Will create a TPU of type {tpu.node_type} tf_version {tpu.tf_version} in zone {tpu.zone} with name {node}" ) tpunode = tpu.get_node(node) if tpunode is None: if not tpu.create_node(nodename=node): log.error("Error creating tpu node {node}") else: if tpu.preserve_tpu: if not tpu.start_node(nodename=node): log.error("Error starting tpu node {node}") else: log.info( f"Tpu node {node} is already created, but will not start it because nodeset does not have preserve_tpu option active." ) else: log.debug( f"Will create a multi-vm TPU of type {tpu.node_type} tf_version {tpu.tf_version} in zone {tpu.zone} with name {node[0]}" ) if not tpu.create_node(nodename=node): log.error("Error creating tpu node {node}") def resume_nodes(nodes: List[str], resume_data=None): """resume nodes in nodelist""" if not nodes: log.info("No nodes to resume") return if resume_data is None and global_resume_data is not None: resume_data = global_resume_data.deepcopy() nodes = sorted(nodes, key=lkp.node_prefix) grouped_nodes, grouped_tpu_nodes = group_nodes_bulk(nodes, resume_data) if log.isEnabledFor(logging.DEBUG): # grouped_nodelists is used in later debug logs too grouped_nodelists = { group: to_hostlist(chunk.nodes) for group, chunk in grouped_nodes.items() } grouped_tpu_nodelists = { group: to_hostlist(chunk.nodes) for group, chunk in grouped_tpu_nodes.items() } log.debug( "node bulk groups: \n{}".format(yaml.safe_dump(grouped_nodelists).rstrip()) ) log.debug( "TPU node bulk groups: \n{}".format( yaml.safe_dump(grouped_tpu_nodelists).rstrip() ) ) tpu_start_data = [] tpu_objs = {} for group, chunk in grouped_tpu_nodes.items(): # do not create multiple tpu_objs if nodes with the same prefix are used if chunk.prefix not in tpu_objs.keys(): model = chunk.nodes[0] tpu_objs[chunk.prefix] = TPU(lkp.node_nodeset(model)) tpu_start_data.append({"tpu": tpu_objs[chunk.prefix], "node": chunk.nodes}) # make all bulkInsert requests and execute with batch inserts = { group: create_instances_request( chunk.nodes, chunk.partition_name, chunk.placement_group, chunk.job_id ) for group, chunk in grouped_nodes.items() } bulk_ops = dict( zip(inserts.keys(), map_with_futures(ensure_execute, inserts.values())) ) log.debug(f"bulk_ops={yaml.safe_dump(bulk_ops)}") started = { group: op for group, op in bulk_ops.items() if not isinstance(op, Exception) } failed = { group: err for group, err in bulk_ops.items() if isinstance(err, Exception) } if failed: failed_reqs = [str(e) for e in failed.items()] log.error("bulkInsert API failures: {}".format("; ".join(failed_reqs))) for ident, exc in failed.items(): down_nodes(grouped_nodes[ident].nodes, f"GCP Error: {exc._get_reason()}") if log.isEnabledFor(logging.DEBUG): for group, op in started.items(): group_nodes = grouped_nodelists[group] name = op["name"] gid = op["operationGroupId"] log.debug( f"new bulkInsert operation started: group={group} nodes={group_nodes} name={name} operationGroupId={gid}" ) # wait for all bulkInserts to complete and log any errors bulk_operations = {group: wait_for_operation(op) for group, op in started.items()} # Start TPU after regular nodes so that regular nodes are not affected by the slower TPU nodes log.debug(f"tpu_start_data={yaml.safe_dump(tpu_start_data)}") execute_with_futures(start_tpu, tpu_start_data) all_successful_inserts = [] for group, bulk_op in bulk_operations.items(): group_id = bulk_op["operationGroupId"] bulk_op_name = bulk_op["name"] if "error" in bulk_op: error = bulk_op["error"]["errors"][0] group_nodes = to_hostlist_fast(grouped_nodes[group].nodes) log.warning( f"bulkInsert operation errors: {error['code']} name={bulk_op_name} operationGroupId={group_id} nodes={group_nodes}" ) successful_inserts, failed_inserts = separate( lambda op: "error" in op, get_insert_operations(group_id) ) # Apparently multiple errors are possible... so join with +. by_error_inserts = util.groupby_unsorted( failed_inserts, lambda op: "+".join(err["code"] for err in op["error"]["errors"]), ) for code, failed_ops in by_error_inserts: failed_nodes = {trim_self_link(op["targetLink"]): op for op in failed_ops} hostlist = util.to_hostlist(failed_nodes) count = len(failed_nodes) log.error( f"{count} instances failed to start: {code} ({hostlist}) operationGroupId={group_id}" ) failed_node, failed_op = next(iter(failed_nodes.items())) msg = "; ".join( f"{err['code']}: {err['message'] if 'message' in err else 'no message'}" for err in failed_op["error"]["errors"] ) if code != "RESOURCE_ALREADY_EXISTS": down_nodes(hostlist, f"GCP Error: {msg}") log.error( f"errors from insert for node '{failed_node}' ({failed_op['name']}): {msg}" ) ready_nodes = {trim_self_link(op["targetLink"]) for op in successful_inserts} if len(ready_nodes) > 0: ready_nodelist = to_hostlist_fast(ready_nodes) log.info(f"created {len(ready_nodes)} instances: nodes={ready_nodelist}") all_successful_inserts.extend(successful_inserts) def update_job_comment(nodelist: str, comment: str): if global_resume_data is None: log.warning( "Cannot update and notify jobs with API failures as no valid resume file is present." ) return nodes = util.to_hostnames(nodelist) job_list = ( job for job in global_resume_data.jobs if any(map(lambda node: node in nodes, util.to_hostnames(job.nodelist_resume))) ) for job in job_list: run(f"{lkp.scontrol} update jobid={job.job_id} admincomment='{comment}'") run(f"{lkp.scontrol} notify {job.job_id} '{comment}'") def down_nodes(nodelist, reason): """set nodes down with reason""" if isinstance(nodelist, list): nodelist = util.to_hostlist(nodelist) update_job_comment(nodelist, reason) run(f"{lkp.scontrol} update nodename={nodelist} state=down reason='{reason}'") def hold_job(job_id, reason): """hold job, set comment to reason""" run(f"{lkp.scontrol} hold jobid={job_id}") run(f"{lkp.scontrol} update jobid={job_id} comment='{reason}'") def create_placement_request(pg_name, region): config = { "name": pg_name, "region": region, "groupPlacementPolicy": { "collocation": "COLLOCATED", }, } if lkp.cfg.enable_slurm_gcp_plugins: slurm_gcp_plugins.pre_placement_group_insert( lkp=lkp, pg_name=pg_name, region=region, request_body=config ) request = util.compute.resourcePolicies().insert( project=cfg.project, region=region, body=config ) log_api_request(request) return request def create_placement_groups(node_list: list, job_id=0): pgs = {} node_map = lkp.nodeset_map(node_list) for _, nodes in node_map.items(): pgs.update(create_nodeset_placement_groups(nodes, job_id=job_id)) return pgs def create_nodeset_placement_groups(node_list: list, job_id=0): model = next(iter(node_list)) nodeset = lkp.node_nodeset(model) if not nodeset.enable_placement: return {None: node_list} if not valid_placement_nodes(node_list): return {None: node_list} region = lkp.node_region(model) groups = { f"{cfg.slurm_cluster_name}-{nodeset.nodeset_name}-{job_id}-{i}": nodes for i, nodes in enumerate(chunked(node_list, n=PLACEMENT_MAX_CNT)) } if log.isEnabledFor(logging.DEBUG): debug_groups = { group: to_hostlist_fast(nodes) for group, nodes in groups.items() } log.debug( f"creating {len(groups)} placement groups: \n{yaml.safe_dump(debug_groups).rstrip()}" ) requests = { group: create_placement_request(group, region) for group, incl_nodes in groups.items() } ops = dict( zip(requests.keys(), map_with_futures(ensure_execute, requests.values())) ) def classify_result(item): op = item[1] if not isinstance(op, Exception): return "submitted" if all(e.get("reason") == "alreadyExists" for e in op.error_details): return "redundant" return "failed" grouped_ops = dict(util.groupby_unsorted(list(ops.items()), classify_result)) submitted, redundant, failed = ( dict(grouped_ops.get(key, {})) for key in ("submitted", "redundant", "failed") ) if redundant: log.warning( "placement policies already exist: {}".format(",".join(redundant.keys())) ) if failed: reqs = [f"{e}" for _, e in failed.values()] log.fatal("failed to create placement policies: {}".format("; ".join(reqs))) operations = {group: wait_for_operation(op) for group, op in submitted.items()} for group, op in operations.items(): if "error" in op: msg = "; ".join( f"{err['code']}: {err['message'] if 'message' in err else 'no message'}" for err in op["error"]["errors"] ) log.error( f"placement group failed to create: '{group}' ({op['name']}): {msg}" ) log.info( f"created {len(operations)} placement groups ({to_hostlist_fast(operations.keys())})" ) return groups def valid_placement_nodes(nodelist): invalid_types = frozenset(["e2", "t2d", "n1", "t2a", "m1", "m2", "m3"]) for node in nodelist: mt = lkp.node_template_info(node).machineType if mt.split("-")[0] in invalid_types: log.warn(f"Unsupported machine type for placement policy: {mt}.") log.warn( f"Please do not use any the following machine types with placement policy: ({','.join(invalid_types)})" ) return False return True def get_resume_file_data(): SLURM_RESUME_FILE = os.getenv("SLURM_RESUME_FILE") if SLURM_RESUME_FILE is None: log.warning( "SLURM_RESUME_FILE was not in environment. Cannot get detailed job, node, partition allocation data." ) return None resume_file = Path(SLURM_RESUME_FILE) resume_json = resume_file.read_text() if args.loglevel == logging.DEBUG: (dirs.scripts / "resume_data.json").write_text(resume_json) return NSDict(json.loads(resume_json)) def main(nodelist, force=False): """main called when run as script""" log.debug(f"ResumeProgram {nodelist}") # Filter out nodes not in config.yaml other_nodes, pm_nodes = separate( lkp.is_power_managed_node, util.to_hostnames(nodelist) ) if other_nodes: log.debug( f"Ignoring non-power-managed nodes '{to_hostlist_fast(other_nodes)}' from '{nodelist}'" ) pm_nodelist = util.to_hostlist_fast(pm_nodes) if pm_nodes: log.debug(f"Resuming nodes '{pm_nodelist}' from '{nodelist}'") else: log.debug("No nodes to resume") return log.info(f"resume {pm_nodelist}") resume_nodes(pm_nodes, global_resume_data) # TODO only run below if resume_nodes succeeds but # resume_nodes does not currently return any status. if lkp.cfg.enable_slurm_gcp_plugins: slurm_gcp_plugins.post_main_resume_nodes( nodelist=nodelist, global_resume_data=global_resume_data ) parser = argparse.ArgumentParser( description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter ) parser.add_argument("nodelist", help="list of nodes to resume") parser.add_argument( "--force", "-f", "--static", action="store_true", help="Force attempted creation of the nodelist, whether nodes are exclusive or not.", ) parser.add_argument( "--debug", "-d", dest="loglevel", action="store_const", const=logging.DEBUG, default=logging.INFO, help="Enable debugging output", ) parser.add_argument( "--trace-api", "-t", action="store_true", help="Enable detailed api request output", ) if __name__ == "__main__": args = parser.parse_args() if cfg.enable_debug_logging: args.loglevel = logging.DEBUG if args.trace_api: cfg.extra_logging_flags = list(cfg.extra_logging_flags) cfg.extra_logging_flags.append("trace_api") util.chown_slurm(LOGFILE, mode=0o600) util.config_root_logger(filename, level=args.loglevel, logfile=LOGFILE) sys.excepthook = util.handle_exception global_resume_data = get_resume_file_data() main(args.nodelist, args.force)