in pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/ConsumerConnector.java [71:107]
public ConsumerConnector(ConsumerConfig config) {
checkNotNull(config, "ConsumerConfig can't be null");
clientId = config.clientId();
groupId = config.groupId();
isAutoCommit = config.autoCommitEnable();
if ("largest".equalsIgnoreCase(config.autoOffsetReset())) {
strategy = SubscriptionInitialPosition.Latest;
} else if ("smallest".equalsIgnoreCase(config.autoOffsetReset())) {
strategy = SubscriptionInitialPosition.Earliest;
}
String consumerId = !config.consumerId().isEmpty() ? config.consumerId().get() : null;
int maxMessage = config.queuedMaxMessages();
String serviceUrl = config.zkConnect();
Properties properties = config.props() != null && config.props().props() != null ? config.props().props()
: new Properties();
try {
client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(serviceUrl).build();
} catch (PulsarClientException e) {
throw new IllegalArgumentException(
"Failed to create pulsar-client using url = " + serviceUrl + ", properties = " + properties, e);
}
topicStreams = Sets.newConcurrentHashSet();
consumerBuilder = client.newConsumer();
consumerBuilder.subscriptionName(groupId);
if (properties.containsKey("queued.max.message.chunks") && config.queuedMaxMessages() > 0) {
consumerBuilder.receiverQueueSize(maxMessage);
}
if (consumerId != null) {
consumerBuilder.consumerName(consumerId);
}
if (properties.containsKey("auto.commit.interval.ms") && config.autoCommitIntervalMs() > 0) {
consumerBuilder.acknowledgmentGroupTime(config.autoCommitIntervalMs(), TimeUnit.MILLISECONDS);
}
this.executor = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-kafka"));
}