cli/slurm_check.py (85 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """A Slurm implementation of the healthscan check interface.""" import subprocess import click import check class SlurmCheck(check.Check): """A check to provide the current healthscan result status of a cluster.""" def __init__( self, name: str, description: str, machine_type: str, check_flag: str, partition: str, nodes: list[str], supported_machine_types: frozenset[str], dry_run: bool = False, ): """Initializes a check to run on a Slurm cluster. Args: name: The name of the check. description: The description of the check. machine_type: The machine type of the cluster to run the check on. check_flag: The flag to pass to the cluster-validation.sh script. partition: The partition to run the check on. nodes: The nodes to run the check on. supported_machine_types: The machine types supported by the check. dry_run: Whether to run the check in dry run mode. """ super().__init__( name=name, description=description, machine_type=machine_type, supported_machine_types=supported_machine_types, dry_run=dry_run, ) self.check_flag = check_flag self.partition = partition self.nodes = _expand_slurm_nodes(nodes) def _get_slurm_run_command(self) -> list[str]: """Builds the command to run the slurm check.""" relative_path = 'deploy/slurm' command = [ f'{relative_path}/cluster-validation.sh', '--nodelist', ','.join(self.nodes), f'--{self.check_flag}', '--partition', self.partition, '--nodes', f'{len(self.nodes)}', f'--relative-exec-path={relative_path}', '--results-dir=results', ] return command def set_up(self) -> None: """Slurm set_up is not yet supported.""" # No setup is needed for the status check. def clean_up(self) -> None: """Slurm clean_up is not yet supported.""" # No cleanup is needed for the status check. def run( self, timeout_sec: int | None = None, startup_sec: int | None = None, ) -> str | None: """Run the status check. Args: timeout_sec: The timeout in seconds for the check. startup_sec: The time in seconds to wait for the health runner to start. Returns: The status of the cluster as a string. """ click.echo(f'Performing {self.name} check...') command = self._get_slurm_run_command() if self.dry_run: click.echo( click.style( f'Running {self.name} check in dry run mode...', fg='red', bold=True, ) ) dry_run_command = ' '.join(command) click.echo(f'Skipping running command: {dry_run_command}') return None result = subprocess.run( command, text=True, check=False, capture_output=True ).stdout click.echo(result) return result def _expand_slurm_nodes(nodes: list[str]) -> list[str]: """Expands a list of slurm nodes into a list of nodes.""" nodelist = [] for node in nodes: nodelist.extend(_expand_slurm_node_pattern(node)) return nodelist def _expand_slurm_node_pattern(node_pattern: str) -> list[str]: """Expands a slurm node pattern into a list of nodes.""" slurm_nodelist_expansion_cmd = [ 'scontrol', 'show', 'hostname', node_pattern, ] output = subprocess.run( slurm_nodelist_expansion_cmd, text=True, check=True, capture_output=True, ).stdout.strip() nodes = output.split('\n') return nodes