pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [279:289]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                        futures.add(future.thenApply(consumer -> {
                            log.info("Add consumer {} for partition {}", consumer, tp);
                            consumers.putIfAbsent(tp, consumer);
                            return consumer;
                        }));
                        topicPartitions.add(tp);
                    }
                } else {
                    // Topic has a single partition
                    CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.topic(topic)
                            .subscribeAsync();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [232:242]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                        futures.add(future.thenApply(consumer -> {
                            log.info("Add consumer {} for partition {}", consumer, tp);
                            consumers.putIfAbsent(tp, consumer);
                            return consumer;
                        }));
                        topicPartitions.add(tp);
                    }
                } else {
                    // Topic has a single partition
                    CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.topic(topic)
                            .subscribeAsync();
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



