public void onEvent()

in src/main/java/org/apache/geode/kafka/source/GeodeKafkaSourceListener.java [40:60]


  public void onEvent(CqEvent aCqEvent) {
    while (!initialResultsLoaded) {
      Thread.yield();
    }
    try {
      eventBufferSupplier.get().offer(new GeodeEvent(regionName, aCqEvent), 2, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      logger.info("GeodeKafkaSource Queue is full, waiting to offer");

      while (true) {
        try {
          if (!eventBufferSupplier.get().offer(new GeodeEvent(regionName, aCqEvent), 2,
              TimeUnit.SECONDS))
            break;
        } catch (InterruptedException ex) {
          logger.info("Thread interrupted while updating buffer", ex);
        }
        logger.info("GeodeKafkaSource Queue is full");
      }
    }
  }