aios/tools/hape/hape_libs/clusters/suez/suez.py (244 lines of code) (raw):
import os
from sched import scheduler
import requests
import json
import attr
from .service.catalog_manager import CatalogManager
from .service.suez_cluster_manager import SuezClusterManager
from .service.scheduler_service import SchedulerService
from hape_libs.utils.logger import Logger
from hape_libs.config import *
from hape_libs.appmaster import HippoMasterPlan, KeyValuePair
from hape_libs.clusters.cluster_base import *
from hape_libs.common import HapeCommon
from hape_libs.utils.fs_wrapper import FsWrapper
class SuezCluster(ClusterBase):
def __init__(self, key, domain_config): #type:(str, DomainConfig)->None
super(SuezCluster, self).__init__(key, domain_config)
self.catalog_manager = self.get_catalog_manager()
self.suez_cluster_manager = self.get_suez_cluster_manager()
def get_catalog_manager(self):
leader_http_address = self.get_leader_http_address()
if leader_http_address != None:
catalog_manager = CatalogManager(leader_http_address, self._domain_config)
return catalog_manager
else:
return None
def get_scheduler_service(self):
leader_http_address = self.get_leader_http_address()
if leader_http_address != None:
scheduler_service = SchedulerService(leader_http_address)
return scheduler_service
else:
return None
def get_suez_cluster_manager(self):
leader_http_address = self.get_leader_http_address()
if leader_http_address != None:
self.suez_cluster_manager = SuezClusterManager(leader_http_address,
self._global_config.get_service_appmaster_zk_address(HapeCommon.SWIFT_KEY),
self._global_config.get_service_appmaster_zk_address(HapeCommon.HAVENASK_KEY),
self._domain_config)
return self.suez_cluster_manager
def is_ready(self):
try:
address = self.get_leader_http_address()
Logger.debug("Check catalog service in address: {}".format(address))
response = requests.post("{}/CatalogService/listCatalog".format(address), timeout=2)
data = response.json()
Logger.debug("Catalog service response: {}".format(str(data)))
if len(data['status']) != 0:
Logger.debug("Catalog service is not ready in havenask admin by now")
return False
Logger.debug("Check cluster service in address: {}".format(address))
response = requests.post("{}/ClusterService/getClusterDeployment".format(address), timeout=2)
data = response.text
Logger.debug("Cluster service response: {}".format(str(data)))
if data.find("SERVICE_NOT_READY") == -1:
return True
else:
Logger.debug("Cluster service is not ready in havenask admin by now")
return False
except Exception as e:
return False
def before_master_start(self, hippo_plan): #type:(str, HippoMasterPlan) -> bool
hadoop_home = self._global_config.common.hadoopHome
binary_path = self._global_config.common.binaryPath
extend_attrs = {
"hadoop_home": hadoop_home,
"binary_path": binary_path
}
fs_wrapper = FsWrapper(self._global_config.havenask.dbStoreRoot, extend_attrs=extend_attrs)
if not fs_wrapper.exists(""):
fs_wrapper.mkdir("")
fs_wrapper = FsWrapper(self._global_config.havenask.suezClusterStoreRoot, extend_attrs=extend_attrs)
if not fs_wrapper.exists(""):
fs_wrapper.mkdir("")
fs_wrapper = FsWrapper(self._global_config.common.dataStoreRoot, extend_attrs=extend_attrs)
if not fs_wrapper.exists(""):
fs_wrapper.mkdir("")
process = hippo_plan.processLaunchContext.processes[0]
template_addr = self._domain_config.template_config.upload_direct_table_templ()
process.envs.append(KeyValuePair(key='DIRECT_WRITE_TEMPLATE_CONFIG_PATH', value=template_addr))
template_addr = self._domain_config.template_config.upload_offline_table_templ()
process.envs.append(KeyValuePair(key='BS_TEMPLATE_CONFIG_PATH', value=template_addr))
return True
def after_master_start(self):
self.catalog_manager = self.get_catalog_manager()
self.catalog_manager.create_default_catalog()
self.suez_cluster_manager = self.get_suez_cluster_manager()
self.suez_cluster_manager.create_or_update_default_clusters()
def stop(self, is_delete=False, only_admin = False):
super(SuezCluster, self).stop(is_delete=is_delete, only_admin = only_admin)
hadoop_home = self._global_config.common.hadoopHome
binary_path = self._global_config.common.binaryPath
extend_attrs = {
"hadoop_home": hadoop_home,
"binary_path": binary_path
}
if is_delete:
FsWrapper(self._global_config.havenask.dbStoreRoot, extend_attrs=extend_attrs).rm("")
FsWrapper(self._global_config.havenask.suezClusterStoreRoot, extend_attrs=extend_attrs).rm("")
FsWrapper(self._global_config.common.dataStoreRoot, extend_attrs=extend_attrs).rm("templates")
def get_leader_ip_address(self):
Logger.debug("Try to parse suez admin leader from zk")
try:
data = self._master.admin_zk_fs_wrapper.get("admin/LeaderElection/leader_election0000000000")
return data.split("\n")[-1]
except:
Logger.info("Suez admin is not started by now")
return None
def wait_cluster_ready(self):
def check_ready():
status = self.get_status()
return status['clusterStatus'] == "READY"
succ = Retry.retry(check_ready, limit=self._domain_config.global_config.common.retryCommonWaitTimes, check_msg="Wait havenask cluster ready")
return succ
def get_status(self, **kwargs):
if not self.is_ready():
Logger.error("Havenask admin not ready")
return None
scheduler_service = self.get_scheduler_service()
processors_status_map = scheduler_service.get_processors_status()
workers_status_map = self._master.get_containers_status()
status_map = {"admin": [], "database": {}, "qrs": {}}
if len(processors_status_map) != 0:
for group, role_map in processors_status_map.items():
for role, role_status in role_map.items():
if role not in status_map:
status_map[group][role] = []
for key, worker_status_list in workers_status_map.items():
for worker_status in worker_status_list:
if worker_status.role == "admin":
processor_status = ClusterProcessorStatus.from_hippo_worker_info(worker_status)
processor_status.processorName = self._global_config.get_master_command(self._key)
processor_status.processorStatus = ProcessorStatusType.RUNNING
status_map["admin"].append(processor_status)
else:
is_find = False
group, role = worker_status.role.split(".")
role_status = processors_status_map[group][role]
for processor_status in role_status.processorList:
if processor_status.ip == worker_status.ip and processor_status.role == role:
processor_status.merge_from_hippo_worker_info(worker_status)
processor_status.processorName = self._global_config.get_worker_command(HapeCommon.HAVENASK_KEY)
is_find = True
status_map[group][role].append(processor_status)
break
if not is_find:
processor_status = ShardGroupProcessorStatus.from_hippo_worker_info(worker_status)
status_map[group][role].append(processor_status)
is_cluster_ready = True
hint = ""
for group, role_map in status_map.items():
if group == "admin":
continue
if len(role_map) == 0:
is_cluster_ready = False
hint = "Group {} has no containers, maybe lack of candidate nodes".format(group)
break
for role, node_list in role_map.items():
if len(node_list) == 0:
is_cluster_ready = False
hint = "Group {} Role {} has no containers, maybe lack of candidate nodes".format(group, role)
break
if processors_status_map[group][role].readyForCurVersion == False:
is_cluster_ready = False
hint = "Group {} Role {} has unready containers, or mabye lack of candidate nodes".format(group, role)
break
cluster_status = ShardGroupClusterStatus(
serviceZk = self._global_config.get_service_appmaster_zk_address(self._key),
hippoZk = self._global_config.get_service_hippo_zk_address(self._key),
sqlClusterInfo = status_map,
leaderAddress = self.get_leader_http_address(),
hint = hint,
clusterStatus = "READY" if is_cluster_ready else "NOT_READY"
)
return attr.asdict(cluster_status)
def get_ready_tables(self):
## collect tables ready in qrs
qrs_worker_ips, tables_collect = self.collect_qrs_tables()
ready_tables = set()
max_shard = 1
for table in tables_collect:
if len(tables_collect[table]) == len(qrs_worker_ips):
shard = self.catalog_manager.get_table_shard_count(table)
ready_tables.add((shard, table))
max_shard = max(max_shard, shard)
## check tables all ready in database
result = []
status = self.get_status()["sqlClusterInfo"]
for shard, table in ready_tables:
replica = self._global_config.havenask.searcherReplicaCount
ready_counts = 0
for role, processor_status_list in status["database"].items():
for processor_status in processor_status_list:
if processor_status['signature'].find(table+".") != -1:
ready_counts += 1
if ready_counts == max_shard * replica:
result.append(table)
else:
Logger.debug("Table {} not ready in all searchers and qrs, you can use `gs havenask` to find reason".format(table))
return result
def collect_qrs_tables(self):
qrs_worker_ips = self.get_qrs_ips()
if qrs_worker_ips == None:
return [], {}
Logger.debug("Start to collect table in all qrs")
Logger.debug("Qrs iplist: {}".format(qrs_worker_ips))
qrshttpPort = self._global_config.havenask.qrsHttpPort
tables_collect = {}
for ip in qrs_worker_ips:
try:
response = requests.post("http://{}:{}/sqlClientInfo".format(ip, qrshttpPort))
data = response.json()
tables = data['result']["default"][HapeCommon.DEFAULT_DATABASE]['tables']
for table in tables:
if table.endswith("summary_"):
continue
if table not in tables_collect:
tables_collect[table] = []
tables_collect[table].append(ip)
except Exception as e:
# Logger.info(traceback.format_exc())
pass
return qrs_worker_ips, tables_collect
def get_qrs_ips(self):
qrs_count = self._global_config.havenask.qrsReplicaCount
def check_qrs_replicas():
qrs_worker_ips = self.parse_qrs_ip()
if len(qrs_worker_ips) != qrs_count:
Logger.info("Qrs expect to have {} replicas started, now {}".format(qrs_count, len(qrs_worker_ips)))
return False
else:
return True
check_msg = "qrs achieve target replicas"
ready = Retry.retry(check_qrs_replicas, check_msg, 50)
if not ready:
return None
else:
return self.parse_qrs_ip()
def parse_qrs_ip(self):
result = []
workers_status_map = self._master.get_containers_status()
for role in workers_status_map:
if role.startswith("qrs"):
for worker_status in workers_status_map[role]:
result.append(worker_status.ip)
return result