def group_nodes_bulk()

in ansible/roles/slurm/files/scripts/resume.py [0:0]


def group_nodes_bulk(nodes, resume_data=None):
    """group nodes by job_id, placement_group, node_group, and max bulkInsert size"""
    if resume_data is None:
        # all nodes will be considered jobless
        jobs = {}
    else:
        jobs = {job.job_id: job for job in resume_data.jobs}

    # expand all job nodelists
    for job in jobs.values():
        job.nodelist_alloc = job.nodes_alloc
        job.nodes_alloc = util.to_hostnames(job.nodelist_alloc)
        job.nodelist_resume = job.nodes_resume
        job.nodes_resume = util.to_hostnames(job.nodelist_resume)
        job.tpu = util.part_is_tpu(job.partition)
        if not job.tpu:
            # create placement groups if nodes for job need it
            job.placement_groups = create_placement_groups(
                node_list=job.nodes_alloc,
                job_id=job.job_id,
            )
            # placement group assignment is based on all allocated nodes, but we only want to
            # handle nodes in nodes_resume in this run.
            for pg, pg_nodes in job.placement_groups.items():
                job.placement_groups[pg] = list(
                    set(pg_nodes).intersection(job.nodes_resume)
                )
    # a bit of a hack, but nodes resumed using scontrol instead of through job scheduling do not have a job
    jobless_nodes = list(
        set(nodes).difference(
            chain.from_iterable(job.nodes_resume for job in jobs.values())
        )
    )
    jobless_nodes_tpu = []
    for jobless_node in jobless_nodes[:]:
        if lkp.node_is_tpu(jobless_node):
            jobless_nodes.remove(jobless_node)
            jobless_nodes_tpu.append(jobless_node)

    jobs["Normal_None"] = NSDict(
        job_id=None,
        nodes_resume=jobless_nodes,
        nodes_alloc=jobless_nodes,
        placement_groups=create_placement_groups(node_list=jobless_nodes),
        partition=None,
        tpu=False,
    )
    jobs["TPU_None"] = NSDict(
        job_id=None,
        nodes_resume=jobless_nodes_tpu,
        nodes_alloc=jobless_nodes_tpu,
        partition=None,
        tpu=True,
    )

    BulkChunk = collections.namedtuple(
        "BulkChunk",
        ["prefix", "job_id", "partition_name", "placement_group", "nodes", "i"],
    )
    BulkChunkTPU = collections.namedtuple(
        "BulkChunkTPU",
        ["prefix", "job_id", "partition_name", "nodes", "i"],
    )
    grouped_nodes = [
        BulkChunk(
            prefix,
            job_id if job_id != "Normal_None" else None,
            jobs[job_id].partition,
            placement_group,
            chunk_nodes,
            i,
        )
        for job_id, job in jobs.items()
        if not job.tpu
        for placement_group, pg_nodes in job.placement_groups.items()
        for prefix, nodes in util.groupby_unsorted(pg_nodes, lkp.node_prefix)
        for i, chunk_nodes in enumerate(chunked(nodes, n=BULK_INSERT_LIMIT))
    ]
    grouped_nodes_tpu = [
        BulkChunkTPU(
            prefix,
            job_id if job_id != "TPU_None" else None,
            jobs[job_id].partition,
            chunk_nodes,
            i,
        )
        for job_id, job in jobs.items()
        if job.tpu
        for prefix, nodes in util.groupby_unsorted(job.nodes_resume, lkp.node_prefix)
        for i, chunk_nodes in enumerate(lkp.chunk_tpu_nodes(list(nodes)))
    ]

    def group_name(chunk: BulkChunk):
        if chunk.placement_group is not None:
            return f"{chunk.prefix}:job{chunk.job_id}:{chunk.placement_group}:{chunk.i}"
        if chunk.job_id is not None:
            return f"{chunk.prefix}:job{chunk.job_id}:{chunk.i}"
        return f"{chunk.prefix}:{chunk.i}"

    def group_name_tpu(chunk: BulkChunkTPU):
        if chunk.job_id is not None:
            return f"{chunk.prefix}:job{chunk.job_id}:{chunk.i}"
        return f"{chunk.prefix}:{chunk.i}"

    grouped_nodes = {group_name(chunk): chunk for chunk in grouped_nodes}
    grouped_nodes_tpu = {group_name_tpu(chunk): chunk for chunk in grouped_nodes_tpu}
    return grouped_nodes, grouped_nodes_tpu