GeodeKafkaSourceListener installListenersToRegion()

in src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceTask.java [148:177]


  GeodeKafkaSourceListener installListenersToRegion(GeodeContext geodeContext, int taskId,
      EventBufferSupplier eventBuffer, String regionName, String cqPrefix, boolean loadEntireRegion,
      boolean isDurable) {
    CqAttributesFactory cqAttributesFactory = new CqAttributesFactory();
    GeodeKafkaSourceListener listener = new GeodeKafkaSourceListener(eventBuffer, regionName);
    cqAttributesFactory.addCqListener(listener);
    CqAttributes cqAttributes = cqAttributesFactory.create();
    try {
      if (loadEntireRegion) {
        CqResults<?> events =
            geodeContext.newCqWithInitialResults(generateCqName(taskId, cqPrefix, regionName),
                "select * from /" + regionName, cqAttributes,
                isDurable);
        eventBuffer.get()
            .addAll(events.stream().map(
                e -> new GeodeEvent(regionName, ((Struct) e).get("key"), ((Struct) e).get("value")))
                .collect(Collectors.toList()));
      } else {
        final CqQuery cqQuery = geodeContext.newCq(generateCqName(taskId, cqPrefix, regionName),
            "select * from /" + regionName, cqAttributes,
            isDurable);
        if (cqQuery == null) {
          throw new ConnectException("Unable to executed queries on the Apache Geode server");
        }
      }
    } finally {
      listener.signalInitialResultsLoaded();
    }
    return listener;
  }