in src/slurm_plugin/resume.py [0:0]
def _resume(arg_nodes, resume_config, slurm_resume):
"""Launch new EC2 nodes according to nodes requested by slurm."""
# Check heartbeat
current_time = datetime.now(tz=timezone.utc)
if not is_clustermgtd_heartbeat_valid(
current_time, resume_config.clustermgtd_timeout, resume_config.clustermgtd_heartbeat_file_path
):
log.error(
"No valid clustermgtd heartbeat detected, clustermgtd is down!\n"
"Please check clustermgtd log for error.\n"
"Not launching nodes %s",
arg_nodes,
)
_handle_failed_nodes(arg_nodes)
return
log.info("Launching EC2 instances for the following Slurm nodes: %s", arg_nodes)
node_list = []
node_list_with_status = []
for node in get_nodes_info(arg_nodes):
node_list.append(node.name)
node_list_with_status.append((node.name, node.state_string))
log.info("Current state of Slurm nodes to resume: %s", node_list_with_status)
instance_manager = InstanceManager(
region=resume_config.region,
cluster_name=resume_config.cluster_name,
boto3_config=resume_config.boto3_config,
table_name=resume_config.dynamodb_table,
hosted_zone=resume_config.hosted_zone,
dns_domain=resume_config.dns_domain,
use_private_hostname=resume_config.use_private_hostname,
head_node_private_ip=resume_config.head_node_private_ip,
head_node_hostname=resume_config.head_node_hostname,
fleet_config=resume_config.fleet_config,
run_instances_overrides=resume_config.run_instances_overrides,
create_fleet_overrides=resume_config.create_fleet_overrides,
job_level_scaling=resume_config.job_level_scaling,
)
instance_manager.add_instances(
slurm_resume=slurm_resume,
node_list=node_list,
launch_batch_size=resume_config.launch_max_batch_size,
assign_node_batch_size=resume_config.assign_node_max_batch_size,
terminate_batch_size=resume_config.terminate_max_batch_size,
update_node_address=resume_config.update_node_address,
scaling_strategy=ScalingStrategy(resume_config.scaling_strategy),
)
failed_nodes = set().union(*instance_manager.failed_nodes.values())
success_nodes = [node for node in node_list if node not in failed_nodes]
if success_nodes:
log.info("Successfully launched nodes %s", print_with_count(success_nodes))
if failed_nodes:
log.error(
"Failed to launch following nodes, setting nodes to DOWN: %s",
print_with_count(failed_nodes),
)
for error_code, node_list in instance_manager.failed_nodes.items():
_handle_failed_nodes(node_list, reason=f"(Code:{error_code})Failure when resuming nodes")
event_publisher = ClusterEventPublisher.create_with_default_publisher(
event_logger,
resume_config.cluster_name,
"HeadNode",
"slurm-resume",
resume_config.head_node_instance_id,
)
event_publisher.publish_node_launch_events(instance_manager.failed_nodes)