in pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/simple/consumer/PulsarKafkaSimpleConsumer.java [93:114]
public PulsarKafkaSimpleConsumer(String host, int port, int soTimeout, int bufferSize, String clientId,
Properties properties) {
super(host, port, soTimeout, bufferSize, clientId);
this.host = host;
this.port = port;
this.clientId = clientId;
try {
client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(host).build();
} catch (PulsarClientException e) {
log.warn("Failed to create pulsar client for {} and properties {}", host, properties);
throw new RuntimeException("Failed to create pulsar client " + host, e);
}
try {
String url = properties.getProperty(HTTP_SERVICE_URL, host);
admin = PulsarClientKafkaConfig.getAdminBuilder(url, properties).build();
} catch (PulsarClientException e) {
log.warn("Failed to create pulsar admin for {} and properties {}", host, properties);
throw new RuntimeException("Failed to create pulsar admin " + host, e);
}
this.topicConsumerMap = new ConcurrentHashMap<>(8, 0.75f, 1);
this.subscriptionType = getSubscriptionType(properties);
}