cli/launch_helm.py (123 lines of code) (raw):
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Useful functions for running parts of Cluster Health Scanner.
This library is intended to be used by the Cluster Health Scanner CLI. It
emulates what a user would do when running CHS manually.
"""
import json
import subprocess
def _run_command(
command: str,
check: bool = False,
) -> subprocess.CompletedProcess[str]:
"""Execute a shell command using subprocess.
Args:
command: The shell command to be executed.
check: If True, raises CalledProcessError if the command returns a non-zero
exit status. Defaults to True.
Returns:
The result object containing information about the completed process.
"""
diag = subprocess.run(
command,
shell=True,
text=True,
check=check,
capture_output=True,
)
return diag
def _generate_helm_command(
hc_type: str,
chart_name: str,
release_name: str,
namespace: str | None = None,
values_file: str | None = None,
set_values: dict[str, str] | None = None,
) -> str:
"""Generates a Helm command for installing a chart.
Args:
hc_type: The type of health check to deploy.
chart_name: The name of the Helm chart.
release_name: The name of the Helm release.
namespace: The namespace to deploy the chart to.
values_file: The path to a YAML file containing values to override.
set_values: A dictionary of values to override.
Returns:
A list of strings representing the Helm command.
"""
helm_install_command: str = f'helm install {release_name} {chart_name} '
if namespace:
helm_install_command += f'-n {namespace} '
if values_file:
helm_install_command += f'-f {values_file} '
if set_values:
for k, v in set_values.items():
helm_install_command += f'--set {k}={v} '
# Turn off all health checks
helm_install_command += '--set health_checks.gpu_healthcheck.run_check=false '
helm_install_command += (
'--set health_checks.nccl_healthcheck.run_check=false '
)
helm_install_command += (
'--set health_checks.neper_healthcheck.run_check=false '
)
helm_install_command += (
'--set health_checks.straggler_healthcheck.run_check=false '
)
helm_install_command += (
'--set health_checks.tinymax_healthcheck.run_check=false '
)
helm_install_command += (
f'--set health_checks.{hc_type}_healthcheck.run_check=true '
)
return helm_install_command
def deploy_health_runner(
hr_release_name: str,
hc_type: str,
wait: int,
values_file: str | None = None,
hc_release_name_base: str | None = None,
additional_helm_env_vars: dict[str, str] | None = None,
dry_run: bool = False,
) -> str:
"""Deploy health runner.
Args:
hr_release_name: The name of the health runner release.
hc_type: The type of health check to deploy.
wait: The wait time in minutes to complete.
values_file: The relative path to a YAML file containing values to override.
hc_release_name_base: The unique ID to use for health check release names.
If None, will default to the Health Runner's default.
additional_helm_env_vars: A dictionary of additional Helm environment
variables to set.
dry_run: If True, the install command will be returned but not executed.
Returns:
The name of the health runner pod. If dry_run is True, this will be the
command that would have been run.
"""
specific_set_values: dict[str, str] = {
f'health_checks.{hc_type}_healthcheck.env.TIMEOUT_MINUTES': str(wait),
}
if additional_helm_env_vars:
specific_set_values.update(additional_helm_env_vars)
# If hc_release_name_base is provided, set the HC helm release name to it
if hc_release_name_base:
specific_set_values[
f'health_checks.{hc_type}_healthcheck.env.HELM_RELEASE_NAME_BASE'
] = hc_release_name_base
helm_install_command = _generate_helm_command(
hc_type=hc_type,
chart_name='deploy/helm/health_runner',
release_name=hr_release_name,
namespace='default',
values_file=values_file,
set_values=specific_set_values,
)
if dry_run:
return helm_install_command
_ = _run_command(helm_install_command)
helm_resources = json.loads(
_run_command(
f'helm status {hr_release_name} --show-resources -o json'
).stdout
)
hr_job_name = helm_resources['info']['resources']['v1/Job'][0]['metadata'][
'name'
]
hr_pod = json.loads(
_run_command(f'kubectl get pod -l job-name={hr_job_name} -o json').stdout
)
hr_pod_name = hr_pod['items'][0]['metadata']['name']
return hr_pod_name
def setup_k8s_cluster(
launch_label: str,
launch_label_value: str,
results_labels: list[str],
nodes: list[str] | None = None,
) -> None:
"""Set up cluster/nodes as necessary before setting up and running CHS.
This can include removing labels from previous runs so setup can be done
correctly.
Args:
launch_label: The label a node must have for the health check to run.
launch_label_value: The value of the launch label.
results_labels: All labels that the healthcheck writes to the node.
nodes: The nodes to set up. If None, all nodes will be set up.
"""
nodes_to_setup = ' '.join(nodes) if nodes else '--all'
kubectl_label_nodes_base_command = 'kubectl label nodes'
# Remove past launch labels on all nodes (only current nodes will run)
remove_launch_labels_command = (
f'{kubectl_label_nodes_base_command} --all {launch_label}- '
)
_run_command(remove_launch_labels_command.strip())
# Remove past labels on all nodes (can block node affinity)
remove_labels_command = (
f'{kubectl_label_nodes_base_command} {nodes_to_setup} '
+ ' '.join(f'{label}-' for label in results_labels)
)
_run_command(remove_labels_command.strip())
# Add labels only for the nodes to be tested
add_labels_command = (
f'{kubectl_label_nodes_base_command} {nodes_to_setup} '
f'{launch_label}={launch_label_value} '
)
_run_command(add_labels_command.strip())
def cleanup_k8s_cluster(
hr_release_name: str,
launch_label: str,
nodes: list[str] | None = None,
) -> None:
"""Uninstall helm chart for health check and remove labels from nodes.
Args:
hr_release_name: The name of the health runner release.
launch_label: The label a node must have for the health check to run.
nodes: The nodes to clean up. If None, all nodes will be cleaned up.
"""
# Uninistall helm chart
helm_uninstall_command = f'helm uninstall {hr_release_name}'
_run_command(helm_uninstall_command)
nodes_to_cleanup = ' '.join(nodes) if nodes else '--all'
remove_labels_command = (
f'kubectl label nodes {nodes_to_cleanup} {launch_label}-'
)
_run_command(remove_labels_command)