def clone_with_placement_group()

in src/hpc/autoscale/node/bucket.py [0:0]


    def clone_with_placement_group(self, pg_name: PlacementGroup) -> "NodeBucket":
        assert self.valid
        if self.placement_group:
            # This will help us avoid available_count issues with existing pg limits
            # when no placement group is defined, the pg limit will be None so
            # min(self.max_scaleset_size, self.available_count) is valid
            raise RuntimeError(
                "clone_with_placement_group is only supported when invoked on a bucket that has no placement group."
            )
        new_def = deepcopy(self.__definition)
        new_def.placement_group = pg_name
        lim = self.limits

        new_pg_limits = _SharedLimit(
            "PlacementGroup({})".format(pg_name),
            consumed_core_count=0,
            max_core_count=self.max_placement_group_size * self.vcpu_count,
            consumed_count=0,
            max_count=self.max_placement_group_size,
        )

        available_count = min(self.max_placement_group_size, self.available_count)

        new_limits = BucketLimits(
            self.vcpu_count,
            regional_limits=lim._regional_limits,
            cluster_limits=lim._cluster_limits,
            nodearray_limits=lim._nodearray_limits,
            family_limits=lim._family_limits,
            placement_group_limits=new_pg_limits,
            active_core_count=0,
            active_count=0,
            available_core_count=available_count * self.vcpu_count,
            available_count=available_count,
            max_core_count=0,
            max_count=available_count,
        )

        return NodeBucket(
            new_def,
            new_limits,
            self.max_placement_group_size,
            nodes=[],
            artificial=False,
            priority=self.priority,
            last_capacity_failure=self.__last_capacity_failure,
            valid=True
        )