in gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java [291:427]
void runInternal() {
Set<String> inUseInstances = new HashSet<>();
// helixInstancesContainingStuckTasks maintains the set of helix instances/participants containing tasks that are
// stuck in any of the configured states.
final Set<String> helixInstancesContainingStuckTasks = new HashSet<>();
YarnContainerRequestBundle yarnContainerRequestBundle = new YarnContainerRequestBundle();
for (Map.Entry<String, WorkflowConfig> workFlowEntry : taskDriver.getWorkflows().entrySet()) {
WorkflowContext workflowContext = taskDriver.getWorkflowContext(workFlowEntry.getKey());
WorkflowConfig workflowConfig = workFlowEntry.getValue();
// Only allocate for active workflows. Those marked for deletion are ignored but the existing containers won't
// be released until maxIdleTimeInMinutesBeforeScalingDown
if (workflowContext == null ||
TargetState.DELETE.equals(workflowConfig.getTargetState()) ||
!workflowContext.getWorkflowState().equals(TaskState.IN_PROGRESS)) {
continue;
}
log.debug("Workflow name {} config {} context {}", workFlowEntry.getKey(), workFlowEntry.getValue(),
workflowContext);
JobDag jobDag = workflowConfig.getJobDag();
Set<String> jobs = jobDag.getAllNodes();
// sum up the number of partitions
for (String jobName : jobs) {
JobContext jobContext = taskDriver.getJobContext(jobName);
JobConfig jobConfig = taskDriver.getJobConfig(jobName);
Resource resource = Resource.newInstance(this.defaultContainerMemoryMbs, this.defaultContainerCores);
int numPartitions = 0;
String jobTag = defaultHelixInstanceTags;
if (jobContext != null) {
log.debug("JobContext {} num partitions {}", jobContext, jobContext.getPartitionSet().size());
inUseInstances.addAll(jobContext.getPartitionSet().stream()
.map(i -> getInuseParticipantForHelixPartition(jobContext, i))
.filter(Objects::nonNull).collect(Collectors.toSet()));
if (enableDetectionStuckTask) {
// if feature is not enabled the set helixInstancesContainingStuckTasks will always be empty
helixInstancesContainingStuckTasks.addAll(jobContext.getPartitionSet().stream()
.map(helixPartition -> getParticipantInGivenStateForHelixPartition(jobContext, helixPartition, taskStates))
.filter(Objects::nonNull).collect(Collectors.toSet()));
}
numPartitions = jobContext.getPartitionSet().size();
// Job level config for helix instance tags takes precedence over other tag configurations
if (jobConfig != null) {
if (!Strings.isNullOrEmpty(jobConfig.getInstanceGroupTag())) {
jobTag = jobConfig.getInstanceGroupTag();
}
Map<String, String> jobCommandConfigMap = jobConfig.getJobCommandConfigMap();
if(jobCommandConfigMap.containsKey(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS)){
resource.setMemory(Integer.parseInt(jobCommandConfigMap.get(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS)));
}
if(jobCommandConfigMap.containsKey(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES)){
resource.setVirtualCores(Integer.parseInt(jobCommandConfigMap.get(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES)));
}
}
}
// compute the container count as a ceiling of number of partitions divided by the number of containers
// per partition. Scale the result by a constant overprovision factor.
int containerCount = (int) Math.ceil(((double)numPartitions / this.partitionsPerContainer) * this.overProvisionFactor);
yarnContainerRequestBundle.add(jobTag, containerCount, resource);
log.info("jobName={}, jobTag={}, numPartitions={}, targetNumContainers={}",
jobName, jobTag, numPartitions, containerCount);
}
}
// Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
// and potentially replanner-instance.
Set<String> allParticipants = HelixUtils.getParticipants(helixDataAccessor, HELIX_YARN_INSTANCE_NAME_PREFIX);
final Set<Container> containersToRelease = new HashSet<>();
// Find all joined participants not in-use for this round of inspection.
// If idle time is beyond tolerance, mark the instance as unused by assigning timestamp as -1.
for (String participant : allParticipants) {
if (!inUseInstances.contains(participant)) {
instanceIdleSince.putIfAbsent(participant, System.currentTimeMillis());
if (!isInstanceUnused(participant)) {
inUseInstances.add(participant);
}
} else {
// A previously idle instance is now detected to be in use.
// Remove this instance if existed in the tracking map.
instanceIdleSince.remove(participant);
}
if(helixInstancesContainingStuckTasks.contains(participant)) {
instanceStuckSince.putIfAbsent(participant, System.currentTimeMillis());
if (isInstanceStuck(participant)) {
// release the corresponding container as the helix task is stuck for a long time
log.info("Instance {} has some helix partition that is stuck for {} minutes, "
+ "releasing the container enabled : {}", participant,
TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis() - instanceStuckSince.get(participant)),
enableReleasingContainerHavingStuckTask);
// get container of the helix participant
Optional<Container> container = yarnService.getContainerInfoGivenHelixParticipant(participant);
instanceStuckSince.remove(participant);
String containerId = "";
if(container.isPresent()) {
if (enableReleasingContainerHavingStuckTask) {
containersToRelease.add(container.get());
}
containerId = container.get().getId().toString();
} else {
log.warn("Container information for participant {} is not found", participant);
}
if(this.yarnService.getEventSubmitter().isPresent()) {
// send GTE
this.yarnService.getEventSubmitter().get().submit(GobblinYarnEventConstants.EventNames.HELIX_PARTITION_STUCK,
GobblinHelixConstants.HELIX_INSTANCE_NAME_KEY, participant,
GobblinYarnMetricTagNames.CONTAINER_ID, containerId);
}
}
} else {
instanceStuckSince.remove(participant);
}
}
// release the containers
if(!containersToRelease.isEmpty()) {
this.yarnService.getEventBus().post(new ContainerReleaseRequest(containersToRelease, true));
}
slidingWindowReservoir.add(yarnContainerRequestBundle);
log.debug("There are {} containers being requested in total, tag-count map {}, tag-resource map {}",
yarnContainerRequestBundle.getTotalContainers(), yarnContainerRequestBundle.getHelixTagContainerCountMap(),
yarnContainerRequestBundle.getHelixTagResourceMap());
this.yarnService.requestTargetNumberOfContainers(slidingWindowReservoir.getMax(), inUseInstances);
}