in pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java [58:84]
public SparkStreamingPulsarReceiver(StorageLevel storageLevel,
String serviceUrl,
ConsumerConfigurationData<byte[]> conf,
Authentication authentication) {
super(storageLevel);
checkNotNull(serviceUrl, "serviceUrl must not be null");
checkNotNull(conf, "ConsumerConfigurationData must not be null");
checkArgument(conf.getTopicNames().size() > 0, "TopicNames must be set a value.");
checkNotNull(conf.getSubscriptionName(), "SubscriptionName must not be null");
this.serviceUrl = serviceUrl;
this.authentication = authentication;
if (conf.getMessageListener() == null) {
conf.setMessageListener((MessageListener<byte[]> & Serializable) (consumer, msg) -> {
try {
store(msg.getData());
consumer.acknowledgeAsync(msg);
} catch (Exception e) {
LOG.error("Failed to store a message : {}", e.getMessage());
consumer.negativeAcknowledge(msg);
}
});
}
this.conf = conf;
}