public void assignToLeafTasks()

in tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java [710:823]


    public void assignToLeafTasks(LinkedList<TaskRequestEvent> taskRequests) {
      Collections.shuffle(taskRequests);
      LinkedList<TaskRequestEvent> remoteTaskRequests = new LinkedList<TaskRequestEvent>();

      TaskRequestEvent taskRequest;
      while (leafTasks.size() > 0 && (!taskRequests.isEmpty() || !remoteTaskRequests.isEmpty())) {
        taskRequest = taskRequests.pollFirst();
        if(taskRequest == null) { // if there are only remote task requests
          taskRequest = remoteTaskRequests.pollFirst();
        }

        // checking if this container is still alive.
        // If not, ignore the task request and stop the task runner
        ContainerProxy container = context.getMasterContext().getResourceAllocator()
            .getContainer(taskRequest.getContainerId());
        if(container == null) {
          taskRequest.getCallback().run(stopTaskRunnerReq);
          continue;
        }

        // getting the hostname of requested node
        String host = container.getTaskHostName();

        // if there are no worker matched to the hostname a task request
        if(!leafTaskHostMapping.containsKey(host)){
          host = NetUtils.normalizeHost(host);

          if(!leafTaskHostMapping.containsKey(host) && !taskRequests.isEmpty()){
            // this case means one of either cases:
            // * there are no blocks which reside in this node.
            // * all blocks which reside in this node are consumed, and this task runner requests a remote task.
            // In this case, we transfer the task request to the remote task request list, and skip the followings.
            remoteTaskRequests.add(taskRequest);
            continue;
          }
        }

        ContainerId containerId = taskRequest.getContainerId();
        LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
            "containerId=" + containerId);

        //////////////////////////////////////////////////////////////////////
        // disk or host-local allocation
        //////////////////////////////////////////////////////////////////////
        QueryUnitAttemptId attemptId = allocateLocalTask(host, containerId);

        if (attemptId == null) { // if a local task cannot be found
          HostVolumeMapping hostVolumeMapping = leafTaskHostMapping.get(host);

          if(hostVolumeMapping != null) {
            if(!hostVolumeMapping.isRemote(containerId)){
              // assign to remote volume
              hostVolumeMapping.decreaseConcurrency(containerId);
              hostVolumeMapping.increaseConcurrency(containerId, HostVolumeMapping.REMOTE);
            }
            // this part is remote concurrency management of a tail tasks
            int tailLimit = Math.max(remainingScheduledObjectNum() / (leafTaskHostMapping.size() * 2), 1);

            if(hostVolumeMapping.getRemoteConcurrency() > tailLimit){
              //release container
              hostVolumeMapping.decreaseConcurrency(containerId);
              taskRequest.getCallback().run(stopTaskRunnerReq);
              subQuery.releaseContainer(containerId);
              continue;
            }
          }

          //////////////////////////////////////////////////////////////////////
          // rack-local allocation
          //////////////////////////////////////////////////////////////////////
          attemptId = allocateRackTask(host);

          //////////////////////////////////////////////////////////////////////
          // random node allocation
          //////////////////////////////////////////////////////////////////////
          if (attemptId == null && leafTaskNum() > 0) {
            synchronized (leafTasks){
              attemptId = leafTasks.iterator().next();
              leafTasks.remove(attemptId);
              rackLocalAssigned++;
              totalAssigned++;
              LOG.info(String.format("Assigned Local/Remote/Total: (%d/%d/%d), Locality: %.2f%%,",
                  hostLocalAssigned, rackLocalAssigned, totalAssigned,
                  ((double) hostLocalAssigned / (double) totalAssigned) * 100));
            }
          }
        }

        if (attemptId != null) {
          QueryUnit task = subQuery.getQueryUnit(attemptId.getQueryUnitId());
          QueryUnitRequest taskAssign = new QueryUnitRequestImpl(
              attemptId,
              new ArrayList<FragmentProto>(task.getAllFragments()),
              "",
              false,
              task.getLogicalPlan().toJson(),
              context.getMasterContext().getQueryContext(),
              subQuery.getDataChannel(), subQuery.getBlock().getEnforcer());
          if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) {
            taskAssign.setInterQuery();
          }

          context.getMasterContext().getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
              taskRequest.getContainerId(),
              host, container.getTaskPort()));
          assignedRequest.add(attemptId);

          scheduledObjectNum -= task.getAllFragments().size();
          taskRequest.getCallback().run(taskAssign.getProto());
        } else {
          throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!");
        }
      }
    }