in src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java [148:177]
GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, int taskId,
EventBufferSupplier eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion,
boolean isDurable) {
CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
GeodeKafkaSourceListener listener = new GeodeKafkaSourceListener(eventBuffer, regionName);
cqAttributesFactory.addCqListener(listener);
CqAttributes cqAttributes = cqAttributesFactory.create();
try {
if (loadEntireRegion) {
CqResults<?> events =
geodeContext.newCqWithInitialResults(generateCqName(taskId, cqPrefix, regionName),
"select * from /" + regionName, cqAttributes,
isDurable);
eventBuffer.get()
.addAll(events.stream().map(
e -> new GeodeEvent(regionName, ((Struct) e).get("key"), ((Struct) e).get("value")))
.collect(Collectors.toList()));
} else {
final CqQuery cqQuery = geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName),
"select * from /" + regionName, cqAttributes,
isDurable);
if (cqQuery == null) {
throw new ConnectException("Unable to executed queries on the Apache Geode server");
}
}
} finally {
listener.signalInitialResultsLoaded();
}
return listener;
}