aios/tools/hape/hape_libs/config/global_config.py (226 lines of code) (raw):

from ctypes import ArgumentError import attr import os import getpass import socket import sys from hape_libs.utils.logger import Logger def _get_hape_root(): here = os.path.dirname(os.path.realpath(__file__)) candidates = [ "../../", "../../../../../../../" ## when hape_libs in usr/local/lib/python/site-packages ] file = "hape" for path in candidates: hape_root = os.path.realpath(os.path.join(here, path)) full_path = os.path.realpath(os.path.join(here, path, file)) if os.path.exists(full_path): Logger.info("find hape under {}, set as hape root".format(hape_root)) return hape_root return hape_root def _get_local_host(): try: hostname = socket.gethostname() local_ip = socket.gethostbyname(hostname) except socket.error: local_ip = "127.0.0.1" return local_ip def _iplist_converter(value): return value.split(';') if value else [_get_local_host()] def is_not_none(instance, attribute, value): if value is None: raise ValueError("The '{}' field is required and cannot be None".format(attribute.name)) @attr.s class MasterConfigBase(object): serviceName = attr.ib(attr.NOTHING) image = attr.ib(attr.NOTHING) adminCpu = attr.ib(attr.NOTHING) adminMem = attr.ib(attr.NOTHING) adminIpList = attr.ib(converter = _iplist_converter, default=[]) k8sNamespace = attr.ib(default=None) serviceZkAddr = attr.ib(init=False) workerCandidateMap = attr.ib(init=False) serviceMasterZkAddr = attr.ib(init=False) serviceHippoZkAddr = attr.ib(init=False) @attr.s class SwiftConfig(MasterConfigBase): workerIpList = attr.ib(converter = _iplist_converter, default=[]) brokerCount = attr.ib(default=1) swiftDataStoreRoot = attr.ib(init=False) def __attrs_post_init__(self): self.workerCandidateMap = { "default": self.workerIpList } self.brokerCount = int(self.brokerCount) @attr.s class BSConfig(MasterConfigBase): workerIpList = attr.ib(converter = _iplist_converter, default=[]) def __attrs_post_init__(self): self.workerCandidateMap = { "default": self.workerIpList } @attr.s class HavenaskConfig(MasterConfigBase): allowMultiSlotInOne = attr.ib(default='true') qrsIpList = attr.ib(converter = _iplist_converter, default=[]) searcherIpList = attr.ib(converter = _iplist_converter, default=[]) adminHttpPort = attr.ib(default=45700) qrsHttpPort = attr.ib(default=45800) qrsReplicaCount = attr.ib(default=1) searcherReplicaCount = attr.ib(default=1) dbStoreRoot = attr.ib(init=False) # enableVirtualIp = attr.ib(default=False) suezClusterStoreRoot = attr.ib(init=False) offlineTable = attr.ib(default='false') searcherMode = attr.ib(default='{"key": "--env", "value": "mode=rw"},') def __attrs_post_init__(self): self.workerCandidateMap = { "qrs": self.qrsIpList, "searcher": self.searcherIpList } self.qrsReplicaCount = int(self.qrsReplicaCount) self.searcherReplicaCount = int(self.searcherReplicaCount) if self.offlineTable == 'true': self.searcherMode = '' # if isinstance(self.enableVirtualIp, unicode): # self.enableVirtualIp = self.enableVirtualIp == "true" @attr.s class CommonConfig(object): binaryPath = attr.ib(attr.NOTHING) dataStoreRoot = attr.ib(attr.NOTHING) processorMode = attr.ib(default='docker', validator=lambda ins, attri, value: value in {"proc", "docker", "k8s"}) domainZkRoot = attr.ib(default=None) hadoopHome = attr.ib(default='/usr/local/hadoop/hadoop/') javaHome = attr.ib(default='/opt/taobao/java') kubeConfigPath = attr.ib(default=os.path.expanduser("~/.kube/config")) isDomainZkSet = attr.ib(default=False) c2K8sNamespace = attr.ib(default="havenask-master") c2ClusterName = attr.ib(default="k8s-havenask") k8sNamespace = attr.ib(default="havenask-worker") catalogName = attr.ib(default="catalog") retryCommonWaitTimes = attr.ib(default = 40) enableKmonitorPrometheusSink = attr.ib(default = "false") kmonitorSinkAddress = attr.ib(default = None) kmonitorSinkHost = attr.ib(default="127.0.0.1") kmonitorSinkPort = attr.ib(default="9091") kubeShardImage = attr.ib(default="registry.cn-hangzhou.aliyuncs.com/havenask/kubeshard:v0.0.4") def __attrs_post_init__(self): if self.domainZkRoot == None: self.domainZkRoot = "zfs://{}:2181/havenask".format(_get_local_host()) self.isDomainZkSet = False else: self.isDomainZkSet = True if self.processorMode == "k8s": self.retryCommonWaitTimes = int(self.retryCommonWaitTimes) * 4 if self.kmonitorSinkAddress != None: self.enableKmonitorPrometheusSink = 'true' splits = self.kmonitorSinkAddress.split(":") if len(splits) != 2: raise ArgumentError("kmonitorSinkAddress address requires format: <host>:<port>") self.kmonitorSinkHost, self.kmonitorSinkPort = splits class DefaultVaribles: def __init__(self): self.user= getpass.getuser() self.user_home = os.path.expanduser("~") self.local_host = _get_local_host() self.hape_root = _get_hape_root() def to_dict(self): return { "user": self.user, "user_home": self.user_home, "local_host": self.local_host, "hape_root": self.hape_root } default_variables = DefaultVaribles() @attr.s(init=False) class GlobalConfig(object): common = attr.ib() # type: CommonConfig swift = attr.ib() # type: SwiftConfig bs = attr.ib() # type: BSConfig havenask = attr.ib() # type: HavenaskConfig default_variables = attr.ib(init=False) # type: DefaultVaribles def __init__(self, **kwargs): required_fields = ['common', 'swift', 'bs', 'havenask'] missing_fields = [field for field in required_fields if field not in kwargs] if missing_fields: raise ValueError("Missing required configuration fields: {}".format(', '.join(missing_fields))) self.common = CommonConfig(**kwargs['common']) self.swift = SwiftConfig(**kwargs['swift']) self.bs = BSConfig(**kwargs.get('bs', {})) self.havenask = HavenaskConfig(**kwargs['havenask']) self.fill() def fill(self): self.havenask.dbStoreRoot = os.path.join(self.common.dataStoreRoot, "database_store") self.havenask.suezClusterStoreRoot = os.path.join(self.common.dataStoreRoot, "suez_cluster_store") self.swift.swiftDataStoreRoot = os.path.join(self.common.dataStoreRoot, "havenask/swift_data_root") self.default_variables = default_variables for key in ["bs", "havenask", "swift"]: self.add_role_full_zk_path(key) def add_role_full_zk_path(self, key): common_config = self.common domainZkRoot = common_config.domainZkRoot service_name = self.__getattribute__(key).__getattribute__("serviceName") zk_func_map = { "serviceZkAddr": lambda root: os.path.join(root, service_name), "serviceMasterZkAddr": lambda root: os.path.join(root, service_name, "appmaster"), "serviceHippoZkAddr": lambda root: os.path.join(root, service_name, "hippo"), } for zk_key, zk_func in zk_func_map.items(): zk_path = zk_func(domainZkRoot) self.__getattribute__(key).__setattr__(zk_key, zk_path) def get_appmaster_base_config(self, key, subkey): config = self.__getattribute__(key) service_name = config.__getattribute__(subkey) return service_name def get_service_zk_address(self, key): return self.get_appmaster_base_config(key, "serviceZkAddr") def get_service_appmaster_zk_address(self, key): return self.get_appmaster_base_config(key, "serviceMasterZkAddr") def get_service_hippo_zk_address(self, key): return self.get_appmaster_base_config(key, "serviceHippoZkAddr") def get_appmaster_service_name(self, key): return self.get_appmaster_base_config(key, "serviceName") def get_worker_candidate_maps(self, key): return self.get_appmaster_base_config(key, "workerCandidateMap") def get_admin_candidate_ips(self, key): return self.get_appmaster_base_config(key, "adminIpList") def get_k8s_workers_crds(self, key = None): map = { "swift": [ ["CarbonJob", self.common.k8sNamespace, self.default_variables.user + "-" + self.get_appmaster_service_name("swift")] ], "bs": [ ["CarbonJob", self.common.k8sNamespace, self.get_appmaster_service_name("bs")] ], "havenask": [ ["ShardGroup", self.common.k8sNamespace, "qrs"], ["ShardGroup", self.common.k8sNamespace, "database"] ] } if key != None: return map[key] else: return map def get_k8s_workers_apphash(self, key): map = { "swift": self.default_variables.user + "_" + self.get_appmaster_service_name(key), "bs": self.get_appmaster_service_name(key), "havenask": self.get_appmaster_service_name(key) } return map[key] def get_worker_command(self, key): map = { "swift": "swift_broker", "bs": "build_service_worker", "havenask": "ha_sql" } return map[key] def get_master_command(self, key): map = { "swift": "swift_admin", "bs": "bs_admin_worker", "havenask": "suez_admin_worker" } return map[key] def get_docker_container_prefix(self, key): service_name = self.get_appmaster_service_name(key) container_name = "havenask_container" + (("_" + self.default_variables.user) if key == "swift" else "") + "_" + service_name return container_name