in cookbooks/aws-parallelcluster-slurm/files/default/head_node_slurm/slurm/pcluster_fleet_config_generator.py [0:0]
def generate_fleet_config_file(output_file: str, input_file: str):
"""
Generate configuration file used by Fleet Manager in node daemon package.
Generate fleet-config.json
{
"my-queue": {
"fleet-compute-resource": {
"Api": "create-fleet",
"CapacityType": "on-demand|spot|capacity-block",
"AllocationStrategy": "lowest-price|capacity-optimized",
"Instances": [
{ "InstanceType": "p4d.24xlarge" }
],
"MaxPrice": "",
"Networking": {
"SubnetIds": ["subnet-123456"]
},
"CapacityReservationId": "id"
}
"single-compute-resource": {
"Api": "run-instances",
"CapacityType": "on-demand|spot|capacity-block",
"AllocationStrategy": "lowest-price|capacity-optimized",
"Instances": [
{ "InstanceType": ... }
],
"CapacityReservationId": "id"
}
}
}
"""
cluster_config = _load_cluster_config(input_file)
queue_name, compute_resource_name = None, None
try:
fleet_config = {}
for queue_config in cluster_config["Scheduling"]["SlurmQueues"]:
queue_name = queue_config["Name"]
# Retrieve capacity info from the queue_name, if there
queue_capacity_type = CAPACITY_TYPE_MAP.get(queue_config.get("CapacityType", "ONDEMAND"))
queue_allocation_strategy = queue_config.get("AllocationStrategy")
queue_capacity_reservation_target = queue_config.get("CapacityReservationTarget", {})
queue_capacity_reservation = (
queue_capacity_reservation_target.get("CapacityReservationId")
if queue_capacity_reservation_target
else None
)
fleet_config[queue_name] = {}
for compute_resource_config in queue_config["ComputeResources"]:
compute_resource_name, config_for_fleet = _generate_compute_resource_fleet_config(
compute_resource_config=compute_resource_config,
queue_name=queue_name,
queue_allocation_strategy=queue_allocation_strategy,
queue_capacity_reservation=queue_capacity_reservation,
queue_capacity_type=queue_capacity_type,
queue_subnets=queue_config["Networking"]["SubnetIds"],
)
fleet_config[queue_name][compute_resource_name] = config_for_fleet
except (KeyError, AttributeError) as e:
if isinstance(e, KeyError):
message = f"Unable to find key {e} in the configuration file."
else:
message = f"Error parsing configuration file. {e}. {traceback.format_exc()}."
message += f" Queue: {queue_name}" if queue_name else ""
log.error(message)
raise CriticalError(message)
log.info("Generating %s", output_file)
with open(output_file, "w", encoding="utf-8") as output:
output.write(json.dumps(fleet_config, indent=4))
log.info("Finished.")