public synchronized boolean requestTargetNumberOfContainers()

in gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java [491:600]


  public synchronized boolean requestTargetNumberOfContainers(YarnContainerRequestBundle yarnContainerRequestBundle, Set<String> inUseInstances) {
    LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances count is {}, container map size is {}",
        yarnContainerRequestBundle.getTotalContainers(), inUseInstances.size(), this.containerMap.size());
    if (startupInProgress) {
      LOGGER.warn("YarnService is still starting up. Unable to request containers from yarn until YarnService is finished starting up.");
      return false;
    }

    //Correct the containerMap first as there is cases that handleContainerCompletion() is called before onContainersAllocated()
    for (ContainerId removedId : this.removedContainerID.keySet()) {
      ContainerInfo containerInfo = this.containerMap.remove(removedId);
      if (containerInfo != null) {
        String helixTag = containerInfo.getHelixTag();
        allocatedContainerCountMap.putIfAbsent(helixTag, new AtomicInteger(0));
        this.allocatedContainerCountMap.get(helixTag).decrementAndGet();
        this.removedContainerID.remove(removedId);
      }
    }

    int numTargetContainers = yarnContainerRequestBundle.getTotalContainers();
    // YARN can allocate more than the requested number of containers, compute additional allocations and deallocations
    // based on the max of the requested and actual allocated counts
    // Represents the number of containers allocated for across all helix tags
    int totalAllocatedContainers = this.containerMap.size();
    int totalContainersInContainerCountMap = 0;
    for (AtomicInteger count: allocatedContainerCountMap.values()) {
      totalContainersInContainerCountMap += count.get();
    }
    if (totalContainersInContainerCountMap != totalAllocatedContainers) {
      LOGGER.warn(String.format("Container number mismatch in containerMap and allocatedContainerCountMap, "
          + "we have %s containers in containerMap while %s in allocatedContainerCountMap", totalAllocatedContainers, totalContainersInContainerCountMap));
    }

    // Request additional containers if the desired count is higher than the max of the current allocation or previously
    // requested amount. Note that there may be in-flight or additional allocations after numContainers has been computed
    // so overshooting can occur, but periodic calls to this method will make adjustments towards the target.
    for (Map.Entry<String, Integer> entry : yarnContainerRequestBundle.getHelixTagContainerCountMap().entrySet()) {
      String currentHelixTag = entry.getKey();
      int desiredContainerCount = entry.getValue();
      Resource resourceForHelixTag = yarnContainerRequestBundle.getHelixTagResourceMap().get(currentHelixTag);

      // Calculate requested container count based on adding allocated count and outstanding ContainerRequests in Yarn
      allocatedContainerCountMap.putIfAbsent(currentHelixTag, new AtomicInteger(0));
      int allocatedContainersForHelixTag = allocatedContainerCountMap.get(currentHelixTag).get();
      int outstandingContainerRequests = getMatchingRequestsCount(resourceForHelixTag);
      int requestedContainerCount = allocatedContainersForHelixTag + outstandingContainerRequests;
      int numContainersNeeded = desiredContainerCount - requestedContainerCount;
      LOGGER.info("Container counts for helixTag={} (allocatedContainers={}, outstandingContainerRequests={}, desiredContainerCount={}, numContainersNeeded={})",
          currentHelixTag, allocatedContainersForHelixTag, outstandingContainerRequests, desiredContainerCount, numContainersNeeded);

      if (numContainersNeeded > 0) {
        requestContainers(numContainersNeeded, resourceForHelixTag);
      }
    }

    //Iterate through all containers allocated and check whether the corresponding helix instance is still LIVE within the helix cluster.
    // A container that has a bad connection to zookeeper will be dropped from the Helix cluster if the disconnection is greater than the specified timeout.
    // In these cases, we want to release the container to get a new container because these containers won't be assigned tasks by Helix

    List<Container> containersToRelease = new ArrayList<>();
    HashSet<ContainerId> idleContainerIdsToRelease = new HashSet<>();
    for (Map.Entry<ContainerId, ContainerInfo> entry : this.containerMap.entrySet()) {
      ContainerInfo containerInfo = entry.getValue();
      if (!HelixUtils.isInstanceLive(helixManager, containerInfo.getHelixParticipantId())) {
        containerIdleSince.putIfAbsent(entry.getKey(), System.currentTimeMillis());
        if (System.currentTimeMillis() - containerIdleSince.get(entry.getKey())
            >= TimeUnit.MINUTES.toMillis(YarnAutoScalingManager.DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES)) {
          LOGGER.info("Releasing Container {} because the assigned participant {} has been in-active for more than {} minutes",
              entry.getKey(), containerInfo.getHelixParticipantId(), YarnAutoScalingManager.DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES);
          containersToRelease.add(containerInfo.getContainer());
          idleContainerIdsToRelease.add(entry.getKey());
        }
      } else {
        containerIdleSince.remove(entry.getKey());
      }
    }

    // If the total desired is lower than the currently allocated amount then release free containers.
    // This is based on the currently allocated amount since containers may still be in the process of being allocated
    // and assigned work. Resizing based on numRequestedContainers at this point may release a container right before
    // or soon after it is assigned work.
    if (numTargetContainers < totalAllocatedContainers - idleContainerIdsToRelease.size()) {
      int numToShutdown = totalAllocatedContainers - numTargetContainers;

      LOGGER.info("Shrinking number of containers by {} because numTargetContainers < totalAllocatedContainers - idleContainersToRelease ({} < {} - {})",
          totalAllocatedContainers - numTargetContainers - idleContainerIdsToRelease.size(), numTargetContainers, totalAllocatedContainers, idleContainerIdsToRelease.size());

      // Look for eligible containers to release. If a container is in use then it is not released.
      for (Map.Entry<ContainerId, ContainerInfo> entry : this.containerMap.entrySet()) {
        ContainerInfo containerInfo = entry.getValue();
        if (!inUseInstances.contains(containerInfo.getHelixParticipantId()) && !idleContainerIdsToRelease.contains(entry.getKey())) {
          containersToRelease.add(containerInfo.getContainer());
        }

        if (containersToRelease.size() >= numToShutdown) {
          break;
        }
      }

      LOGGER.info("Shutting down {} containers. containersToRelease={}", containersToRelease.size(), containersToRelease);
    }

    if (!containersToRelease.isEmpty()) {
      this.eventBus.post(new ContainerReleaseRequest(containersToRelease));
    }
    this.yarnContainerRequest = yarnContainerRequestBundle;
    LOGGER.info("Current tag-container desired count:{}, tag-container allocated: {}",
        yarnContainerRequestBundle.getHelixTagContainerCountMap(), this.allocatedContainerCountMap);
    return true;
  }