private SerializedBatchHolder readPageBatch()

in logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java [638:691]


    private SerializedBatchHolder readPageBatch(Page p, int limit, long timeout) throws IOException {
        int left = limit;
        final List<byte[]> elements = new ArrayList<>(limit);

        // NOTE: the tricky thing here is that upon entering this method, if p is initially a head page
        // it could become a tail page upon returning from the notEmpty.await call.
        long firstSeqNum = -1L;
        while (left > 0) {
            if (isHeadPage(p) && p.isFullyRead()) {
                boolean elapsed;
                // a head page is fully read but can be written to so let's wait for more data
                try {
                    elapsed = !notEmpty.await(timeout, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    // set back the interrupted flag
                    Thread.currentThread().interrupt();
                    break;
                }

                if ((elapsed && p.isFullyRead()) || isClosed()) {
                    break;
                }
            }

            if (! p.isFullyRead()) {
                boolean wasFull = isMaxUnreadReached();

                final SequencedList<byte[]> serialized = p.read(left);
                int n = serialized.getElements().size();
                assert n > 0 : "page read returned 0 elements";
                elements.addAll(serialized.getElements());
                if (firstSeqNum == -1L) {
                    firstSeqNum = serialized.getSeqNums().get(0);
                }

                this.unreadCount -= n;
                left -= n;

                if (wasFull) {
                    notFull.signalAll();
                }
            }

            if (isTailPage(p) && p.isFullyRead()) {
                break;
            }
        }

        if (isTailPage(p) && p.isFullyRead()) {
            removeUnreadPage(p);
        }

        return new SerializedBatchHolder(elements, firstSeqNum);
    }