cli/status.py (61 lines of code) (raw):
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Status check for the healthscan command.
This check provides the current healthscan result status of a cluster.
"""
import subprocess
import click
import common
import check
import gke_check
NAME = "status"
_DESCRIPTION = (
"A check to provide the current healthscan result status of a cluster."
)
def get_check_for_orchestrator(
orchestrator: str,
machine_type: str,
nodes: list[str],
) -> check.Check:
"""Returns the appropriate check for the given orchestrator."""
match orchestrator:
case "gke":
return GkeStatus(machine_type=machine_type, nodes=nodes)
case _:
raise ValueError(f"Unsupported orchestrator: {orchestrator}")
class GkeStatus(gke_check.GkeCheck):
"""A check to provide the current healthscan result status of a cluster."""
_SUPPORTED_MACHINE_TYPES = common.SUPPORTED_MACHINE_TYPES
_custom_cols = (
"NODE:.metadata.name,"
"TINYMAX_RESULT:.metadata.labels.aiinfra/tinymax-healthcheck-result,"
"NEPER_RESULT:.metadata.labels.aiinfra/neper-healthcheck-result,"
"GPU_RESULT:.metadata.labels.aiinfra/gpu-healthcheck-result,"
"NCCL_RESULT:.metadata.labels.aiinfra/nccl-healthcheck-result"
)
def __init__(self, machine_type: str, nodes: list[str]):
super().__init__(
name=NAME,
description=_DESCRIPTION,
machine_type=machine_type,
supported_machine_types=self._SUPPORTED_MACHINE_TYPES,
launch_label=None,
results_labels=None,
nodes=nodes,
)
def _status(self) -> str:
"""Get the current healthscan status of a GKE cluster."""
command = (
f"kubectl get nodes -o custom-columns={self._custom_cols} "
f"-l node.kubernetes.io/instance-type={self.machine_type}"
)
return subprocess.run(
command,
shell=True,
text=True,
check=False,
capture_output=True,
).stdout
def set_up(self) -> None:
"""Set up for the status check."""
# No setup is needed for the status check.
def clean_up(self) -> None:
"""Clean up after the status check."""
# No cleanup is needed for the status check.
def run(
self,
timeout_sec: int | None = None,
startup_sec: int | None = None,
) -> str | None:
"""Run the status check.
Args:
timeout_sec: The timeout in seconds for the check.
startup_sec: The time in seconds to wait for the health runner to start.
Returns:
The status of the cluster as a string.
"""
click.echo("Performing status check...")
status = self._status()
click.echo(status)
return status