in src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java [134:150]
public Closeable createPoller(String topicName, Reset reset, @Nullable String assign, HandlerAdapter<?>... adapters) {
log.info("Creating poller for topic={}, reset={}, assing={} with adapters {}.", topicName, reset, assign, adapters);
KafkaConsumer<String, String> consumer = createConsumer(StringDeserializer.class, reset);
TopicPartition topicPartition = new TopicPartition(topicName, PARTITION);
Collection<TopicPartition> topicPartitions = singleton(topicPartition);
consumer.assign(topicPartitions);
if (assign != null) {
AssignDetails assignDetails = new AssignDetails(assign);
long offset = assignDetails.getOffset(consumer, topicPartition);
consumer.seek(topicPartition, offset);
} else if (reset == Reset.earliest) {
consumer.seekToBeginning(topicPartitions);
} else {
consumer.seekToEnd(topicPartitions);
}
return new KafkaPoller(consumer, eventSender, Arrays.asList(adapters));
}