in helix-core/src/main/java/org/apache/helix/controller/rebalancer/strategy/crushMapping/CRUSHPlacementAlgorithm.java [84:192]
public List<Node> select(Node parent, long input, int count, String type,
Predicate<Node> nodePredicate) {
int childCount = parent.getChildrenCount(type);
if (childCount < count) {
logger.error(count + " nodes of type " + type +
" were requested but the tree has only " + childCount + " nodes!");
}
List<Node> selected = new ArrayList<Node>(count);
// use the index stored in the map
Integer offset;
if (keepOffset) {
offset = roundOffset.get(input);
if (offset == null) {
offset = 0;
roundOffset.put(input, offset);
}
} else {
offset = 0;
}
int rPrime = 0;
for (int r = 1; r <= count; r++) {
int failure = 0;
// number of times we had to loop back to the origin
int loopbackCount = 0;
boolean escape = false;
boolean retryOrigin;
Node out = null;
do {
retryOrigin = false; // initialize at the outset
Node in = parent;
Set<Node> rejected = new HashSet<Node>();
boolean retryNode;
do {
retryNode = false; // initialize at the outset
rPrime = r + offset + failure;
logger.trace("{}.select({}, {})", new Object[] {in, input, rPrime});
Selector selector = new Selector(in);
out = selector.select(input, rPrime);
if (!out.getType().equalsIgnoreCase(type)) {
logger.trace("selected output {} for data {} didn't match the type {}: walking down " +
"the hierarchy...", new Object[] {out, input, type});
in = out; // walk down the hierarchy
retryNode = true; // stay within the node and walk down the tree
} else { // type matches
boolean predicateRejected = !nodePredicate.apply(out);
if (selected.contains(out) || predicateRejected) {
if (predicateRejected) {
logger.trace("{} was rejected by the node predicate for data {}: rejecting and " +
"increasing rPrime", out, input);
rejected.add(out);
} else { // already selected
logger.trace("{} was already selected for data {}: rejecting and increasing rPrime",
out, input);
}
// we need to see if we have selected all possible nodes from this parent, in which
// case we should loop back to the origin and start over
if (allChildNodesEliminated(in, selected, rejected)) {
logger.trace("all child nodes of {} have been eliminated", in);
if (loopbackCount == MAX_LOOPBACK_COUNT) {
// we looped back the maximum times we specified; we give up search, and exit
escape = true;
break;
}
loopbackCount++;
logger.trace("looping back to the original parent node ({})", parent);
retryOrigin = true;
} else {
retryNode = true; // go back and reselect on the same parent
}
failure++;
} else if (nodeIsOut(out)) {
logger.trace("{} is marked as out (failed or over the maximum assignment) for data " +
"{}! looping back to the original parent node", out, input);
failure++;
if (loopbackCount == MAX_LOOPBACK_COUNT) {
// we looped back the maximum times we specified; we give up search, and exit
escape = true;
break;
}
loopbackCount++;
// re-selection on the same parent is detrimental in case of node failure: loop back
// to the origin
retryOrigin = true;
} else {
// we got a successful selection
break;
}
}
} while (retryNode);
} while (retryOrigin);
if (escape) {
// cannot find a node under this parent; return a smaller set than was intended
logger.debug("we could not select a node for data {} under parent {}; a smaller data set " +
"than is requested will be returned", input, parent);
continue;
}
logger.trace("{} was selected for data {}", out, input);
selected.add(out);
}
if (keepOffset) {
roundOffset.put(input, rPrime);
}
return selected;
}