public static FunctionDetails convert()

in pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java [93:374]


    public static FunctionDetails convert(FunctionConfig functionConfig, ExtractedFunctionDetails extractedDetails)
             throws IllegalArgumentException {

        boolean isBuiltin = !StringUtils.isEmpty(functionConfig.getJar())
                && functionConfig.getJar().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);

        FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();

        // Setup source
        Function.SourceSpec.Builder sourceSpecBuilder = Function.SourceSpec.newBuilder();
        if (functionConfig.getInputs() != null) {
            functionConfig.getInputs().forEach((topicName ->
                sourceSpecBuilder.putInputSpecs(topicName,
                        Function.ConsumerSpec.newBuilder()
                                .setIsRegexPattern(false)
                                .build())
            ));
        }
        if (functionConfig.getTopicsPattern() != null && !functionConfig.getTopicsPattern().isEmpty()) {
            sourceSpecBuilder.putInputSpecs(functionConfig.getTopicsPattern(),
                    Function.ConsumerSpec.newBuilder()
                            .setIsRegexPattern(true)
                            .build());
        }
        if (functionConfig.getCustomSerdeInputs() != null) {
            functionConfig.getCustomSerdeInputs().forEach((topicName, serdeClassName) -> {
                sourceSpecBuilder.putInputSpecs(topicName,
                        Function.ConsumerSpec.newBuilder()
                                .setSerdeClassName(serdeClassName)
                                .setIsRegexPattern(false)
                                .build());
            });
        }
        if (functionConfig.getCustomSchemaInputs() != null) {
            functionConfig.getCustomSchemaInputs().forEach((topicName, conf) -> {
                try {
                    ConsumerConfig consumerConfig = OBJECT_MAPPER.readValue(conf, ConsumerConfig.class);
                    sourceSpecBuilder.putInputSpecs(topicName,
                            Function.ConsumerSpec.newBuilder()
                                    .setSchemaType(consumerConfig.getSchemaType())
                                    .putAllSchemaProperties(consumerConfig.getSchemaProperties())
                                    .putAllConsumerProperties(consumerConfig.getConsumerProperties())
                                    .setIsRegexPattern(false)
                                    .build());
                } catch (JsonProcessingException e) {
                    throw new IllegalArgumentException(
                            String.format("Incorrect custom schema inputs,Topic %s ", topicName));
                }
            });
        }
        if (functionConfig.getInputSpecs() != null) {
            functionConfig.getInputSpecs().forEach((topicName, consumerConf) -> {
                Function.ConsumerSpec.Builder bldr = Function.ConsumerSpec.newBuilder()
                        .setIsRegexPattern(consumerConf.isRegexPattern());
                if (isNotBlank(consumerConf.getSchemaType())) {
                    bldr.setSchemaType(consumerConf.getSchemaType());
                } else if (isNotBlank(consumerConf.getSerdeClassName())) {
                    bldr.setSerdeClassName(consumerConf.getSerdeClassName());
                }
                if (consumerConf.getReceiverQueueSize() != null) {
                    bldr.setReceiverQueueSize(Function.ConsumerSpec.ReceiverQueueSize.newBuilder()
                            .setValue(consumerConf.getReceiverQueueSize()).build());
                }
                if (consumerConf.getSchemaProperties() != null) {
                    bldr.putAllSchemaProperties(consumerConf.getSchemaProperties());
                }
                if (consumerConf.getCryptoConfig() != null) {
                    bldr.setCryptoSpec(CryptoUtils.convert(consumerConf.getCryptoConfig()));
                }
                if (consumerConf.getMessagePayloadProcessorConfig() != null) {
                    bldr.setMessagePayloadProcessorSpec(
                            MessagePayloadProcessorUtils.convert(consumerConf.getMessagePayloadProcessorConfig()));
                }
                bldr.putAllConsumerProperties(consumerConf.getConsumerProperties());
                bldr.setPoolMessages(consumerConf.isPoolMessages());
                sourceSpecBuilder.putInputSpecs(topicName, bldr.build());
            });
        }

        // Set subscription type
        Function.SubscriptionType subType;
        if ((functionConfig.getRetainOrdering() != null && functionConfig.getRetainOrdering())
                || FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE
                .equals(functionConfig.getProcessingGuarantees())) {
            subType = Function.SubscriptionType.FAILOVER;
        } else if (functionConfig.getRetainKeyOrdering() != null && functionConfig.getRetainKeyOrdering()) {
            subType = Function.SubscriptionType.KEY_SHARED;
        } else {
            subType = Function.SubscriptionType.SHARED;
        }
        sourceSpecBuilder.setSubscriptionType(subType);

        // Set subscription name
        if (isNotBlank(functionConfig.getSubName())) {
            sourceSpecBuilder.setSubscriptionName(functionConfig.getSubName());
        }

        // Set subscription position
        if (functionConfig.getSubscriptionPosition() != null) {
            Function.SubscriptionPosition subPosition;
            if (SubscriptionInitialPosition.Earliest == functionConfig.getSubscriptionPosition()) {
                subPosition = Function.SubscriptionPosition.EARLIEST;
            } else {
                subPosition = Function.SubscriptionPosition.LATEST;
            }
            sourceSpecBuilder.setSubscriptionPosition(subPosition);
        }

        if (functionConfig.getSkipToLatest() != null) {
            sourceSpecBuilder.setSkipToLatest(functionConfig.getSkipToLatest());
        } else {
            sourceSpecBuilder.setSkipToLatest(false);
        }

        if (extractedDetails.getTypeArg0() != null) {
            sourceSpecBuilder.setTypeClassName(extractedDetails.getTypeArg0());
        } else if (StringUtils.isNotEmpty(functionConfig.getInputTypeClassName())) {
            sourceSpecBuilder.setTypeClassName(functionConfig.getInputTypeClassName());
        }
        if (functionConfig.getTimeoutMs() != null) {
            sourceSpecBuilder.setTimeoutMs(functionConfig.getTimeoutMs());
            // We use negative acks for fast tracking failures
            sourceSpecBuilder.setNegativeAckRedeliveryDelayMs(functionConfig.getTimeoutMs());
        }
        if (functionConfig.getCleanupSubscription() != null) {
            sourceSpecBuilder.setCleanupSubscription(functionConfig.getCleanupSubscription());
        } else {
            sourceSpecBuilder.setCleanupSubscription(true);
        }
        functionDetailsBuilder.setSource(sourceSpecBuilder);

        // Setup sink
        Function.SinkSpec.Builder sinkSpecBuilder = Function.SinkSpec.newBuilder();
        if (functionConfig.getOutput() != null) {
            sinkSpecBuilder.setTopic(functionConfig.getOutput());
        }
        if (!StringUtils.isBlank(functionConfig.getOutputSerdeClassName())) {
            sinkSpecBuilder.setSerDeClassName(functionConfig.getOutputSerdeClassName());
        }
        if (!StringUtils.isBlank(functionConfig.getOutputSchemaType())) {
            sinkSpecBuilder.setSchemaType(functionConfig.getOutputSchemaType());
        }
        if (functionConfig.getForwardSourceMessageProperty() == Boolean.TRUE) {
            sinkSpecBuilder.setForwardSourceMessageProperty(functionConfig.getForwardSourceMessageProperty());
        }
        if (functionConfig.getCustomSchemaOutputs() != null && functionConfig.getOutput() != null) {
            String conf = functionConfig.getCustomSchemaOutputs().get(functionConfig.getOutput());
            try {
                if (StringUtils.isNotEmpty(conf)) {
                    ConsumerConfig consumerConfig = OBJECT_MAPPER.readValue(conf, ConsumerConfig.class);
                    sinkSpecBuilder.putAllSchemaProperties(consumerConfig.getSchemaProperties());
                    sinkSpecBuilder.putAllConsumerProperties(consumerConfig.getConsumerProperties());
                }
            } catch (JsonProcessingException e) {
                throw new IllegalArgumentException(
                        String.format("Incorrect custom schema outputs,Topic %s ", functionConfig.getOutput()));
            }
        }
        if (extractedDetails.getTypeArg1() != null) {
            sinkSpecBuilder.setTypeClassName(extractedDetails.getTypeArg1());
        } else if (StringUtils.isNotEmpty(functionConfig.getOutputTypeClassName())) {
            sinkSpecBuilder.setTypeClassName(functionConfig.getOutputTypeClassName());
        }
        if (functionConfig.getProducerConfig() != null) {
            sinkSpecBuilder.setProducerSpec(convertProducerConfigToProducerSpec(functionConfig.getProducerConfig()));
        }
        if (functionConfig.getBatchBuilder() != null) {
            Function.ProducerSpec.Builder builder = sinkSpecBuilder.getProducerSpec() != null
                    ? sinkSpecBuilder.getProducerSpec().toBuilder()
                    : Function.ProducerSpec.newBuilder();
            sinkSpecBuilder.setProducerSpec(builder.setBatchBuilder(functionConfig.getBatchBuilder()).build());
        }
        functionDetailsBuilder.setSink(sinkSpecBuilder);

        if (functionConfig.getTenant() != null) {
            functionDetailsBuilder.setTenant(functionConfig.getTenant());
        }
        if (functionConfig.getNamespace() != null) {
            functionDetailsBuilder.setNamespace(functionConfig.getNamespace());
        }
        if (functionConfig.getName() != null) {
            functionDetailsBuilder.setName(functionConfig.getName());
        }
        if (functionConfig.getLogTopic() != null) {
            functionDetailsBuilder.setLogTopic(functionConfig.getLogTopic());
        }
        if (functionConfig.getRuntime() != null) {
            functionDetailsBuilder.setRuntime(FunctionCommon.convertRuntime(functionConfig.getRuntime()));
        }
        if (functionConfig.getProcessingGuarantees() != null) {
            functionDetailsBuilder.setProcessingGuarantees(
                    FunctionCommon.convertProcessingGuarantee(functionConfig.getProcessingGuarantees()));
        }
        if (functionConfig.getRetainKeyOrdering() != null) {
            functionDetailsBuilder.setRetainKeyOrdering(functionConfig.getRetainKeyOrdering());
        }
        if (functionConfig.getRetainOrdering() != null) {
            functionDetailsBuilder.setRetainOrdering(functionConfig.getRetainOrdering());
        }

        if (functionConfig.getMaxMessageRetries() != null && functionConfig.getMaxMessageRetries() >= 0) {
            Function.RetryDetails.Builder retryBuilder = Function.RetryDetails.newBuilder();
            retryBuilder.setMaxMessageRetries(functionConfig.getMaxMessageRetries());
            if (isNotEmpty(functionConfig.getDeadLetterTopic())) {
                retryBuilder.setDeadLetterTopic(functionConfig.getDeadLetterTopic());
            }
            functionDetailsBuilder.setRetryDetails(retryBuilder);
        }

        Map<String, Object> configs = new HashMap<>();
        if (functionConfig.getUserConfig() != null) {
            configs.putAll(functionConfig.getUserConfig());
        }

        // windowing related
        WindowConfig windowConfig = functionConfig.getWindowConfig();
        if (windowConfig != null) {
            // Windows Function not support MANUAL and EFFECTIVELY_ONCE.
            if (functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE
                    || functionConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.MANUAL) {
                throw new IllegalArgumentException(
                        "Windows Function not support "
                                + functionConfig.getProcessingGuarantees() + " delivery semantics.");
            } else {
                // Override functionConfig.getProcessingGuarantees to MANUAL, and set windowsFunction is guarantees
                windowConfig.setProcessingGuarantees(WindowConfig.ProcessingGuarantees
                        .valueOf(functionDetailsBuilder.getProcessingGuarantees().name()));
                functionDetailsBuilder.setProcessingGuarantees(Function.ProcessingGuarantees.MANUAL);
            }
            windowConfig.setActualWindowFunctionClassName(extractedDetails.getFunctionClassName());
            configs.put(WindowConfig.WINDOW_CONFIG_KEY, windowConfig);
            // set class name to window function executor
            functionDetailsBuilder.setClassName("org.apache.pulsar.functions.windowing.WindowFunctionExecutor");
        } else {
            if (extractedDetails.getFunctionClassName() != null) {
                functionDetailsBuilder.setClassName(extractedDetails.getFunctionClassName());
            }
        }
        if (!configs.isEmpty()) {
            functionDetailsBuilder.setUserConfig(new Gson().toJson(configs));
        }
        if (functionConfig.getSecrets() != null && !functionConfig.getSecrets().isEmpty()) {
            functionDetailsBuilder.setSecretsMap(new Gson().toJson(functionConfig.getSecrets()));
        }

        if (functionConfig.getAutoAck() != null) {
            functionDetailsBuilder.setAutoAck(functionConfig.getAutoAck());
        } else {
            functionDetailsBuilder.setAutoAck(true);
        }
        if (functionConfig.getParallelism() != null) {
            functionDetailsBuilder.setParallelism(functionConfig.getParallelism());
        } else {
            functionDetailsBuilder.setParallelism(1);
        }

        // use default resources if resources not set
        Resources resources = Resources.mergeWithDefault(functionConfig.getResources());

        Function.Resources.Builder bldr = Function.Resources.newBuilder();
        bldr.setCpu(resources.getCpu());
        bldr.setRam(resources.getRam());
        bldr.setDisk(resources.getDisk());
        functionDetailsBuilder.setResources(bldr);

        if (!StringUtils.isEmpty(functionConfig.getRuntimeFlags())) {
            functionDetailsBuilder.setRuntimeFlags(functionConfig.getRuntimeFlags());
        }

        functionDetailsBuilder.setComponentType(FunctionDetails.ComponentType.FUNCTION);

        if (!StringUtils.isEmpty(functionConfig.getCustomRuntimeOptions())) {
            functionDetailsBuilder.setCustomRuntimeOptions(functionConfig.getCustomRuntimeOptions());
        }

        if (isBuiltin) {
            String builtin = functionConfig.getJar().replaceFirst("^builtin://", "");
            functionDetailsBuilder.setBuiltin(builtin);
        }

        return validateFunctionDetails(functionDetailsBuilder.build());
    }