in core/src/main/java/org/apache/stormcrawler/persistence/urlbuffer/SchedulingURLBuffer.java [74:126]
public synchronized Values next() {
do {
Iterator<Entry<String, Queue<URLMetadata>>> i = queues.entrySet().iterator();
if (!i.hasNext()) {
LOG.trace("Empty iterator");
return null;
}
Map.Entry<String, Queue<URLMetadata>> nextEntry = i.next();
Queue<URLMetadata> queue = nextEntry.getValue();
String queueName = nextEntry.getKey();
// remove the entry, gets added back later
i.remove();
LOG.trace("Next queue {}", queueName);
URLMetadata item = null;
// is this queue ready to be processed?
if (canRelease(queueName)) {
// try the first element
item = queue.poll();
LOG.trace("Item {}", item.url);
} else {
LOG.trace("Queue {} not ready to release yet", queueName);
}
// any left? add to the end of the iterator
if (!queue.isEmpty()) {
LOG.debug("Adding to the back of the queue {}", queueName);
queues.put(queueName, queue);
}
// notify that the queue is empty
else {
if (listener != null) {
listener.emptyQueue(queueName);
}
}
if (item != null) {
lastReleased.put(queueName, Instant.now());
unacked.put(item.url, new Object[] {Instant.now(), queueName});
// remove it from the list of URLs in the queue
in_buffer.remove(item.url);
return new Values(item.url, item.metadata);
}
} while (!queues.isEmpty());
return null;
}