in gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java [491:600]
public synchronized boolean requestTargetNumberOfContainers(YarnContainerRequestBundle yarnContainerRequestBundle, Set<String> inUseInstances) {
LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances count is {}, container map size is {}",
yarnContainerRequestBundle.getTotalContainers(), inUseInstances.size(), this.containerMap.size());
if (startupInProgress) {
LOGGER.warn("YarnService is still starting up. Unable to request containers from yarn until YarnService is finished starting up.");
return false;
}
//Correct the containerMap first as there is cases that handleContainerCompletion() is called before onContainersAllocated()
for (ContainerId removedId : this.removedContainerID.keySet()) {
ContainerInfo containerInfo = this.containerMap.remove(removedId);
if (containerInfo != null) {
String helixTag = containerInfo.getHelixTag();
allocatedContainerCountMap.putIfAbsent(helixTag, new AtomicInteger(0));
this.allocatedContainerCountMap.get(helixTag).decrementAndGet();
this.removedContainerID.remove(removedId);
}
}
int numTargetContainers = yarnContainerRequestBundle.getTotalContainers();
// YARN can allocate more than the requested number of containers, compute additional allocations and deallocations
// based on the max of the requested and actual allocated counts
// Represents the number of containers allocated for across all helix tags
int totalAllocatedContainers = this.containerMap.size();
int totalContainersInContainerCountMap = 0;
for (AtomicInteger count: allocatedContainerCountMap.values()) {
totalContainersInContainerCountMap += count.get();
}
if (totalContainersInContainerCountMap != totalAllocatedContainers) {
LOGGER.warn(String.format("Container number mismatch in containerMap and allocatedContainerCountMap, "
+ "we have %s containers in containerMap while %s in allocatedContainerCountMap", totalAllocatedContainers, totalContainersInContainerCountMap));
}
// Request additional containers if the desired count is higher than the max of the current allocation or previously
// requested amount. Note that there may be in-flight or additional allocations after numContainers has been computed
// so overshooting can occur, but periodic calls to this method will make adjustments towards the target.
for (Map.Entry<String, Integer> entry : yarnContainerRequestBundle.getHelixTagContainerCountMap().entrySet()) {
String currentHelixTag = entry.getKey();
int desiredContainerCount = entry.getValue();
Resource resourceForHelixTag = yarnContainerRequestBundle.getHelixTagResourceMap().get(currentHelixTag);
// Calculate requested container count based on adding allocated count and outstanding ContainerRequests in Yarn
allocatedContainerCountMap.putIfAbsent(currentHelixTag, new AtomicInteger(0));
int allocatedContainersForHelixTag = allocatedContainerCountMap.get(currentHelixTag).get();
int outstandingContainerRequests = getMatchingRequestsCount(resourceForHelixTag);
int requestedContainerCount = allocatedContainersForHelixTag + outstandingContainerRequests;
int numContainersNeeded = desiredContainerCount - requestedContainerCount;
LOGGER.info("Container counts for helixTag={} (allocatedContainers={}, outstandingContainerRequests={}, desiredContainerCount={}, numContainersNeeded={})",
currentHelixTag, allocatedContainersForHelixTag, outstandingContainerRequests, desiredContainerCount, numContainersNeeded);
if (numContainersNeeded > 0) {
requestContainers(numContainersNeeded, resourceForHelixTag);
}
}
//Iterate through all containers allocated and check whether the corresponding helix instance is still LIVE within the helix cluster.
// A container that has a bad connection to zookeeper will be dropped from the Helix cluster if the disconnection is greater than the specified timeout.
// In these cases, we want to release the container to get a new container because these containers won't be assigned tasks by Helix
List<Container> containersToRelease = new ArrayList<>();
HashSet<ContainerId> idleContainerIdsToRelease = new HashSet<>();
for (Map.Entry<ContainerId, ContainerInfo> entry : this.containerMap.entrySet()) {
ContainerInfo containerInfo = entry.getValue();
if (!HelixUtils.isInstanceLive(helixManager, containerInfo.getHelixParticipantId())) {
containerIdleSince.putIfAbsent(entry.getKey(), System.currentTimeMillis());
if (System.currentTimeMillis() - containerIdleSince.get(entry.getKey())
>= TimeUnit.MINUTES.toMillis(YarnAutoScalingManager.DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES)) {
LOGGER.info("Releasing Container {} because the assigned participant {} has been in-active for more than {} minutes",
entry.getKey(), containerInfo.getHelixParticipantId(), YarnAutoScalingManager.DEFAULT_MAX_CONTAINER_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES);
containersToRelease.add(containerInfo.getContainer());
idleContainerIdsToRelease.add(entry.getKey());
}
} else {
containerIdleSince.remove(entry.getKey());
}
}
// If the total desired is lower than the currently allocated amount then release free containers.
// This is based on the currently allocated amount since containers may still be in the process of being allocated
// and assigned work. Resizing based on numRequestedContainers at this point may release a container right before
// or soon after it is assigned work.
if (numTargetContainers < totalAllocatedContainers - idleContainerIdsToRelease.size()) {
int numToShutdown = totalAllocatedContainers - numTargetContainers;
LOGGER.info("Shrinking number of containers by {} because numTargetContainers < totalAllocatedContainers - idleContainersToRelease ({} < {} - {})",
totalAllocatedContainers - numTargetContainers - idleContainerIdsToRelease.size(), numTargetContainers, totalAllocatedContainers, idleContainerIdsToRelease.size());
// Look for eligible containers to release. If a container is in use then it is not released.
for (Map.Entry<ContainerId, ContainerInfo> entry : this.containerMap.entrySet()) {
ContainerInfo containerInfo = entry.getValue();
if (!inUseInstances.contains(containerInfo.getHelixParticipantId()) && !idleContainerIdsToRelease.contains(entry.getKey())) {
containersToRelease.add(containerInfo.getContainer());
}
if (containersToRelease.size() >= numToShutdown) {
break;
}
}
LOGGER.info("Shutting down {} containers. containersToRelease={}", containersToRelease.size(), containersToRelease);
}
if (!containersToRelease.isEmpty()) {
this.eventBus.post(new ContainerReleaseRequest(containersToRelease));
}
this.yarnContainerRequest = yarnContainerRequestBundle;
LOGGER.info("Current tag-container desired count:{}, tag-container allocated: {}",
yarnContainerRequestBundle.getHelixTagContainerCountMap(), this.allocatedContainerCountMap);
return true;
}