private List chooseWorkers()

in tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java [353:501]


  private List<AllocatedWorkerResource> chooseWorkers(WorkerResourceRequest resourceRequest) {
    List<AllocatedWorkerResource> selectedWorkers = new ArrayList<AllocatedWorkerResource>();

    int allocatedResources = 0;

    TajoMasterProtocol.ResourceRequestPriority resourceRequestPriority
        = resourceRequest.request.getResourceRequestPriority();

    if(resourceRequestPriority == TajoMasterProtocol.ResourceRequestPriority.MEMORY) {
      synchronized(rmContext) {
        List<String> randomWorkers = new ArrayList<String>(rmContext.getWorkers().keySet());
        Collections.shuffle(randomWorkers);

        int numContainers = resourceRequest.request.getNumContainers();
        int minMemoryMB = resourceRequest.request.getMinMemoryMBPerContainer();
        int maxMemoryMB = resourceRequest.request.getMaxMemoryMBPerContainer();
        float diskSlot = Math.max(resourceRequest.request.getMaxDiskSlotPerContainer(),
            resourceRequest.request.getMinDiskSlotPerContainer());

        int liveWorkerSize = randomWorkers.size();
        Set<String> insufficientWorkers = new HashSet<String>();
        boolean stop = false;
        boolean checkMax = true;
        while(!stop) {
          if(allocatedResources >= numContainers) {
            break;
          }

          if(insufficientWorkers.size() >= liveWorkerSize) {
            if(!checkMax) {
              break;
            }
            insufficientWorkers.clear();
            checkMax = false;
          }
          int compareAvailableMemory = checkMax ? maxMemoryMB : minMemoryMB;

          for(String eachWorker: randomWorkers) {
            if(allocatedResources >= numContainers) {
              stop = true;
              break;
            }

            if(insufficientWorkers.size() >= liveWorkerSize) {
              break;
            }

            Worker worker = rmContext.getWorkers().get(eachWorker);
            WorkerResource workerResource = worker.getResource();
            if(workerResource.getAvailableMemoryMB() >= compareAvailableMemory) {
              int workerMemory;
              if(workerResource.getAvailableMemoryMB() >= maxMemoryMB) {
                workerMemory = maxMemoryMB;
              } else {
                workerMemory = workerResource.getAvailableMemoryMB();
              }
              AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
              allocatedWorkerResource.worker = worker;
              allocatedWorkerResource.allocatedMemoryMB = workerMemory;
              if(workerResource.getAvailableDiskSlots() >= diskSlot) {
                allocatedWorkerResource.allocatedDiskSlots = diskSlot;
              } else {
                allocatedWorkerResource.allocatedDiskSlots = workerResource.getAvailableDiskSlots();
              }

              workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
                  allocatedWorkerResource.allocatedMemoryMB);

              selectedWorkers.add(allocatedWorkerResource);

              allocatedResources++;
            } else {
              insufficientWorkers.add(eachWorker);
            }
          }
        }
      }
    } else {
      synchronized(rmContext) {
        List<String> randomWorkers = new ArrayList<String>(rmContext.getWorkers().keySet());
        Collections.shuffle(randomWorkers);

        int numContainers = resourceRequest.request.getNumContainers();
        float minDiskSlots = resourceRequest.request.getMinDiskSlotPerContainer();
        float maxDiskSlots = resourceRequest.request.getMaxDiskSlotPerContainer();
        int memoryMB = Math.max(resourceRequest.request.getMaxMemoryMBPerContainer(),
            resourceRequest.request.getMinMemoryMBPerContainer());

        int liveWorkerSize = randomWorkers.size();
        Set<String> insufficientWorkers = new HashSet<String>();
        boolean stop = false;
        boolean checkMax = true;
        while(!stop) {
          if(allocatedResources >= numContainers) {
            break;
          }

          if(insufficientWorkers.size() >= liveWorkerSize) {
            if(!checkMax) {
              break;
            }
            insufficientWorkers.clear();
            checkMax = false;
          }
          float compareAvailableDisk = checkMax ? maxDiskSlots : minDiskSlots;

          for(String eachWorker: randomWorkers) {
            if(allocatedResources >= numContainers) {
              stop = true;
              break;
            }

            if(insufficientWorkers.size() >= liveWorkerSize) {
              break;
            }

            Worker worker = rmContext.getWorkers().get(eachWorker);
            WorkerResource workerResource = worker.getResource();
            if(workerResource.getAvailableDiskSlots() >= compareAvailableDisk) {
              float workerDiskSlots;
              if(workerResource.getAvailableDiskSlots() >= maxDiskSlots) {
                workerDiskSlots = maxDiskSlots;
              } else {
                workerDiskSlots = workerResource.getAvailableDiskSlots();
              }
              AllocatedWorkerResource allocatedWorkerResource = new AllocatedWorkerResource();
              allocatedWorkerResource.worker = worker;
              allocatedWorkerResource.allocatedDiskSlots = workerDiskSlots;

              if(workerResource.getAvailableMemoryMB() >= memoryMB) {
                allocatedWorkerResource.allocatedMemoryMB = memoryMB;
              } else {
                allocatedWorkerResource.allocatedMemoryMB = workerResource.getAvailableMemoryMB();
              }
              workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
                  allocatedWorkerResource.allocatedMemoryMB);

              selectedWorkers.add(allocatedWorkerResource);

              allocatedResources++;
            } else {
              insufficientWorkers.add(eachWorker);
            }
          }
        }
      }
    }
    return selectedWorkers;
  }