public void run()

in tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java [130:252]


    public void run() {
      LOG.info("Worker Resource Heartbeat Thread start.");
      int sendDiskInfoCount = 0;
      int pullServerPort = 0;
      if(context.getPullService()!= null) {
        long startTime = System.currentTimeMillis();
        while(true) {
          pullServerPort = context.getPullService().getPort();
          if(pullServerPort > 0) {
            break;
          }
          //waiting while pull server init
          try {
            Thread.sleep(100);
          } catch (InterruptedException e) {
          }
          if(System.currentTimeMillis() - startTime > 30 * 1000) {
            LOG.fatal("Too long push server init.");
            System.exit(0);
          }
        }
      }

      String hostName = null;
      int peerRpcPort = 0;
      int queryMasterPort = 0;
      int clientPort = 0;

      if(context.getTajoWorkerManagerService() != null) {
        hostName = context.getTajoWorkerManagerService().getBindAddr().getHostName();
        peerRpcPort = context.getTajoWorkerManagerService().getBindAddr().getPort();
      }
      if(context.getQueryMasterManagerService() != null) {
        hostName = context.getQueryMasterManagerService().getBindAddr().getHostName();
        queryMasterPort = context.getQueryMasterManagerService().getBindAddr().getPort();
      }
      if(context.getTajoWorkerClientService() != null) {
        clientPort = context.getTajoWorkerClientService().getBindAddr().getPort();
      }
      if (context.getPullService() != null) {
        pullServerPort = context.getPullService().getPort();
      }

      while(!stopped.get()) {
        if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {
          getDiskUsageInfos();
        }
        TajoMasterProtocol.ServerStatusProto.JvmHeap jvmHeap =
            TajoMasterProtocol.ServerStatusProto.JvmHeap.newBuilder()
                .setMaxHeap(Runtime.getRuntime().maxMemory())
                .setFreeHeap(Runtime.getRuntime().freeMemory())
                .setTotalHeap(Runtime.getRuntime().totalMemory())
                .build();

        TajoMasterProtocol.ServerStatusProto serverStatus = TajoMasterProtocol.ServerStatusProto.newBuilder()
            .addAllDisk(diskInfos)
            .setRunningTaskNum(
                context.getTaskRunnerManager() == null ? 1 : context.getTaskRunnerManager().getNumTasks())
            .setSystem(systemInfo)
            .setDiskSlots(workerDiskSlots)
            .setMemoryResourceMB(workerMemoryMB)
            .setJvmHeap(jvmHeap)
            .setQueryMasterMode(PrimitiveProtos.BoolProto.newBuilder().setValue(context.isQueryMasterMode()))
            .setTaskRunnerMode(PrimitiveProtos.BoolProto.newBuilder().setValue(context.isTaskRunnerMode()))
            .build();

        NodeHeartbeat heartbeatProto = NodeHeartbeat.newBuilder()
            .setTajoWorkerHost(hostName)
            .setTajoQueryMasterPort(queryMasterPort)
            .setPeerRpcPort(peerRpcPort)
            .setTajoWorkerClientPort(clientPort)
            .setTajoWorkerHttpPort(context.getHttpPort())
            .setTajoWorkerPullServerPort(pullServerPort)
            .setServerStatus(serverStatus)
            .build();

        NettyClientBase rmClient = null;
        try {
          CallFuture<TajoMasterProtocol.TajoHeartbeatResponse> callBack =
              new CallFuture<TajoMasterProtocol.TajoHeartbeatResponse>();

          rmClient = connectionPool.getConnection(context.getResourceTrackerAddress(), TajoResourceTrackerProtocol.class, true);
          TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub();
          resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack);

          TajoMasterProtocol.TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
          if(response != null) {
            TajoMasterProtocol.ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
            if(clusterResourceSummary.getNumWorkers() > 0) {
              context.setNumClusterNodes(clusterResourceSummary.getNumWorkers());
            }
            context.setClusterResource(clusterResourceSummary);
          } else {
            if(callBack.getController().failed()) {
              throw new ServiceException(callBack.getController().errorText());
            }
          }
        } catch (InterruptedException e) {
          break;
        } catch (TimeoutException te) {
          LOG.warn("Heartbeat response is being delayed.");
        } catch (Exception e) {
          LOG.error(e.getMessage(), e);
        } finally {
          connectionPool.releaseConnection(rmClient);
        }

        try {
          synchronized (WorkerHeartbeatThread.this){
            wait(10 * 1000);
          }
        } catch (InterruptedException e) {
          break;
        }
        sendDiskInfoCount++;

        if(sendDiskInfoCount > 10) {
          sendDiskInfoCount = 0;
        }
      }

      LOG.info("Worker Resource Heartbeat Thread stopped.");
    }