public ConsumerConnector()

in pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/consumer/ConsumerConnector.java [71:107]


    public ConsumerConnector(ConsumerConfig config) {
        checkNotNull(config, "ConsumerConfig can't be null");
        clientId = config.clientId();
        groupId = config.groupId();
        isAutoCommit = config.autoCommitEnable();
        if ("largest".equalsIgnoreCase(config.autoOffsetReset())) {
            strategy = SubscriptionInitialPosition.Latest;
        } else if ("smallest".equalsIgnoreCase(config.autoOffsetReset())) {
            strategy = SubscriptionInitialPosition.Earliest;
        }
        String consumerId = !config.consumerId().isEmpty() ? config.consumerId().get() : null;
        int maxMessage = config.queuedMaxMessages();
        String serviceUrl = config.zkConnect();

        Properties properties = config.props() != null && config.props().props() != null ? config.props().props()
                : new Properties();
        try {
            client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(serviceUrl).build();
        } catch (PulsarClientException e) {
            throw new IllegalArgumentException(
                    "Failed to create pulsar-client using url = " + serviceUrl + ", properties = " + properties, e);
        }

        topicStreams = Sets.newConcurrentHashSet();
        consumerBuilder = client.newConsumer();
        consumerBuilder.subscriptionName(groupId);
        if (properties.containsKey("queued.max.message.chunks") && config.queuedMaxMessages() > 0) {
            consumerBuilder.receiverQueueSize(maxMessage);
        }
        if (consumerId != null) {
            consumerBuilder.consumerName(consumerId);
        }
        if (properties.containsKey("auto.commit.interval.ms") && config.autoCommitIntervalMs() > 0) {
            consumerBuilder.acknowledgmentGroupTime(config.autoCommitIntervalMs(), TimeUnit.MILLISECONDS);
        }
        this.executor = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-kafka"));
    }