cli/configcheck.py (377 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """CLI for running configcheck on a cluster. This is part of the larger cluster_diag CLI. To get the full helpstring, run `cluster_diag configcheck --help`. """ import asyncio import subprocess import click from kubernetes import client from kubernetes import config import pandas as pd import common import config as configcheck_config import config_differ import dependencies import dependency_version_parser import golden_config_parser import node_config_fetcher _SUPPORTED_MACHINE_TYPES = list(common.SUPPORTED_MACHINE_TYPES) _SUPPORTED_MACHINE_TYPES.remove('a4-highgpu-8g') _A3_ULTRAGPU_8G_DEPENDENCY_PARSERS = dependencies.DEPENDENCY_PARSERS _A3_MEGAGPU_8G_DEPENDENCY_PARSERS = dependencies.DEPENDENCY_PARSERS _A3_HIGHGPU_8G_DEPENDENCY_PARSERS = dependencies.DEPENDENCY_PARSERS _FORMAT_MARKDOWN = 'markdown' _FORMAT_JSON = 'json' def _get_k8s_nodes(machine_type: str) -> list[str]: """Returns all nodes with containers requesting GPU resources.""" config.load_kube_config() v1 = client.CoreV1Api() return [ node.metadata.name for node in v1.list_node( label_selector=f'beta.kubernetes.io/instance-type={machine_type}' ).items ] def _get_config_matrix( data: list[configcheck_config.NodeConfig], ) -> pd.DataFrame: """Returns a DataFrame representing the config data.""" columns = ['Node Name'] for attribute_name in sorted(data[0].dependencies.keys()): columns.append(attribute_name) data_rows = [node.to_csv() for node in data] return pd.DataFrame( data=data_rows, columns=columns, ) def _get_diff_matrix(data: list[configcheck_config.NodeDiff]) -> pd.DataFrame: """Returns a DataFrame representing the config data.""" columns = ['Node Name'] for attribute_name in sorted(data[0].dependency_diffs.keys()): columns.append(attribute_name) data_rows = [node.to_csv() for node in data] return pd.DataFrame( data=data_rows, columns=columns, ) def _get_workload_containers_on_node(node_name: str) -> dict[str, str]: """Returns containers actively running on a node which request GPUs.""" config.load_kube_config() v1 = client.CoreV1Api() pods = v1.list_pod_for_all_namespaces( field_selector=f'spec.nodeName={node_name}' ).items pod_to_container_names = dict() for pod in pods: pod_name = pod.metadata.name for container_status in pod.status.container_statuses: if container_status.state.running: # Get the container spec to access the resource requests for container_spec in pod.spec.containers: if ( container_spec.name == container_status.name and container_spec.resources.requests and int( container_spec.resources.requests.get('nvidia.com/gpu', 0) ) > 0 ): pod_to_container_names[pod_name] = container_spec.name break return pod_to_container_names def _get_workload_container_on_node(node_name: str) -> tuple[str, str] | None: """Returns the workload container actively running on a node which requests GPUs.""" pod_to_container_names = _get_workload_containers_on_node(node_name) if not pod_to_container_names: return None elif len(pod_to_container_names.items()) > 1: raise click.Abort( f'Multiple workload containers found on node {node_name}:' f' {pod_to_container_names.values()}. Please limit to running a single' ' workload container per node.' ) pod_name, workload_container = pod_to_container_names.popitem() return (pod_name, workload_container) def _get_zone_from_k8s_topology_label(node_name: str) -> str: """Returns the zone of the node from the K8s topology label.""" config.load_kube_config() v1 = client.CoreV1Api() node = v1.read_node(node_name) return node.metadata.labels['topology.kubernetes.io/zone'] def _fetch_node_configs( project: str, node_list: list[str], static_dependency_parsers: list[ dependency_version_parser.DependencyVersionParser ], zone: str | None, workload_container: str | None, sudo: bool = False, verbose: bool = False, ) -> list[configcheck_config.NodeConfig]: """Fetches the current configuration settings for a given node. Args: project: The project of the workload to be checked. node_list: The list of nodes to fetch configs for. static_dependency_parsers: The dependency parsers to use for the static dependencies. zone: The zone of the workload to be checked. workload_container: The name of the workload container to fetch NCCL configs from. If not specified, NCCL configs will not be fetched. sudo: Whether to run remote commands with sudo. Defaults to False. verbose: Whether to enable verbose logging. Defaults to False. Returns: A list of NodeConfig objects, one for each node. """ with click.progressbar( label='Fetching node configs', length=len(node_list), ) as progress_bar: node_data = [] for node in node_list: pod_name = None if workload_container is None: pod_name_to_container_name = _get_workload_container_on_node(node) if pod_name_to_container_name: pod_name = pod_name_to_container_name[0] workload_container = pod_name_to_container_name[1] if zone is None: zone = _get_zone_from_k8s_topology_label(node) dynamic_dependency_parsers = dependencies.get_dynamic_dependency_parsers( node, zone, pod_name=pod_name, workload_container=workload_container ) fetcher = node_config_fetcher.NodeConfigFetcher( name=node, zone=zone, project=project, dependency_parsers=static_dependency_parsers + dynamic_dependency_parsers, sudo=sudo, verbose=verbose, ) node_data.append(fetcher.fetch_config()) progress_bar.update(1) return node_data def _get_gcloud_config_value(key: str) -> str: """Returns the value of a gcloud config key.""" result = subprocess.run( ['gcloud', 'config', 'get-value', key], check=True, capture_output=True, text=True, ) if result.returncode != 0 or result.stdout in (None, '(unset)'): error_msg = ( f'Failed to get gcloud config value for {key}. Please run `gcloud' f' config set {key} <value>` and try again.' ) click.echo( click.style( error_msg, fg='red', bold=True, ) ) raise click.Abort(error_msg) value = result.stdout.strip() click.echo( click.style( f'{key} not set. Inferring {key} from `gcloud config get' f' {key}`: {value}', fg='yellow', bold=True, ) ) return value async def _fetch_node_configs_async( project: str, node_list: list[str], static_dependency_parsers: list[ dependency_version_parser.DependencyVersionParser ], zone: str | None, workload_container: str | None, sudo: bool = False, verbose: bool = False, ) -> list[configcheck_config.NodeConfig]: """Asynchronously fetches the current configuration settings for a given node. Args: project: The project of the workload to be checked. node_list: The list of nodes to fetch configs for. static_dependency_parsers: The dependency parsers to use for the static dependencies. zone: The zone of the workload to be checked. workload_container: The name of the workload container to fetch NCCL configs from. If not specified, NCCL configs will not be fetched. sudo: Whether to run remote commands with sudo. Defaults to False. verbose: Whether to enable verbose logging. Defaults to False. Returns: A list of NodeConfig objects, one for each node. """ tasks = [] with click.progressbar( label='[Async] Fetching node configs', length=len(node_list), ) as progress_bar: for node in node_list: pod_name = None if workload_container is None: pod_name_to_container_name = _get_workload_container_on_node(node) if pod_name_to_container_name: pod_name = pod_name_to_container_name[0] workload_container = pod_name_to_container_name[1] dynamic_dependency_parsers = dependencies.get_dynamic_dependency_parsers( node, zone, pod_name=pod_name, workload_container=workload_container ) fetcher = node_config_fetcher.NodeConfigFetcher( name=node, zone=zone, project=project, dependency_parsers=static_dependency_parsers + dynamic_dependency_parsers, sudo=sudo, verbose=verbose, ) task = asyncio.create_task(fetcher.fetch_config_async()) task.add_done_callback(lambda _: progress_bar.update(1)) tasks.append(task) configs = list(await asyncio.gather(*tasks)) # Reset the terminal to the original state. # subprocess.run(['reset'], check=True) return configs @click.command(name='configcheck') @click.argument( 'machine_type', type=click.Choice(_SUPPORTED_MACHINE_TYPES, case_sensitive=False), ) @click.option( '-n', '--nodes', default='', help=( 'A comma-separated list of nodes to run checks on. Defaults to running' ' on all nodes.' ), ) @click.option( '--skip_diff', '--nodiff', default=False, is_flag=True, help=( 'If true, only print the node configs without diffing against the' ' golden config.' ), ) @click.option( '--run_async', '--async', default=False, is_flag=True, help=( '[Experimental] If true, run the configcheck in async mode. This will' ' reset your terminal as part of the process.' ), ) @click.option( '--project', default=None, help=( 'The project of the workload to be checked. If not specified, the' ' project will be inferred from `gcloud config get project`' ), ) @click.option( '--zone', default=None, help=( 'The zone of the workload to be checked. If not specified, the zone' ' will be inferred per node from the `topology.kubernetes.io/zone`' ' label.' ), ) @click.option( '--workload_container', default=None, help=( 'The name of the workload container to fetch workload configs from. If' ' not specified, the workload container will be inferred from the node.' ), ) @click.option( '--output_format', default=_FORMAT_MARKDOWN, type=click.Choice([_FORMAT_MARKDOWN, _FORMAT_JSON]), help=( 'The format to print the output in. Defaults to markdown. Other' ' supported formats are `csv` and `json`.' ), ) @click.option( '--sudo', default=False, is_flag=True, help=( 'Run remote commands with sudo. Note: This is sometimes necessary to ' 'fetch configs on nodes/pods/containers with restricted permissions.' ), ) @click.option( '--verbose', '-v', default=False, is_flag=True, help='Enable verbose logging.', ) @click.pass_context def cli( ctx: click.Context, machine_type: str, nodes: str, skip_diff: bool, run_async: bool, project: str | None, zone: str | None, workload_container: str | None, output_format: str, sudo: bool, verbose: bool, ): """Run a configcheck on a cluster.""" orchestrator = ctx.obj['orchestrator'] if project is None: project = _get_gcloud_config_value('project') if nodes: node_list = nodes.split(',') else: node_list = common.run_for_orchestrator( orchestrator=orchestrator, gke_function=lambda: _get_k8s_nodes(machine_type), slurm_function=lambda: click.Abort( 'configcheck is not yet supported for Slurm clusters' ), ) match machine_type: case 'a3-ultragpu-8g': dependency_parsers = _A3_ULTRAGPU_8G_DEPENDENCY_PARSERS case 'a3-megagpu-8g': dependency_parsers = _A3_MEGAGPU_8G_DEPENDENCY_PARSERS case 'a3-highgpu-8g': dependency_parsers = _A3_HIGHGPU_8G_DEPENDENCY_PARSERS case _: raise click.Abort( f'Unsupported machine type: {machine_type}. Supported machine types:' f' {_SUPPORTED_MACHINE_TYPES}' ) if run_async: click.echo( click.style( 'WARNING: Running configcheck in async mode is experimental. Your' ' terminal may be reset as part of this process.', fg='red', bold=True, ) ) node_data = asyncio.run( _fetch_node_configs_async( project=project, zone=zone, node_list=node_list, static_dependency_parsers=dependency_parsers, workload_container=workload_container, sudo=sudo, verbose=verbose, ) ) else: node_data = _fetch_node_configs( project=project, zone=zone, node_list=node_list, static_dependency_parsers=dependency_parsers, workload_container=workload_container, sudo=sudo, verbose=verbose, ) if skip_diff: df = _get_config_matrix(node_data) else: golden_config = golden_config_parser.get_golden_configs( dependency_parsers=dependency_parsers + dependencies.get_dynamic_dependency_parsers( node_name='golden', zone=zone, workload_container='golden' ), machine_type=machine_type, )[0] node_diffs = [ config_differ.diff_configs( experiment=node, golden=golden_config, ) for node in node_data ] df = _get_diff_matrix(node_diffs) output_data = None if output_format == _FORMAT_MARKDOWN: output_data = df.to_markdown(tablefmt='pipe') elif output_format == _FORMAT_JSON: output_data = df.to_json() click.echo(output_data)