in curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/QueueSharder.java [220:256]
private void checkThreshold() {
try {
boolean addAQueueIfLeader = false;
int size = 0;
List<String> children = client.getChildren().forPath(queuePath);
for (String child : children) {
String queuePath = ZKPaths.makePath(this.queuePath, child);
addNewQueueIfNeeded(queuePath);
Stat stat = client.checkExists().forPath(queuePath);
if (stat.getNumChildren() >= policies.getNewQueueThreshold()) {
size = stat.getNumChildren();
addAQueueIfLeader = true;
preferredQueues.remove(queuePath);
} else if (stat.getNumChildren() <= (policies.getNewQueueThreshold() / 2)) {
preferredQueues.add(queuePath);
}
}
if (addAQueueIfLeader && leaderLatch.hasLeadership()) {
if (queues.size() < policies.getMaxQueues()) {
log.info(
"Adding a queue due to exceeded threshold. Queue Size: {} - Threshold: {}",
size,
policies.getNewQueueThreshold());
addNewQueueIfNeeded(null);
} else {
log.warn(
"Max number of queues ({}) reached. Consider increasing the max.", policies.getMaxQueues());
}
}
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
log.error("Checking queue counts against threshold", e);
}
}