in azext_edge/edge/providers/check/base/deployment.py [0:0]
def _check_k8s_version(as_list: bool = False) -> Dict[str, Any]:
from kubernetes.client.models import VersionInfo
from ..common import MIN_K8S_VERSION
version_client = client.VersionApi()
target_k8s_version = "k8s"
check_manager = CheckManager(check_name="evalK8sVers", check_desc="Evaluate Kubernetes server")
check_manager.add_target(
target_name=target_k8s_version,
conditions=[f"(k8s version)>={MIN_K8S_VERSION}"],
)
try:
from packaging import version
version_details: VersionInfo = version_client.get_code()
except (ApiException, ImportError) as ae:
logger.debug(str(ae))
api_error_text = UNABLE_TO_DETERMINE_VERSION_MSG
check_manager.add_target_eval(
target_name=target_k8s_version,
status=CheckTaskStatus.error.value,
value=api_error_text,
)
check_manager.add_display(
target_name=target_k8s_version,
display=Padding(api_error_text, (0, 0, 0, 8)),
)
else:
major_version = version_details.major
minor_version = version_details.minor
semver = f"{major_version}.{minor_version}"
if version.parse(semver) >= version.parse(MIN_K8S_VERSION):
semver_status = CheckTaskStatus.success.value
semver_colored = f"[green]v{semver}[/green]"
else:
semver_status = CheckTaskStatus.error.value
semver_colored = f"[red]v{semver}[/red]"
k8s_semver_text = (
f"Require [bright_blue]k8s[/bright_blue] >=[cyan]{MIN_K8S_VERSION}[/cyan] detected {semver_colored}."
)
check_manager.add_target_eval(target_name=target_k8s_version, status=semver_status, value=semver)
check_manager.add_display(
target_name=target_k8s_version,
display=Padding(k8s_semver_text, (0, 0, 0, 8)),
)
return check_manager.as_dict(as_list)