aios/sql/python/local_search_starter.py (1,280 lines of code) (raw):
#!/bin/env python
import re
import sys
import os
import copy
import socket
from optparse import OptionParser
import tempfile
import subprocess
import json
import time
import socket
import shutil
import httplib
import errno
from datetime import datetime
from itertools import product
class TimeoutTerminator:
def __init__(self, timeout):
self.timeout = timeout
self.start = time.time()
self.due = self.start + timeout
def left_time(self):
return self.due - time.time()
def is_timeout(self):
return self.left_time() <= 0
def raw_timeout(self):
return self.timeout
def start_str(self):
return datetime.utcfromtimestamp(self.start).strftime('%Y-%m-%d %H:%M:%S.%f')
class PortListItem:
def __init__(self):
self.ports = None
self.role = None
class LocalSearchStartCmd(object):
'''
local_search_starter.py
{-i index_dir | --index=index_dir}
{-c config_dir | --config=config_dir}
{-p port_start | --prot=prot_start}
{-z zone_name | --zone=zone_name}
{-b binary_path | --binary=binary_path}
{-t timeout | --timeout=timeout}
{-l preload | --preload=preload}
{-g agg_name | --agg=agg_name}
{-w para_search_ways | --para_ways=para_search_ways}
{-S disable_sql | --disableSql=sql_flag}
{-W disable_sql_warmup | --disableSqlWarmup=sql_warmup_flag}
{-m multi_biz | --multiBiz=multi_biz}
{-M model_biz | --modelBiz=model_biz}
{-L local_biz_service | --localBizService=localBizService}
options:
-i index_dir, --index=index_dir : required, index dir
-c config_dir, --config=config_dir : required, config path,
-p port_list, --port=port_list : optional, port list, http port is first port, arpc port is second port, default 12000, if only one port is designed, next port is start +1 (total port may use start + n*3 )
-z zone_name, --zone=zone_name : optional, special zone to start
-j auxiliary_table, --tables=auxiliary_table : optional, special auxiliary table to load
-b binary_path, --binary=binary_path : optional, special binary path to load
-t timeout, --timeout=timeout : optional, special timeout load [defalut 300]
-l preload, --preload=preload : optional, special lib to load
-s serviceName, --serviceName=serviceName : optional, serviceName [default sql_suez_local_search]
-a amonPath, --amonPath=amonPath : optional, amon path [default sql_suez_local_search]
-g agg_name, --aggName=aggName : optional, aggName [default default_agg_4]
--basicTuringBizNames=names : optional, common turing biz names, example [biz1,biz2]
--tabletInfos=tabletInfos : optional, tabletInfos [default empty str]
--basicTuringPrefix=prefix : optional, common turing request http prefix, example [/common]
-w para_search_ways, --paraSearchWays=paraSearchWays : optional, paraSearchWays [default 2,4]
-S disable_sql, --disableSql=true : optional, disableSql [default false]
-W disable_sql_warmup, --disableSqlWarmup=true : optional, disableSqlWarmup [default false]
-d dailyrun mode, --dailyrunMode=true : optional, enable dailyrun mode
-m multi_biz, --multiBiz=multiBiz : optional, enable multi_biz
-M model_biz, --modelBiz=modelBiz : optional, modelBiz list
-L local_biz_service, --localBizService=localBizService : optional, [defalut false]
--enableMultiPartition : optional, enableMultiPartition [default false]
--enableLocalAccess : optional, enableLocalAccess [default false]
--onlySql : optional, onlySql [default false]
--enableLocalCatalog : optional, enableLocalCatalog [default false]
--enableUpdateCatalog : optional, enableUpdateCatalog [default false]
--zk_root : optional, zk_root for leader election [default LOCAL]
--mode : optional, mode for table direct write
--leader_election_strategy_type : optional, leader election type
--leader_election_config : optional, leader election config
--version_sync_config : optional, version sync config
--disableCodeGen : optional, disable indexlib codegen
--disableTurbojet : optional, disable turbojet
--searcherLocalSubscribe : optional, searcher will subscribe each other
--enablePublishTableTopoInfo : optional, searcher will publish topo info by table
--force_tablet_load : optional, searcher will load with tablet
examples:
./local_search_starter.py -i /path/to/index -c path/to/config
./local_search_starter.py -i /path/to/index -c path/to/config -p 12345
./local_search_starter.py -i /path/to/index -c path/to/config -p 12345,22345,32345
./local_search_starter.py -i /path/to/index -c path/to/config -z zone1,zone2 -j table1:t1,t3;table2:t3
'''
def __init__(self):
self.parser = OptionParser(usage=self.__doc__)
def usage(self):
print self.__doc__ % {'prog': sys.argv[0]}
def addOptions(self):
self.parser.add_option('-i', '--index', action='store', dest='indexPath')
self.parser.add_option('-c', '--config', action='store', dest='configPath')
self.parser.add_option('-p', '--port', action='store', dest='portList')
self.parser.add_option('', '--qrsHttpArpcBindPort', dest='qrsHttpArpcBindPort', type='int', default=0)
self.parser.add_option('', '--qrsArpcBindPort', dest='qrsArpcBindPort', type='int', default=0)
self.parser.add_option('-z', '--zone', action='store', dest='zoneName')
self.parser.add_option('-j', '--tables', action='store', dest='atables')
self.parser.add_option('-b', '--binary', action='store', dest='binaryPath')
self.parser.add_option('-t', '--timeout', action='store', dest='timeout', type='int', default=300)
self.parser.add_option('-l', '--preload', action='store', dest='preload', default='asan')
self.parser.add_option(
'-s',
'--serviceName',
action='store',
dest='serviceName',
default="sql_suez_local_search")
self.parser.add_option('-a', '--amonPath', action='store', dest='amonPath', default="sql_suez_local_search")
self.parser.add_option('-g', '--aggName', action='store', dest='aggName')
self.parser.add_option('', '--basicTuringBizNames', action='store', dest='basicTuringBizNames')
self.parser.add_option('', '--tabletInfos', action='store', dest='tabletInfos')
self.parser.add_option('', '--basicTuringPrefix', action='store', dest='basicTuringPrefix')
self.parser.add_option('-w', '--paraSearchWays', action='store', dest='paraSearchWays')
self.parser.add_option('-S', '--disableSql', action='store_true', dest='disableSql', default=False)
self.parser.add_option('-W', '--disableSqlWarmup', action='store_true', dest='disableSqlWarmup', default=False)
self.parser.add_option('', '--qrsExtraQueue', action='store', dest='qrsQueue')
self.parser.add_option('', '--searcherExtraQueue', action='store', dest='searcherQueue')
self.parser.add_option('', '--searcherThreadNum', action='store', dest='searcherThreadNum', type='int')
self.parser.add_option('', '--searcherQueueSize', action='store', dest='searcherQueueSize', type='int')
self.parser.add_option('', '--naviThreadNum', action='store', dest='naviThreadNum', type='int')
self.parser.add_option('', '--threadNumScaleFactor', action='store', dest='threadNumScaleFactor', type='float')
self.parser.add_option('', '--qrsThreadNum', action='store', dest='qrsThreadNum', type='int')
self.parser.add_option('', '--qrsQueueSize', action='store', dest='qrsQueueSize', type='int')
self.parser.add_option('-d', '--dailyrunMode', action='store_true', dest='dailyrunMode', default=False)
self.parser.add_option(
'',
'--enableMultiPartition',
action='store_true',
dest='enableMultiPartition',
default=False)
self.parser.add_option('', '--enableLocalAccess', action='store_true', dest='enableLocalAccess', default=False)
self.parser.add_option('', '--onlySql', action='store_true', dest='onlySql', default=False)
self.parser.add_option(
'',
'--enableLocalCatalog',
action='store_true',
dest='enableLocalCatalog',
default=False)
self.parser.add_option(
'',
'--enableUpdateCatalog',
action='store_true',
dest='enableUpdateCatalog',
default=False)
self.parser.add_option('-m', '--multiBiz', action='store_true', dest='multiBiz')
self.parser.add_option('-M', '--modelBiz', action='store', dest='modelBiz', default='')
self.parser.add_option('-L', '--localBizService', action='store_true', dest='localBizService')
# xxxx://invalid/lhubic/wuhf65/aecyg6
self.parser.add_option('', '--kmonSinkAddress', action='store', dest='kmonSinkAddress',
default='127.0.0.1')
self.parser.add_option('', '--specialCatalogList', action='store', dest='specialCatalogList')
self.parser.add_option('', '--zk_root', action='store', dest='zkRoot', default='LOCAL')
self.parser.add_option('', '--mode', action='store', dest='mode', default='rw')
self.parser.add_option('', '--leader_election_strategy_type', action='store', dest='leaderElectionStrategyType')
self.parser.add_option('', '--leader_election_config', action='store', dest='leaderElectionConfig')
self.parser.add_option('', '--version_sync_config', action='store', dest='versionSyncConfig')
self.parser.add_option('', '--searcherReplica', type=int, dest='searcherReplica', default=1)
self.parser.add_option('', '--disableCodeGen', action='store_true', dest='disableCodeGen', default=False)
self.parser.add_option('', '--disableTurbojet', action='store_true', dest='disableTurbojet', default=False)
self.parser.add_option('', '--searcherSubscribeConfig', action='store', dest='searcherSubscribeConfig', )
self.parser.add_option(
'',
'--searcherLocalSubscribe',
action='store_true',
dest='searcherLocalSubscribe',
default=False)
self.parser.add_option(
'',
'--enablePublishTableTopoInfo',
action='store_true',
dest='enablePublishTableTopoInfo',
default=False)
self.parser.add_option('', '--force_tablet_load', action='store_true', dest='forceTabletLoad', default=False)
self.parser.add_option('', '--allow_follow_write', action='store_true', dest='allowFollowWrite', default=False)
def parseParams(self, optionList):
self.optionList = optionList
self.addOptions()
(options, args) = self.parser.parse_args(optionList)
self.options = options
ret = self.checkOptionsValidity(options)
if not ret:
print "ERROR: checkOptionsValidity Failed!"
return False
self.initMember(options)
return True
def checkOptionsValidity(self, options):
if options.indexPath is None or options.indexPath == '':
print "ERROR: index path must be specified"
return False
if options.configPath is None or options.configPath == '':
print "ERROR: config path must be specified"
return False
if options.multiBiz is None or options.multiBiz == '':
self.enableMultiBiz = False
else:
self.enableMultiBiz = True
return True
def initMember(self, options):
self.searcherReplica = options.searcherReplica
self.qrsQueue = options.qrsQueue
self.searcherQueue = options.searcherQueue
self.qrsThreadNum = options.qrsThreadNum
self.qrsQueueSize = options.qrsQueueSize
self.searcherThreadNum = options.searcherThreadNum
self.naviThreadNum = options.naviThreadNum
self.searcherQueueSize = options.searcherQueueSize
self.threadNumScaleFactor = options.threadNumScaleFactor
self.indexPath = options.indexPath
self.aggName = options.aggName
self.basicTuringBizNames = options.basicTuringBizNames
self.tabletInfos = options.tabletInfos
self.basicTuringPrefix = options.basicTuringPrefix
self.paraSearchWays = options.paraSearchWays
self.disableSql = options.disableSql
self.disableSqlWarmup = options.disableSqlWarmup
self.enableMultiPartition = options.enableMultiPartition
self.enableLocalAccess = options.enableLocalAccess
self.onlySql = options.onlySql
self.enableLocalCatalog = options.enableLocalCatalog
self.enableUpdateCatalog = options.enableUpdateCatalog
self.dailyrunMode = options.dailyrunMode
self.multiBiz = options.multiBiz
self.modelBiz = set(options.modelBiz.split(','))
self.localBizService = options.localBizService
self.kmonSinkAddress = options.kmonSinkAddress
self.specialCatalogList = options.specialCatalogList
self.zkRoot = options.zkRoot
self.mode = options.mode
self.leaderElectionStrategyType = options.leaderElectionStrategyType
self.leaderElectionConfig = options.leaderElectionConfig
self.versionSyncConfig = options.versionSyncConfig
self.disableCodeGen = options.disableCodeGen
self.disableTurbojet = options.disableTurbojet
self.searcherLocalSubscribe = options.searcherLocalSubscribe
self.enablePublishTableTopoInfo = options.enablePublishTableTopoInfo
self.forceTabletLoad = options.forceTabletLoad
self.allowFollowWrite = options.allowFollowWrite
if not self.indexPath.startswith('/'):
self.indexPath = os.path.join(os.getcwd(), self.indexPath)
self.configPath = options.configPath
if not self.configPath.startswith('/'):
self.configPath = os.path.join(os.getcwd(), self.configPath)
self.onlineConfigPath = os.path.join(self.configPath, "bizs")
self.offlineConfigPath = os.path.join(self.configPath, "table")
tableVersions = sorted(map(lambda x: int(x), os.listdir(self.offlineConfigPath)))
if len(tableVersions) == 0:
print "table version count is 0, path [%s]" % self.offlineConfigPath
else:
self.offlineConfigPath = os.path.join(self.offlineConfigPath, str(tableVersions[-1]))
self.qrsHttpArpcBindPort = options.qrsHttpArpcBindPort
self.qrsArpcBindPort = options.qrsArpcBindPort
self.portList = []
self.origin_port_list = map(lambda x: int(x), options.portList.split(","))
self.searcher_port_list = []
self.qrs_port_list = None
self.portStart = 12000
if len(self.portList) > 0:
self.portStart = int(self.portList[0])
else:
self.portList = [12000]
if options.zoneName:
self.specialZones = options.zoneName.split(",")
else:
self.specialZones = []
self.atableList = {}
for zone in self.specialZones:
self.atableList[zone] = ""
if options.atables:
azones = options.atables.split(";")
for azone in azones:
tmp = azone.split(":")
self.atableList[tmp[0]] = tmp[1]
if options.binaryPath:
self.binaryPath = options.binaryPath
if not options.binaryPath.startswith('/'):
self.binaryPath = os.path.join(os.getcwd(), options.binaryPath)
else:
curdir = os.path.split(os.path.realpath(__file__))[0]
self.binaryPath = os.path.join(curdir, "../../../../../../")
self.binaryPath = os.path.abspath(self.binaryPath)
self.timeout = options.timeout
self.preload = options.preload
self.serviceName = options.serviceName
self.amonPath = options.amonPath
self.libPath = "/usr/ali/java/jre/lib/amd64/server:/usr/local/java/jdk/jre/lib/amd64/server:" + self.binaryPath + "/home/admin/sap/lib64/:" + self.binaryPath + "/home/admin/diamond-client4cpp/lib:" + self.binaryPath + "/home/admin/eagleeye-core/lib/:" + self.binaryPath + "/usr/local/cuda-10.1/lib64/stubs:" + \
self.binaryPath + "/usr/local/cuda-10.1/lib64/:" + self.binaryPath + "/usr/local/lib64:" + self.binaryPath + "/usr/lib:" + self.binaryPath + "/usr/lib64:" + self.binaryPath + "/usr/local/lib:" + "/home/admin/isearch5_data/AliWS-1.4.0.0/usr/local/lib:" + "/usr/local/lib64:"
self.binPath = self.binaryPath + "/home/admin/sap/bin/:" + self.binaryPath + "/usr/local/bin/:" + \
self.binaryPath + "/usr/bin/:" + self.binaryPath + "/bin/:" + "/usr/local/bin:/usr/bin:/bin"
self.startCmdTemplate = "/bin/env HIPPO_DP2_SLAVE_PORT=19715"
if self.preload:
if self.preload == "asan":
self.startCmdTemplate += " LSAN_OPTIONS=\"suppressions=%s/usr/local/etc/sql/leak_suppression\" ASAN_OPTIONS=\"detect_odr_violation=0 abort_on_error=1 detect_leaks=1\"" % (
self.binaryPath)
elif self.preload == "llalloc":
self.startCmdTemplate += " LD_PRELOAD=%s/usr/local/lib64/libllalloc.so" % self.binaryPath
else:
self.startCmdTemplate += " LD_PRELOAD=%s" % self.preload
self.startCmdTemplate += " JAVA_HOME=/usr/local/java HADOOP_HOME=/usr/local/hadoop/hadoop CLASSPATH=/usr/local/hadoop/hadoop/etc/hadoop:/usr/local/hadoop/hadoop/share/hadoop/common/lib/*:/usr/local/hadoop/hadoop/share/hadoop/common/*:/usr/local/hadoop/hadoop/share/hadoop/hdfs:/usr/local/hadoop/hadoop/share/hadoop/hdfs/lib/*:/usr/local/hadoop/hadoop/share/hadoop/hdfs/*:/usr/local/hadoop/hadoop/share/hadoop/yarn/lib/*:/usr/local/hadoop/hadoop/share/hadoop/yarn/*:/usr/local/hadoop/hadoop/share/hadoop/mapreduce/lib/*:/usr/local/hadoop/hadoop/share/hadoop/mapreduce/*:/usr/local/hadoop/hadoop/contrib/capacity-scheduler/*.jar "
if self.dailyrunMode:
self.startCmdTemplate += " DAILY_RUN_MODE=true"
self.startCmdTemplate += " IS_TEST_MODE=true"
if self.disableCodeGen:
self.startCmdTemplate += " DISABLE_CODEGEN=true"
if self.disableTurbojet:
self.startCmdTemplate += " DISABLE_TURBOJET=true"
if self.enablePublishTableTopoInfo:
self.startCmdTemplate += " ENABLE_PUBLISH_TABLE_TOPO_INFO=true"
self.startCmdTemplate += " HIPPO_APP_INST_ROOT=" + self.binaryPath + \
" HIPPO_APP_WORKDIR=" + os.getcwd() + " TJ_RUNTIME_TEMP_DIR=" + self.binaryPath
self.startCmdTemplate += " PATH=$JAVA_HOME/bin:%s LD_LIBRARY_PATH=%s ha_sql --env roleType=%s -l %s -r %s -c %s --port arpc:%d --port http:%d --env httpPort=%d --env gigGrpcPort=0 --env serviceName=%s --env amonitorPath=%s/%s --env port=%d --env ip=%s --env userName=admin --env decodeUri=true --env haCompatible=true --env zoneName=%s --env roleName=%s_partition_%d --env partId=0 --env decisionLoopInterval=10000 --env dpThreadNum=1 --env loadThreadNum=4 --env load_biz_thread_num=4 --env kmonitorNormalSamplePeriod=1 --env naviPoolModeAsan=1 --env naviDisablePerf=1 --env WORKER_IDENTIFIER_FOR_CARBON= --env gigGrpcThreadNum=5 --env GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS=500 --env FAST_CLEAN_INC_VERSION=false --env navi_config_loader=%s"
if self.localBizService:
self.startCmdTemplate += " --env localBizService=true"
self.alogConfigPath = os.path.join(self.binaryPath, "usr/local/etc/sql/sql_alog.conf")
self.searchCfg = os.path.join(self.binaryPath, "usr/local/etc/sql/search_server.cfg")
self.qrsCfg = os.path.join(self.binaryPath, "usr/local/etc/sql/qrs_server.cfg")
self.config_loader = os.path.join(self.binaryPath,
"usr/local/lib/python/site-packages/sql/sql_config_loader.py")
self.ip = socket.gethostbyname(socket.gethostname())
self.gigInfos = {}
cwd = os.getcwd()
self.localSearchDir = os.path.join(cwd, "local_search_%d" % self.portStart)
if not os.path.exists(self.localSearchDir):
os.system("mkdir %s" % self.localSearchDir)
binarySymbol = "%s/binary" % (self.localSearchDir)
if not os.path.exists(binarySymbol):
try:
os.symlink(self.binaryPath, binarySymbol)
except OSError as e:
if e.errno == errno.EEXIST:
os.remove(binarySymbol)
os.symlink(self.binaryPath, binarySymbol)
else:
raise e
self.pidFile = os.path.join(self.localSearchDir, "pid")
self.portFile = os.path.join(self.localSearchDir, "ports")
self.qrsPortFile = os.path.join(cwd, 'qrs_port')
self.searcherPortFile = os.path.join(cwd, 'searcher_port')
self.start_time = datetime.now()
self.targetVersion = 1651870394
def run(self):
tryTimes = 1
ret = -1
status = None
while tryTimes > 0:
tryTimes -= 1
ret, status = self.start_once()
if ret == 1:
# retry
continue
else:
break
if ret == 0:
if self.qrs_port_list:
with open(self.qrsPortFile, 'w') as f:
f.write(json.dumps(self.__format_port_list(self.qrs_port_list), indent=4))
if len(self.searcher_port_list) > 0:
port_map = [(x.role, self.__format_port_list(x.ports)) for x in self.searcher_port_list]
with open(self.searcherPortFile, 'w') as f:
f.write(json.dumps(port_map, indent=4))
return status
def __format_port_list(self, port_list):
return {
'http_arpc_port': port_list[0],
'arpc_port': port_list[1],
'grpc_port': port_list[2]
}
def start_once(self):
terminator = TimeoutTerminator(self.timeout)
searcherTargetInfos = None
if not self.stopAll():
return -1, ("", "stop exist pid failed", -1)
if not self.enableLocalAccess:
searcher_ret, searcherTargetInfos = self.startSearcher()
if searcher_ret != 0:
return searcher_ret, ("", "start searcher failed", -1)
else:
zoneNames = self._getNeedStartZoneName()
if len(zoneNames) > 1:
return -1, ("", "local access mode only support one zone, now zone names " + str(zoneNames), -1)
qrs_ret = self.startQrs()
if qrs_ret != 0:
return qrs_ret, ("", "start qrs failed", -1)
if not self.enableLocalAccess:
ret = self.loadSearcherTarget(searcherTargetInfos, terminator.left_time())
if ret != 0:
return ret, ("", "load searcher target failed", -1)
if self.searcherLocalSubscribe:
ret = self.subscribeLocalSearchService()
if ret != 0:
return ret, ("", "add local subscribe failed", -1)
ret = self.loadQrsTarget(terminator.left_time())
if ret != 0:
return ret, ("", "load qrs target failed", -1)
return 0, ("", "", 0)
def getLocalSubscribeConfig(self, zoneName, grpcPort):
bizName = zoneName + ".default_sql"
local_config = {
"part_count": 1,
"biz_name": bizName,
"version": 123,
"part_id": 0,
"ip": socket.gethostbyname(socket.gethostname()),
"support_heartbeat": True,
"grpc_port": int(grpcPort)
}
return local_config
def subscribeLocalSearchService(self):
targetInfos = self._genTargetInfos(self._getNeedStartZoneName(), self.searcherReplica)
subscribeConfig = {
"local": []
}
idx = 0
for needSubscribeInfo in targetInfos:
grpcPort = self._getSearcherPortList(idx)[2]
idx += 1
zoneName = needSubscribeInfo[0]
local_config = self.getLocalSubscribeConfig(zoneName, grpcPort)
subscribeConfig["local"].append(local_config)
for targetInfo in targetInfos:
target = targetInfo[3]
target['service_info']['cm2_config'] = subscribeConfig
terminator = TimeoutTerminator(self.timeout)
ret = self.loadSearcherTarget(targetInfos, terminator.left_time())
return ret
def stopAll(self, timeout=120):
terminator = TimeoutTerminator(timeout)
needKillPids = []
if os.path.exists(self.pidFile):
f = open(self.pidFile)
for line in f.readlines():
line = line.strip('\n')
parts = line.split(" ")
if len(parts) >= 2:
pids = self.getPids(parts[1])
if int(parts[0]) in pids:
needKillPids.append((int(parts[0]), parts[1]))
# print needKillPids
if len(needKillPids) == 0:
return True
for (pid, port) in needKillPids:
cmd = "kill -9 %s" % pid
print "clean exist process, cmd [%s]" % cmd
os.system(cmd)
while True:
if terminator.is_timeout():
return False
allKilled = True
defunctPids = self.getDefunctPids()
for (pid, rundir) in needKillPids:
if pid in defunctPids:
allKilled = False
pids = self.getPids(rundir)
if len(pids) != 0:
allKilled = False
if allKilled:
cmd = "rm %s" % self.pidFile
print "remove pid file [%s]" % self.pidFile
os.system(cmd)
break
time.sleep(0.1)
return True
def startSearcher(self):
zoneNames = self._getNeedStartZoneName()
targetInfos = self._genTargetInfos(zoneNames, replica=self.searcherReplica)
if not self._startSearcher(targetInfos):
print "start searcher with table info failed: ", json.dumps(targetInfos)
return -1, None
print "wait searcher target load"
return 0, targetInfos
def startQrs(self):
if not self._startQrs():
return -1
print "wait qrs target load"
return 0
def curl(self, address, method, request, curl_type='POST', timeout=20):
terminator = TimeoutTerminator(timeout)
while True:
try:
timeout = terminator.left_time()
conn = httplib.HTTPConnection(address, timeout=timeout)
conn.request(curl_type, method, json.dumps(request))
conn.sock.settimeout(10 if timeout > 10 else timeout)
r = conn.getresponse()
data = r.read()
retCode = 0
if r.status != 200:
retCode = -1
return retCode, data, r.reason, r.status
except Exception as e:
if terminator.is_timeout():
return -1, '', str(e), 418
def createRuntimedirLink(self, zoneDirName):
zoneDir = os.path.join(self.localSearchDir, zoneDirName)
if not os.path.exists(zoneDir):
os.makedirs(zoneDir)
runtimeDir = os.path.join(zoneDir, "runtimedata")
if not os.path.exists(runtimeDir):
try:
os.symlink(self.indexPath, runtimeDir)
except Exception as e:
print "create link failed, src %s, dst %s, e[%s]" % (self.indexPath, runtimeDir, str(e))
raise e
return runtimeDir
def genOnlineConfigPath(self, config, biz):
onlineConfig = os.path.join(self.onlineConfigPath, biz)
configVersions = sorted(map(lambda x: int(x), os.listdir(onlineConfig)))
if len(configVersions) == 0:
print "config version count is 0, path [%s]" % onlineConfig
else:
onlineConfig = os.path.join(onlineConfig, str(configVersions[-1]))
return onlineConfig
def createConfigLink(self, zoneName, prefix, bizName, config):
pos = config.rfind('/')
version = config[pos + 1:]
rundir = os.path.join(self.localSearchDir, zoneName)
bizConfigDir = os.path.join(rundir, "zone_config", prefix, bizName)
if not os.path.exists(bizConfigDir):
os.makedirs(bizConfigDir)
fakeConfigPath = os.path.join(bizConfigDir, version)
try:
os.path.exists(fakeConfigPath) and shutil.rmtree(fakeConfigPath)
cmd = 'cp -r "{}" "{}"'.format(config, fakeConfigPath)
os.system(cmd)
except Exception as e:
print "copy config dir failed, src %s, dst %s, e[%s]" % (config, fakeConfigPath, str(e))
raise e
doneFile = os.path.join(fakeConfigPath, "suez_deploy.done")
if not os.path.exists(doneFile):
open(doneFile, 'a').close()
return config
def loadQrsTarget(self, timeout=300):
terminator = TimeoutTerminator(timeout)
bizs = os.listdir(self.onlineConfigPath)
bizInfo = {}
if self.enableMultiBiz:
for biz in bizs:
onlineConfig = self.genOnlineConfigPath(self.onlineConfigPath, biz)
bizInfo[biz] = {
"config_path": self.createConfigLink('qrs', 'biz', biz, onlineConfig)
}
if biz in self.modelBiz:
bizInfo[biz]["custom_biz_info"] = {
"biz_type": "model_biz"
}
else:
onlineConfig = self.genOnlineConfigPath(self.onlineConfigPath, bizs[0])
bizInfo['default'] = {
"config_path": self.createConfigLink('qrs', 'biz', 'default', onlineConfig)
}
tableInfos = {}
zoneName = "qrs"
portList = self.getQrsPortList()
httpArpcPort = portList[0]
arpcPort = portList[1]
grpcPort = portList[2]
address = "%s:%d" % (self.ip, httpArpcPort)
arpcAddress = "%s:%d" % (self.ip, arpcPort)
if self.enableLocalAccess:
zoneNames = self._getNeedStartZoneName()
targetInfos = self._genTargetInfos(zoneNames, 1, True)
tableInfos = targetInfos[0][3]["table_info"]
zoneName = zoneNames[0]
local_sub_config = self.gigInfos.values()
if self.enableLocalAccess:
local_sub_config = [self.getLocalSubscribeConfig("qrs", grpcPort)]
# add default subscribe config for external table case
gigInfos = self.gigInfos
externalTableConfig = {}
externalTableConfig["biz_name"] = "default.external"
externalTableConfig["part_count"] = 1
externalTableConfig["part_id"] = 0
externalTableConfig["version"] = 0
externalTableConfig["ip"] = self.ip
externalTableConfig["tcp_port"] = arpcPort
if grpcPort != 0:
externalTableConfig["grpc_port"] = grpcPort
externalTableConfig["support_heartbeat"] = False
local_sub_config.append(externalTableConfig)
target = {
"service_info": {
"cm2_config": {
"local": local_sub_config
},
"part_count": 1,
"part_id": 0,
"zone_name": zoneName
},
"biz_info": bizInfo,
"table_info": tableInfos,
"clean_disk": False,
"catalog_address": arpcAddress,
"target_version": self.targetVersion
}
targetStr = ''
targetStr = json.dumps(target)
requestSig = targetStr
globalInfo = {"customInfo": targetStr}
targetRequest = {"signature": requestSig,
"customInfo": targetStr,
"globalCustomInfo": json.dumps(globalInfo)
}
lastRespSignature = ""
while True:
timeout = terminator.left_time()
if timeout <= 0:
break
log_file = os.path.join(self.localSearchDir, "qrs", 'logs/sql.log')
log_state = self.check_log_file(log_file)
if log_state != 0:
return log_state
retCode, out, err, status = self.curl(
address, "/HeartbeatService/heartbeat", targetRequest, timeout=timeout)
if retCode != 0: # qrs core
print "set qrs target [{}] failed, address[{}] ret[{}] out[{}] err[{}] status[{}] left[{}]s".format(targetStr, address, retCode, out, err, status, terminator.left_time())
return -1
response = json.loads(out)
if "signature" not in response:
print "set qrs target response invalid [{}], continue...".format(out)
continue
lastRespSig = response["signature"]
if lastRespSig == requestSig:
print "start local search success\nqrs is ready for search, http arpc grpc port: %s %s %s" % (httpArpcPort, arpcPort, grpcPort)
if self.enableLocalAccess:
time.sleep(1)
return 0
time.sleep(0.1)
print 'load qrs target timeout [{}]s left[{}]s resp[{}] request[{}]'.format(terminator.raw_timeout(), terminator.left_time(), lastRespSig, requestSig)
return -1
def check_log_file(self, log_file):
if os.path.isfile(log_file):
expectLog = '\'has been stopped\''
cmd = 'grep %s %s' % (expectLog, log_file)
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
out, err = p.communicate()
splited = filter(bool, out.split('\n'))
if self.has_log(expectLog, splited):
return 1
expectLog = '\'initBiz failed\''
cmd = 'grep %s %s' % (expectLog, log_file)
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
out, err = p.communicate()
splited = filter(bool, out.split('\n'))
if self.has_log(expectLog, splited):
return -1
return 0
def has_log(self, pattern, log_array):
for line in reversed(log_array):
if pattern in line:
return self.valid_log(line)
return False
def valid_log(self, content):
splited = filter(bool, content.split(']'))
if len(splited) < 2:
return False
timestamp = splited[0]
if timestamp[0] != '[':
return False
timestamp = timestamp[1:]
log_time = datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S.%f')
return (log_time - self.start_time).total_seconds() > 0
def loadSearcherTarget(self, targetInfos, timeout=300):
self.gigInfos = {}
terminator = TimeoutTerminator(timeout)
readyTarget = set()
while True:
timeout = terminator.left_time()
if timeout <= 0:
break
count = 0
for targetInfo in targetInfos:
portList = self._getSearcherPortList(count)
count += 1
zoneName = targetInfo[0]
partId = targetInfo[1]
replicaId = targetInfo[2]
roleName = self.genRoleName(targetInfo)
if roleName in readyTarget:
continue
target = targetInfo[3]
if self.options.searcherSubscribeConfig:
target['service_info']['cm2_config'] = json.loads(self.options.searcherSubscribeConfig)
httpArpcPort = portList[0]
arpcPort = portList[1]
grpcPort = portList[2]
qrsArpcPort = self.getQrsPortList()[1]
arpcAddress = "%s:%d" % (self.ip, qrsArpcPort)
target["catalog_address"] = arpcAddress
target["target_version"] = self.targetVersion
targetStr = json.dumps(target)
requestSig = targetStr
globalInfo = {"customInfo": targetStr}
targetRequest = {"signature": requestSig,
"customInfo": targetStr,
"globalCustomInfo": json.dumps(globalInfo)
}
log_file = os.path.join(self.localSearchDir, roleName, 'logs/sql.log')
log_state = self.check_log_file(log_file)
if log_state != 0:
return log_state
address = "%s:%d" % (self.ip, httpArpcPort)
retCode, out, err, status = self.curl(address, "/HeartbeatService/heartbeat",
targetRequest, timeout=timeout)
if retCode != 0: # binary core
print "set searcher target [{}] failed. role[{}] address[{}] ret[{}] out[{}] err[{}] status[{}] left[{}]s".format(
targetRequest, roleName, address, retCode, out, err, status, terminator.left_time())
return -1
response = json.loads(out)
infos = []
if "signature" not in response:
print "set searcher target response invalid [{}] role [{}], continue...".format(out, roleName)
continue
if response["signature"] == requestSig:
serviceInfo = json.loads(response["serviceInfo"])
infos = serviceInfo["cm2"]["topo_info"].strip('|').split('|')
for info in infos:
splitInfo = info.split(':')
localConfig = {}
localConfig["biz_name"] = splitInfo[0]
localConfig["part_count"] = int(splitInfo[1])
localConfig["part_id"] = int(splitInfo[2])
localConfig["version"] = int(splitInfo[3])
localConfig["ip"] = self.ip
localConfig["tcp_port"] = arpcPort
if grpcPort != 0:
localConfig["grpc_port"] = grpcPort
localConfig["support_heartbeat"] = True
gigKey = roleName + "_" + splitInfo[0] + "_" + str(splitInfo[2])
self.gigInfos[gigKey] = localConfig
readyTarget.add(roleName)
print "searcher [%s] is ready for search, topo [%s]" % (roleName, json.dumps(localConfig))
if len(targetInfos) == len(readyTarget):
print "all searcher is ready."
return 0
time.sleep(0.1)
print 'load searcher [{}] target [{}] timeout [{}]s left [{}]s readyTarget[{}]'.format(
zoneName,
targetStr,
terminator.raw_timeout(),
terminator.left_time(),
readyTarget)
return -1
def _startQrs(self):
zoneName = "qrs"
partId = 0
rundir = os.path.join(self.localSearchDir, zoneName)
if not os.path.exists(rundir):
os.system("mkdir %s" % rundir)
targetCfg = os.path.join(rundir, "qrs_service_%d.cfg" % (self.portStart))
os.system("cp %s %s" % (self.qrsCfg, targetCfg))
startCmd = self.startCmdTemplate % (self.binPath, self.libPath, "qrs", self.alogConfigPath,
self.binaryPath, targetCfg, 0, 0,
self.qrsHttpArpcBindPort, self.serviceName, self.amonPath,
zoneName, self.qrsArpcBindPort, self.ip, zoneName, zoneName,
partId, self.config_loader)
startCmd += " --env LIBHDFS_OPTS=-Xrs "
startCmd += " --env JAVA_TOOL_OPTIONS=-Djdk.lang.processReaperUseDefaultStackSize=true "
if self.qrsQueue:
startCmd += " --env extraTaskQueues=" + self.qrsQueue
if self.qrsQueueSize:
startCmd += " --env queueSize=" + str(self.qrsQueueSize)
if self.qrsThreadNum:
startCmd += " --env threadNum=" + str(self.qrsThreadNum)
if self.naviThreadNum:
startCmd += " --env naviThreadNum=" + str(self.naviThreadNum)
if self.threadNumScaleFactor:
startCmd += " --env threadNumScaleFactor=" + str(self.threadNumScaleFactor)
if self.aggName:
startCmd += " --env defaultAgg=" + self.aggName
if self.basicTuringBizNames:
startCmd += " --env basicTuringBizNames=" + self.basicTuringBizNames
if self.basicTuringPrefix:
startCmd += " --env basicTuringPrefix=" + self.basicTuringPrefix
if self.paraSearchWays:
startCmd += " --env paraSearchWays=" + self.paraSearchWays
if self.disableSql:
startCmd += " --env disableSql=true"
if self.disableSqlWarmup:
startCmd += " --env disableSqlWarmup=true"
if self.kmonSinkAddress:
startCmd += " --env kmonitorSinkAddress=" + self.kmonSinkAddress
if self.specialCatalogList:
startCmd += " --env specialCatalogList=" + str(self.specialCatalogList)
if self.forceTabletLoad:
startCmd += " --env force_tablet_load=true"
if self.enableLocalAccess:
startCmd += " --env enableLocalAccess=true"
startCmd += " --env rewriteLocalBizNameType=sql"
if self.enableMultiPartition:
startCmd += " --env enableMultiPartition=true"
if self.enableUpdateCatalog and not self.disableSql:
startCmd += " --env enableUpdateCatalog=true"
if self.enableLocalCatalog and not self.disableSql:
startCmd += " --env enableLocalCatalog=true"
if self.onlySql:
startCmd += " --env onlySql=true"
if (self.tabletInfos):
if self.mode:
startCmd += " --env mode=" + self.mode
if self.zkRoot:
startCmd += " --env zk_root=" + self.zkRoot
if self.leaderElectionStrategyType:
startCmd += " --env leader_election_strategy_type=" + self.leaderElectionStrategyType
if self.leaderElectionConfig:
startCmd += " --env leader_election_config=" + "'" + self.leaderElectionConfig + "'"
if self.versionSyncConfig:
startCmd += " --env version_sync_config=" + "'" + self.versionSyncConfig + "'"
startCmd += ' -d -n 1>>%s 2>>%s ' % (os.path.join(self.localSearchDir, "qrs.stdout.out"),
os.path.join(self.localSearchDir, "qrs.stderr.out"))
os.chdir(rundir)
print "start qrs cmd: %s" % startCmd
os.system(startCmd)
time.sleep(0.1)
terminator = TimeoutTerminator(5)
while not terminator.is_timeout():
pids = self.getPids(rundir)
if len(pids) == 1:
break
time.sleep(0.1)
if len(pids) != 1:
print "start qrs process failed, cmd [%s]" % startCmd
return False
else:
print "start qrs process success, pid [%d]" % pids[0]
pid = pids[0]
f = open(self.pidFile, 'a+')
f.write("%d %s\n" % (pid, rundir))
f.close()
self.wait_load(rundir)
http_port, arpc_port, grpc_port = self.get_listen_ports(rundir)
self.qrs_port_list = (http_port, arpc_port, grpc_port)
with open(self.portFile, 'w') as portFile:
portFile.write('%s %s\n' % (http_port, arpc_port))
return True
def _startSearcher(self, targetInfos):
count = 0
f = open(self.pidFile, 'w')
httpArpcPort_list = []
for targetInfo in targetInfos:
zoneName = targetInfo[0]
partId = targetInfo[1]
replicaId = targetInfo[2]
roleName = self.genRoleName(targetInfo)
rundir = os.path.join(self.localSearchDir, roleName)
if not os.path.exists(rundir):
os.system("mkdir %s" % rundir)
targetCfg = os.path.join(rundir, zoneName + "_%d_search_service_%d.cfg" % (partId, self.portStart))
cmd = "cp %s %s" % (self.searchCfg, targetCfg)
print cmd
os.system("cp %s %s" % (self.searchCfg, targetCfg))
kmonServiceName = self.serviceName
if '^' in self.serviceName:
# override tags['zone'], tags['role'], tags['host']
kmonServiceName = self.serviceName + '@zone^{}@role^{}@host^{}'.format(zoneName, partId, roleName)
startCmd = self.startCmdTemplate % (self.binPath, self.libPath, "searcher", self.alogConfigPath,
self.binaryPath, targetCfg, 0, 0, 0, kmonServiceName, self.amonPath,
zoneName, 0, self.ip, zoneName, zoneName, partId, self.config_loader)
if self.searcherQueue:
startCmd += " --env extraTaskQueues=" + self.searcherQueue
if self.searcherQueueSize:
startCmd += " --env queueSize=" + str(self.searcherQueueSize)
if self.threadNumScaleFactor:
startCmd += " --env threadNumScaleFactor=" + str(self.threadNumScaleFactor)
if self.searcherThreadNum:
startCmd += " --env threadNum=" + str(self.searcherThreadNum)
if self.naviThreadNum:
startCmd += " --env naviThreadNum=" + str(self.naviThreadNum)
if self.aggName:
startCmd += " --env defaultAgg=" + self.aggName
if self.paraSearchWays:
startCmd += " --env paraSearchWays=" + self.paraSearchWays
if self.basicTuringBizNames:
startCmd += " --env basicTuringBizNames=" + self.basicTuringBizNames
if self.kmonSinkAddress:
startCmd += " --env kmonitorSinkAddress=" + self.kmonSinkAddress
if self.enableMultiPartition:
startCmd += " --env enableMultiPartition=true"
if self.forceTabletLoad:
startCmd += " --env force_tablet_load=true"
if self.allowFollowWrite:
startCmd += " --env ALLOW_FOLLOWER_WRITE=true"
if self.enableLocalAccess:
startCmd += " --env enableLocalAccess=true"
else:
if self.enableUpdateCatalog and not self.disableSql:
startCmd += " --env enableUpdateCatalog=true"
if self.disableSql:
startCmd += " --env disableSql=true"
if self.onlySql:
startCmd += " --env onlySql=true"
if (self.tabletInfos):
if self.mode:
startCmd += " --env mode=" + self.mode
if self.zkRoot:
startCmd += " --env zk_root=" + self.zkRoot
if self.leaderElectionStrategyType:
startCmd += " --env leader_election_strategy_type=" + self.leaderElectionStrategyType
if self.leaderElectionConfig:
startCmd += " --env leader_election_config=" + "'" + self.leaderElectionConfig + "'"
if self.versionSyncConfig:
startCmd += " --env version_sync_config=" + "'" + self.versionSyncConfig + "'"
startCmd += ' -d -n 1>>%s 2>>%s ' % (os.path.join(self.localSearchDir,
"{}.stdout.out".format(roleName)),
os.path.join(self.localSearchDir,
"{}.stderr.out".format(roleName)))
os.chdir(rundir)
print "start searcher cmd: %s" % startCmd
os.system(startCmd)
httpArpcPort_list.append((rundir, roleName, startCmd))
count = count + 1
time.sleep(0.1)
terminator = TimeoutTerminator(10)
while not terminator.is_timeout():
start = True
for info in httpArpcPort_list:
rundir = info[0]
pids = self.getPids(rundir)
if len(pids) != 1:
start = False
if start:
break
time.sleep(0.1)
if terminator.is_timeout():
print "start searcher [%s] timeout, cmd [%s], rundir[%s]" % (roleName, startCmd, rundir)
for info in httpArpcPort_list:
rundir = info[0]
roleName = info[1]
startCmd = info[2]
pids = self.getPids(rundir)
print pids
if len(pids) != 1:
print "start searcher process [%s] failed, pids [%s] len(pid) != 1, cmd [%s], rundir[%s]" % (roleName, ','.join(pids), startCmd, rundir)
f.close()
return False
else:
print "start searcher process [%s] success, pid [%d]" % (roleName, pids[0])
pid = pids[0]
f.write("%d %s\n" % (pid, rundir))
if not self.wait_load(rundir):
print "wait load failed [{}]".format(rundir)
return False
http_port, arpc_port, grpc_port = self.get_listen_ports(rundir)
item = PortListItem()
item.ports = (http_port, arpc_port, grpc_port)
item.role = roleName
self.searcher_port_list.append(item)
f.close()
return True
def wait_load(self, rundir, timeout=60):
terminator = TimeoutTerminator(timeout)
while True:
timeout = terminator.left_time()
if timeout <= 0:
break
time.sleep(0.1)
heartbeatTarget = 'HeartbeatManager.cpp -- getTarget'
cmd = 'grep "{}" {}/logs/sql.log | tail -1'.format(heartbeatTarget, rundir)
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
out, err = p.communicate()
out = out.strip()
if out:
if self.valid_log(out):
sapTarget = '\"initWatchThread sucess\"' # wait sap server ready
cmd = 'grep %s %s/logs/sql.log | tail -1' % (sapTarget, rundir)
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
out, err = p.communicate()
if self.valid_log(out):
return True
print "wait load timeout [{}]s left[{}]s start[{}] for [{}]".format(terminator.raw_timeout(),
terminator.left_time(),
terminator.start_str(),
rundir)
return False
def get_listen_ports(self, rundir):
return self.get_listen_http_arpc_ports(rundir), self.get_listen_arpc_ports(
rundir), self.get_listen_grpc_ports(rundir)
def get_listen_arpc_ports(self, rundir):
grpcListenKeyword = 'gigRpcServer initArpcServer success, arpc listen port'
cmd = 'grep "{}" {}/logs/sql.log | tail -1'.format(grpcListenKeyword, rundir)
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
out, err = p.communicate()
out = out.strip()
if out:
if self.valid_log(out):
return int(out.split('listen port [')[1].split(']')[0])
return 0
def get_listen_http_arpc_ports(self, rundir):
grpcListenKeyword = 'gigRpcServer initHttpArpcServer success, httpArpc listen port'
cmd = 'grep "{}" {}/logs/sql.log | tail -1'.format(grpcListenKeyword, rundir)
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
out, err = p.communicate()
out = out.strip()
if out:
if self.valid_log(out):
return int(out.split('listen port [')[1].split(']')[0])
return 0
def get_listen_grpc_ports(self, rundir):
grpcListenKeyword = 'gigRpcServer initGrpcServer success, grpc listen port'
cmd = 'grep "{}" {}/logs/sql.log | tail -1'.format(grpcListenKeyword, rundir)
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
out, err = p.communicate()
out = out.strip()
if out:
if self.valid_log(out):
return int(out.split('listen port [')[1].split(']')[0])
return 0
def _getSearcherPortList(self, pos):
return self.searcher_port_list[pos].ports
def getQrsPortList(self):
return self.qrs_port_list
def _getArpcPortList(self):
arpcPortList = []
for (httpArpcPort, arpcPort, httpPort) in self.portList:
arpcPortList.append(arpcPort)
return arpcPortList
def _getHttpArpcPortList(self):
httpArpcPortList = []
for (httpArpcPort, arpcPort, httpPort) in self.portList:
httpArpcPortList.append(httpArpcPort)
return httpArpcPortList
def _genCatalogTable(self, dbName, tableGroupName, tableName, tableMode, tableVersion):
return {
"id": {
"dbName": dbName,
"tgName": tableGroupName,
"tableName": tableName
},
"meta": {
"mode": tableMode
},
"partitions": [{
"id": {
"dbName": dbName,
"tgName": tableGroupName,
"tableName": tableName,
"partitionName": tableVersion
},
"meta": {
"fullVersion": tableVersion
}
}]
}
# tabletInfos = {tableName:{generationId:gid, partitions:["0_65535"], tableMode:1}}
def _genTargetInfos(self, zoneNames, replica, inQrs=False):
tabletInfos = {}
if self.tabletInfos:
tabletInfos = json.loads(self.tabletInfos)
targetInfos = []
for zoneName in zoneNames:
tableGroup = {
"id": {
"tgName": zoneName,
"dbName": zoneName
},
"meta": {
"shardCount": 1,
"replicaCount": 0
},
"tables": []
}
tableGroupTables = {}
atables = []
if self.atableList.has_key(zoneName):
atables = self.atableList[zoneName].split(",")
atables.append(zoneName)
zoneGid = None
partitions = None
has_offline_index = True
has_realtime = False
if tabletInfos.has_key(zoneName):
has_offline_index = tabletInfos[zoneName]['has_offline_index']
has_realtime = True
if has_offline_index:
zoneGid = self._getMaxGenerationId(self.indexPath, zoneName)
partitions = self._getPartitions(self.indexPath, zoneName, zoneGid)
else:
zoneGid = tabletInfos[zoneName]["generationId"]
partitions = tabletInfos[zoneName]["partitions"]
fullPartition = "0_65535"
partCnt = len(partitions)
if self.enableMultiPartition:
partCnt = 1
maxPartCnt = len(partitions)
tableGroup["meta"]["shard_count"] = partCnt
for replicaId, partId in product(range(replica), range(partCnt)):
tableInfos = {}
for tableName in atables:
if not tableName:
continue
tablePartition = []
curTableGid = None
curTablePartitions = None
has_offline_index = True
has_realtime = False
if tabletInfos.has_key(tableName):
has_offline_index = tabletInfos[tableName]['has_offline_index']
has_realtime = True
if has_offline_index:
curTableGid = self._getMaxGenerationId(self.indexPath, tableName)
curTablePartitions = self._getPartitions(self.indexPath, tableName, curTableGid)
else:
curTableGid = tabletInfos[tableName]["generationId"]
curTablePartitions = tabletInfos[tableName]["partitions"]
if not self.enableMultiPartition:
curTablePartitionCnt = len(curTablePartitions)
if curTablePartitionCnt == 1:
tablePartition.append(fullPartition)
elif curTablePartitionCnt == maxPartCnt:
tablePartition.append(partitions[partId])
else:
raise Exception(
"table %s : len(curTablePartitions)(%d) != maxPartCnt(%d)" %
(tableName, curTablePartitionCnt, maxPartCnt))
else:
tablePartition = curTablePartitions
tableGid = curTableGid
zoneDirName = self.genRoleName((zoneName, partId, replicaId))
if inQrs:
zoneDirName = "qrs"
tableMode = 0 if not has_realtime else 1 # TM_READ_WRITE
tableType = 3 if not has_realtime else 2 # SPT_TABLET
tmp = {
tableGid: {
"table_mode": tableMode,
"table_type": tableType,
"index_root": self.createRuntimedirLink(zoneDirName),
"config_path": self.createConfigLink(
zoneDirName,
'table',
tableName,
self.offlineConfigPath),
"total_partition_count": len(curTablePartitions),
"partitions": {}}}
for partition in tablePartition:
incVersion = -1
if has_offline_index:
incVersion = self._getMaxIndexVersion(self.indexPath, tableName, tableGid, partition)
tmp[tableGid]["partitions"][partition] = {
"inc_version": incVersion
}
tableInfos[tableName] = tmp
tableGroupTables[tableName] = self._genCatalogTable(
zoneName, zoneName, tableName, tableMode, tableGid)
bizs = os.listdir(self.onlineConfigPath)
bizInfo = {}
if self.enableMultiBiz:
for biz in bizs:
onlineConfig = self.genOnlineConfigPath(self.onlineConfigPath, biz)
bizInfo[biz] = {
"config_path": self.createConfigLink(zoneDirName, 'biz', biz, onlineConfig)
}
if biz in self.modelBiz:
bizInfo[biz]["custom_biz_info"] = {
"biz_type": "model_biz"
}
else:
onlineConfig = self.genOnlineConfigPath(self.onlineConfigPath, bizs[0])
bizInfo['default'] = {
"config_path": self.createConfigLink(zoneDirName, 'biz', 'default', onlineConfig)
}
targetInfo = {
"table_info": tableInfos,
"service_info": {
"part_count": partCnt,
"part_id": partId,
"zone_name": zoneName,
"version": zoneGid
},
"biz_info": bizInfo,
"clean_disk": False
}
targetInfos.append((zoneName, partId, replicaId, targetInfo))
database = {
"dbName": zoneName,
"description": "",
"tableGroups": [],
"udxfs": {},
"tvfs": {}
}
for tableName, catalogTable in tableGroupTables.items():
tableGroup["tables"].append(catalogTable)
database["tableGroups"].append(tableGroup)
return targetInfos
def _getMaxGenerationId(self, indexPath, tableName):
generationList = os.listdir(os.path.join(indexPath, tableName))
versions = map(lambda x: int(x.split('_')[1]), generationList)
versions.sort()
return versions[len(versions) - 1]
def _getMaxIndexVersion(self, path, clusterName, generationId, partition):
files = os.listdir(os.path.join(path, clusterName, 'generation_' + str(generationId), 'partition_' + partition))
versions = map(lambda x: int(x.split('.')[1]),
filter(lambda x: x.startswith('version.'), files))
if len(versions) > 0:
return sorted(versions)[-1]
return -1
def _getPartitions(self, path, clusterName, generationId):
partitions = os.listdir(os.path.join(path, clusterName, 'generation_' + str(generationId)))
partitions = map(lambda x: x.split('n_')[1], filter(lambda x: len(x.split("_")) == 3, partitions))
# sort by from of partitions
partitions.sort(key=lambda x: int(x.split('_')[0]))
return partitions
def _getSchema(self, path, clusterName, generationId, partition):
filePath = os.path.join(
path,
clusterName,
'generation_' +
str(generationId),
'partition_' +
partition,
"schema.json")
with open(filePath, "r") as f:
schema = json.load(f)
return schema
def _getTableType(self, path, clusterName, generationId, partition):
schema = self._getSchema(path, clusterName, generationId, partition)
return schema["table_type"]
def _getNeedStartZoneName(self):
bizs = os.listdir(self.onlineConfigPath)
zones = []
for biz in bizs:
onlineConfig = self.genOnlineConfigPath(self.onlineConfigPath, biz)
realPath = os.path.join(onlineConfig, 'zones')
zones += os.listdir(realPath)
zones = list(set(zones))
if len(self.specialZones) == 0:
return zones
zoneNames = []
for zone in zones:
if zone in self.specialZones:
zoneNames.append(zone)
return zoneNames
def getDefunctPids(self):
cmd = 'ps uxww | grep ha_sql | grep defunct| grep -v grep'
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
out, err = p.communicate()
pids = []
for line in out.splitlines():
parts = line.split()
pids.append(int(parts[1]))
return pids
def getPids(self, rundir):
pids = []
cmd = 'ps uxww | grep ha_sql| grep "%s"| grep -v grep' % rundir
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
out, err = p.communicate()
for line in out.splitlines():
parts = line.split()
pids.append(int(parts[1]))
return pids
def getUnusedPort(self, lackPort=1):
sockets = []
ports = []
for i in range(lackPort):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('', 0))
s.listen(1)
addr, port = s.getsockname()
ports.append(port)
sockets.append(s)
for s in sockets:
s.close()
return ports
def genRoleName(self, targetInfo):
zoneName = targetInfo[0]
partId = targetInfo[1]
replicaId = targetInfo[2]
return '{zoneName}_p{partId}_r{replicaId}'.format(zoneName=zoneName, partId=partId, replicaId=replicaId)
if __name__ == '__main__':
cmd = LocalSearchStartCmd()
if len(sys.argv) < 3:
cmd.usage()
sys.exit(-1)
if not cmd.parseParams(sys.argv):
cmd.usage()
sys.exit(-1)
data, error, code = cmd.run()
if code != 0:
if error:
print error
sys.exit(code)
sys.exit(0)