in pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java [779:934]
public static void doCommonChecks(FunctionConfig functionConfig) {
if (isEmpty(functionConfig.getTenant())) {
throw new IllegalArgumentException("Function tenant cannot be null");
}
if (isEmpty(functionConfig.getNamespace())) {
throw new IllegalArgumentException("Function namespace cannot be null");
}
if (isEmpty(functionConfig.getName())) {
throw new IllegalArgumentException("Function name cannot be null");
}
// go doesn't need className. Java className is done in doJavaChecks.
if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON) {
if (isEmpty(functionConfig.getClassName())) {
throw new IllegalArgumentException("Function classname cannot be null");
}
}
Collection<String> allInputTopics = collectAllInputTopics(functionConfig);
if (allInputTopics.isEmpty()) {
throw new IllegalArgumentException("No input topic(s) specified for the function");
}
for (String topic : allInputTopics) {
if (!TopicName.isValid(topic)) {
throw new IllegalArgumentException(String.format("Input topic %s is invalid", topic));
}
}
if (!isEmpty(functionConfig.getOutput())) {
if (!TopicName.isValid(functionConfig.getOutput())) {
throw new IllegalArgumentException(
String.format("Output topic %s is invalid", functionConfig.getOutput()));
}
}
if (!isEmpty(functionConfig.getLogTopic())) {
if (!TopicName.isValid(functionConfig.getLogTopic())) {
throw new IllegalArgumentException(
String.format("LogTopic topic %s is invalid", functionConfig.getLogTopic()));
}
}
if (!isEmpty(functionConfig.getDeadLetterTopic())) {
if (!TopicName.isValid(functionConfig.getDeadLetterTopic())) {
throw new IllegalArgumentException(
String.format("DeadLetter topic %s is invalid", functionConfig.getDeadLetterTopic()));
}
}
if (functionConfig.getParallelism() != null && functionConfig.getParallelism() <= 0) {
throw new IllegalArgumentException("Function parallelism must be a positive number");
}
// Ensure that topics aren't being used as both input and output
verifyNoTopicClash(allInputTopics, functionConfig.getOutput());
WindowConfig windowConfig = functionConfig.getWindowConfig();
if (windowConfig != null) {
// set auto ack to false since windowing framework is responsible
// for acking and not the function framework
if (functionConfig.getAutoAck() != null && functionConfig.getAutoAck()) {
throw new IllegalArgumentException("Cannot enable auto ack when using windowing functionality");
}
WindowConfigUtils.validate(windowConfig);
}
if (functionConfig.getResources() != null) {
ResourceConfigUtils.validate(functionConfig.getResources());
}
if (functionConfig.getTimeoutMs() != null && functionConfig.getTimeoutMs() <= 0) {
throw new IllegalArgumentException("Function timeout must be a positive number");
}
if (functionConfig.getTimeoutMs() != null
&& functionConfig.getProcessingGuarantees() != null
&& functionConfig.getProcessingGuarantees() != FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) {
throw new IllegalArgumentException("Message timeout can only be specified with processing guarantee is "
+ FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE.name());
}
if (functionConfig.getMaxMessageRetries() != null && functionConfig.getMaxMessageRetries() >= 0
&& functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
throw new IllegalArgumentException("MaxMessageRetries and Effectively once don't gel well");
}
if ((functionConfig.getMaxMessageRetries() == null || functionConfig.getMaxMessageRetries() < 0)
&& !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getDeadLetterTopic())) {
throw new IllegalArgumentException("Dead Letter Topic specified, however max retries is set to infinity");
}
if (functionConfig.getRetainKeyOrdering() != null
&& functionConfig.getRetainKeyOrdering()
&& functionConfig.getProcessingGuarantees() != null
&& functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
throw new IllegalArgumentException(
"When effectively once processing guarantee is specified, retain Key ordering cannot be set");
}
if (functionConfig.getRetainKeyOrdering() != null && functionConfig.getRetainKeyOrdering()
&& functionConfig.getRetainOrdering() != null && functionConfig.getRetainOrdering()) {
throw new IllegalArgumentException("Only one of retain ordering or retain key ordering can be set");
}
if (!isEmpty(functionConfig.getPy()) && !org.apache.pulsar.common.functions.Utils
.isFunctionPackageUrlSupported(functionConfig.getPy())
&& functionConfig.getPy().startsWith(BUILTIN)) {
String filename = functionConfig.getPy();
if (filename.contains("..")) {
throw new IllegalArgumentException("Invalid filename: " + filename);
}
if (!new File(filename).exists()) {
throw new IllegalArgumentException("The supplied python file does not exist");
}
}
if (!isEmpty(functionConfig.getGo()) && !org.apache.pulsar.common.functions.Utils
.isFunctionPackageUrlSupported(functionConfig.getGo())
&& functionConfig.getGo().startsWith(BUILTIN)) {
String filename = functionConfig.getGo();
if (filename.contains("..")) {
throw new IllegalArgumentException("Invalid filename: " + filename);
}
if (!new File(filename).exists()) {
throw new IllegalArgumentException("The supplied go file does not exist");
}
}
if (functionConfig.getInputSpecs() != null) {
functionConfig.getInputSpecs().forEach((topicName, conf) -> {
// receiver queue size should be >= 0
if (conf.getReceiverQueueSize() != null && conf.getReceiverQueueSize() < 0) {
throw new IllegalArgumentException(
"Receiver queue size should be >= zero");
}
if (conf.getCryptoConfig() != null && isBlank(conf.getCryptoConfig().getCryptoKeyReaderClassName())) {
throw new IllegalArgumentException(
"CryptoKeyReader class name required");
}
if (conf.getMessagePayloadProcessorConfig() != null && isBlank(
conf.getMessagePayloadProcessorConfig().getClassName())) {
throw new IllegalArgumentException(
"MessagePayloadProcessor class name required");
}
});
}
if (functionConfig.getProducerConfig() != null
&& functionConfig.getProducerConfig().getCryptoConfig() != null) {
if (isBlank(functionConfig.getProducerConfig().getCryptoConfig().getCryptoKeyReaderClassName())) {
throw new IllegalArgumentException("CryptoKeyReader class name required");
}
if (functionConfig.getProducerConfig().getCryptoConfig().getEncryptionKeys() == null
|| functionConfig.getProducerConfig().getCryptoConfig().getEncryptionKeys().length == 0) {
throw new IllegalArgumentException("Must provide encryption key name for crypto key reader");
}
}
}