def add_default_placement_groups()

in src/hpc/autoscale/job/driver.py [0:0]


def add_default_placement_groups(config: Dict, node_mgr: NodeManager) -> None:
    nas = config.get("nodearrays", {})
    for name, child in nas.items():
        if child.get("placement_groups"):
            return

    by_pg = partition(
        node_mgr.get_buckets(), lambda b: (b.nodearray, b.placement_group)
    )
    by_na_vm = partition(node_mgr.get_buckets(), lambda b: (b.nodearray, b.vm_size))

    for key, buckets in by_na_vm.items():
        nodearray, vm_size = key
        non_pg_buckets = [b for b in buckets if not b.placement_group]
        if not non_pg_buckets:
            # hardcoded PlacementGroupId
            logging.debug(
                "Nodearray %s defines PlacementGroupId, so no additional "
                + "placement groups will be created automatically.",
                nodearray,
            )
            continue
        bucket = non_pg_buckets[0]
        if not bucket.supports_colocation:
            continue

        buf_size = int(
            nas.get(nodearray, {}).get("generated_placement_group_buffer", 2)
        )
        max_placement_groups = int(
            nas.get(nodearray, {}).get("max_placement_groups", 10000)
        )

        node_mgr.add_placement_group
        existing_pgs = len(node_mgr.get_placement_groups(bucket))
        buf_size = min(max_placement_groups - existing_pgs, buf_size)
        buf_remaining = buf_size
        pgi = 0
        while buf_remaining > 0:
            pg_name = ht.PlacementGroup("{}_pg{}".format(vm_size, pgi))
            pg_key = (nodearray, pg_name)
            if pg_key not in by_pg:
                logging.fine("Adding placement group %s", pg_name)
                node_mgr.add_placement_group(pg_name, bucket)
                buf_remaining -= 1
            pgi += 1