in logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java [638:691]
private SerializedBatchHolder readPageBatch(Page p, int limit, long timeout) throws IOException {
int left = limit;
final List<byte[]> elements = new ArrayList<>(limit);
// NOTE: the tricky thing here is that upon entering this method, if p is initially a head page
// it could become a tail page upon returning from the notEmpty.await call.
long firstSeqNum = -1L;
while (left > 0) {
if (isHeadPage(p) && p.isFullyRead()) {
boolean elapsed;
// a head page is fully read but can be written to so let's wait for more data
try {
elapsed = !notEmpty.await(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// set back the interrupted flag
Thread.currentThread().interrupt();
break;
}
if ((elapsed && p.isFullyRead()) || isClosed()) {
break;
}
}
if (! p.isFullyRead()) {
boolean wasFull = isMaxUnreadReached();
final SequencedList<byte[]> serialized = p.read(left);
int n = serialized.getElements().size();
assert n > 0 : "page read returned 0 elements";
elements.addAll(serialized.getElements());
if (firstSeqNum == -1L) {
firstSeqNum = serialized.getSeqNums().get(0);
}
this.unreadCount -= n;
left -= n;
if (wasFull) {
notFull.signalAll();
}
}
if (isTailPage(p) && p.isFullyRead()) {
break;
}
}
if (isTailPage(p) && p.isFullyRead()) {
removeUnreadPage(p);
}
return new SerializedBatchHolder(elements, firstSeqNum);
}