in helix-core/src/main/java/org/apache/helix/task/AssignableInstanceManager.java [80:182]
public void buildAssignableInstances(ClusterConfig clusterConfig, TaskDataCache taskDataCache,
Map<String, LiveInstance> liveInstances, Map<String, InstanceConfig> instanceConfigs) {
// Reset all cached information
_assignableInstanceMap.clear();
_taskAssignResultMap.clear();
// Create all AssignableInstance objects based on what's in liveInstances
for (Map.Entry<String, LiveInstance> liveInstanceEntry : liveInstances.entrySet()) {
// Prepare instance-specific metadata
String instanceName = liveInstanceEntry.getKey();
LiveInstance liveInstance = liveInstanceEntry.getValue();
if (!instanceConfigs.containsKey(instanceName)) {
continue; // Ill-formatted input; skip over this instance
}
InstanceConfig instanceConfig = instanceConfigs.get(instanceName);
// Create an AssignableInstance
AssignableInstance assignableInstance =
new AssignableInstance(clusterConfig, instanceConfig, liveInstance);
_assignableInstanceMap.put(instanceConfig.getInstanceName(), assignableInstance);
LOG.debug("AssignableInstance created for instance: {}", instanceName);
}
// Update task profiles by traversing all TaskContexts
Map<String, JobConfig> jobConfigMap = taskDataCache.getJobConfigMap();
for (String jobName : jobConfigMap.keySet()) {
JobConfig jobConfig = jobConfigMap.get(jobName);
JobContext jobContext = taskDataCache.getJobContext(jobName);
if (jobConfig == null || jobContext == null) {
LOG.debug(
"JobConfig or JobContext for this job is null. Skipping this job! Job name: {}, JobConfig: {}, JobContext: {}",
jobName, jobConfig, jobContext);
continue; // Ignore this job if either the config or context is null
}
// First, check that the workflow and job are in valid states. This is important because
// sometimes aborted jobs do not get a proper update of their task states, meaning there could
// be INIT and RUNNING tasks we want to ignore
String workflowName = jobConfig.getWorkflow();
WorkflowConfig workflowConfig = taskDataCache.getWorkflowConfig(workflowName);
WorkflowContext workflowContext = taskDataCache.getWorkflowContext(workflowName);
if (workflowConfig == null || workflowContext == null) {
// There is no workflow config or context - meaning no tasks are currently scheduled and
// invalid, so skip this job
continue;
}
TaskState workflowState = workflowContext.getWorkflowState();
TaskState jobState = workflowContext.getJobState(jobName);
if (isResourceTerminalOrStopped(workflowState) || isResourceTerminalOrStopped(jobState)) {
continue;
}
String quotaType = jobConfig.getJobType();
if (quotaType == null) {
quotaType = AssignableInstance.DEFAULT_QUOTA_TYPE;
}
Set<Integer> taskIndices = jobContext.getPartitionSet(); // Each integer represents a task in
// this job (this is NOT taskId)
for (int taskIndex : taskIndices) {
TaskPartitionState taskState = jobContext.getPartitionState(taskIndex);
if (taskState == TaskPartitionState.INIT || taskState == TaskPartitionState.RUNNING) {
// Because task state is INIT or RUNNING, find the right AssignableInstance and subtract
// the right amount of resources. STOPPED means it's been cancelled, so it will be
// re-assigned and therefore does not use instances' resources
String assignedInstance = jobContext.getAssignedParticipant(taskIndex);
String taskId = jobContext.getTaskIdForPartition(taskIndex);
if (taskId == null) {
// For targeted tasks, taskId will be null
// We instead use pName (see FixedTargetTaskAssignmentCalculator)
taskId = String.format("%s_%s", jobConfig.getJobId(), taskIndex);
}
if (assignedInstance == null) {
LOG.debug(
"This task's TaskContext does not have an assigned instance! Task will be ignored. "
+ "Job: {}, TaskId: {}, TaskIndex: {}",
jobContext.getName(), taskId, taskIndex);
continue;
}
if (_assignableInstanceMap.containsKey(assignedInstance)) {
TaskConfig taskConfig = jobConfig.getTaskConfig(taskId);
AssignableInstance assignableInstance = _assignableInstanceMap.get(assignedInstance);
TaskAssignResult taskAssignResult =
assignableInstance.restoreTaskAssignResult(taskId, taskConfig, quotaType);
if (taskAssignResult.isSuccessful()) {
_taskAssignResultMap.put(taskId, taskAssignResult);
LOG.debug("TaskAssignResult restored for taskId: {}, assigned on instance: {}",
taskId, assignedInstance);
}
} else {
LOG.debug(
"While building AssignableInstance map, discovered that the instance a task is assigned to is no "
+ "longer a LiveInstance! TaskAssignResult will not be created and no resource will be taken "
+ "up for this task. Job: {}, TaskId: {}, TaskIndex: {}, Instance: {}",
jobContext.getName(), taskId, taskIndex, assignedInstance);
}
}
}
}
LOG.info(
"AssignableInstanceManager built AssignableInstances from scratch based on contexts in TaskDataCache due to Controller switch or ClusterConfig change.");
computeGlobalThreadBasedCapacity();
}