void runInternal()

in gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java [291:427]


    void runInternal() {
      Set<String> inUseInstances = new HashSet<>();
      // helixInstancesContainingStuckTasks maintains the set of helix instances/participants containing tasks that are
      // stuck in any of the configured states.
      final Set<String> helixInstancesContainingStuckTasks = new HashSet<>();

      YarnContainerRequestBundle yarnContainerRequestBundle = new YarnContainerRequestBundle();
      for (Map.Entry<String, WorkflowConfig> workFlowEntry : taskDriver.getWorkflows().entrySet()) {
        WorkflowContext workflowContext = taskDriver.getWorkflowContext(workFlowEntry.getKey());
        WorkflowConfig workflowConfig = workFlowEntry.getValue();

        // Only allocate for active workflows. Those marked for deletion are ignored but the existing containers won't
        // be released until maxIdleTimeInMinutesBeforeScalingDown
        if (workflowContext == null ||
            TargetState.DELETE.equals(workflowConfig.getTargetState()) ||
            !workflowContext.getWorkflowState().equals(TaskState.IN_PROGRESS)) {
          continue;
        }

        log.debug("Workflow name {} config {} context {}", workFlowEntry.getKey(), workFlowEntry.getValue(),
            workflowContext);

        JobDag jobDag = workflowConfig.getJobDag();
        Set<String> jobs = jobDag.getAllNodes();

        // sum up the number of partitions
        for (String jobName : jobs) {
          JobContext jobContext = taskDriver.getJobContext(jobName);
          JobConfig jobConfig = taskDriver.getJobConfig(jobName);
          Resource resource = Resource.newInstance(this.defaultContainerMemoryMbs, this.defaultContainerCores);
          int numPartitions = 0;
          String jobTag = defaultHelixInstanceTags;
          if (jobContext != null) {
            log.debug("JobContext {} num partitions {}", jobContext, jobContext.getPartitionSet().size());

            inUseInstances.addAll(jobContext.getPartitionSet().stream()
                .map(i -> getInuseParticipantForHelixPartition(jobContext, i))
                .filter(Objects::nonNull).collect(Collectors.toSet()));

            if (enableDetectionStuckTask) {
              // if feature is not enabled the set helixInstancesContainingStuckTasks will always be empty
              helixInstancesContainingStuckTasks.addAll(jobContext.getPartitionSet().stream()
                  .map(helixPartition -> getParticipantInGivenStateForHelixPartition(jobContext, helixPartition, taskStates))
                  .filter(Objects::nonNull).collect(Collectors.toSet()));
            }

            numPartitions = jobContext.getPartitionSet().size();
            // Job level config for helix instance tags takes precedence over other tag configurations
            if (jobConfig != null) {
              if (!Strings.isNullOrEmpty(jobConfig.getInstanceGroupTag())) {
                jobTag = jobConfig.getInstanceGroupTag();
              }
              Map<String, String> jobCommandConfigMap = jobConfig.getJobCommandConfigMap();
              if(jobCommandConfigMap.containsKey(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS)){
                resource.setMemory(Integer.parseInt(jobCommandConfigMap.get(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS)));
              }
              if(jobCommandConfigMap.containsKey(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES)){
                resource.setVirtualCores(Integer.parseInt(jobCommandConfigMap.get(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES)));
              }
            }
          }
          // compute the container count as a ceiling of number of partitions divided by the number of containers
          // per partition. Scale the result by a constant overprovision factor.
          int containerCount = (int) Math.ceil(((double)numPartitions / this.partitionsPerContainer) * this.overProvisionFactor);
          yarnContainerRequestBundle.add(jobTag, containerCount, resource);
          log.info("jobName={}, jobTag={}, numPartitions={}, targetNumContainers={}",
              jobName, jobTag, numPartitions, containerCount);
        }
      }
      // Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
      // and potentially replanner-instance.
      Set<String> allParticipants = HelixUtils.getParticipants(helixDataAccessor, HELIX_YARN_INSTANCE_NAME_PREFIX);

      final Set<Container> containersToRelease = new HashSet<>();

      // Find all joined participants not in-use for this round of inspection.
      // If idle time is beyond tolerance, mark the instance as unused by assigning timestamp as -1.
      for (String participant : allParticipants) {
        if (!inUseInstances.contains(participant)) {
          instanceIdleSince.putIfAbsent(participant, System.currentTimeMillis());
          if (!isInstanceUnused(participant)) {
            inUseInstances.add(participant);
          }
        } else {
          // A previously idle instance is now detected to be in use.
          // Remove this instance if existed in the tracking map.
          instanceIdleSince.remove(participant);
        }

        if(helixInstancesContainingStuckTasks.contains(participant)) {
          instanceStuckSince.putIfAbsent(participant, System.currentTimeMillis());
          if (isInstanceStuck(participant)) {
            // release the corresponding container as the helix task is stuck for a long time
            log.info("Instance {} has some helix partition that is stuck for {} minutes, "
                + "releasing the container enabled : {}", participant,
                TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis() - instanceStuckSince.get(participant)),
                enableReleasingContainerHavingStuckTask);

            // get container of the helix participant
            Optional<Container> container = yarnService.getContainerInfoGivenHelixParticipant(participant);
            instanceStuckSince.remove(participant);
            String containerId = "";
            if(container.isPresent()) {
              if (enableReleasingContainerHavingStuckTask) {
                containersToRelease.add(container.get());
              }
              containerId = container.get().getId().toString();
            } else {
              log.warn("Container information for participant {} is not found", participant);
            }

            if(this.yarnService.getEventSubmitter().isPresent()) {
              // send GTE
              this.yarnService.getEventSubmitter().get().submit(GobblinYarnEventConstants.EventNames.HELIX_PARTITION_STUCK,
                  GobblinHelixConstants.HELIX_INSTANCE_NAME_KEY, participant,
                  GobblinYarnMetricTagNames.CONTAINER_ID, containerId);
            }
          }
        } else {
          instanceStuckSince.remove(participant);
        }
      }

      // release the containers
      if(!containersToRelease.isEmpty()) {
        this.yarnService.getEventBus().post(new ContainerReleaseRequest(containersToRelease, true));
      }

      slidingWindowReservoir.add(yarnContainerRequestBundle);


      log.debug("There are {} containers being requested in total, tag-count map {}, tag-resource map {}",
          yarnContainerRequestBundle.getTotalContainers(), yarnContainerRequestBundle.getHelixTagContainerCountMap(),
          yarnContainerRequestBundle.getHelixTagResourceMap());

      this.yarnService.requestTargetNumberOfContainers(slidingWindowReservoir.getMax(), inUseInstances);
    }