void preemptIfNeeded()

in tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java [1027:1160]


  void preemptIfNeeded() {
    ContainerId preemptedContainer = null;
    synchronized (this) {
      Resource freeResources = Resources.subtract(totalResources,
        allocatedResources);
      if (LOG.isDebugEnabled()) {
        LOG.debug("Allocated resource memory: " + allocatedResources.getMemory() +
          " cpu:" + allocatedResources.getVirtualCores() + 
          " delayedContainers: " + delayedContainerManager.delayedContainers.size());
      }
      assert freeResources.getMemory() >= 0;
  
      CookieContainerRequest highestPriRequest = null;
      for(CookieContainerRequest request : taskRequests.values()) {
        if(highestPriRequest == null) {
          highestPriRequest = request;
        } else if(isHigherPriority(request.getPriority(),
                                     highestPriRequest.getPriority())){
          highestPriRequest = request;
        }
      }
      if(highestPriRequest != null &&
         !fitsIn(highestPriRequest.getCapability(), freeResources)) {
        // highest priority request will not fit in existing free resources
        // free up some more
        // TODO this is subject to error wrt RM resource normalization
        
        // This request must have been considered for matching with all existing 
        // containers when request was made.
        Container lowestPriNewContainer = null;
        // could not find anything to preempt. Check if we can release unused 
        // containers
        for (HeldContainer heldContainer : delayedContainerManager.delayedContainers) {
          if (!heldContainer.isNew()) {
            if (LOG.isDebugEnabled()) {
              LOG.debug("Reused container exists. Wait for assignment loop to release it. "
                  + heldContainer.getContainer().getId());
            }
            return;
          }
          if (heldContainer.geNumAssignmentAttempts() < 3) {
            // we havent tried to assign this container at node/rack/ANY
            if (LOG.isDebugEnabled()) {
              LOG.debug("Brand new container. Wait for assignment loop to match it. "
                  + heldContainer.getContainer().getId());
            }
            return;
          }
          Container container = heldContainer.getContainer();
          if (lowestPriNewContainer == null ||
              isHigherPriority(lowestPriNewContainer.getPriority(), container.getPriority())){
            // there is a lower priority new container
            lowestPriNewContainer = container;
          }
        }
        
        if (lowestPriNewContainer != null) {
          LOG.info("Preempting new container: " + lowestPriNewContainer.getId() +
              " with priority: " + lowestPriNewContainer.getPriority() + 
              " to free resource for request: " + highestPriRequest +
              " . Current free resources: " + freeResources);
          releaseUnassignedContainers(Collections.singletonList(lowestPriNewContainer));
          // We are returning an unused resource back the RM. The RM thinks it 
          // has serviced our initial request and will not re-allocate this back
          // to us anymore. So we need to ask for this again. If there is no
          // outstanding request at that priority then its fine to not ask again.
          // See TEZ-915 for more details
          for (Map.Entry<Object, CookieContainerRequest> entry : taskRequests.entrySet()) {
            Object task = entry.getKey();
            CookieContainerRequest request = entry.getValue();
            if (request.getPriority().equals(lowestPriNewContainer.getPriority())) {
              LOG.info("Resending request for task again: " + task);
              deallocateTask(task, true);
              allocateTask(task, request.getCapability(), 
                  (request.getNodes() == null ? null : 
                    request.getNodes().toArray(new String[request.getNodes().size()])), 
                    (request.getRacks() == null ? null : 
                      request.getRacks().toArray(new String[request.getRacks().size()])), 
                    request.getPriority(), 
                    request.getCookie().getContainerSignature(),
                    request.getCookie().getAppCookie());
              break;
            }
          }
          
          return;
        }
        
        // this assert will be a no-op in production but can help identify 
        // invalid assumptions during testing
        assert delayedContainerManager.delayedContainers.isEmpty();
        
        // there are no reused or new containers to release
        // try to preempt running containers
        Map.Entry<Object, Container> preemptedEntry = null;
        for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
          HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
          CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo();
          Priority taskPriority = lastTaskInfo.getPriority();
          Object signature = lastTaskInfo.getCookie().getContainerSignature();
          if(!isHigherPriority(highestPriRequest.getPriority(), taskPriority)) {
            // higher or same priority
            continue;
          }
          if (containerSignatureMatcher.isExactMatch(
              highestPriRequest.getCookie().getContainerSignature(),
              signature)) {
            // exact match with different priorities
            continue;
          }
          if(preemptedEntry == null ||
             !isHigherPriority(taskPriority, 
                 preemptedEntry.getValue().getPriority())) {
            // keep the lower priority or the one added later
            preemptedEntry = entry;
          }
        }
        if(preemptedEntry != null) {
          // found something to preempt
          LOG.info("Preempting task: " + preemptedEntry.getKey() +
              " to free resource for request: " + highestPriRequest +
              " . Current free resources: " + freeResources);
          preemptedContainer = preemptedEntry.getValue().getId();
          // app client will be notified when after container is killed
          // and we get its completed container status
        }
      }
    }
    
    // upcall outside locks
    if (preemptedContainer != null) {
      appClientDelegate.preemptContainer(preemptedContainer);
    }
  }