public void subscribe()

in pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [207:276]


    public void subscribe(List<String> topics, ConsumerRebalanceListener callback) {
        List<CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>>> futures = new ArrayList<>();

        List<TopicPartition> topicPartitions = new ArrayList<>();
        try {
            for (String topic : topics) {
                // Create individual subscription on each partition, that way we can keep using the
                // acknowledgeCumulative()
                int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get();

                ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(client, properties);
                consumerBuilder.subscriptionType(SubscriptionType.Failover);
                consumerBuilder.messageListener(this);
                consumerBuilder.subscriptionName(groupId);
                if (numberOfPartitions > 1) {
                    // Subscribe to each partition
                    consumerBuilder.consumerName(ConsumerName.generateRandomName());
                    for (int i = 0; i < numberOfPartitions; i++) {
                        String partitionName = TopicName.get(topic).getPartition(i).toString();
                        CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
                                .topic(partitionName).subscribeAsync();
                        int partitionIndex = i;
                        TopicPartition tp = new TopicPartition(
                                TopicName.get(topic).getPartitionedTopicName(),
                                partitionIndex);
                        futures.add(future.thenApply(consumer -> {
                            log.info("Add consumer {} for partition {}", consumer, tp);
                            consumers.putIfAbsent(tp, consumer);
                            return consumer;
                        }));
                        topicPartitions.add(tp);
                    }
                } else {
                    // Topic has a single partition
                    CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.topic(topic)
                            .subscribeAsync();
                    TopicPartition tp = new TopicPartition(
                            TopicName.get(topic).getPartitionedTopicName(),
                            0);
                    futures.add(future.thenApply(consumer -> {
                        log.info("Add consumer {} for partition {}", consumer, tp);
                        consumers.putIfAbsent(tp, consumer);
                        return consumer;
                    }));
                    topicPartitions.add(tp);
                }
            }
            unpolledPartitions.addAll(topicPartitions);

            // Wait for all consumers to be ready
            futures.forEach(CompletableFuture::join);

            // Notify the listener is now owning all topics/partitions
            if (callback != null) {
                callback.onPartitionsAssigned(topicPartitions);
            }

        } catch (Exception e) {
            // Close all consumer that might have been successfully created
            futures.forEach(f -> {
                try {
                    f.get().close();
                } catch (Exception e1) {
                    // Ignore. Consumer already had failed
                }
            });

            throw new RuntimeException(e);
        }
    }