public Subscription subscribe()

in org.apache.aries.events.kafka/src/main/java/org/apache/aries/events/kafka/KafkaMessaging.java [126:149]


    public Subscription subscribe(SubscribeRequestBuilder requestBuilder) {
        SubscribeRequest request = requestBuilder.build();
        KafkaConsumer<String, byte[]> consumer = buildKafkaConsumer(request.getSeek());

        TopicPartition topicPartition = new TopicPartition(request.getTopic(), PARTITION);

        Collection<TopicPartition> topicPartitions = singleton(topicPartition);
        consumer.assign(topicPartitions);

        if (request.getPosition() != null) {
            consumer.seek(topicPartition, asKafkaPosition(request.getPosition()).getOffset());
        } else if (request.getSeek() == Seek.earliest) {
            consumer.seekToBeginning(topicPartitions);
        } else {
            consumer.seekToEnd(topicPartitions);
        }

        KafkaSubscription subscription = new KafkaSubscription(consumer, request.getCallback());
        // TODO pool the threads
        Thread thread = new Thread(subscription);
        thread.setDaemon(true);
        thread.start();
        return subscription;
    }