in src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java [109:127]
public List<SourceRecord> poll() {
logger.trace("Polling for new data");
ArrayList<SourceRecord> records = new ArrayList<>(batchSize);
ArrayList<GeodeEvent> events = new ArrayList<>(batchSize);
if (eventBufferSupplier.get().drainTo(events, batchSize) > 0) {
logger.debug("Geode events polled :" + events.size());
for (GeodeEvent event : events) {
String regionName = event.getRegionName();
List<String> topics = regionToTopics.get(regionName);
for (String topic : topics) {
records.add(new SourceRecord(sourcePartitions.get(regionName), OFFSET_DEFAULT, topic,
null, event.getKey(), null, event.getValue()));
}
}
return records;
}
return null;
}