in org.apache.aries.events.mongo/src/main/java/org/apache/aries/events/mongo/MessageReceiverImpl.java [116:146]
private void refreshBuffer(long index) {
long startIndex = index;
try (MongoCursor<Document> cursor = col.find(Filters.gte(INDEX, startIndex)).iterator()) {
List<Message> collected = new ArrayList<>(fetchLimit);
while (cursor.hasNext()) {
int i = collected.size();
Document document = cursor.next();
long idx = document.get(INDEX, Long.class);
if (startIndex == FIRST_AVAILABLE) {
startIndex = idx;
}
if (idx == startIndex + i) {
Binary payload = document.get(PAYLOAD, Binary.class);
Map<String, String> props = (Map<String, String>) document.get(Fields.PROPS);
Message message = new Message(payload.getData(), props);
collected.add(message);
} else {
if (i == 0) {
throw new NoSuchElementException("Element [" + startIndex + "] has been evicted from the log. Oldest available: [" + idx + "]");
} else {
throw new IllegalStateException("Missing element at [" + (startIndex + i) + "]. Next available at [" + idx + "]");
}
}
}
buffer = collected;
firstIndex = (startIndex == FIRST_AVAILABLE) ? 0L : startIndex;
if (collected.size() > 0) {
lastReceived = currentTimeMillis();
}
}
}