in tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java [215:335]
public void run() {
LOG.info("Start TajoWorkerAllocationThread");
CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack =
new CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse>();
//TODO consider task's resource usage pattern
int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY);
float requiredDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TASK_DEFAULT_DISK);
TajoMasterProtocol.WorkerResourceAllocationRequest request =
TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
.setMinMemoryMBPerContainer(requiredMemoryMB)
.setMaxMemoryMBPerContainer(requiredMemoryMB)
.setNumContainers(event.getRequiredNum())
.setResourceRequestPriority(TajoMasterProtocol.ResourceRequestPriority.MEMORY)
.setMinDiskSlotPerContainer(requiredDiskSlots)
.setMaxDiskSlotPerContainer(requiredDiskSlots)
.setQueryId(event.getExecutionBlockId().getQueryId().getProto())
.build();
RpcConnectionPool connPool = RpcConnectionPool.getPool(queryTaskContext.getConf());
NettyClientBase tmClient = null;
try {
tmClient = connPool.getConnection(
queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
TajoMasterProtocol.class, true);
TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
masterClientService.allocateWorkerResources(null, request, callBack);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
} finally {
connPool.releaseConnection(tmClient);
}
TajoMasterProtocol.WorkerResourceAllocationResponse response = null;
while(!stopped.get()) {
try {
response = callBack.get(3, TimeUnit.SECONDS);
break;
} catch (InterruptedException e) {
if(stopped.get()) {
return;
}
} catch (TimeoutException e) {
LOG.info("No available worker resource for " + event.getExecutionBlockId());
continue;
}
}
int numAllocatedContainers = 0;
if(response != null) {
List<TajoMasterProtocol.WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList();
ExecutionBlockId executionBlockId = event.getExecutionBlockId();
List<Container> containers = new ArrayList<Container>();
for(TajoMasterProtocol.WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
TajoWorkerContainer container = new TajoWorkerContainer();
NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getWorkerHost(),
eachAllocatedResource.getPeerRpcPort());
TajoWorkerContainerId containerId = new TajoWorkerContainerId();
containerId.setApplicationAttemptId(
ApplicationIdUtils.createApplicationAttemptId(executionBlockId.getQueryId(),
eachAllocatedResource.getContainerId().getAppAttemptId().getAttemptId()));
containerId.setId(eachAllocatedResource.getContainerId().getId());
container.setId(containerId);
container.setNodeId(nodeId);
WorkerResource workerResource = new WorkerResource();
workerResource.setMemoryMB(eachAllocatedResource.getAllocatedMemoryMB());
workerResource.setDiskSlots(eachAllocatedResource.getAllocatedDiskSlots());
Worker worker = new Worker(null, workerResource);
worker.setHostName(nodeId.getHost());
worker.setPeerRpcPort(nodeId.getPort());
worker.setQueryMasterPort(eachAllocatedResource.getQueryMasterPort());
worker.setPullServerPort(eachAllocatedResource.getWorkerPullServerPort());
container.setWorkerResource(worker);
containers.add(container);
}
SubQueryState state = queryTaskContext.getSubQuery(executionBlockId).getState();
if (!SubQuery.isRunningState(state)) {
try {
List<ContainerId> containerIds = new ArrayList<ContainerId>();
for(Container eachContainer: containers) {
containerIds.add(eachContainer.getId());
}
TajoContainerProxy.releaseWorkerResource(queryTaskContext, executionBlockId, containerIds);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
return;
}
if (allocatedResources.size() > 0) {
if(LOG.isDebugEnabled()) {
LOG.debug("SubQueryContainerAllocationEvent fire:" + executionBlockId);
}
queryTaskContext.getEventHandler().handle(new SubQueryContainerAllocationEvent(executionBlockId, containers));
}
numAllocatedContainers += allocatedResources.size();
}
if(event.getRequiredNum() > numAllocatedContainers) {
ContainerAllocationEvent shortRequestEvent = new ContainerAllocationEvent(
event.getType(), event.getExecutionBlockId(), event.getPriority(),
event.getResource(),
event.getRequiredNum() - numAllocatedContainers,
event.isLeafQuery(), event.getProgress()
);
queryTaskContext.getEventHandler().handle(shortRequestEvent);
}
LOG.info("Stop TajoWorkerAllocationThread");
}