in src/hpc/autoscale/job/driver.py [0:0]
def add_default_placement_groups(config: Dict, node_mgr: NodeManager) -> None:
nas = config.get("nodearrays", {})
for name, child in nas.items():
if child.get("placement_groups"):
return
by_pg = partition(
node_mgr.get_buckets(), lambda b: (b.nodearray, b.placement_group)
)
by_na_vm = partition(node_mgr.get_buckets(), lambda b: (b.nodearray, b.vm_size))
for key, buckets in by_na_vm.items():
nodearray, vm_size = key
non_pg_buckets = [b for b in buckets if not b.placement_group]
if not non_pg_buckets:
# hardcoded PlacementGroupId
logging.debug(
"Nodearray %s defines PlacementGroupId, so no additional "
+ "placement groups will be created automatically.",
nodearray,
)
continue
bucket = non_pg_buckets[0]
if not bucket.supports_colocation:
continue
buf_size = int(
nas.get(nodearray, {}).get("generated_placement_group_buffer", 2)
)
max_placement_groups = int(
nas.get(nodearray, {}).get("max_placement_groups", 10000)
)
node_mgr.add_placement_group
existing_pgs = len(node_mgr.get_placement_groups(bucket))
buf_size = min(max_placement_groups - existing_pgs, buf_size)
buf_remaining = buf_size
pgi = 0
while buf_remaining > 0:
pg_name = ht.PlacementGroup("{}_pg{}".format(vm_size, pgi))
pg_key = (nodearray, pg_name)
if pg_key not in by_pg:
logging.fine("Adding placement group %s", pg_name)
node_mgr.add_placement_group(pg_name, bucket)
buf_remaining -= 1
pgi += 1