cli/straggler_check.py (73 lines of code) (raw):
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Check for running a Pipeline Parallelism straggler detection test on a cluster."""
import click
import common
import check
import gke_check
NAME = 'straggler'
_DESCRIPTION = (
'Runs a Pipeline Parallelism straggler detection test on a cluster.'
)
def get_check_for_orchestrator(
orchestrator: str,
machine_type: str,
nodes: list[str],
run_only_on_available_nodes: bool,
dry_run: bool = False,
) -> check.Check:
"""Returns the appropriate check for the given orchestrator."""
match orchestrator:
case 'gke':
return GkeStragglerCheck(
machine_type=machine_type,
nodes=nodes,
run_only_on_available_nodes=run_only_on_available_nodes,
dry_run=dry_run,
)
case _:
raise ValueError(f'Unsupported orchestrator: {orchestrator}')
class GkeStragglerCheck(gke_check.GkeCheck):
"""Runs a Pipeline Parallelism Straggler Detection test on a cluster."""
# Explicitly exclude not supported machine types
_SUPPORTED_MACHINE_TYPES = frozenset(
machine_type
for machine_type in common.SUPPORTED_MACHINE_TYPES
if machine_type not in ['a4-highgpu-8g']
)
launch_label = 'aiinfra/straggler-healthcheck-test'
results_labels = [
'aiinfra/straggler-healthcheck-runtime-sec',
]
def __init__(
self,
machine_type: str,
nodes: list[str],
run_only_on_available_nodes: bool = False,
dry_run: bool = False,
**kwargs,
):
super().__init__(
name=NAME,
description=_DESCRIPTION,
machine_type=machine_type,
supported_machine_types=self._SUPPORTED_MACHINE_TYPES,
launch_label=self.launch_label,
results_labels=self.results_labels,
nodes=nodes,
run_only_on_available_nodes=run_only_on_available_nodes,
timeout_sec=15 * 60,
dry_run=dry_run,
container_name='straggler-detection-test',
**kwargs,
)
def get_check_pod(self) -> str | None:
"""Get the name of the canonical pod for the check."""
pod_names = [
pod.metadata.name
for pod in self._v1.list_pod_for_all_namespaces().items
if 'chs-hc-straggler-node0' in pod.metadata.name
]
pod = None
if pod_names:
pod = pod_names[0]
return pod
def clean_up(self):
if self.check_logs:
last_line = self.check_logs.splitlines()[-1]
if 'Results at' in last_line:
click.echo(last_line)
super().clean_up()