python/graphscope/deploy/kubernetes/utils.py (288 lines of code) (raw):
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import os
import re
import sys
import threading
import time
from queue import Queue
from kubernetes import client as kube_client
from kubernetes import config as kube_config
from kubernetes.client.rest import ApiException as K8SApiException
from graphscope.framework.errors import K8sError
logger = logging.getLogger("graphscope")
def resolve_api_client(k8s_config_file=None):
"""Get ApiClient from predefined locations.
Args:
k8s_config_file(str): Path to kubernetes config file.
Raises:
RuntimeError: K8s api client resolve failed.
Returns:
An kubernetes ApiClient object, initialized with the client args.
The order of resolution as follows:
1. load from kubernetes config file or,
2. load from incluster configuration or,
3. set api address from env if `KUBE_API_ADDRESS` exist.
RuntimeError will be raised if resolution failed.
"""
try:
# load from kubernetes config file
kube_config.load_kube_config(k8s_config_file)
except: # noqa: E722
try:
# load from incluster configuration
kube_config.load_incluster_config()
except: # noqa: E722
if "KUBE_API_ADDRESS" in os.environ:
# try to load from env `KUBE_API_ADDRESS`
config = kube_client.Configuration()
config.host = os.environ["KUBE_API_ADDRESS"]
return kube_client.ApiClient(config)
raise RuntimeError("Resolve kube api client failed.")
return kube_client.ApiClient()
def parse_readable_memory(value):
value = str(value).strip()
num = value[:-2]
suffix = value[-2:]
try:
float(num)
except ValueError as e:
raise ValueError(f"Argument cannot be interpreted as a number: {value}") from e
if suffix not in ["Ki", "Mi", "Gi"]:
raise ValueError(f"Memory suffix must be one of 'Ki', 'Mi' and 'Gi': {value}")
return value
def try_to_read_namespace_from_context():
try:
contexts, active_context = kube_config.list_kube_config_contexts()
if contexts and "namespace" in active_context["context"]:
return active_context["context"]["namespace"]
except: # noqa: E722
pass
return None
def wait_for_deployment_complete(
api_client, namespace, name, pods_watcher=None, timeout_seconds=60
):
core_api = kube_client.CoreV1Api(api_client)
app_api = kube_client.AppsV1Api(api_client)
start_time = time.time()
while time.time() - start_time < timeout_seconds:
time.sleep(1)
if pods_watcher is not None:
if pods_watcher.exception is not None:
tp, value, tb = pods_watcher.exception
if value is None:
value = tp()
if value.__traceback__ is not tb:
raise value.with_traceback(tb)
raise value
response = app_api.read_namespaced_deployment_status(
namespace=namespace, name=name
)
s = response.status
if (
s.updated_replicas == response.spec.replicas
and s.replicas == response.spec.replicas
and s.available_replicas == response.spec.replicas
and s.observed_generation >= response.metadata.generation
):
return True
# check failed
match_labels = response.spec.selector.match_labels
selector = ",".join([f"{k}={v}" for k, v in match_labels.items()])
pods = core_api.list_namespaced_pod(
namespace=namespace, label_selector=selector
)
for pod in pods.items:
if pod.status.container_statuses is not None:
for container_status in pod.status.container_statuses:
if (
not container_status.ready
and container_status.restart_count > 0
):
raise K8sError("Deployment {} start failed.".format(name))
raise TimeoutError("Waiting timeout for deployment {}".format(name))
class KubernetesPodWatcher(object):
"""Class for watching events and logs of kubernetes pod."""
def __init__(self, api_client, namespace, pod, container=None, queue=None):
self._api_client = api_client
self._core_api = kube_client.CoreV1Api(api_client)
self._app_api = kube_client.AppsV1Api(api_client)
self._namespace = namespace
self._pod = pod
self._container = container
self._pod_name = pod.metadata.name
if queue is None:
self._lines = Queue()
else:
self._lines = queue
self._stream_event_thread = None
self._stream_log_thread = None
self._stopped = True
self._exc_info = None
@property
def exception(self):
return self._exc_info
def _stream_event_impl(self, simple=False):
field_selector = "involvedObject.name=" + self._pod_name
event_messages = []
while not self._stopped:
time.sleep(1)
try:
events = self._core_api.list_namespaced_event(
namespace=self._namespace,
field_selector=field_selector,
timeout_seconds=2,
)
except K8SApiException:
pass
else:
error_message = []
for event in events.items:
msg = f"{self._pod_name}: {event.message}"
if msg and msg not in event_messages:
event_messages.append(msg)
self._lines.put(msg)
logger.info(msg, extra={"simple": simple})
if event.reason == "Failed":
error_message.append(f"Kubernetes event error: {msg}")
if error_message:
try:
raise K8sError(
"Error when launching Coordinator on kubernetes cluster: \n"
+ "\n".join(error_message)
)
except: # noqa: E722,B110, pylint: disable=bare-except
self._exc_info = sys.exc_info()
return
def _stream_log_impl(self, simple=False):
log_messages = []
while not self._stopped:
time.sleep(1)
try:
logs = self._core_api.read_namespaced_pod_log(
namespace=self._namespace,
name=self._pod_name,
container=self._container,
)
except K8SApiException:
pass
else:
for msg in logs.split("\n"):
if msg and msg not in log_messages:
log_messages.append(msg)
self._lines.put(msg)
logger.info(msg, extra={"simple": simple})
def poll(self, block=True, timeout_seconds=None):
return self._lines.get(block=block, timeout=timeout_seconds)
def start(self):
self._stopped = False
self._stream_event_thread = threading.Thread(
target=self._stream_event_impl, args=()
)
self._stream_event_thread.start()
time.sleep(1)
self._stream_log_thread = threading.Thread(
target=self._stream_log_impl, args=(True,)
)
self._stream_log_thread.start()
def stop(self, timeout_seconds=60):
if not self._stopped:
self._stopped = True
self._stream_event_thread.join(timeout=timeout_seconds)
self._stream_log_thread.join(timeout=timeout_seconds)
if (
self._stream_event_thread.is_alive()
or self._stream_log_thread.is_alive()
):
raise TimeoutError(
"Pod watcher thread joined timeout: {}.".format(self._pod_name)
)
def get_service_endpoints( # noqa: C901
api_client,
namespace,
name,
service_type,
timeout_seconds=60,
query_port=None,
):
"""Get service endpoint by service name and service type.
Args:
api_client: ApiClient
An kubernetes ApiClient object, initialized with the client args.
namespace: str
Namespace of the service belongs to.
name: str
Service name.
service_type: str
Service type. Valid options are NodePort, LoadBalancer and ClusterIP.
timeout_seconds: int
Raise TimeoutError after the duration, only used in LoadBalancer type.
Raises:
TimeoutError: If the underlying cloud-provider doesn't support the LoadBalancer
service type.
K8sError: The service type is not one of (NodePort, LoadBalancer, ClusterIP). Or
the service has no endpoint.
Returns: A list of endpoint.
If service type is LoadBalancer, format with <load_balancer_ip>:<port>. And
if service type is NodePort, format with <host_ip>:<node_port>, And
if service type is ClusterIP, format with <cluster_ip>:<port>
"""
start_time = time.time()
core_api = kube_client.CoreV1Api(api_client)
svc = core_api.read_namespaced_service(name=name, namespace=namespace)
# get pods
selector = ",".join([f"{k}={v}" for k, v in svc.spec.selector.items()])
pods = core_api.list_namespaced_pod(namespace=namespace, label_selector=selector)
ips = []
ports = []
if service_type == "NodePort":
for pod in pods.items:
ips.append(pod.status.host_ip)
for port in svc.spec.ports:
if query_port is None or port.port == query_port:
ports.append(port.node_port)
elif service_type == "LoadBalancer":
while True:
svc = core_api.read_namespaced_service(name=name, namespace=namespace)
if svc.status.load_balancer.ingress is None:
if time.time() - start_time > timeout_seconds:
raise TimeoutError(
"LoadBalancer service type is not supported yet."
)
time.sleep(1)
continue
for ingress in svc.status.load_balancer.ingress:
if ingress.hostname is not None:
ips.append(ingress.hostname)
else:
ips.append(ingress.ip)
for port in svc.spec.ports:
if query_port is None or port.port == query_port:
ports.append(port.port)
break
elif service_type == "ClusterIP":
ips.append(svc.spec.cluster_ip)
for port in svc.spec.ports:
if query_port is None or port.port == query_port:
ports.append(port.port)
else:
raise K8sError("Service type {0} is not supported yet".format(service_type))
if not ips or not ports:
raise K8sError(f"Get {service_type} service {name} failed.")
endpoints = [f"{ip}:{port}" for ip in ips for port in ports]
return endpoints
def get_kubernetes_object_info(api_client, target):
"""Get name and kind on valid kubernetes API object (i.e. List, Service, etc).
Args:
api_client: ApiClient
An kubernetes ApiClient object, initialized with the client args.
target: A valid kubernetes object.
Returns:
dict: Key is object name, value is object kind.
"""
return {target.metadata.name: target.kind}
def delete_kubernetes_object(
api_client, target, verbose=False, wait=False, timeout_seconds=60, **kwargs
):
"""Perform a delete action on valid kubernetes API object (i.e. List, Service, etc).
Args:
api_client: ApiClient
An kubernetes ApiClient object, initialized with the client args.
target: A valid kubernetes object.
verbose: bool, optional
If True, print confirmation from the delete action. Defaults to False.
wait: bool, optional
Waiting for delete object. Defaults to False.
timeout_seconds: int, optional
If waiting for delete timeout, just print a error message. Defaults to 60.
Returns:
Status: Return status for calls kubernetes delete method.
"""
group, _, version = target.api_version.partition("/")
if version == "":
version = group
group = "core"
# Take care for the case e.g. api_type is "apiextensions.k8s.io"
# Only replace the last instance
group = "".join(group.rsplit(".k8s.io", 1))
# convert group name from DNS subdomain format to
# python class name convention
group = "".join(word.capitalize() for word in group.split("."))
fcn_to_call = f"{group}{version.capitalize()}Api"
k8s_api = getattr(kube_client, fcn_to_call)(
api_client
) # pylint: disable=not-callable
kind = target.kind
kind = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", kind)
kind = re.sub("([a-z0-9])([A-Z])", r"\1_\2", kind).lower()
try:
# Expect the user to create namespaced objects more often
kwargs["name"] = target.metadata.name
if hasattr(k8s_api, "delete_namespaced_{0}".format(kind)):
# Decide which namespace we are going to put the object in, if any
kwargs["namespace"] = target.metadata.namespace
resp = getattr(k8s_api, "delete_namespaced_{0}".format(kind))(**kwargs)
else:
kwargs.pop("namespace", None)
resp = getattr(k8s_api, "delete_{0}".format(kind))(**kwargs)
except K8SApiException:
# Object already deleted.
pass
else:
# waiting for delete
if wait:
start_time = time.time()
if hasattr(k8s_api, "read_namespaced_{0}".format(kind)):
while True:
try:
getattr(k8s_api, "read_namespaced_{0}".format(kind))(**kwargs)
except K8SApiException as ex:
if ex.status != 404:
logger.exception(
"Deleting %s, %s, failed",
kind,
target.metadata.name,
)
break
else:
time.sleep(1)
if time.time() - start_time > timeout_seconds:
logger.info(
"Deleting %s, %s, timeout",
kind,
target.metadata.name,
)
if verbose:
msg = "{0}/{1} deleted.".format(kind, target.metadata.name)
if hasattr(resp, "status"):
msg += " status='{0}'".format(str(resp.status))
logger.info(msg)
return resp