in aios/sql/python/local_search_starter.py [0:0]
def _startSearcher(self, targetInfos):
count = 0
f = open(self.pidFile, 'w')
httpArpcPort_list = []
for targetInfo in targetInfos:
zoneName = targetInfo[0]
partId = targetInfo[1]
replicaId = targetInfo[2]
roleName = self.genRoleName(targetInfo)
rundir = os.path.join(self.localSearchDir, roleName)
if not os.path.exists(rundir):
os.system("mkdir %s" % rundir)
targetCfg = os.path.join(rundir, zoneName + "_%d_search_service_%d.cfg" % (partId, self.portStart))
cmd = "cp %s %s" % (self.searchCfg, targetCfg)
print cmd
os.system("cp %s %s" % (self.searchCfg, targetCfg))
kmonServiceName = self.serviceName
if '^' in self.serviceName:
# override tags['zone'], tags['role'], tags['host']
kmonServiceName = self.serviceName + '@zone^{}@role^{}@host^{}'.format(zoneName, partId, roleName)
startCmd = self.startCmdTemplate % (self.binPath, self.libPath, "searcher", self.alogConfigPath,
self.binaryPath, targetCfg, 0, 0, 0, kmonServiceName, self.amonPath,
zoneName, 0, self.ip, zoneName, zoneName, partId, self.config_loader)
if self.searcherQueue:
startCmd += " --env extraTaskQueues=" + self.searcherQueue
if self.searcherQueueSize:
startCmd += " --env queueSize=" + str(self.searcherQueueSize)
if self.threadNumScaleFactor:
startCmd += " --env threadNumScaleFactor=" + str(self.threadNumScaleFactor)
if self.searcherThreadNum:
startCmd += " --env threadNum=" + str(self.searcherThreadNum)
if self.naviThreadNum:
startCmd += " --env naviThreadNum=" + str(self.naviThreadNum)
if self.aggName:
startCmd += " --env defaultAgg=" + self.aggName
if self.paraSearchWays:
startCmd += " --env paraSearchWays=" + self.paraSearchWays
if self.basicTuringBizNames:
startCmd += " --env basicTuringBizNames=" + self.basicTuringBizNames
if self.kmonSinkAddress:
startCmd += " --env kmonitorSinkAddress=" + self.kmonSinkAddress
if self.enableMultiPartition:
startCmd += " --env enableMultiPartition=true"
if self.forceTabletLoad:
startCmd += " --env force_tablet_load=true"
if self.allowFollowWrite:
startCmd += " --env ALLOW_FOLLOWER_WRITE=true"
if self.enableLocalAccess:
startCmd += " --env enableLocalAccess=true"
else:
if self.enableUpdateCatalog and not self.disableSql:
startCmd += " --env enableUpdateCatalog=true"
if self.disableSql:
startCmd += " --env disableSql=true"
if self.onlySql:
startCmd += " --env onlySql=true"
if (self.tabletInfos):
if self.mode:
startCmd += " --env mode=" + self.mode
if self.zkRoot:
startCmd += " --env zk_root=" + self.zkRoot
if self.leaderElectionStrategyType:
startCmd += " --env leader_election_strategy_type=" + self.leaderElectionStrategyType
if self.leaderElectionConfig:
startCmd += " --env leader_election_config=" + "'" + self.leaderElectionConfig + "'"
if self.versionSyncConfig:
startCmd += " --env version_sync_config=" + "'" + self.versionSyncConfig + "'"
startCmd += ' -d -n 1>>%s 2>>%s ' % (os.path.join(self.localSearchDir,
"{}.stdout.out".format(roleName)),
os.path.join(self.localSearchDir,
"{}.stderr.out".format(roleName)))
os.chdir(rundir)
print "start searcher cmd: %s" % startCmd
os.system(startCmd)
httpArpcPort_list.append((rundir, roleName, startCmd))
count = count + 1
time.sleep(0.1)
terminator = TimeoutTerminator(10)
while not terminator.is_timeout():
start = True
for info in httpArpcPort_list:
rundir = info[0]
pids = self.getPids(rundir)
if len(pids) != 1:
start = False
if start:
break
time.sleep(0.1)
if terminator.is_timeout():
print "start searcher [%s] timeout, cmd [%s], rundir[%s]" % (roleName, startCmd, rundir)
for info in httpArpcPort_list:
rundir = info[0]
roleName = info[1]
startCmd = info[2]
pids = self.getPids(rundir)
print pids
if len(pids) != 1:
print "start searcher process [%s] failed, pids [%s] len(pid) != 1, cmd [%s], rundir[%s]" % (roleName, ','.join(pids), startCmd, rundir)
f.close()
return False
else:
print "start searcher process [%s] success, pid [%d]" % (roleName, pids[0])
pid = pids[0]
f.write("%d %s\n" % (pid, rundir))
if not self.wait_load(rundir):
print "wait load failed [{}]".format(rundir)
return False
http_port, arpc_port, grpc_port = self.get_listen_ports(rundir)
item = PortListItem()
item.ports = (http_port, arpc_port, grpc_port)
item.role = roleName
self.searcher_port_list.append(item)
f.close()
return True