in tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java [1880:1952]
synchronized void determineMinHeldContainers() {
sessionMinHeldContainers.clear();
if (sessionNumMinHeldContainers <= 0) {
return;
}
if (heldContainers.size() <= sessionNumMinHeldContainers) {
sessionMinHeldContainers.addAll(heldContainers.keySet());
}
Map<String, AtomicInteger> rackHeldNumber = Maps.newHashMap();
Map<String, List<HeldContainer>> nodeHeldContainers = Maps.newHashMap();
for(HeldContainer heldContainer : heldContainers.values()) {
AtomicInteger count = rackHeldNumber.get(heldContainer.getRack());
if (count == null) {
count = new AtomicInteger(0);
rackHeldNumber.put(heldContainer.getRack(), count);
}
count.incrementAndGet();
List<HeldContainer> nodeContainers = nodeHeldContainers.get(heldContainer.getNode());
if (nodeContainers == null) {
nodeContainers = Lists.newLinkedList();
nodeHeldContainers.put(heldContainer.getNode(), nodeContainers);
}
nodeContainers.add(heldContainer);
}
Map<String, AtomicInteger> rackToHoldNumber = Maps.newHashMap();
for (String rack : rackHeldNumber.keySet()) {
rackToHoldNumber.put(rack, new AtomicInteger(0));
}
// distribute evenly across nodes
// the loop assigns 1 container per rack over all racks
int containerCount = 0;
while (containerCount < sessionNumMinHeldContainers && !rackHeldNumber.isEmpty()) {
Iterator<Entry<String, AtomicInteger>> iter = rackHeldNumber.entrySet().iterator();
while (containerCount < sessionNumMinHeldContainers && iter.hasNext()) {
Entry<String, AtomicInteger> entry = iter.next();
if (entry.getValue().decrementAndGet() >=0) {
containerCount++;
rackToHoldNumber.get(entry.getKey()).incrementAndGet();
} else {
iter.remove();
}
}
}
// distribute containers evenly across nodes while not exceeding rack limit
// the loop assigns 1 container per node over all nodes
containerCount = 0;
while (containerCount < sessionNumMinHeldContainers && !nodeHeldContainers.isEmpty()) {
Iterator<Entry<String, List<HeldContainer>>> iter = nodeHeldContainers.entrySet().iterator();
while (containerCount < sessionNumMinHeldContainers && iter.hasNext()) {
List<HeldContainer> nodeContainers = iter.next().getValue();
if (nodeContainers.isEmpty()) {
// node is empty. remove it.
iter.remove();
continue;
}
HeldContainer heldContainer = nodeContainers.remove(nodeContainers.size() - 1);
if (rackToHoldNumber.get(heldContainer.getRack()).decrementAndGet() >= 0) {
// rack can hold a container
containerCount++;
sessionMinHeldContainers.add(heldContainer.getContainer().getId());
} else {
// rack limit reached. remove node.
iter.remove();
}
}
}
LOG.info("Holding on to " + sessionMinHeldContainers.size() + " containers");
}