protected void handleContainerAllocation()

in myriad-scheduler/src/main/java/org/apache/myriad/scheduler/fgs/YarnNodeCapacityManager.java [203:246]


  protected void handleContainerAllocation(RMNode rmNode) {
    String host = rmNode.getNodeID().getHost();

    ConsumedOffer consumedOffer = offerLifecycleMgr.drainConsumedOffer(host);
    if (consumedOffer == null) {
      LOGGER.debug("No offer consumed for {}", host);
      return;
    }

    Node node = nodeStore.getNode(host);
    Set<RMContainer> containersBeforeSched = node.getContainerSnapshot();
    Set<RMContainer> containersAfterSched = new HashSet<>(node.getNode().getRunningContainers());

    Set<RMContainer> containersAllocatedByMesosOffer = (containersBeforeSched == null) ? containersAfterSched : Sets.difference(
        containersAfterSched, containersBeforeSched);

    if (containersAllocatedByMesosOffer.isEmpty()) {
      LOGGER.debug("No containers allocated using Mesos offers for host: {}", host);
      for (Protos.Offer offer : consumedOffer.getOffers()) {
        offerLifecycleMgr.declineOffer(offer);
      }
      decrementNodeCapacity(rmNode, OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers()));
    } else {
      LOGGER.debug("Containers allocated using Mesos offers for host: {} count: {}", host, containersAllocatedByMesosOffer.size());

      // Identify the Mesos tasks that need to be launched
      List<Protos.TaskInfo> tasks = Lists.newArrayList();
      Resource resUsed = Resource.newInstance(0, 0);

      for (RMContainer newContainer : containersAllocatedByMesosOffer) {
        tasks.add(getTaskInfoForContainer(newContainer, consumedOffer, node));
        resUsed = Resources.add(resUsed, newContainer.getAllocatedResource());
      }

      // Reduce node capacity to account for unused offers
      Resource resOffered = OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers());
      Resource resUnused = Resources.subtract(resOffered, resUsed);
      decrementNodeCapacity(rmNode, resUnused);
      myriadDriver.getDriver().launchTasks(consumedOffer.getOfferIds(), tasks);
    }

    // No need to hold on to the snapshot anymore
    node.removeContainerSnapshot();
  }