in helix-core/src/main/java/org/apache/helix/task/TaskDriver.java [382:533]
public void enqueueJobs(final String queue, final List<String> jobs,
final List<JobConfig.Builder> jobBuilders) {
// Get the job queue config and capacity
WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
if (workflowConfig == null) {
throw new IllegalArgumentException("Queue " + queue + " config does not yet exist!");
}
if (workflowConfig.isTerminable()) {
throw new IllegalArgumentException(queue + " is not a queue!");
}
final int capacity = workflowConfig.getCapacity();
int queueSize = workflowConfig.getJobDag().size();
if (capacity > 0 && queueSize >= capacity) {
// if queue is full, Helix will try to clean up the expired job to free more space.
WorkflowContext workflowContext = TaskUtil.getWorkflowContext(_propertyStore, queue);
if (workflowContext != null) {
Set<String> expiredJobs =
TaskUtil.getExpiredJobs(_accessor, _propertyStore, workflowConfig, workflowContext);
if (!TaskUtil.removeJobsFromWorkflow(_accessor, _propertyStore, queue, expiredJobs, true)) {
LOG.warn("Failed to clean up expired and completed jobs from queue {}", queue);
}
}
workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
if (workflowConfig.getJobDag().size() >= capacity) {
throw new HelixException(String.format( "Failed to enqueue job, queue %s is full.", queue));
}
}
// Fail the operation if adding new jobs will cause the queue to reach its capacity limit
workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
if (workflowConfig.getJobDag().size() + jobs.size() >= capacity) {
throw new IllegalStateException(
String.format("Queue %s already reaches its max capacity %d, failed to add %s", queue,
capacity, jobs.toString()));
}
validateZKNodeLimitation(1);
final List<JobConfig> jobConfigs = new ArrayList<>();
final List<String> namespacedJobNames = new ArrayList<>();
final List<String> jobTypeList = new ArrayList<>();
try {
for (int i = 0; i < jobBuilders.size(); i++) {
// Create the job to ensure that it validates
JobConfig jobConfig = jobBuilders.get(i).setWorkflow(queue).build();
String namespacedJobName = TaskUtil.getNamespacedJobName(queue, jobs.get(i));
// add job config first.
addJobConfig(namespacedJobName, jobConfig);
jobConfigs.add(jobConfig);
namespacedJobNames.add(namespacedJobName);
jobTypeList.add(jobConfig.getJobType());
}
} catch (HelixException e) {
LOG.error("Failed to add job configs {}. Remove them all!", jobs.toString());
for (String job : jobs) {
String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
TaskUtil.removeJobConfig(_accessor, namespacedJobName);
}
}
// update the job dag to append the job to the end of the queue.
DataUpdater<ZNRecord> updater = currentData -> {
if (currentData == null) {
// For some reason, the WorkflowConfig for this JobQueue doesn't exist
// In this case, we cannot proceed and must alert the user
throw new HelixException(
String.format("enqueueJobs DataUpdater: JobQueue %s config is not found!", queue));
}
// Add the node to the existing DAG
JobDag jobDag = JobDag
.fromJson(currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
Set<String> allNodes = jobDag.getAllNodes();
if (capacity > 0 && allNodes.size() + jobConfigs.size() >= capacity) {
// Remove previously added jobConfigs if adding new jobs will cause exceeding capacity
// limit. Removing the job configs is necessary to avoid multiple threads adding jobs at the
// same time and cause overcapacity queue
for (String job : jobs) {
String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
TaskUtil.removeJobConfig(_accessor, namespacedJobName);
}
throw new IllegalStateException(
String.format("Queue %s already reaches its max capacity %d, failed to add %s", queue,
capacity, jobs.toString()));
}
String lastNodeName = null;
for (int i = 0; i < namespacedJobNames.size(); i++) {
String namespacedJobName = namespacedJobNames.get(i);
if (allNodes.contains(namespacedJobName)) {
throw new IllegalStateException(String
.format("Could not add to queue %s, job %s already exists", queue, jobs.get(i)));
}
jobDag.addNode(namespacedJobName);
// Add the node to the end of the queue
String candidate = null;
if (lastNodeName == null) {
for (String node : allNodes) {
if (!node.equals(namespacedJobName) && jobDag.getDirectChildren(node).isEmpty()) {
candidate = node;
break;
}
}
} else {
candidate = lastNodeName;
}
if (candidate != null) {
jobDag.addParentToChild(candidate, namespacedJobName);
lastNodeName = namespacedJobName;
}
}
// Add job type if job type is not null
Map<String, String> jobTypes =
currentData.getMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name());
for (String jobType : jobTypeList) {
if (jobType != null) {
if (jobTypes == null) {
jobTypes = new HashMap<>();
}
jobTypes.put(queue, jobType);
}
}
if (jobTypes != null) {
currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes);
}
// Save the updated DAG
try {
currentData.setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(),
jobDag.toJson());
} catch (Exception e) {
throw new IllegalStateException(
String.format("Could not add jobs %s to queue %s", jobs.toString(), queue), e);
}
return currentData;
};
String path = _accessor.keyBuilder().resourceConfig(queue).getPath();
boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
if (!status) {
LOG.error("Failed to update WorkflowConfig, remove all jobs {}", jobs.toString());
for (String job : jobs) {
TaskUtil.removeJobConfig(_accessor, job);
}
throw new HelixException("Failed to enqueue job");
}
}