in core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java [385:435]
public synchronized FetchItem getFetchItem() {
if (queues.isEmpty()) {
return null;
}
FetchItemQueue start = null;
do {
Iterator<Entry<String, FetchItemQueue>> i = queues.entrySet().iterator();
if (!i.hasNext()) {
return null;
}
Map.Entry<String, FetchItemQueue> nextEntry = i.next();
if (nextEntry == null) {
return null;
}
FetchItemQueue fiq = nextEntry.getValue();
// We remove the entry and put it at the end of the map
i.remove();
// reap empty queues
if (fiq.getQueueSize() == 0 && fiq.getInProgressSize() == 0) {
continue;
}
// Put the entry at the end no matter the result
queues.put(nextEntry.getKey(), nextEntry.getValue());
// In case of we are looping
if (start == null) {
start = fiq;
} else if (fiq == start) {
return null;
}
FetchItem fit = fiq.getFetchItem();
if (fit != null) {
inQueues.decrementAndGet();
return fit;
}
} while (!queues.isEmpty());
return null;
}