private void assignLeafTasks()

in tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java [332:438]


  private void assignLeafTasks(List<TaskRequestEvent> taskRequests) {
    Collections.shuffle(taskRequests);
    Iterator<TaskRequestEvent> it = taskRequests.iterator();

    TaskRequestEvent taskRequest;
    while (it.hasNext() && scheduledFragments.size() > 0) {
      taskRequest = it.next();
      LOG.debug("assignToLeafTasks: " + taskRequest.getExecutionBlockId() + "," +
          "containerId=" + taskRequest.getContainerId());
      ContainerProxy container = context.getMasterContext().getResourceAllocator().
          getContainer(taskRequest.getContainerId());

      if(container == null) {
        continue;
      }

      String host = container.getTaskHostName();
      QueryUnitAttemptScheduleContext queryUnitContext = new QueryUnitAttemptScheduleContext(container.containerID,
          host, taskRequest.getCallback());
      QueryUnit task = SubQuery.newEmptyQueryUnit(context, queryUnitContext, subQuery, nextTaskId++);

      FragmentPair fragmentPair;
      List<FragmentPair> fragmentPairs = new ArrayList<FragmentPair>();
      boolean diskLocal = false;
      long assignedFragmentSize = 0;
      long taskSize = adjustTaskSize();
      LOG.info("Adjusted task size: " + taskSize);

      // host local, disk local
      String normalized = NetUtils.normalizeHost(host);
      Integer diskId = hostDiskBalancerMap.get(normalized).getDiskId(container.containerID);
      if (diskId != null && diskId != -1) {
        do {
          fragmentPair = scheduledFragments.getHostLocalFragment(host, diskId);
          if (fragmentPair == null || fragmentPair.getLeftFragment() == null) {
            break;
          }

          if (assignedFragmentSize + fragmentPair.getLeftFragment().getEndKey() > taskSize) {
            break;
          } else {
            fragmentPairs.add(fragmentPair);
            assignedFragmentSize += fragmentPair.getLeftFragment().getEndKey();
            if (fragmentPair.getRightFragment() != null) {
              assignedFragmentSize += fragmentPair.getRightFragment().getEndKey();
            }
          }
          scheduledFragments.removeFragment(fragmentPair);
          diskLocal = true;
        } while (scheduledFragments.size() > 0 && assignedFragmentSize < taskSize);
      }

      if (assignedFragmentSize < taskSize) {
        // host local
        do {
          fragmentPair = scheduledFragments.getHostLocalFragment(host);
          if (fragmentPair == null || fragmentPair.getLeftFragment() == null) {
            break;
          }

          if (assignedFragmentSize + fragmentPair.getLeftFragment().getEndKey() > taskSize) {
            break;
          } else {
            fragmentPairs.add(fragmentPair);
            assignedFragmentSize += fragmentPair.getLeftFragment().getEndKey();
            if (fragmentPair.getRightFragment() != null) {
              assignedFragmentSize += fragmentPair.getRightFragment().getEndKey();
            }
          }
          scheduledFragments.removeFragment(fragmentPair);
          diskLocal = false;
        } while (scheduledFragments.size() > 0 && assignedFragmentSize < taskSize);
      }

      // rack local
      if (fragmentPairs.size() == 0) {
        fragmentPair = scheduledFragments.getRackLocalFragment(host);

        // random
        if (fragmentPair == null) {
          fragmentPair = scheduledFragments.getRandomFragment();
        } else {
          rackLocalAssigned++;
        }

        if (fragmentPair != null) {
          fragmentPairs.add(fragmentPair);
          scheduledFragments.removeFragment(fragmentPair);
        }
      } else {
        if (diskLocal) {
          diskLocalAssigned++;
        } else {
          hostLocalAssigned++;
        }
      }

      if (fragmentPairs.size() == 0) {
        throw new RuntimeException("Illegal State!!!!!!!!!!!!!!!!!!!!!");
      }

      LOG.info("host: " + host + " disk id: " + diskId + " fragment num: " + fragmentPairs.size());

      task.setFragment(fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()]));
      subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE));
    }
  }