in myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java [203:246]
protected void handleContainerAllocation(RMNode rmNode) {
String host = rmNode.getNodeID().getHost();
ConsumedOffer consumedOffer = offerLifecycleMgr.drainConsumedOffer(host);
if (consumedOffer == null) {
LOGGER.debug("No offer consumed for {}", host);
return;
}
Node node = nodeStore.getNode(host);
Set<RMContainer> containersBeforeSched = node.getContainerSnapshot();
Set<RMContainer> containersAfterSched = new HashSet<>(node.getNode().getRunningContainers());
Set<RMContainer> containersAllocatedByMesosOffer = (containersBeforeSched == null) ? containersAfterSched : Sets.difference(
containersAfterSched, containersBeforeSched);
if (containersAllocatedByMesosOffer.isEmpty()) {
LOGGER.debug("No containers allocated using Mesos offers for host: {}", host);
for (Protos.Offer offer : consumedOffer.getOffers()) {
offerLifecycleMgr.declineOffer(offer);
}
decrementNodeCapacity(rmNode, OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers()));
} else {
LOGGER.debug("Containers allocated using Mesos offers for host: {} count: {}", host, containersAllocatedByMesosOffer.size());
// Identify the Mesos tasks that need to be launched
List<Protos.TaskInfo> tasks = Lists.newArrayList();
Resource resUsed = Resource.newInstance(0, 0);
for (RMContainer newContainer : containersAllocatedByMesosOffer) {
tasks.add(getTaskInfoForContainer(newContainer, consumedOffer, node));
resUsed = Resources.add(resUsed, newContainer.getAllocatedResource());
}
// Reduce node capacity to account for unused offers
Resource resOffered = OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers());
Resource resUnused = Resources.subtract(resOffered, resUsed);
decrementNodeCapacity(rmNode, resUnused);
myriadDriver.getDriver().launchTasks(consumedOffer.getOfferIds(), tasks);
}
// No need to hold on to the snapshot anymore
node.removeContainerSnapshot();
}