in src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java [40:60]
public void onEvent(CqEvent aCqEvent) {
while (!initialResultsLoaded) {
Thread.yield();
}
try {
eventBufferSupplier.get().offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.info("GeodeKafkaSource Queue is full, waiting to offer");
while (true) {
try {
if (!eventBufferSupplier.get().offer(new GeodeEvent(regionName, aCqEvent), 2,
TimeUnit.SECONDS))
break;
} catch (InterruptedException ex) {
logger.info("Thread interrupted while updating buffer", ex);
}
logger.info("GeodeKafkaSource Queue is full");
}
}
}