in pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java [256:322]
public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
List<CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>>> futures = new ArrayList<>();
List<TopicPartition> topicPartitions = new ArrayList<>();
try {
for (String topic : topics) {
// Create individual subscription on each partition, that way we can keep using the
// acknowledgeCumulative()
int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get();
ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(client, properties);
consumerBuilder.subscriptionType(SubscriptionType.Failover);
consumerBuilder.messageListener(this);
consumerBuilder.subscriptionName(groupId);
if (numberOfPartitions > 1) {
// Subscribe to each partition
consumerBuilder.consumerName(ConsumerName.generateRandomName());
for (int i = 0; i < numberOfPartitions; i++) {
String partitionName = TopicName.get(topic).getPartition(i).toString();
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
.topic(partitionName).subscribeAsync();
int partitionIndex = i;
TopicPartition tp = normalizedTopicPartition(topic, partitionIndex);
futures.add(future.thenApply(consumer -> {
log.info("Add consumer {} for partition {}", consumer, tp);
consumers.putIfAbsent(tp, consumer);
return consumer;
}));
topicPartitions.add(tp);
}
} else {
// Topic has a single partition
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.topic(topic)
.subscribeAsync();
TopicPartition tp = normalizedTopicPartition(topic, 0);
futures.add(future.thenApply(consumer -> {
log.info("Add consumer {} for partition {}", consumer, tp);
consumers.putIfAbsent(tp, consumer);
return consumer;
}));
topicPartitions.add(tp);
}
}
unpolledPartitions.addAll(topicPartitions);
// Wait for all consumers to be ready
futures.forEach(CompletableFuture::join);
PulsarConsumerCoordinator.invokePartitionsAssigned(groupId, consumerCfg, Lists.newArrayList(consumers.keySet()));
// Notify the listener is now owning all topics/partitions
if (callback != null) {
callback.onPartitionsAssigned(topicPartitions);
}
} catch (Exception e) {
// Close all consumer that might have been successfully created
futures.forEach(f -> {
try {
f.get().close();
} catch (Exception e1) {
// Ignore. Consumer already had failed
}
});
throw new RuntimeException(e);
}
}