public SparkStreamingPulsarReceiver()

in pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java [58:84]


    public SparkStreamingPulsarReceiver(StorageLevel storageLevel,
        String serviceUrl,
        ConsumerConfigurationData<byte[]> conf,
        Authentication authentication) {
        super(storageLevel);

        checkNotNull(serviceUrl, "serviceUrl must not be null");
        checkNotNull(conf, "ConsumerConfigurationData must not be null");
        checkArgument(conf.getTopicNames().size() > 0, "TopicNames must be set a value.");
        checkNotNull(conf.getSubscriptionName(), "SubscriptionName must not be null");

        this.serviceUrl = serviceUrl;
        this.authentication = authentication;

        if (conf.getMessageListener() == null) {
            conf.setMessageListener((MessageListener<byte[]> & Serializable) (consumer, msg) -> {
                try {
                    store(msg.getData());
                    consumer.acknowledgeAsync(msg);
                } catch (Exception e) {
                    LOG.error("Failed to store a message : {}", e.getMessage());
                    consumer.negativeAcknowledge(msg);
                }
            });
        }
        this.conf = conf;
    }