def resume_nodes()

in community/modules/scheduler/schedmd-slurm-gcp-v6-controller/modules/slurm_files/scripts/resume.py [0:0]


def resume_nodes(nodes: List[str], resume_data: Optional[ResumeData]):
    """resume nodes in nodelist"""
    lkp = lookup()
    # Prevent dormant nodes associated with a future reservation from being resumed
    nodes, dormant_fr_nodes = util.separate(lkp.is_dormant_fr_node, nodes)
    
    if dormant_fr_nodes:
        log.warning(f"Resume was unable to resume future reservation nodes={dormant_fr_nodes}")
        down_nodes_notify_jobs(dormant_fr_nodes, "Reservation is not active, nodes cannot be resumed", resume_data)

    nodes, flex_managed = util.separate(lambda n: lkp.is_flex_node(n) and (lkp.instance(n) is not None), nodes)
    if flex_managed:
        # TODO(FLEX): This is weak assumption, VM may not exist, but still be managed by MIG (e.g. still provisioning)
        # Inspect all present MIGs instead.
        # Particularly CRITICAL due to ActionOnFailure=DO_NOTHING, hence deleted VMs will not be repaired.
        log.warning(f"Resume was unable to resume nodes={flex_managed} already managed by MIGs")
        down_nodes_notify_jobs(flex_managed, "VM is managed MIG, can not be resumed", resume_data)

    if not nodes:
        log.info("No nodes to resume")
        return

    nodes = sorted(nodes, key=lkp.node_prefix)
    grouped_nodes = group_nodes_bulk(nodes, resume_data, lkp)

    if log.isEnabledFor(logging.DEBUG):
        grouped_nodelists = {
            group: to_hostlist(chunk.nodes) for group, chunk in grouped_nodes.items()
        }
        log.debug(
            "node bulk groups: \n{}".format(yaml.safe_dump(grouped_nodelists).rstrip())
        )

    tpu_chunks, flex_chunks = [], []
    bi_inserts = {}

    for group, chunk in grouped_nodes.items():
        model = chunk.nodes[0]

        if lkp.node_is_tpu(model):
            tpu_chunks.append(chunk.nodes)
        elif lkp.is_flex_node(model):
            flex_chunks.append(chunk)
        else:
            bi_inserts[group] = create_instances_request(
                chunk.nodes, chunk.placement_group, chunk.excl_job_id
            )

    for chunk in flex_chunks:
        mig_flex.resume_flex_chunk(chunk.nodes, chunk.excl_job_id, lkp)

    # execute all bulkInsert requests  with batch
    bulk_ops = dict(
        zip(bi_inserts.keys(), map_with_futures(ensure_execute, bi_inserts.values()))
    )
    log.debug(f"bulk_ops={yaml.safe_dump(bulk_ops)}")
    started = {
        group: op for group, op in bulk_ops.items() if not isinstance(op, Exception)
    }
    failed = {
        group: err for group, err in bulk_ops.items() if isinstance(err, Exception)
    }
    if failed:
        failed_reqs = [str(e) for e in failed.items()]
        log.error("bulkInsert API failures: {}".format("; ".join(failed_reqs)))
        for ident, exc in failed.items():
            down_nodes_notify_jobs(grouped_nodes[ident].nodes, f"GCP Error: {exc._get_reason()}", resume_data) # type: ignore

    if log.isEnabledFor(logging.DEBUG):
        for group, op in started.items():
            group_nodes = grouped_nodelists[group]
            name = op["name"]
            gid = op["operationGroupId"]
            log.debug(
                f"new bulkInsert operation started: group={group} nodes={group_nodes} name={name} operationGroupId={gid}"
            )
    # wait for all bulkInserts to complete and log any errors
    bulk_operations = {group: wait_for_operation(op) for group, op in started.items()}

    # Start TPU after regular nodes so that regular nodes are not affected by the slower TPU nodes
    execute_with_futures(tpu.start_tpu, tpu_chunks)

    for group, op in bulk_operations.items():
        _handle_bulk_insert_op(op, grouped_nodes[group].nodes, resume_data)