in src/health_runner/health_runner.py [0:0]
def run_health_check(sleep_time: int) -> None:
"""Run the health check."""
# PREPARATION
ensure_env_variables(
required_envs={
"DRY_RUN",
"HELM_CHART", # Must be defined since can't assume health check type
},
)
# This must be defined in the YAML configuration
helm_chart_path = _HELM_CHART
#
helm_chart_version = _HELM_CHART_VERSION
#
helm_install_flags = _HELM_INSTALL_FLAGS
#
helm_values: dict[str, str] = dict()
node_names = os.environ.get("HOSTS_CSV", "nil")
if node_names != "nil":
node_names = node_names.split(",")
else:
node_names = (
checker_common.run_command(_K_NAME_GPU_NODES_IN_CLUSTER_COMMAND)
.stdout.strip()
.split("\n")
)
num_nodes = os.environ.get("N_NODES", "nil")
if num_nodes == "nil":
num_nodes = len(node_names)
else:
num_nodes = int(num_nodes)
node_names_csv = r"\,".join(node_names)
# Determine number of tests to run
num_tests = determine_test_iterations(num_nodes=num_nodes)
logging.info("Creating %d tests...", num_tests)
# Pass Node Names & Number of Nodes to all health checks
helm_values["health_check.env.HOSTS_CSV"] = f'"{node_names_csv}"'
helm_values["health_check.env.N_NODES"] = str(num_nodes)
# Pass all other environment variables to health checks
for key, value in os.environ.items():
if key.startswith(_HC_ENV_PREFIX):
# Strip the _HC_ENV_PREFIX prefix and convert to Helm value format
helm_key = f"health_check.env.{key[len(_HC_ENV_PREFIX):]}"
helm_values[helm_key] = f'"{value}"'
# RUN HC
release_names = []
for i in range(num_tests):
# If Helm release name is not unique, it will not install the release
short_guid = str(uuid.uuid4())[:8]
hc_release_name_suffix = f"{i}-{short_guid}"
if _HELM_RELEASE_NAME_BASE:
unique_release_name = (
f"{_HELM_RELEASE_NAME_BASE}-{hc_release_name_suffix}"
)
else:
unique_release_name = f"chs-hc-{hc_release_name_suffix}"
release_names.append(unique_release_name)
helm_values["job.name"] = f"chs-hc-{i}-{short_guid}"
cleanup_functions.extend(
checker_common.create_helm_release(
helm_path=_HELM,
release_name=unique_release_name,
chart=helm_chart_path,
values=helm_values,
chart_version=helm_chart_version,
helm_install_flags=helm_install_flags,
)
)
# Count of tests deployed should start at 1 to make it clear
logging.info("Deployed test %d (%d of %d total)", i, i + 1, num_tests)
logging.info(
"Waiting for maximum of %s minutes before cleaning up...",
sleep_time,
)
# Helm releases & associated jobs are logged for reference outside of HR
release_jobs = checker_common.get_created_jobs(release_names)
jobs_and_releases: list[tuple[str, str]] = list(
zip(release_jobs, release_names)
)
logging.info(
"Helm charts and associated jobs: %s",
jobs_and_releases,
)
# Sleep until all jobs are complete or timeout is reached
checker_common.wait_till_jobs_complete(
job_v1=client.BatchV1Api(),
jobs_to_monitor=release_jobs,
timeout_seconds=(sleep_time * 60),
check_interval=10,
)
post_run_cleanup()