aios/tools/hape/hape_libs/clusters/suez/service/scheduler_service.py (52 lines of code) (raw):
import sys
import os
import requests
import traceback
import json
import attr
from hape_libs.utils.logger import Logger
from hape_libs.clusters.cluster_base import *
class SchedulerService(object):
def __init__(self, address):
self._http_address = address
Logger.info("visit scheduler service in address {}".format(self._http_address))
def get_processors_status(self):
Logger.debug("get scheduler status of havenask admin")
response = requests.post(self._http_address + "/SchedulerService/getSystemInfo")
Logger.debug(response.text)
sys_info_str = response.json()["systemInfoStr"]
result = self.extract_processors_status(sys_info_str)
return result
def extract_processors_status(self, system_info_str):
status_map = {}
try:
system_info_dict = json.loads(system_info_str)
for group in system_info_dict:
role_statuses = system_info_dict[group]['roleStatuses']
status_map[group] = {}
for role in role_statuses:
replica_nodes_status = role_statuses[role]['nextInstanceInfo'].get('replicaNodes', [])
node_status_list = self.extract_replica_nodes_status(role, replica_nodes_status)
status = ShardGroupRoleStatus(processorList = node_status_list, readyForCurVersion=role_statuses[role]['readyForCurVersion'])
status_map[group][role] = status
except:
Logger.error("Failed to extract processors status")
Logger.error(traceback.format_exc())
return status_map
def extract_replica_nodes_status(self, role, replica_nodes_status):
status_list = []
for node_status in replica_nodes_status:
processor_status = ShardGroupProcessorStatus()
processor_status.workerStatus = node_status["workerStatus"]
if len(node_status.get("processStatus", [])) == 1:
processor_status.processorStatus = ProcessorStatusType.RUNNING if node_status["processStatus"][0]["status"] == "PS_RUNNING" else ProcessorStatusType.NOT_RUNNING
processor_status.replicaId = node_status["replicaNodeId"]
processor_status.signature = node_status["signature"]
processor_status.targetSignature = node_status["targetSignature"]
processor_status.readyForCurVersion = node_status["readyForCurVersion"]
processor_status.ip = node_status["ip"]
processor_status.role = role
status_list.append(processor_status)
if len(replica_nodes_status) == 0:
status_list.append(ShardGroupProcessorStatus())
return status_list