boolean preemptIfNeeded()

in tez-dag/src/main/java/org/apache/tez/dag/app/rm/YarnTaskSchedulerService.java [1164:1417]


  boolean preemptIfNeeded() {
    if (preemptionPercentage == 0) {
      // turned off
      return true;
    }
    ContainerId[] preemptedContainers = null;
    int numPendingRequestsToService = 0;
    synchronized (this) {
      Resource freeResources = this.getAvailableResources();
      if (LOG.isDebugEnabled()) {
        LOG.debug(constructPreemptionPeriodicLog(freeResources));
      } else {
        if (numHeartbeats % 50 == 1) {
          LOG.info(constructPreemptionPeriodicLog(freeResources));
        }
      }
      assert freeResources.getMemory() >= 0;
  
      CookieContainerRequest highestPriRequest = null;
      int numHighestPriRequests = 0;
      for(CookieContainerRequest request : taskRequests.values()) {
        if(highestPriRequest == null) {
          highestPriRequest = request;
          numHighestPriRequests = 1;
        } else if(isHigherPriority(request.getPriority(),
                                     highestPriRequest.getPriority())){
          highestPriRequest = request;
          numHighestPriRequests = 1;
        } else if (request.getPriority().equals(highestPriRequest.getPriority())) {
          numHighestPriRequests++;
        }
      }
      
      if (highestPriRequest == null) {
        // nothing pending
        resetHighestWaitingPriority(null);
        return true;
      }
      
      // reset the wait time when waiting priority changes to prevent carry over of the value
      if (highestWaitingRequestPriority == null ||
          !highestPriRequest.getPriority().equals(highestWaitingRequestPriority)) {
        resetHighestWaitingPriority(highestPriRequest.getPriority());
      }
      
      long currTime = System.currentTimeMillis();
      if (highestWaitingRequestWaitStartTime == 0) {
        highestWaitingRequestWaitStartTime = currTime;
      }

      boolean preemptionWaitDeadlineCrossed = 
          (currTime - highestWaitingRequestWaitStartTime) > preemptionMaxWaitTime ? true : false;

      if(!preemptionWaitDeadlineCrossed && 
          fitsIn(highestPriRequest.getCapability(), freeResources)) {
        LOG.debug("{} fits in free resources", highestPriRequest);
        if (numHeartbeats % 50 == 1) {
          LOG.info(highestPriRequest + " fits in free resources");
        }
        return true;
      }
      
      if (preemptionWaitDeadlineCrossed) {
        // check if anything lower priority is running - priority inversion
        // this check could have been done earlier but in the common case
        // this would be unnecessary since there are usually requests pending
        // in the normal case without priority inversion. So do this expensive
        // iteration now
        boolean lowerPriRunning = false;
        for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
          HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
          CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo();
          Priority taskPriority = lastTaskInfo.getPriority();
          Object signature = lastTaskInfo.getCookie().getContainerSignature();
          if(isHigherPriority(highestPriRequest.getPriority(), taskPriority)) {
            // lower priority task is running
            if (containerSignatureMatcher.isExactMatch(
                highestPriRequest.getCookie().getContainerSignature(),
                signature)) {
              // exact match with different priorities
              continue;
            }
            lowerPriRunning = true;
            break;
          }
        }
        if (!lowerPriRunning) {
          // nothing lower priority running
          // normal case of many pending request without priority inversion
          resetHighestWaitingPriority(null);
          return true;
        }
        LOG.info("Preemption deadline crossed at pri: " + highestPriRequest.getPriority()
            + " numRequests: " + numHighestPriRequests + ". "
            + constructPreemptionPeriodicLog(freeResources));
      }
      
      // highest priority request will not fit in existing free resources
      // free up some more
      // TODO this is subject to error wrt RM resource normalization
      
      numPendingRequestsToService = scaleDownByPreemptionPercentage(numHighestPriRequests,
          preemptionPercentage);

      if (numPendingRequestsToService < 1) {
        // nothing to preempt - reset preemption last heartbeat
        return true;
      }

      if (LOG.isDebugEnabled()) {
        LOG.debug("Trying to service " + numPendingRequestsToService + " out of total "
            + numHighestPriRequests + " pending requests at pri: "
            + highestPriRequest.getPriority());
      }
      int newContainersReleased = 0;
      for (int i=0; i<numPendingRequestsToService; ++i) {
        // This request must have been considered for matching with all existing 
        // containers when request was made.
        Container lowestPriNewContainer = null;
        // could not find anything to preempt. Check if we can release unused 
        // containers
        for (HeldContainer heldContainer : delayedContainerManager.delayedContainers) {
          if (!heldContainer.isNew()) {
            if (LOG.isDebugEnabled()) {
              LOG.debug("Reused container exists. Wait for assignment loop to release it. "
                  + heldContainer.getContainer().getId());
            }
            return true;
          }
          if (heldContainer.geNumAssignmentAttempts() < 3) {
            // we havent tried to assign this container at node/rack/ANY
            if (LOG.isDebugEnabled()) {
              LOG.debug("Brand new container. Wait for assignment loop to match it. "
                  + heldContainer.getContainer().getId());
            }
            return true;
          }
          Container container = heldContainer.getContainer();
          if (lowestPriNewContainer == null ||
              isHigherPriority(lowestPriNewContainer.getPriority(), container.getPriority())){
            // there is a lower priority new container
            lowestPriNewContainer = container;
          }
        }
        
        if (lowestPriNewContainer != null) {
          LOG.info("Preempting new container: " + lowestPriNewContainer.getId() +
              " with priority: " + lowestPriNewContainer.getPriority() + 
              " to free resource for request: " + highestPriRequest +
              " . Current free resources: " + freeResources);
          newContainersReleased++;
          releaseUnassignedContainers(Collections.singletonList(lowestPriNewContainer));
          // We are returning an unused resource back the RM. The RM thinks it 
          // has serviced our initial request and will not re-allocate this back
          // to us anymore. So we need to ask for this again. If there is no
          // outstanding request at that priority then its fine to not ask again.
          // See TEZ-915 for more details
          maybeRescheduleContainerAtPriority(lowestPriNewContainer.getPriority());

          // come back and free more new containers if needed
          continue;
        }
      }
      numPendingRequestsToService -= newContainersReleased;
      if (numPendingRequestsToService < 1) {
        return true;
      }

      // there are no reused or new containers to release. try to preempt running containers
      // this assert will be a no-op in production but can help identify 
      // invalid assumptions during testing
      assert delayedContainerManager.delayedContainers.isEmpty();
      if (!delayedContainerManager.delayedContainers.isEmpty()) {
        LOG.warn("Expected delayed containers to be empty. "
            + constructPreemptionPeriodicLog(freeResources));
      }
      
      Priority preemptedTaskPriority = null;
      int numEntriesAtPreemptedPriority = 0;
      for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
        HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
        CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo();
        Priority taskPriority = lastTaskInfo.getPriority();
        Object signature = lastTaskInfo.getCookie().getContainerSignature();
        if(!isHigherPriority(highestPriRequest.getPriority(), taskPriority)) {
          // higher or same priority
          continue;
        }
        if (containerSignatureMatcher.isExactMatch(
            highestPriRequest.getCookie().getContainerSignature(),
            signature)) {
          // exact match with different priorities
          continue;
        }
        if (preemptedTaskPriority == null ||
            !isHigherPriority(taskPriority, preemptedTaskPriority)) {
          // keep the lower priority
          if (taskPriority.equals(preemptedTaskPriority)) {
            numEntriesAtPreemptedPriority++;
          } else {
            // this is at a lower priority than existing
            numEntriesAtPreemptedPriority = 1;
          }
          preemptedTaskPriority = taskPriority;
        }
      }
      if(preemptedTaskPriority != null) {
        int newNumPendingRequestsToService = scaleDownByPreemptionPercentage(Math.min(
            numEntriesAtPreemptedPriority, numHighestPriRequests), preemptionPercentage);
        numPendingRequestsToService = Math.min(newNumPendingRequestsToService,
            numPendingRequestsToService);
        if (numPendingRequestsToService < 1) {
          return true;
        }
        // wait for enough heartbeats since this request became active for preemption
        if ((numHeartbeats - heartbeatAtLastPreemption) < numHeartbeatsBetweenPreemptions) {
          // stop incrementing lastpreemption heartbeat count
          return false;
        }
        LOG.info("Trying to service " + numPendingRequestsToService + " out of total "
            + numHighestPriRequests + " pending requests at pri: "
            + highestPriRequest.getPriority() + " by preempting from "
            + numEntriesAtPreemptedPriority + " running tasks at priority: " + preemptedTaskPriority);
        // found something to preempt. get others of the same priority
        preemptedContainers = new ContainerId[numPendingRequestsToService];
        int currIndex = 0;
        for (Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
          HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
          CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo();
          Priority taskPriority = lastTaskInfo.getPriority();
          Container container = entry.getValue();
          if (preemptedTaskPriority.equals(taskPriority)) {
            // taskAllocations map will iterate from oldest to newest assigned containers
            // keep the N newest containersIds with the matching priority
            preemptedContainers[currIndex++ % numPendingRequestsToService] = container.getId();
          }
        }
        // app client will be notified when after container is killed
        // and we get its completed container status
      }
    }
    
    // upcall outside locks
    if (preemptedContainers != null) {
      for(int i=0; i<numPendingRequestsToService; ++i) {
        ContainerId cId = preemptedContainers[i];
        if (cId != null) {
          LOG.info("Preempting container: " + cId + " currently allocated to a task.");
          getContext().preemptContainer(cId);
        }
      }
    }
    return true;
  }