public void init()

in tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java [165:259]


  public void init(Configuration conf) {
    Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));

    this.systemConf = (TajoConf)conf;
    RackResolver.init(systemConf);

    this.connPool = RpcConnectionPool.getPool(systemConf);
    this.workerContext = new WorkerContext();

    String resourceManagerClassName = systemConf.getVar(ConfVars.RESOURCE_MANAGER_CLASS);

    boolean randomPort = true;
    if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
      randomPort = false;
    }
    int clientPort = systemConf.getSocketAddrVar(ConfVars.WORKER_CLIENT_RPC_ADDRESS).getPort();
    int peerRpcPort = systemConf.getSocketAddrVar(ConfVars.WORKER_PEER_RPC_ADDRESS).getPort();
    int qmManagerPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_RPC_ADDRESS).getPort();

    if(randomPort) {
      clientPort = 0;
      peerRpcPort = 0;
      qmManagerPort = 0;
      systemConf.setIntVar(ConfVars.PULLSERVER_PORT, 0);
    }

    // querymaster worker
    tajoWorkerClientService = new TajoWorkerClientService(workerContext, clientPort);
    addService(tajoWorkerClientService);

    queryMasterManagerService = new QueryMasterManagerService(workerContext, qmManagerPort);
    addService(queryMasterManagerService);

    // taskrunner worker
    taskRunnerManager = new TaskRunnerManager(workerContext);
    addService(taskRunnerManager);

    tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, peerRpcPort);
    addService(tajoWorkerManagerService);

    if(!yarnContainerMode) {
      if(taskRunnerMode) {
        pullService = new TajoPullServerService();
        addService(pullService);
      }

      if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE")) {
        try {
          httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_INFO_ADDRESS).getPort();
          if(queryMasterMode && !taskRunnerMode) {
            //If QueryMaster and TaskRunner run on single host, http port conflicts
            httpPort = systemConf.getSocketAddrVar(ConfVars.WORKER_QM_INFO_ADDRESS).getPort();
          }
          webServer = StaticHttpServer.getInstance(this ,"worker", null, httpPort ,
              true, null, systemConf, null);
          webServer.start();
          httpPort = webServer.getPort();
          LOG.info("Worker info server started:" + httpPort);

          deletionService = new DeletionService(getMountPath().size(), 0);
          if(systemConf.getBoolVar(ConfVars.WORKER_TEMPORAL_DIR_CLEANUP)){
            getWorkerContext().cleanupTemporalDirectories();
          }
        } catch (IOException e) {
          LOG.error(e.getMessage(), e);
        }
      }
    }

    LOG.info("Tajo Worker started: queryMaster=" + queryMasterMode + " taskRunner=" + taskRunnerMode +
        ", qmRpcPort=" + qmManagerPort +
        ",yarnContainer=" + yarnContainerMode + ", clientPort=" + clientPort +
        ", peerRpcPort=" + peerRpcPort + ":" + qmManagerPort + ",httpPort" + httpPort);

    super.init(conf);

    if(yarnContainerMode && queryMasterMode) {
      tajoMasterAddress = NetUtils.createSocketAddr(cmdArgs[2]);
      connectToCatalog();

      QueryId queryId = TajoIdUtils.parseQueryId(cmdArgs[1]);
      queryMasterManagerService.getQueryMaster().reportQueryStatusToQueryMaster(
          queryId, TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);
    } else if(yarnContainerMode && taskRunnerMode) { //TaskRunner mode
      taskRunnerManager.startTask(cmdArgs);
    } else {
      tajoMasterAddress = NetUtils.createSocketAddr(systemConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS));
      workerResourceTrackerAddr = NetUtils.createSocketAddr(systemConf.getVar(ConfVars.RESOURCE_TRACKER_RPC_ADDRESS));
      connectToCatalog();
    }

    workerHeartbeatThread = new WorkerHeartbeatService(workerContext);
    workerHeartbeatThread.init(conf);
    addIfService(workerHeartbeatThread);
  }