public static void doCommonChecks()

in pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java [779:934]


    public static void doCommonChecks(FunctionConfig functionConfig) {
        if (isEmpty(functionConfig.getTenant())) {
            throw new IllegalArgumentException("Function tenant cannot be null");
        }
        if (isEmpty(functionConfig.getNamespace())) {
            throw new IllegalArgumentException("Function namespace cannot be null");
        }
        if (isEmpty(functionConfig.getName())) {
            throw new IllegalArgumentException("Function name cannot be null");
        }
        // go doesn't need className. Java className is done in doJavaChecks.
        if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON) {
            if (isEmpty(functionConfig.getClassName())) {
                throw new IllegalArgumentException("Function classname cannot be null");
            }
        }

        Collection<String> allInputTopics = collectAllInputTopics(functionConfig);
        if (allInputTopics.isEmpty()) {
            throw new IllegalArgumentException("No input topic(s) specified for the function");
        }
        for (String topic : allInputTopics) {
            if (!TopicName.isValid(topic)) {
                throw new IllegalArgumentException(String.format("Input topic %s is invalid", topic));
            }
        }

        if (!isEmpty(functionConfig.getOutput())) {
            if (!TopicName.isValid(functionConfig.getOutput())) {
                throw new IllegalArgumentException(
                        String.format("Output topic %s is invalid", functionConfig.getOutput()));
            }
        }

        if (!isEmpty(functionConfig.getLogTopic())) {
            if (!TopicName.isValid(functionConfig.getLogTopic())) {
                throw new IllegalArgumentException(
                        String.format("LogTopic topic %s is invalid", functionConfig.getLogTopic()));
            }
        }

        if (!isEmpty(functionConfig.getDeadLetterTopic())) {
            if (!TopicName.isValid(functionConfig.getDeadLetterTopic())) {
                throw new IllegalArgumentException(
                        String.format("DeadLetter topic %s is invalid", functionConfig.getDeadLetterTopic()));
            }
        }

        if (functionConfig.getParallelism() != null && functionConfig.getParallelism() <= 0) {
            throw new IllegalArgumentException("Function parallelism must be a positive number");
        }
        // Ensure that topics aren't being used as both input and output
        verifyNoTopicClash(allInputTopics, functionConfig.getOutput());

        WindowConfig windowConfig = functionConfig.getWindowConfig();
        if (windowConfig != null) {
            // set auto ack to false since windowing framework is responsible
            // for acking and not the function framework
            if (functionConfig.getAutoAck() != null && functionConfig.getAutoAck()) {
                throw new IllegalArgumentException("Cannot enable auto ack when using windowing functionality");
            }
            WindowConfigUtils.validate(windowConfig);
        }

        if (functionConfig.getResources() != null) {
            ResourceConfigUtils.validate(functionConfig.getResources());
        }

        if (functionConfig.getTimeoutMs() != null && functionConfig.getTimeoutMs() <= 0) {
            throw new IllegalArgumentException("Function timeout must be a positive number");
        }

        if (functionConfig.getTimeoutMs() != null
                && functionConfig.getProcessingGuarantees() != null
                && functionConfig.getProcessingGuarantees() != FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) {
            throw new IllegalArgumentException("Message timeout can only be specified with processing guarantee is "
                    + FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE.name());
        }

        if (functionConfig.getMaxMessageRetries() != null && functionConfig.getMaxMessageRetries() >= 0
                && functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
            throw new IllegalArgumentException("MaxMessageRetries and Effectively once don't gel well");
        }
        if ((functionConfig.getMaxMessageRetries() == null || functionConfig.getMaxMessageRetries() < 0)
                && !org.apache.commons.lang3.StringUtils.isEmpty(functionConfig.getDeadLetterTopic())) {
            throw new IllegalArgumentException("Dead Letter Topic specified, however max retries is set to infinity");
        }
        if (functionConfig.getRetainKeyOrdering() != null
                && functionConfig.getRetainKeyOrdering()
                && functionConfig.getProcessingGuarantees() != null
                && functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
            throw new IllegalArgumentException(
                    "When effectively once processing guarantee is specified, retain Key ordering cannot be set");
        }
        if (functionConfig.getRetainKeyOrdering() != null && functionConfig.getRetainKeyOrdering()
                && functionConfig.getRetainOrdering() != null && functionConfig.getRetainOrdering()) {
            throw new IllegalArgumentException("Only one of retain ordering or retain key ordering can be set");
        }

        if (!isEmpty(functionConfig.getPy()) && !org.apache.pulsar.common.functions.Utils
                .isFunctionPackageUrlSupported(functionConfig.getPy())
                && functionConfig.getPy().startsWith(BUILTIN)) {
            String filename = functionConfig.getPy();
            if (filename.contains("..")) {
                throw new IllegalArgumentException("Invalid filename: " + filename);
            }

            if (!new File(filename).exists()) {
                throw new IllegalArgumentException("The supplied python file does not exist");
            }
        }
        if (!isEmpty(functionConfig.getGo()) && !org.apache.pulsar.common.functions.Utils
                .isFunctionPackageUrlSupported(functionConfig.getGo())
                && functionConfig.getGo().startsWith(BUILTIN)) {
            String filename = functionConfig.getGo();
            if (filename.contains("..")) {
                throw new IllegalArgumentException("Invalid filename: " + filename);
            }

            if (!new File(filename).exists()) {
                throw new IllegalArgumentException("The supplied go file does not exist");
            }
        }

        if (functionConfig.getInputSpecs() != null) {
            functionConfig.getInputSpecs().forEach((topicName, conf) -> {
                // receiver queue size should be >= 0
                if (conf.getReceiverQueueSize() != null && conf.getReceiverQueueSize() < 0) {
                    throw new IllegalArgumentException(
                        "Receiver queue size should be >= zero");
                }

                if (conf.getCryptoConfig() != null && isBlank(conf.getCryptoConfig().getCryptoKeyReaderClassName())) {
                    throw new IllegalArgumentException(
                            "CryptoKeyReader class name required");
                }
                if (conf.getMessagePayloadProcessorConfig() != null && isBlank(
                        conf.getMessagePayloadProcessorConfig().getClassName())) {
                    throw new IllegalArgumentException(
                            "MessagePayloadProcessor class name required");
                }
            });
        }

        if (functionConfig.getProducerConfig() != null
                && functionConfig.getProducerConfig().getCryptoConfig() != null) {
            if (isBlank(functionConfig.getProducerConfig().getCryptoConfig().getCryptoKeyReaderClassName())) {
                throw new IllegalArgumentException("CryptoKeyReader class name required");
            }

            if (functionConfig.getProducerConfig().getCryptoConfig().getEncryptionKeys() == null
                    || functionConfig.getProducerConfig().getCryptoConfig().getEncryptionKeys().length == 0) {
                throw new IllegalArgumentException("Must provide encryption key name for crypto key reader");
            }
        }
    }