in src/slurm_plugin/clustermgtd.py [0:0]
def manage_cluster(self):
"""Manage cluster by syncing scheduler states with EC2 states and performing node maintenance actions."""
# Initialization
log.info("Managing cluster...")
self._current_time = datetime.now(tz=timezone.utc)
self._compute_fleet_status = self._compute_fleet_status_manager.get_status(fallback=self._compute_fleet_status)
log.info("Current compute fleet status: %s", self._compute_fleet_status)
if not self._config.disable_all_cluster_management:
if self._compute_fleet_status in {
None,
ComputeFleetStatus.RUNNING,
ComputeFleetStatus.PROTECTED,
}:
# Get partition_nodelist_mapping between PC-managed Slurm partitions and PC-managed Slurm nodelists
# Initialize PartitionNodelistMapping singleton
self._partition_nodelist_mapping_instance = PartitionNodelistMapping.instance()
# Get node states for nodes in inactive and active partitions
# Initialize nodes
try:
log.info("Retrieving nodes info from the scheduler")
nodes = self._get_node_info_with_retry()
log.debug("Nodes: %s", nodes)
partitions_name_map, compute_resource_nodes_map = self._parse_scheduler_nodes_data(nodes)
except Exception as e:
log.error(
"Unable to get partition/node info from slurm, no other action can be performed. Sleeping... "
"Exception: %s",
e,
)
return
# Get all non-terminating instances in EC2
try:
cluster_instances = self._get_ec2_instances()
except ClusterManager.EC2InstancesInfoUnavailable:
log.error("Unable to get instances info from EC2, no other action can be performed. Sleeping...")
return
log.debug("Current cluster instances in EC2: %s", cluster_instances)
partitions = list(partitions_name_map.values())
self._update_slurm_nodes_with_ec2_info(nodes, cluster_instances)
self._event_publisher.publish_compute_node_events(nodes, cluster_instances)
# Handle inactive partition and terminate backing instances
self._clean_up_inactive_partition(partitions)
# Perform health check actions
if not self._config.disable_all_health_checks:
self._perform_health_check_actions(partitions)
# Maintain slurm nodes
self._maintain_nodes(partitions_name_map, compute_resource_nodes_map)
# Clean up orphaned instances
self._terminate_orphaned_instances(cluster_instances)
elif self._compute_fleet_status in {
ComputeFleetStatus.STOPPED,
}:
# Since Slurm partition status might have been manually modified, when STOPPED we want to keep checking
# partitions and EC2 instances to take into account changes that can be manually
# applied by the user by re-activating Slurm partitions.
# When partition are INACTIVE, always try to reset nodeaddr/nodehostname to avoid issue.
self._maintain_nodes_down()
# Write clustermgtd heartbeat to file
self._write_timestamp_to_file()