in org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaMessaging.java [126:149]
public Subscription subscribe(SubscribeRequestBuilder requestBuilder) {
SubscribeRequest request = requestBuilder.build();
KafkaConsumer<String, byte[]> consumer = buildKafkaConsumer(request.getSeek());
TopicPartition topicPartition = new TopicPartition(request.getTopic(), PARTITION);
Collection<TopicPartition> topicPartitions = singleton(topicPartition);
consumer.assign(topicPartitions);
if (request.getPosition() != null) {
consumer.seek(topicPartition, asKafkaPosition(request.getPosition()).getOffset());
} else if (request.getSeek() == Seek.earliest) {
consumer.seekToBeginning(topicPartitions);
} else {
consumer.seekToEnd(topicPartitions);
}
KafkaSubscription subscription = new KafkaSubscription(consumer, request.getCallback());
// TODO pool the threads
Thread thread = new Thread(subscription);
thread.setDaemon(true);
thread.start();
return subscription;
}