in java/yarn-extras/src/main/java/org/apache/impala/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java [270:466]
public synchronized void reloadAllocations() throws IOException,
ParserConfigurationException, SAXException,
AllocationConfigurationException {
if (allocFile == null) {
return;
}
LOG.info("Loading allocation file " + allocFile);
// Create some temporary hashmaps to hold the new allocs, and we only save
// them in our fields if we have parsed the entire allocs file successfully.
Map<String, Resource> minQueueResources = new HashMap<>();
Map<String, Resource> maxQueueResources = new HashMap<>();
Map<String, Resource> maxChildQueueResources = new HashMap<>();
Map<String, Integer> queueMaxApps = new HashMap<>();
Map<String, Integer> userMaxApps = new HashMap<>();
Map<String, Float> queueMaxAMShares = new HashMap<>();
Map<String, ResourceWeights> queueWeights = new HashMap<>();
Map<String, SchedulingPolicy> queuePolicies = new HashMap<>();
Map<String, Long> minSharePreemptionTimeouts = new HashMap<>();
Map<String, Long> fairSharePreemptionTimeouts = new HashMap<>();
Map<String, Float> fairSharePreemptionThresholds = new HashMap<>();
Map<String, Map<QueueACL, AccessControlList>> queueAcls = new HashMap<>();
Map<String, Map<String, Integer>> userQueryLimits = new HashMap<>();
Map<String, Map<String, Integer>> groupQueryLimits = new HashMap<>();
Map<String, Boolean> onlyCoordinators = new HashMap<>();
Set<String> nonPreemptableQueues = new HashSet<>();
int userMaxAppsDefault = Integer.MAX_VALUE;
int queueMaxAppsDefault = Integer.MAX_VALUE;
Resource queueMaxResourcesDefault = Resources.unbounded();
float queueMaxAMShareDefault = 0.5f;
long defaultFairSharePreemptionTimeout = Long.MAX_VALUE;
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
float defaultFairSharePreemptionThreshold = 0.5f;
SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY;
QueuePlacementPolicy newPlacementPolicy = null;
// Remember all queue names so we can display them on web UI, etc.
// configuredQueues is segregated based on whether it is a leaf queue
// or a parent queue. This information is used for creating queues
// and also for making queue placement decisions(QueuePlacementRule.java).
Map<FSQueueType, Set<String>> configuredQueues = new HashMap<>();
for (FSQueueType queueType : FSQueueType.values()) {
configuredQueues.put(queueType, new HashSet<String>());
}
// Read and parse the allocations file.
DocumentBuilderFactory docBuilderFactory =
DocumentBuilderFactory.newInstance();
docBuilderFactory.setIgnoringComments(true);
DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
Document doc = builder.parse(allocFile);
Element root = doc.getDocumentElement();
if (!"allocations".equals(root.getTagName()))
throw new AllocationConfigurationException("Bad fair scheduler config " +
"file: top-level element not <allocations>");
NodeList elements = root.getChildNodes();
List<Element> queueElements = new ArrayList<Element>();
Element placementPolicyElement = null;
for (int i = 0; i < elements.getLength(); i++) {
Node node = elements.item(i);
if (node instanceof Element) {
Element element = (Element)node;
if ("queue".equals(element.getTagName()) ||
"pool".equals(element.getTagName())) {
queueElements.add(element);
} else if ("user".equals(element.getTagName())) {
String userName = element.getAttribute("name");
NodeList fields = element.getChildNodes();
for (int j = 0; j < fields.getLength(); j++) {
Node fieldNode = fields.item(j);
if (!(fieldNode instanceof Element))
continue;
Element field = (Element) fieldNode;
if ("maxRunningApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
userMaxApps.put(userName, val);
}
}
} else if ("queueMaxResourcesDefault".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
Resource val =
FairSchedulerConfiguration.parseResourceConfigValue(text);
queueMaxResourcesDefault = val;
} else if ("userMaxAppsDefault".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
userMaxAppsDefault = val;
} else if ("defaultFairSharePreemptionTimeout"
.equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
defaultFairSharePreemptionTimeout = val;
} else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
if (defaultFairSharePreemptionTimeout == Long.MAX_VALUE) {
String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
defaultFairSharePreemptionTimeout = val;
}
} else if ("defaultMinSharePreemptionTimeout"
.equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
defaultMinSharePreemptionTimeout = val;
} else if ("defaultFairSharePreemptionThreshold"
.equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
float val = Float.parseFloat(text);
val = Math.max(Math.min(val, 1.0f), 0.0f);
defaultFairSharePreemptionThreshold = val;
} else if ("queueMaxAppsDefault".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
queueMaxAppsDefault = val;
} else if ("queueMaxAMShareDefault".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
float val = Float.parseFloat(text);
val = Math.min(val, 1.0f);
queueMaxAMShareDefault = val;
} else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
|| "defaultQueueSchedulingMode".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
if (text.equalsIgnoreCase("FIFO")) {
throw new AllocationConfigurationException("Bad fair scheduler "
+ "config file: defaultQueueSchedulingPolicy or "
+ "defaultQueueSchedulingMode can't be FIFO.");
}
defaultSchedPolicy = SchedulingPolicy.parse(text);
} else if ("queuePlacementPolicy".equals(element.getTagName())) {
placementPolicyElement = element;
} else {
LOG.warn("Bad element in allocations file: " + element.getTagName());
}
}
}
// Load queue elements. A root queue can either be included or omitted. If
// it's included, all other queues must be inside it.
for (Element element : queueElements) {
String parent = ROOT_POOL_NAME;
if (element.getAttribute("name").equalsIgnoreCase(ROOT_POOL_NAME)) {
if (queueElements.size() > 1) {
throw new AllocationConfigurationException("If configuring root queue,"
+ " no other queues can be placed alongside it.");
}
parent = null;
}
loadQueue(parent, element, minQueueResources, maxQueueResources,
maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares,
queueWeights, queuePolicies, minSharePreemptionTimeouts,
fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
userQueryLimits, groupQueryLimits, configuredQueues, onlyCoordinators,
nonPreemptableQueues);
}
// Load placement policy and pass it configured queues
Configuration conf = getConfig();
if (placementPolicyElement != null) {
newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement,
configuredQueues, conf);
} else {
newPlacementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
configuredQueues);
}
// Set the min/fair share preemption timeout for the root queue
if (!minSharePreemptionTimeouts.containsKey(ROOT_POOL_NAME)){
minSharePreemptionTimeouts.put(ROOT_POOL_NAME,
defaultMinSharePreemptionTimeout);
}
if (!fairSharePreemptionTimeouts.containsKey(ROOT_POOL_NAME)) {
fairSharePreemptionTimeouts.put(ROOT_POOL_NAME,
defaultFairSharePreemptionTimeout);
}
// Set the fair share preemption threshold for the root queue
if (!fairSharePreemptionThresholds.containsKey(ROOT_POOL_NAME)) {
fairSharePreemptionThresholds.put(ROOT_POOL_NAME,
defaultFairSharePreemptionThreshold);
}
AllocationConfiguration info = new AllocationConfiguration(minQueueResources,
maxQueueResources, maxChildQueueResources, queueMaxApps, userMaxApps,
queueWeights, queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault,
queueMaxResourcesDefault, queueMaxAMShareDefault, queuePolicies,
defaultSchedPolicy, minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
fairSharePreemptionThresholds, queueAcls, userQueryLimits, groupQueryLimits,
onlyCoordinators, newPlacementPolicy, configuredQueues, nonPreemptableQueues);
lastSuccessfulReload = clock.getTime();
lastReloadAttemptFailed = false;
verifyConfiguration(info);
LOG.info("Completed loading allocation file " + allocFile);
reloadListener.onReload(info);
}