in hyracks-fullstack/hyracks/hyracks-hdfs/src/main/java/org/apache/hyracks/hdfs/scheduler/RackAwareNcCollectionBuilder.java [48:213]
public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos,
final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
final int[] workloads, final int slotLimit) {
try {
final Map<List<Integer>, List<String>> pathToNCs = new HashMap<List<Integer>, List<String>>();
for (String NC : NCs) {
List<Integer> path = new ArrayList<>();
String ipAddress = InetAddress
.getByAddress(ncNameToNcInfos.get(NC).getNetworkAddress().lookupIpAddress()).getHostAddress();
topology.lookupNetworkTerminal(ipAddress, path);
if (path.isEmpty()) {
// if the hyracks nc is not in the defined cluster
path.add(Integer.MIN_VALUE);
LOGGER.info(NC + "'s IP address is not in the cluster toplogy file!");
}
List<String> ncs = pathToNCs.computeIfAbsent(path, k -> new ArrayList<>());
ncs.add(NC);
}
final TreeMap<List<Integer>, IntWritable> availableIpsToSlots =
new TreeMap<List<Integer>, IntWritable>((l1, l2) -> {
int commonLength = Math.min(l1.size(), l2.size());
for (int i = 0; i < commonLength; i++) {
int value1 = l1.get(i);
int value2 = l2.get(i);
int cmp = Integer.compare(value1, value2);
if (cmp != 0) {
return cmp;
}
}
return Integer.compare(l1.size(), l2.size());
});
for (int i = 0; i < workloads.length; i++) {
if (workloads[i] < slotLimit) {
List<Integer> path = new ArrayList<Integer>();
String ipAddress =
InetAddress.getByAddress(ncNameToNcInfos.get(NCs[i]).getNetworkAddress().lookupIpAddress())
.getHostAddress();
topology.lookupNetworkTerminal(ipAddress, path);
if (path.isEmpty()) {
// if the hyracks nc is not in the defined cluster
path.add(Integer.MIN_VALUE);
}
IntWritable availableSlot = availableIpsToSlots.get(path);
if (availableSlot == null) {
availableSlot = new IntWritable(slotLimit - workloads[i]);
availableIpsToSlots.put(path, availableSlot);
} else {
availableSlot.set(slotLimit - workloads[i] + availableSlot.get());
}
}
}
return new INcCollection() {
@Override
public String findNearestAvailableSlot(InputSplit split) {
try {
String[] locs = split.getLocations();
int minDistance = Integer.MAX_VALUE;
List<Integer> currentCandidatePath = null;
if (locs == null || locs.length > 0) {
for (String loc : locs) {
/*
* get all the IP addresses from the name
*/
InetAddress[] allIps = InetAddress.getAllByName(loc);
boolean inTopology = false;
for (InetAddress ip : allIps) {
List<Integer> splitPath = new ArrayList<>();
boolean inCluster = topology.lookupNetworkTerminal(ip.getHostAddress(), splitPath);
if (!inCluster) {
continue;
}
inTopology = true;
/*
* if the node controller exists
*/
List<Integer> candidatePath = availableIpsToSlots.floorKey(splitPath);
if (candidatePath == null) {
candidatePath = availableIpsToSlots.ceilingKey(splitPath);
}
if (candidatePath != null && availableIpsToSlots.get(candidatePath).get() > 0) {
int distance = distance(splitPath, candidatePath);
if (minDistance > distance) {
minDistance = distance;
currentCandidatePath = candidatePath;
}
}
}
if (!inTopology) {
LOGGER.info(loc + "'s IP address is not in the cluster toplogy file!");
/*
* if the machine is not in the toplogy file
*/
List<Integer> candidatePath = null;
for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
if (entry.getValue().get() > 0) {
candidatePath = entry.getKey();
break;
}
}
/* the split path is empty */
if (candidatePath != null && availableIpsToSlots.get(candidatePath).get() > 0) {
currentCandidatePath = candidatePath;
}
}
}
} else {
for (Entry<List<Integer>, IntWritable> entry : availableIpsToSlots.entrySet()) {
if (entry.getValue().get() > 0) {
currentCandidatePath = entry.getKey();
break;
}
}
}
if (currentCandidatePath != null && !currentCandidatePath.isEmpty()) {
/*
* Update the entry of the selected IP
*/
IntWritable availableSlot = availableIpsToSlots.get(currentCandidatePath);
availableSlot.set(availableSlot.get() - 1);
if (availableSlot.get() == 0) {
availableIpsToSlots.remove(currentCandidatePath);
}
/*
* Update the entry of the selected NC
*/
List<String> candidateNcs = pathToNCs.get(currentCandidatePath);
for (String candidate : candidateNcs) {
int ncIndex = ncNameToIndex.get(candidate);
if (workloads[ncIndex] < slotLimit) {
return candidate;
}
}
}
/* not scheduled */
return null;
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
@Override
public int numAvailableSlots() {
return availableIpsToSlots.size();
}
private int distance(List<Integer> splitPath, List<Integer> candidatePath) {
int commonLength = Math.min(splitPath.size(), candidatePath.size());
int distance = 0;
for (int i = 0; i < commonLength; i++) {
distance = distance * 100 + Math.abs(splitPath.get(i) - candidatePath.get(i));
}
List<Integer> restElements = splitPath.size() > candidatePath.size() ? splitPath : candidatePath;
for (int i = commonLength; i < restElements.size(); i++) {
distance = distance * 100 + Math.abs(restElements.get(i));
}
return distance;
}
};
} catch (Exception e) {
throw new IllegalStateException(e);
}
}