in curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java [492:540]
private void processChildren(List<String> children, long currentVersion) throws Exception {
final Semaphore processedLatch = new Semaphore(0);
final boolean isUsingLockSafety = (lockPath != null);
int min = minItemsBeforeRefresh;
for (final String itemNode : children) {
if (Thread.currentThread().isInterrupted()) {
processedLatch.release(children.size());
break;
}
if (!itemNode.startsWith(QUEUE_ITEM_NAME)) {
log.warn("Foreign node in queue path: " + itemNode);
processedLatch.release();
continue;
}
if (min-- <= 0) {
if (refreshOnWatch && (currentVersion != childrenCache.getData().version)) {
processedLatch.release(children.size());
break;
}
}
if (getDelay(itemNode) > 0) {
processedLatch.release();
continue;
}
executor.execute(new Runnable() {
@Override
public void run() {
try {
if (isUsingLockSafety) {
processWithLockSafety(itemNode, ProcessType.NORMAL);
} else {
processNormally(itemNode, ProcessType.NORMAL);
}
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
log.error("Error processing message at " + itemNode, e);
} finally {
processedLatch.release();
}
}
});
}
processedLatch.acquire(children.size());
}