aios/tools/hape/hape_libs/appmaster/k8s/k8s_client.py (213 lines of code) (raw):
# -*- coding: utf-8 -*-
import io
import traceback
import json
import functools
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from hape_libs.clusters.cluster_base import *
from hape_libs.utils.logger import Logger
from hape_libs.utils.retry import Retry
class K8sClient():
def __init__(self, binary_path, kube_config_path):
## 暂时解决bazel下python2对于yaml库以及k8s内的yaml库兼容性问题,引入lib/yaml
import sys
sys.path = [binary_path + "/usr/local/lib/python/site-packages/lib"] + sys.path
import lib.yaml as yaml
import kubernetes as k8s
self.yaml = yaml
self.k8s = k8s
self.k8s.config.load_kube_config(kube_config_path)
configuration = self.k8s.client.Configuration().get_default_copy()
configuration.verify_ssl = False
api_client = self.k8s.client.ApiClient(configuration=configuration)
self._core_v1 = self.k8s.client.CoreV1Api(api_client)
self._rbac_v1 = self.k8s.client.RbacAuthorizationV1Api(api_client)
self._apps_v1 = self.k8s.client.AppsV1Api(api_client)
self._apiextensions_v1 = self.k8s.client.ApiextensionsV1Api(api_client)
self._custom_objects_api = self.k8s.client.CustomObjectsApi(api_client)
self._priorityclass_api = self.k8s.client.SchedulingV1Api(api_client)
self._version_api = self.k8s.client.VersionApi(api_client)
self._resources_api_map = {
"CustomResourceDefinition": {
"read": self._apiextensions_v1.read_custom_resource_definition,
"create": self._apiextensions_v1.create_custom_resource_definition,
"delete": self._apiextensions_v1.delete_custom_resource_definition,
"namespace_required": False
},
"Namespace": {
"read": self._core_v1.read_namespace,
"create": self._core_v1.create_namespace,
"delete": self._core_v1.delete_namespace,
"namespace_required": False
},
"PriorityClass": {
"read": self._priorityclass_api.read_priority_class,
"create": self._priorityclass_api.create_priority_class,
"delete": self._priorityclass_api.delete_priority_class,
"namespace_required": False
},
"ServiceAccount": {
"read": self._core_v1.read_namespaced_service_account,
"create": self._core_v1.create_namespaced_service_account,
"delete": self._core_v1.delete_namespaced_service_account,
"namespace_required": True
},
"ClusterRoleBinding": {
"read": self._rbac_v1.read_cluster_role_binding,
"create": self._rbac_v1.create_cluster_role_binding,
"delete": self._rbac_v1.delete_cluster_role_binding,
"namespace_required": False
},
"Secret": {
"read": self._core_v1.read_namespaced_secret,
"create": self._core_v1.create_namespaced_secret,
"delete": self._core_v1.delete_namespaced_secret,
"namespace_required": True
},
"Deployment": {
"read": self._apps_v1.read_namespaced_deployment,
"create": self._apps_v1.create_namespaced_deployment,
"delete": self._apps_v1.delete_namespaced_deployment,
"list": self._apps_v1.list_namespaced_deployment,
"namespace_required": True
},
"Pod": {
"list": self._core_v1.list_namespaced_pod,
"namespace_required": True
},
"Service": {
"read": self._core_v1.read_namespaced_service,
"create": self._core_v1.create_namespaced_service,
"delete": self._core_v1.delete_namespaced_service,
"namespace_required": True
},
"ConfigMap": {
"read": self._core_v1.read_namespaced_config_map,
"create": self._core_v1.create_namespaced_config_map,
"delete": self._core_v1.delete_namespaced_config_map,
"namespace_required": True
},
"ShardGroup": {
"read": functools.partial(self._custom_objects_api.get_namespaced_custom_object, group="carbon.taobao.com", version="v1", plural="shardgroups"),
"delete": functools.partial(self._custom_objects_api.delete_namespaced_custom_object, group="carbon.taobao.com", version="v1", plural="shardgroups"),
"list": functools.partial(self._custom_objects_api.list_namespaced_custom_object, group="carbon.taobao.com", version="v1", plural="shardgroups"),
"namespace_required": True
},
"CarbonJob": {
"read": functools.partial(self._custom_objects_api.get_namespaced_custom_object, group="carbon.taobao.com", version="v1", plural="carbonjobs"),
"delete": functools.partial(self._custom_objects_api.delete_namespaced_custom_object, group="carbon.taobao.com", version="v1", plural="carbonjobs"),
"list": functools.partial(self._custom_objects_api.list_namespaced_custom_object, group="carbon.taobao.com", version="v1", plural="carbonjobs"),
"namespace_required": True
},
"WorkerNode": {
"list": functools.partial(self._custom_objects_api.list_namespaced_custom_object, group="carbon.taobao.com", version="v1", plural="workernodes"),
"namespace_required": True
},
"DaemonSet": {
"read": self._apps_v1.read_namespaced_daemon_set,
"create": self._apps_v1.create_namespaced_daemon_set,
"delete": self._apps_v1.delete_namespaced_daemon_set,
"namespace_required": True
}
}
def create_by_yaml(self, content):
docs = self.yaml.safe_load_all(content)
for doc in docs:
if not self.create_resource(doc):
return False
return True
def read_resource(self, kind, name, namespace):
if name == None:
Logger.error("K8s resource requires name".format(name))
return None
if kind == None:
Logger.error("K8s resource {} requires kind".format(kind))
return None
if kind not in self._resources_api_map:
Logger.error("K8s resource {} with kind {} is not supported by hape by now, you can create by hand".format(name, kind))
return None
try:
resource_api = self._resources_api_map[kind]
method, ns_required = resource_api["read"], resource_api["namespace_required"]
if ns_required and namespace == None:
Logger.error("K8s resource {} requires namespace".format(name))
return None
if ns_required:
obj = method(name=name, namespace=namespace)
else:
obj = method(name=name)
return obj
except Exception as e:
Logger.debug(traceback.format_exc())
return None
def list_resources(self, kind, namespace, label_selector = None):
if kind not in self._resources_api_map:
Logger.error("K8s resource kind {} not supported by hape, you can list by hand".format(kind))
return []
resource_api = self._resources_api_map[kind]
method = resource_api["list"]
if label_selector != None:
objs = method(namespace=namespace, label_selector = label_selector)
else:
objs = method(namespace=namespace)
return objs.items
def create_resource(self, doc):
metadata = doc.get("metadata", {})
name = metadata.get("name", None)
namespace = metadata.get("namespace", None)
kind = doc.get("kind", None)
metainfo = "kind: {}, name: {} namespace: {}".format(kind, name, namespace)
if self.read_resource(kind, name, namespace) != None:
Logger.info("K8s resource already exists, kind:{} namespace:{} name:{}".format(kind, namespace, name))
return True
Logger.info("K8s resource not exists, will create it, {}".format(name, metainfo))
Logger.debug("K8s resource {} spec: {}".format(name, self.yaml.dump(doc)))
resource_api = self._resources_api_map[kind]
method, ns_required = resource_api["create"] , resource_api['namespace_required']
try:
if not ns_required:
method(body=doc)
else:
method(body=doc, namespace=namespace)
Logger.info("Create k8s resource succeed, {}".format(metainfo))
return True
except Exception as e:
Logger.error("Failed to create k8s resource, {}".format(metainfo))
Logger.error(traceback.format_exc())
return False
def delete_resource(self, kind, name, namespace = None):
metainfo = "kind: {}, namespace: {}, name: {}".format(kind, namespace, name)
if kind not in self._resources_api_map:
Logger.error("K8s resource kind {} not supported by hape, you can delete by hand".format(kind))
return False
if not self.read_resource(kind, name, namespace) != None:
return True
Logger.info("Begin to delete K8s resource, {}".format(metainfo))
try:
resource_api = self._resources_api_map[kind]
ns_required, method = resource_api["namespace_required"], resource_api["delete"]
if ns_required:
if namespace == None:
Logger.error("Failed to delete k8s resource, namespace is required, {}".format(metainfo))
return False
method(namespace=namespace,name=name)
else:
method(name=name)
Logger.info("K8s resource deleted successfully, {}".format(metainfo))
def check_resource_deleted():
return self.read_resource(kind=kind, name=name, namespace=namespace) == None
succ = Retry.retry(check_resource_deleted, check_msg="K8s resource deleted, {}".format(metainfo), limit=100)
return succ
except Exception as e:
Logger.error("Failed to delete k8s resource, {}".format(metainfo))
Logger.error("You can delete it by hand")
Logger.error(traceback.format_exc())
return False
def check_connections(self):
try:
version_info = self._version_api.get_code()
Logger.info("Connected to Kubernetes cluster")
return True
except Exception as e:
Logger.error("Failed to connect to Kubernetes cluster:", e)
Logger.error(traceback.format_exc())
return False