in emr-dynamodb-hadoop/src/main/java/org/apache/hadoop/dynamodb/preader/PageResultMultiplexer.java [72:116]
public V next() throws IOException {
if (itemsReturned % 10000 == 0) {
log.info("Pagemux stats: items=" + itemsReturned + ", pages=" + pageCount.get() + ", cap="
+ capacity);
}
synchronized (removeItemLock) {
// Now, this could block for a while as we wait for enough pages to
// multiplex. As long as we're really doing work in the background (
// scans/queries), then the dynamodb client/retrier will tick the
// reporter to signal liveness.
waitForMuxCondition();
// Loop until we find the queue in draining mode and empty
while (!(draining && pageCount.get() == 0)) {
PageResults<V> nextPage;
try {
nextPage = pageIterator.next();
} catch (NoSuchElementException e) {
pageIterator = pages.iterator();
continue;
}
if (nextPage.exception != null) {
throw new IOException(nextPage.exception);
}
V nextItem = nextPage.next();
// Remove the page if we've emptied it
if (!nextPage.hasMore()) {
// Order: lower counter, remove page
pageCount.decrementAndGet();
pageIterator.remove();
}
if (nextItem != null) {
itemsReturned++;
return nextItem;
}
}
}
return null;
}