in tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java [710:823]
public void assignToLeafTasks(LinkedList<TaskRequestEvent> taskRequests) {
Collections.shuffle(taskRequests);
LinkedList<TaskRequestEvent> remoteTaskRequests = new LinkedList<TaskRequestEvent>();
TaskRequestEvent taskRequest;
while (leafTasks.size() > 0 && (!taskRequests.isEmpty() || !remoteTaskRequests.isEmpty())) {
taskRequest = taskRequests.pollFirst();
if(taskRequest == null) { // if there are only remote task requests
taskRequest = remoteTaskRequests.pollFirst();
}
// checking if this container is still alive.
// If not, ignore the task request and stop the task runner
ContainerProxy container = context.getMasterContext().getResourceAllocator()
.getContainer(taskRequest.getContainerId());
if(container == null) {
taskRequest.getCallback().run(stopTaskRunnerReq);
continue;
}
// getting the hostname of requested node
String host = container.getTaskHostName();
// if there are no worker matched to the hostname a task request
if(!leafTaskHostMapping.containsKey(host)){
host = NetUtils.normalizeHost(host);
if(!leafTaskHostMapping.containsKey(host) && !taskRequests.isEmpty()){
// this case means one of either cases:
// * there are no blocks which reside in this node.
// * all blocks which reside in this node are consumed, and this task runner requests a remote task.
// In this case, we transfer the task request to the remote task request list, and skip the followings.
remoteTaskRequests.add(taskRequest);
continue;
}
}
ContainerId containerId = taskRequest.getContainerId();
LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
"containerId=" + containerId);
//////////////////////////////////////////////////////////////////////
// disk or host-local allocation
//////////////////////////////////////////////////////////////////////
QueryUnitAttemptId attemptId = allocateLocalTask(host, containerId);
if (attemptId == null) { // if a local task cannot be found
HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);
if(hostVolumeMapping != null) {
if(!hostVolumeMapping.isRemote(containerId)){
// assign to remote volume
hostVolumeMapping.decreaseConcurrency(containerId);
hostVolumeMapping.increaseConcurrency(containerId, HostVolumeMapping.REMOTE);
}
// this part is remote concurrency management of a tail tasks
int tailLimit = Math.max(remainingScheduledObjectNum() / (leafTaskHostMapping.size() * 2), 1);
if(hostVolumeMapping.getRemoteConcurrency() > tailLimit){
//release container
hostVolumeMapping.decreaseConcurrency(containerId);
taskRequest.getCallback().run(stopTaskRunnerReq);
subQuery.releaseContainer(containerId);
continue;
}
}
//////////////////////////////////////////////////////////////////////
// rack-local allocation
//////////////////////////////////////////////////////////////////////
attemptId = allocateRackTask(host);
//////////////////////////////////////////////////////////////////////
// random node allocation
//////////////////////////////////////////////////////////////////////
if (attemptId == null && leafTaskNum() > 0) {
synchronized (leafTasks){
attemptId = leafTasks.iterator().next();
leafTasks.remove(attemptId);
rackLocalAssigned++;
totalAssigned++;
LOG.info(String.format("Assigned Local/Remote/Total: (%d/%d/%d), Locality: %.2f%%,",
hostLocalAssigned, rackLocalAssigned, totalAssigned,
((double) hostLocalAssigned / (double) totalAssigned) * 100));
}
}
}
if (attemptId != null) {
QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
attemptId,
new ArrayList<FragmentProto>(task.getAllFragments()),
"",
false,
task.getLogicalPlan().toJson(),
context.getMasterContext().getQueryContext(),
subQuery.getDataChannel(), subQuery.getBlock().getEnforcer());
if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
taskAssign.setInterQuery();
}
context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
taskRequest.getContainerId(),
host, container.getTaskPort()));
assignedRequest.add(attemptId);
scheduledObjectNum -= task.getAllFragments().size();
taskRequest.getCallback().run(taskAssign.getProto());
} else {
throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!");
}
}
}