public static ExtractedSinkDetails validateAndExtractDetails()

in pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java [418:585]


    public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConfig,
                                                                 ValidatableFunctionPackage sinkFunction,
                                                                 ValidatableFunctionPackage transformFunction,
                                                                 boolean validateConnectorConfig) {
        if (isEmpty(sinkConfig.getTenant())) {
            throw new IllegalArgumentException("Sink tenant cannot be null");
        }
        if (isEmpty(sinkConfig.getNamespace())) {
            throw new IllegalArgumentException("Sink namespace cannot be null");
        }
        if (isEmpty(sinkConfig.getName())) {
            throw new IllegalArgumentException("Sink name cannot be null");
        }

        // make we sure we have one source of input
        Collection<String> allInputs = collectAllInputTopics(sinkConfig);
        if (allInputs.isEmpty()) {
            throw new IllegalArgumentException("Must specify at least one topic of input via topicToSerdeClassName, "
                    + "topicsPattern, topicToSchemaType or inputSpecs");
        }
        for (String topic : allInputs) {
            if (!TopicName.isValid(topic)) {
                throw new IllegalArgumentException(String.format("Input topic %s is invalid", topic));
            }
        }
        if (!isEmpty(sinkConfig.getLogTopic())) {
            if (!TopicName.isValid(sinkConfig.getLogTopic())) {
                throw new IllegalArgumentException(
                        String.format("LogTopic topic %s is invalid", sinkConfig.getLogTopic()));
            }
        }

        if (sinkConfig.getParallelism() != null && sinkConfig.getParallelism() <= 0) {
            throw new IllegalArgumentException("Sink parallelism must be a positive number");
        }

        if (sinkConfig.getResources() != null) {
            ResourceConfigUtils.validate(sinkConfig.getResources());
        }

        if (sinkConfig.getTimeoutMs() != null && sinkConfig.getTimeoutMs() < 0) {
            throw new IllegalArgumentException("Sink timeout must be a positive number");
        }

        String sinkClassName = sinkConfig.getClassName();
        // if class name in sink config is not set, this should be a built-in sink
        // thus we should try to find it class name in the NAR service definition
        if (sinkClassName == null) {
            ConnectorDefinition connectorDefinition = sinkFunction.getFunctionMetaData(ConnectorDefinition.class);
            if (connectorDefinition == null) {
                throw new IllegalArgumentException(
                        "Sink package doesn't contain the META-INF/services/pulsar-io.yaml file.");
            }
            sinkClassName = connectorDefinition.getSinkClass();
            if (sinkClassName == null) {
                throw new IllegalArgumentException("Failed to extract sink class from archive");
            }
        }

        // check if sink implements the correct interfaces
        TypeDefinition sinkClass;
        try {
            sinkClass = sinkFunction.resolveType(sinkClassName);
        } catch (TypePool.Resolution.NoSuchTypeException e) {
            throw new IllegalArgumentException(
                    String.format("Sink class %s not found", sinkClassName), e);
        }

        String functionClassName = sinkConfig.getTransformFunctionClassName();
        TypeDefinition typeArg;
        ValidatableFunctionPackage inputFunction;
        if (transformFunction != null) {
            // if function class name in sink config is not set, this should be a built-in function
            // thus we should try to find it class name in the NAR service definition
            if (functionClassName == null) {
                FunctionDefinition functionDefinition =
                        transformFunction.getFunctionMetaData(FunctionDefinition.class);
                if (functionDefinition == null) {
                    throw new IllegalArgumentException(
                            "Function package doesn't contain the META-INF/services/pulsar-io.yaml file.");
                }
                functionClassName = functionDefinition.getFunctionClass();
                if (functionClassName == null) {
                    throw new IllegalArgumentException("Transform function class name must be set");
                }
            }
            TypeDefinition functionClass;
            try {
                functionClass = transformFunction.resolveType(functionClassName);
            } catch (TypePool.Resolution.NoSuchTypeException e) {
                throw new IllegalArgumentException(
                        String.format("Function class %s not found", functionClassName), e);
            }
            // extract type from transform function class
            if (!getRawFunctionTypes(functionClass, false)[1].asErasure().isAssignableTo(Record.class)) {
                throw new IllegalArgumentException("Sink transform function output must be of type Record");
            }
            typeArg = getFunctionTypes(functionClass, false)[0];
            inputFunction = transformFunction;
        } else {
            // extract type from sink class
            typeArg = getSinkType(sinkClass);
            inputFunction = sinkFunction;
        }

        if (sinkConfig.getTopicToSerdeClassName() != null) {
            for (String serdeClassName : sinkConfig.getTopicToSerdeClassName().values()) {
                ValidatorUtils.validateSerde(serdeClassName, typeArg, inputFunction.getTypePool(), true);
            }
        }

        if (sinkConfig.getTopicToSchemaType() != null) {
            for (String schemaType : sinkConfig.getTopicToSchemaType().values()) {
                ValidatorUtils.validateSchema(schemaType, typeArg, inputFunction.getTypePool(), true);
            }
        }

        // topicsPattern does not need checks

        if (sinkConfig.getInputSpecs() != null) {
            for (ConsumerConfig consumerSpec : sinkConfig.getInputSpecs().values()) {
                // Only one is set
                if (!isEmpty(consumerSpec.getSerdeClassName()) && !isEmpty(consumerSpec.getSchemaType())) {
                    throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
                }
                if (!isEmpty(consumerSpec.getSerdeClassName())) {
                    ValidatorUtils.validateSerde(consumerSpec.getSerdeClassName(), typeArg,
                            inputFunction.getTypePool(), true);
                }
                if (!isEmpty(consumerSpec.getSchemaType())) {
                    ValidatorUtils.validateSchema(consumerSpec.getSchemaType(), typeArg,
                            inputFunction.getTypePool(), true);
                }
                if (consumerSpec.getCryptoConfig() != null) {
                    ValidatorUtils.validateCryptoKeyReader(consumerSpec.getCryptoConfig(),
                            inputFunction.getTypePool(), false);
                }
                if (consumerSpec.getMessagePayloadProcessorConfig() != null) {
                    ValidatorUtils.validateMessagePayloadProcessor(consumerSpec.getMessagePayloadProcessorConfig(),
                            inputFunction.getTypePool());
                }
            }
        }

        if (sinkConfig.getRetainKeyOrdering() != null
                && sinkConfig.getRetainKeyOrdering()
                && sinkConfig.getProcessingGuarantees() != null
                && sinkConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
            throw new IllegalArgumentException(
                    "When effectively once processing guarantee is specified, retain Key ordering cannot be set");
        }

        if (sinkConfig.getRetainKeyOrdering() != null && sinkConfig.getRetainKeyOrdering()
                && sinkConfig.getRetainOrdering() != null && sinkConfig.getRetainOrdering()) {
            throw new IllegalArgumentException("Only one of retain ordering or retain key ordering can be set");
        }

        // validate user defined config if enabled and classloading is enabled
        if (validateConnectorConfig) {
            if (sinkFunction.isEnableClassloading()) {
                validateSinkConfig(sinkConfig, sinkFunction);
            } else {
                log.warn("Skipping annotation based validation of sink config as classloading is disabled");
            }
        }

        return new ExtractedSinkDetails(sinkClassName, typeArg.asErasure().getTypeName(), functionClassName);
    }