in tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java [165:259]
public void init(Configuration conf) {
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
this.systemConf = (TajoConf)conf;
RackResolver.init(systemConf);
this.connPool = RpcConnectionPool.getPool(systemConf);
this.workerContext = new WorkerContext();
String resourceManagerClassName = systemConf.getVar(ConfVars.RESOURCE_MANAGER_CLASS);
boolean randomPort = true;
if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
randomPort = false;
}
int clientPort = systemConf.getSocketAddrVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS).getPort();
int peerRpcPort = systemConf.getSocketAddrVar(ConfVars.WORKER_PEER_RPC_ADDRESS).getPort();
int qmManagerPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_RPC_ADDRESS).getPort();
if(randomPort) {
clientPort = 0;
peerRpcPort = 0;
qmManagerPort = 0;
systemConf.setIntVar(ConfVars.PULLSERVER_PORT, 0);
}
// querymaster worker
tajoWorkerClientService = new TajoWorkerClientService(workerContext, clientPort);
addService(tajoWorkerClientService);
queryMasterManagerService = new QueryMasterManagerService(workerContext, qmManagerPort);
addService(queryMasterManagerService);
// taskrunner worker
taskRunnerManager = new TaskRunnerManager(workerContext);
addService(taskRunnerManager);
tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort);
addService(tajoWorkerManagerService);
if(!yarnContainerMode) {
if(taskRunnerMode) {
pullService = new TajoPullServerService();
addService(pullService);
}
if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")) {
try {
httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS).getPort();
if(queryMasterMode && !taskRunnerMode) {
//If QueryMaster and TaskRunner run on single host, http port conflicts
httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_INFO_ADDRESS).getPort();
}
webServer = StaticHttpServer.getInstance(this ,"worker", null, httpPort ,
true, null, systemConf, null);
webServer.start();
httpPort = webServer.getPort();
LOG.info("Worker info server started:" + httpPort);
deletionService = new DeletionService(getMountPath().size(), 0);
if(systemConf.getBoolVar(ConfVars.WORKER_TEMPORAL_DIR_CLEANUP)){
getWorkerContext().cleanupTemporalDirectories();
}
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
}
}
LOG.info("Tajo Worker started: queryMaster=" + queryMasterMode + " taskRunner=" + taskRunnerMode +
", qmRpcPort=" + qmManagerPort +
",yarnContainer=" + yarnContainerMode + ", clientPort=" + clientPort +
", peerRpcPort=" + peerRpcPort + ":" + qmManagerPort + ",httpPort" + httpPort);
super.init(conf);
if(yarnContainerMode && queryMasterMode) {
tajoMasterAddress = NetUtils.createSocketAddr(cmdArgs[2]);
connectToCatalog();
QueryId queryId = TajoIdUtils.parseQueryId(cmdArgs[1]);
queryMasterManagerService.getQueryMaster().reportQueryStatusToQueryMaster(
queryId, TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);
} else if(yarnContainerMode && taskRunnerMode) { //TaskRunner mode
taskRunnerManager.startTask(cmdArgs);
} else {
tajoMasterAddress = NetUtils.createSocketAddr(systemConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
workerResourceTrackerAddr = NetUtils.createSocketAddr(systemConf.getVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS));
connectToCatalog();
}
workerHeartbeatThread = new WorkerHeartbeatService(workerContext);
workerHeartbeatThread.init(conf);
addIfService(workerHeartbeatThread);
}