public void enqueueJobs()

in helix-core/src/main/java/org/apache/helix/task/TaskDriver.java [382:533]


  public void enqueueJobs(final String queue, final List<String> jobs,
      final List<JobConfig.Builder> jobBuilders) {

    // Get the job queue config and capacity
    WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
    if (workflowConfig == null) {
      throw new IllegalArgumentException("Queue " + queue + " config does not yet exist!");
    }
    if (workflowConfig.isTerminable()) {
      throw new IllegalArgumentException(queue + " is not a queue!");
    }

    final int capacity = workflowConfig.getCapacity();
    int queueSize = workflowConfig.getJobDag().size();
    if (capacity > 0 && queueSize >= capacity) {
      // if queue is full, Helix will try to clean up the expired job to free more space.
      WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_propertyStore, queue);
      if (workflowContext != null) {
        Set<String> expiredJobs =
            TaskUtil.getExpiredJobs(_accessor, _propertyStore, workflowConfig, workflowContext);
        if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, expiredJobs, true)) {
          LOG.warn("Failed to clean up expired and completed jobs from queue {}", queue);
        }
      }
      workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
      if (workflowConfig.getJobDag().size() >= capacity) {
        throw new HelixException(String.format( "Failed to enqueue job, queue %s is full.", queue));
      }
    }

    // Fail the operation if adding new jobs will cause the queue to reach its capacity limit
    workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
    if (workflowConfig.getJobDag().size() + jobs.size() >= capacity) {
      throw new IllegalStateException(
          String.format("Queue %s already reaches its max capacity %d, failed to add %s", queue,
              capacity, jobs.toString()));
    }

    validateZKNodeLimitation(1);
    final List<JobConfig> jobConfigs = new ArrayList<>();
    final List<String> namespacedJobNames = new ArrayList<>();
    final List<String> jobTypeList = new ArrayList<>();

    try {
      for (int i = 0; i < jobBuilders.size(); i++) {
        // Create the job to ensure that it validates
        JobConfig jobConfig = jobBuilders.get(i).setWorkflow(queue).build();
        String namespacedJobName = TaskUtil.getNamespacedJobName(queue, jobs.get(i));

        // add job config first.
        addJobConfig(namespacedJobName, jobConfig);
        jobConfigs.add(jobConfig);
        namespacedJobNames.add(namespacedJobName);
        jobTypeList.add(jobConfig.getJobType());
      }
    } catch (HelixException e) {
      LOG.error("Failed to add job configs {}. Remove them all!", jobs.toString());
      for (String job : jobs) {
        String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
        TaskUtil.removeJobConfig(_accessor, namespacedJobName);
      }
    }

    // update the job dag to append the job to the end of the queue.
    DataUpdater<ZNRecord> updater = currentData -> {
      if (currentData == null) {
        // For some reason, the WorkflowConfig for this JobQueue doesn't exist
        // In this case, we cannot proceed and must alert the user
        throw new HelixException(
            String.format("enqueueJobs DataUpdater: JobQueue %s config is not found!", queue));
      }

      // Add the node to the existing DAG
      JobDag jobDag = JobDag
          .fromJson(currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
      Set<String> allNodes = jobDag.getAllNodes();
      if (capacity > 0 && allNodes.size() + jobConfigs.size() >= capacity) {
        // Remove previously added jobConfigs if adding new jobs will cause exceeding capacity
        // limit. Removing the job configs is necessary to avoid multiple threads adding jobs at the
        // same time and cause overcapacity queue
        for (String job : jobs) {
          String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
          TaskUtil.removeJobConfig(_accessor, namespacedJobName);
        }
        throw new IllegalStateException(
            String.format("Queue %s already reaches its max capacity %d, failed to add %s", queue,
                capacity, jobs.toString()));
      }

      String lastNodeName = null;
      for (int i = 0; i < namespacedJobNames.size(); i++) {
        String namespacedJobName = namespacedJobNames.get(i);
        if (allNodes.contains(namespacedJobName)) {
          throw new IllegalStateException(String
              .format("Could not add to queue %s, job %s already exists", queue, jobs.get(i)));
        }
        jobDag.addNode(namespacedJobName);

        // Add the node to the end of the queue
        String candidate = null;
        if (lastNodeName == null) {
          for (String node : allNodes) {
            if (!node.equals(namespacedJobName) && jobDag.getDirectChildren(node).isEmpty()) {
              candidate = node;
              break;
            }
          }
        } else {
          candidate = lastNodeName;
        }
        if (candidate != null) {
          jobDag.addParentToChild(candidate, namespacedJobName);
          lastNodeName = namespacedJobName;
        }
      }

      // Add job type if job type is not null
      Map<String, String> jobTypes =
          currentData.getMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name());
      for (String jobType : jobTypeList) {
        if (jobType != null) {
          if (jobTypes == null) {
            jobTypes = new HashMap<>();
          }
          jobTypes.put(queue, jobType);
        }
      }

      if (jobTypes != null) {
        currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes);
      }
      // Save the updated DAG
      try {
        currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(),
            jobDag.toJson());
      } catch (Exception e) {
        throw new IllegalStateException(
            String.format("Could not add jobs %s to queue %s", jobs.toString(), queue), e);
      }
      return currentData;
    };

    String path = _accessor.keyBuilder().resourceConfig(queue).getPath();
    boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
    if (!status) {
      LOG.error("Failed to update WorkflowConfig, remove all jobs {}", jobs.toString());
      for (String job : jobs) {
        TaskUtil.removeJobConfig(_accessor, job);
      }
      throw new HelixException("Failed to enqueue job");
    }
  }