in tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java [130:252]
public void run() {
LOG.info("Worker Resource Heartbeat Thread start.");
int sendDiskInfoCount = 0;
int pullServerPort = 0;
if(context.getPullService()!= null) {
long startTime = System.currentTimeMillis();
while(true) {
pullServerPort = context.getPullService().getPort();
if(pullServerPort > 0) {
break;
}
//waiting while pull server init
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
if(System.currentTimeMillis() - startTime > 30 * 1000) {
LOG.fatal("Too long push server init.");
System.exit(0);
}
}
}
String hostName = null;
int peerRpcPort = 0;
int queryMasterPort = 0;
int clientPort = 0;
if(context.getTajoWorkerManagerService() != null) {
hostName = context.getTajoWorkerManagerService().getBindAddr().getHostName();
peerRpcPort = context.getTajoWorkerManagerService().getBindAddr().getPort();
}
if(context.getQueryMasterManagerService() != null) {
hostName = context.getQueryMasterManagerService().getBindAddr().getHostName();
queryMasterPort = context.getQueryMasterManagerService().getBindAddr().getPort();
}
if(context.getTajoWorkerClientService() != null) {
clientPort = context.getTajoWorkerClientService().getBindAddr().getPort();
}
if (context.getPullService() != null) {
pullServerPort = context.getPullService().getPort();
}
while(!stopped.get()) {
if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {
getDiskUsageInfos();
}
TajoMasterProtocol.ServerStatusProto.JvmHeap jvmHeap =
TajoMasterProtocol.ServerStatusProto.JvmHeap.newBuilder()
.setMaxHeap(Runtime.getRuntime().maxMemory())
.setFreeHeap(Runtime.getRuntime().freeMemory())
.setTotalHeap(Runtime.getRuntime().totalMemory())
.build();
TajoMasterProtocol.ServerStatusProto serverStatus = TajoMasterProtocol.ServerStatusProto.newBuilder()
.addAllDisk(diskInfos)
.setRunningTaskNum(
context.getTaskRunnerManager() == null ? 1 : context.getTaskRunnerManager().getNumTasks())
.setSystem(systemInfo)
.setDiskSlots(workerDiskSlots)
.setMemoryResourceMB(workerMemoryMB)
.setJvmHeap(jvmHeap)
.setQueryMasterMode(PrimitiveProtos.BoolProto.newBuilder().setValue(context.isQueryMasterMode()))
.setTaskRunnerMode(PrimitiveProtos.BoolProto.newBuilder().setValue(context.isTaskRunnerMode()))
.build();
NodeHeartbeat heartbeatProto = NodeHeartbeat.newBuilder()
.setTajoWorkerHost(hostName)
.setTajoQueryMasterPort(queryMasterPort)
.setPeerRpcPort(peerRpcPort)
.setTajoWorkerClientPort(clientPort)
.setTajoWorkerHttpPort(context.getHttpPort())
.setTajoWorkerPullServerPort(pullServerPort)
.setServerStatus(serverStatus)
.build();
NettyClientBase rmClient = null;
try {
CallFuture<TajoMasterProtocol.TajoHeartbeatResponse> callBack =
new CallFuture<TajoMasterProtocol.TajoHeartbeatResponse>();
rmClient = connectionPool.getConnection(context.getResourceTrackerAddress(), TajoResourceTrackerProtocol.class, true);
TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub();
resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack);
TajoMasterProtocol.TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
if(response != null) {
TajoMasterProtocol.ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
if(clusterResourceSummary.getNumWorkers() > 0) {
context.setNumClusterNodes(clusterResourceSummary.getNumWorkers());
}
context.setClusterResource(clusterResourceSummary);
} else {
if(callBack.getController().failed()) {
throw new ServiceException(callBack.getController().errorText());
}
}
} catch (InterruptedException e) {
break;
} catch (TimeoutException te) {
LOG.warn("Heartbeat response is being delayed.");
} catch (Exception e) {
LOG.error(e.getMessage(), e);
} finally {
connectionPool.releaseConnection(rmClient);
}
try {
synchronized (WorkerHeartbeatThread.this){
wait(10 * 1000);
}
} catch (InterruptedException e) {
break;
}
sendDiskInfoCount++;
if(sendDiskInfoCount > 10) {
sendDiskInfoCount = 0;
}
}
LOG.info("Worker Resource Heartbeat Thread stopped.");
}