pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [291:302]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                    futures.add(future.thenApply(consumer -> {
                        log.info("Add consumer {} for partition {}", consumer, tp);
                        consumers.putIfAbsent(tp, consumer);
                        return consumer;
                    }));
                    topicPartitions.add(tp);
                }
            }
            unpolledPartitions.addAll(topicPartitions);
            
            // Wait for all consumers to be ready
            futures.forEach(CompletableFuture::join);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [246:257]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                    futures.add(future.thenApply(consumer -> {
                        log.info("Add consumer {} for partition {}", consumer, tp);
                        consumers.putIfAbsent(tp, consumer);
                        return consumer;
                    }));
                    topicPartitions.add(tp);
                }
            }
            unpolledPartitions.addAll(topicPartitions);

            // Wait for all consumers to be ready
            futures.forEach(CompletableFuture::join);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



