in src/slurm_plugin/clustermgtd.py [0:0]
def _handle_unhealthy_static_nodes(self, unhealthy_static_nodes):
"""
Maintain any unhealthy static node.
Set node to down, terminate backing instance, and launch new instance for static node.
"""
try:
wait_function = partial(
wait_remaining_time,
wait_start_time=datetime.now(tz=timezone.utc),
total_wait_time=self._config.compute_console_wait_time,
wait_function=self._task_executor.wait_unless_shutdown,
)
self._console_logger.report_console_output_from_nodes(
compute_instances=self._instance_manager.get_compute_node_instances(
unhealthy_static_nodes,
self._config.compute_console_logging_max_sample_size,
),
task_controller=self._task_executor,
task_wait_function=wait_function,
)
except Exception as e:
log.error("Encountered exception when retrieving console output from unhealthy static nodes: %s", e)
node_list = [node.name for node in unhealthy_static_nodes]
# Set nodes into down state so jobs can be requeued immediately
try:
log.info("Setting unhealthy static nodes to DOWN")
reset_nodes(node_list, state="down", reason="Static node maintenance: unhealthy node is being replaced")
except Exception as e:
log.error("Encountered exception when setting unhealthy static nodes into down state: %s", e)
instances_to_terminate = [node.instance.id for node in unhealthy_static_nodes if node.instance]
if instances_to_terminate:
log.info("Terminating instances backing unhealthy static nodes")
self._instance_manager.delete_instances(
instances_to_terminate, terminate_batch_size=self._config.terminate_max_batch_size
)
log.info("Launching new instances for unhealthy static nodes")
self._instance_manager.add_instances(
node_list=node_list,
launch_batch_size=self._config.launch_max_batch_size,
assign_node_batch_size=self._config.assign_node_max_batch_size,
update_node_address=self._config.update_node_address,
scaling_strategy=ScalingStrategy.BEST_EFFORT,
)
# Add launched nodes to list of nodes being replaced, excluding any nodes that failed to launch
failed_nodes = set().union(*self._instance_manager.failed_nodes.values())
launched_nodes = set(node_list) - failed_nodes
self._static_nodes_in_replacement |= launched_nodes
log.info(
"After node maintenance, following nodes are currently in replacement: %s",
print_with_count(self._static_nodes_in_replacement),
)
self._event_publisher.publish_unhealthy_static_node_events(
unhealthy_static_nodes,
self._static_nodes_in_replacement,
launched_nodes,
self._instance_manager.failed_nodes,
)