public Closeable createPoller()

in src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java [134:150]


    public Closeable createPoller(String topicName, Reset reset, @Nullable String assign, HandlerAdapter<?>... adapters) {
        log.info("Creating poller for topic={}, reset={}, assing={} with adapters {}.", topicName, reset, assign, adapters);
        KafkaConsumer<String, String> consumer = createConsumer(StringDeserializer.class, reset);
        TopicPartition topicPartition = new TopicPartition(topicName, PARTITION);
        Collection<TopicPartition> topicPartitions = singleton(topicPartition);
        consumer.assign(topicPartitions);
        if (assign != null) {
            AssignDetails assignDetails = new AssignDetails(assign);
            long offset = assignDetails.getOffset(consumer, topicPartition);
            consumer.seek(topicPartition, offset);
        } else if (reset == Reset.earliest) {
            consumer.seekToBeginning(topicPartitions);
        } else {
            consumer.seekToEnd(topicPartitions);
        }
        return new KafkaPoller(consumer, eventSender, Arrays.asList(adapters));
    }