in core/src/main/java/org/apache/stormcrawler/persistence/AbstractQueryingSpout.java [178:217]
public void nextTuple() {
if (!active) return;
// force the refresh of the buffer even if the buffer is not empty
if (!isInQuery.get() && triggerQueries()) {
populateBuffer();
timeLastQuerySent = System.currentTimeMillis();
}
if (buffer.hasNext()) {
// track how long the buffer had been empty for
if (timestampEmptyBuffer != -1) {
eventCounter
.scope("empty.buffer")
.incrBy(System.currentTimeMillis() - timestampEmptyBuffer);
timestampEmptyBuffer = -1;
}
List<Object> fields = buffer.next();
String url = fields.get(0).toString();
this._collector.emit(fields, url);
beingProcessed.put(url, null);
eventCounter.scope("emitted").incrBy(1);
return;
} else if (timestampEmptyBuffer == -1) {
timestampEmptyBuffer = System.currentTimeMillis();
}
if (isInQuery.get() || throttleQueries() > 0) {
// sleep for a bit but not too much in order to give ack/fail a
// chance
LOG.trace("isInQuery {}", isInQuery);
Utils.sleep(10);
return;
}
// re-populate the buffer
populateBuffer();
timeLastQuerySent = System.currentTimeMillis();
}