synchronized void determineMinHeldContainers()

in tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java [1880:1952]


  synchronized void determineMinHeldContainers() {
    sessionMinHeldContainers.clear();
    if (sessionNumMinHeldContainers <= 0) {
      return;
    }
    
    if (heldContainers.size() <= sessionNumMinHeldContainers) {
      sessionMinHeldContainers.addAll(heldContainers.keySet());
    }
    
    Map<String, AtomicInteger> rackHeldNumber = Maps.newHashMap();
    Map<String, List<HeldContainer>> nodeHeldContainers = Maps.newHashMap();
    for(HeldContainer heldContainer : heldContainers.values()) {
      AtomicInteger count = rackHeldNumber.get(heldContainer.getRack());
      if (count == null) {
        count = new AtomicInteger(0);
        rackHeldNumber.put(heldContainer.getRack(), count);
      }
      count.incrementAndGet();
      List<HeldContainer> nodeContainers = nodeHeldContainers.get(heldContainer.getNode());
      if (nodeContainers == null) {
        nodeContainers = Lists.newLinkedList();
        nodeHeldContainers.put(heldContainer.getNode(), nodeContainers);
      }
      nodeContainers.add(heldContainer);
    }
    Map<String, AtomicInteger> rackToHoldNumber = Maps.newHashMap();
    for (String rack : rackHeldNumber.keySet()) {
      rackToHoldNumber.put(rack, new AtomicInteger(0));
    }
    
    // distribute evenly across nodes
    // the loop assigns 1 container per rack over all racks
    int containerCount = 0;
    while (containerCount < sessionNumMinHeldContainers && !rackHeldNumber.isEmpty()) {
      Iterator<Entry<String, AtomicInteger>> iter = rackHeldNumber.entrySet().iterator();
      while (containerCount < sessionNumMinHeldContainers && iter.hasNext()) {
        Entry<String, AtomicInteger> entry = iter.next();
        if (entry.getValue().decrementAndGet() >=0) {
          containerCount++;
          rackToHoldNumber.get(entry.getKey()).incrementAndGet();
        } else {
          iter.remove();
        }
      }
    }
    
    // distribute containers evenly across nodes while not exceeding rack limit
    // the loop assigns 1 container per node over all nodes
    containerCount = 0;
    while (containerCount < sessionNumMinHeldContainers && !nodeHeldContainers.isEmpty()) {
      Iterator<Entry<String, List<HeldContainer>>> iter = nodeHeldContainers.entrySet().iterator();
      while (containerCount < sessionNumMinHeldContainers && iter.hasNext()) {
        List<HeldContainer> nodeContainers = iter.next().getValue();
        if (nodeContainers.isEmpty()) {
          // node is empty. remove it.
          iter.remove();
          continue;
        }
        HeldContainer heldContainer = nodeContainers.remove(nodeContainers.size() - 1);
        if (rackToHoldNumber.get(heldContainer.getRack()).decrementAndGet() >= 0) {
          // rack can hold a container
          containerCount++;
          sessionMinHeldContainers.add(heldContainer.getContainer().getId());
        } else {
          // rack limit reached. remove node.
          iter.remove();
        }
      }
    }
    
    LOG.info("Holding on to " + sessionMinHeldContainers.size() + " containers");
  }