public static FunctionDetails convert()

in pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java [74:283]


    public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetails sinkDetails) throws IOException {
        FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();

        boolean isBuiltin =
                !org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive()
                        .startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);

        if (sinkConfig.getTenant() != null) {
            functionDetailsBuilder.setTenant(sinkConfig.getTenant());
        }
        if (sinkConfig.getNamespace() != null) {
            functionDetailsBuilder.setNamespace(sinkConfig.getNamespace());
        }
        if (sinkConfig.getName() != null) {
            functionDetailsBuilder.setName(sinkConfig.getName());
        }
        if (sinkConfig.getLogTopic() != null) {
            functionDetailsBuilder.setLogTopic(sinkConfig.getLogTopic());
        }
        functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
        if (sinkConfig.getParallelism() != null) {
            functionDetailsBuilder.setParallelism(sinkConfig.getParallelism());
        } else {
            functionDetailsBuilder.setParallelism(1);
        }
        if (sinkDetails.getFunctionClassName() != null) {
            functionDetailsBuilder.setClassName(sinkDetails.getFunctionClassName());
        } else {
            functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
        }
        if (sinkConfig.getTransformFunctionConfig() != null) {
            functionDetailsBuilder.setUserConfig(sinkConfig.getTransformFunctionConfig());
        }
        if (sinkConfig.getProcessingGuarantees() != null) {
            functionDetailsBuilder.setProcessingGuarantees(
                    convertProcessingGuarantee(sinkConfig.getProcessingGuarantees()));
        } else {
            functionDetailsBuilder.setProcessingGuarantees(Function.ProcessingGuarantees.ATLEAST_ONCE);
        }

        // set source spec
        // source spec classname should be empty so that the default pulsar source will be used
        Function.SourceSpec.Builder sourceSpecBuilder = Function.SourceSpec.newBuilder();
        sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
        if (sinkConfig.getInputs() != null) {
            sinkConfig.getInputs().forEach(topicName ->
                    sourceSpecBuilder.putInputSpecs(topicName,
                            Function.ConsumerSpec.newBuilder()
                                    .setIsRegexPattern(false)
                                    .build()));
        }
        if (!StringUtils.isEmpty(sinkConfig.getTopicsPattern())) {
            sourceSpecBuilder.putInputSpecs(sinkConfig.getTopicsPattern(),
                    Function.ConsumerSpec.newBuilder()
                            .setIsRegexPattern(true)
                            .build());
        }
        if (sinkConfig.getTopicToSerdeClassName() != null) {
            sinkConfig.getTopicToSerdeClassName().forEach((topicName, serde) -> {
                sourceSpecBuilder.putInputSpecs(topicName,
                        Function.ConsumerSpec.newBuilder()
                                .setSerdeClassName(serde == null ? "" : serde)
                                .setIsRegexPattern(false)
                                .build());
            });
        }
        if (sinkConfig.getTopicToSchemaType() != null) {
            sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> {
                sourceSpecBuilder.putInputSpecs(topicName,
                        Function.ConsumerSpec.newBuilder()
                                .setSchemaType(schemaType == null ? "" : schemaType)
                                .setIsRegexPattern(false)
                                .build());
            });
        }
        if (sinkConfig.getInputSpecs() != null) {
            sinkConfig.getInputSpecs().forEach((topic, spec) -> {
                Function.ConsumerSpec.Builder bldr = Function.ConsumerSpec.newBuilder()
                        .setIsRegexPattern(spec.isRegexPattern());
                if (StringUtils.isNotBlank(spec.getSchemaType())) {
                    bldr.setSchemaType(spec.getSchemaType());
                } else if (StringUtils.isNotBlank(spec.getSerdeClassName())) {
                    bldr.setSerdeClassName(spec.getSerdeClassName());
                }
                if (spec.getReceiverQueueSize() != null) {
                    bldr.setReceiverQueueSize(Function.ConsumerSpec.ReceiverQueueSize.newBuilder()
                            .setValue(spec.getReceiverQueueSize()).build());
                }
                if (spec.getCryptoConfig() != null) {
                    bldr.setCryptoSpec(CryptoUtils.convert(spec.getCryptoConfig()));
                }
                if (spec.getMessagePayloadProcessorConfig() != null) {
                    bldr.setMessagePayloadProcessorSpec(
                            MessagePayloadProcessorUtils.convert(spec.getMessagePayloadProcessorConfig()));
                }
                bldr.putAllConsumerProperties(spec.getConsumerProperties());
                bldr.setPoolMessages(spec.isPoolMessages());
                sourceSpecBuilder.putInputSpecs(topic, bldr.build());
            });
        }

        if (sinkDetails.getTypeArg() != null) {
            sourceSpecBuilder.setTypeClassName(sinkDetails.getTypeArg());
        }
        if (isNotBlank(sinkConfig.getSourceSubscriptionName())) {
            sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName());
        }

        // Set subscription type
        Function.SubscriptionType subType;
        if ((sinkConfig.getRetainOrdering() != null && sinkConfig.getRetainOrdering())
                || FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE.equals(sinkConfig.getProcessingGuarantees())) {
            subType = Function.SubscriptionType.FAILOVER;
        } else if (sinkConfig.getRetainKeyOrdering() != null && sinkConfig.getRetainKeyOrdering()) {
            subType = Function.SubscriptionType.KEY_SHARED;
        } else {
            subType = Function.SubscriptionType.SHARED;
        }
        sourceSpecBuilder.setSubscriptionType(subType);

        if (sinkConfig.getAutoAck() != null) {
            functionDetailsBuilder.setAutoAck(sinkConfig.getAutoAck());
        } else {
            functionDetailsBuilder.setAutoAck(true);
        }

        if (sinkConfig.getTimeoutMs() != null) {
            sourceSpecBuilder.setTimeoutMs(sinkConfig.getTimeoutMs());
        }
        if (sinkConfig.getCleanupSubscription() != null) {
            sourceSpecBuilder.setCleanupSubscription(sinkConfig.getCleanupSubscription());
        } else {
            sourceSpecBuilder.setCleanupSubscription(true);
        }
        if (sinkConfig.getNegativeAckRedeliveryDelayMs() != null && sinkConfig.getNegativeAckRedeliveryDelayMs() > 0) {
            sourceSpecBuilder.setNegativeAckRedeliveryDelayMs(sinkConfig.getNegativeAckRedeliveryDelayMs());
        }

        if (sinkConfig.getSourceSubscriptionPosition() == SubscriptionInitialPosition.Earliest) {
            sourceSpecBuilder.setSubscriptionPosition(Function.SubscriptionPosition.EARLIEST);
        } else {
            sourceSpecBuilder.setSubscriptionPosition(Function.SubscriptionPosition.LATEST);
        }

        functionDetailsBuilder.setSource(sourceSpecBuilder);

        if (sinkConfig.getRetainKeyOrdering() != null) {
            functionDetailsBuilder.setRetainKeyOrdering(sinkConfig.getRetainKeyOrdering());
        }
        if (sinkConfig.getRetainOrdering() != null) {
            functionDetailsBuilder.setRetainOrdering(sinkConfig.getRetainOrdering());
        }

        if (sinkConfig.getMaxMessageRetries() != null && sinkConfig.getMaxMessageRetries() > 0) {
            Function.RetryDetails.Builder retryDetails = Function.RetryDetails.newBuilder();
            retryDetails.setMaxMessageRetries(sinkConfig.getMaxMessageRetries());
            if (StringUtils.isNotBlank(sinkConfig.getDeadLetterTopic())) {
                retryDetails.setDeadLetterTopic(sinkConfig.getDeadLetterTopic());
            }
            functionDetailsBuilder.setRetryDetails(retryDetails);
        }

        // set up sink spec
        Function.SinkSpec.Builder sinkSpecBuilder = Function.SinkSpec.newBuilder();
        if (sinkDetails.getSinkClassName() != null) {
            sinkSpecBuilder.setClassName(sinkDetails.getSinkClassName());
        }

        if (isBuiltin) {
            String builtin = sinkConfig.getArchive().replaceFirst("^builtin://", "");
            sinkSpecBuilder.setBuiltin(builtin);
        }

        if (!isEmpty(sinkConfig.getTransformFunction())
                && sinkConfig.getTransformFunction().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
            functionDetailsBuilder.setBuiltin(sinkConfig.getTransformFunction().replaceFirst("^builtin://", ""));
        }

        if (sinkConfig.getConfigs() != null) {
            sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfig.getConfigs()));
        }
        if (sinkConfig.getSecrets() != null && !sinkConfig.getSecrets().isEmpty()) {
            functionDetailsBuilder.setSecretsMap(new Gson().toJson(sinkConfig.getSecrets()));
        }
        if (sinkDetails.getTypeArg() != null) {
            sinkSpecBuilder.setTypeClassName(sinkDetails.getTypeArg());
        }
        functionDetailsBuilder.setSink(sinkSpecBuilder);

        // use default resources if resources not set
        Resources resources = Resources.mergeWithDefault(sinkConfig.getResources());

        Function.Resources.Builder bldr = Function.Resources.newBuilder();
        bldr.setCpu(resources.getCpu());
        bldr.setRam(resources.getRam());
        bldr.setDisk(resources.getDisk());
        functionDetailsBuilder.setResources(bldr);

        if (isNotBlank(sinkConfig.getRuntimeFlags())) {
            functionDetailsBuilder.setRuntimeFlags(sinkConfig.getRuntimeFlags());
        }

        functionDetailsBuilder.setComponentType(FunctionDetails.ComponentType.SINK);

        if (!StringUtils.isEmpty(sinkConfig.getCustomRuntimeOptions())) {
            functionDetailsBuilder.setCustomRuntimeOptions(sinkConfig.getCustomRuntimeOptions());
        }

        return FunctionConfigUtils.validateFunctionDetails(functionDetailsBuilder.build());
    }