private void refreshBuffer()

in org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageReceiverImpl.java [116:146]


    private void refreshBuffer(long index) {
        long startIndex = index;
        try (MongoCursor<Document> cursor = col.find(Filters.gte(INDEX, startIndex)).iterator()) {
            List<Message> collected = new ArrayList<>(fetchLimit);
            while (cursor.hasNext()) {
                int i = collected.size();
                Document document = cursor.next();
                long idx = document.get(INDEX, Long.class);
                if (startIndex == FIRST_AVAILABLE) {
                    startIndex = idx;
                }
                if (idx == startIndex + i) {
                    Binary payload = document.get(PAYLOAD, Binary.class);
                    Map<String, String> props = (Map<String, String>) document.get(Fields.PROPS);
                    Message message = new Message(payload.getData(), props);
                    collected.add(message);
                } else {
                    if (i == 0) {
                        throw new NoSuchElementException("Element [" + startIndex + "] has been evicted from the log. Oldest available: [" + idx + "]");
                    } else {
                        throw new IllegalStateException("Missing element at [" + (startIndex + i) + "]. Next available at [" + idx + "]");
                    }
                }
            }
            buffer = collected;
            firstIndex = (startIndex == FIRST_AVAILABLE) ? 0L : startIndex;
            if (collected.size() > 0) {
                lastReceived = currentTimeMillis();
            }
        }
    }