aios/tools/hape/hape_libs/clusters/bs/bs.py (61 lines of code) (raw):

import os import json import requests from hape_libs.clusters.cluster_base import * from .bs_util import BsAdminService from hape_libs.utils.logger import Logger class BsCluster(ClusterBase): def __init__(self, key, domain_config): super(BsCluster, self).__init__(key, domain_config) def get_leader_ip_address(self): Logger.debug("Try to parse bs admin leader from zk") try: data = self._master.admin_zk_fs_wrapper.get("admin/LeaderElection/leader_election0000000000") return json.loads(data)["httpAddress"] except: Logger.info("Bs admin is not started by now") return None def is_ready(self): address = self.get_leader_http_address() if address == None: return False try: response = requests.get("{}/__method__".format(address)) return response.text.find("AdminService") != -1 except: return False def get_admin_service(self): leader_http_address = self.get_leader_http_address() if leader_http_address != None: admin_service = BsAdminService(leader_http_address) return admin_service else: return None def get_status(self, table = None): if not self.is_ready(): Logger.error("Bs admin not ready") return None admin_service = self.get_admin_service() # processors_status_map = admin_service.get_processors_status() workers_status_map = self._master.get_containers_status() status_list = [] for key, worker_status_list in workers_status_map.items(): for worker_status in worker_status_list: processor_status = ClusterProcessorStatus.from_hippo_worker_info(worker_status) if key == "admin": processor_status.processorName = self._global_config.get_master_command(self._key) processor_status.processorStatus = ProcessorStatusType.RUNNING status_list.append(processor_status) else: if table != None and worker_status.role.find("."+table+".") == -1: continue processor_status = ClusterProcessorStatus.from_hippo_worker_info(worker_status) processor_status.processorName = self._global_config.get_worker_command(self._key) processor_status.processorStatus = ProcessorStatusType.RUNNING status_list.append(processor_status) cluster_status = ClusterStatus( leaderAddress = self.get_leader_http_address(), serviceZk = self._global_config.get_service_appmaster_zk_address(self._key), hippoZk = self._global_config.get_service_hippo_zk_address(self._key), processors = status_list ) return attr.asdict(cluster_status)