public static List getGoInstanceCmd()

in pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java [132:303]


    public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
                                                AuthenticationConfig authConfig,
                                                String originalCodeFileName,
                                                String pulsarServiceUrl,
                                                String stateStorageServiceUrl,
                                                String pulsarWebServiceUrl,
                                                boolean k8sRuntime) throws IOException {
        final List<String> args = new LinkedList<>();
        GoInstanceConfig goInstanceConfig = new GoInstanceConfig();

        // pass the raw functino details directly so that we don't need to assemble the `instanceConf.funcDetails`
        // manually in Go instance
        String functionDetails =
                JsonFormat.printer().omittingInsignificantWhitespace().print(instanceConfig.getFunctionDetails());
        goInstanceConfig.setFunctionDetails(functionDetails);

        if (instanceConfig.getClusterName() != null) {
            goInstanceConfig.setClusterName(instanceConfig.getClusterName());
        }

        if (null != stateStorageServiceUrl) {
            goInstanceConfig.setStateStorageServiceUrl(stateStorageServiceUrl);
        }

        if (instanceConfig.isExposePulsarAdminClientEnabled() && StringUtils.isNotBlank(pulsarWebServiceUrl)) {
            goInstanceConfig.setPulsarWebServiceUrl(pulsarWebServiceUrl);
        }

        if (instanceConfig.getInstanceId() != 0) {
            goInstanceConfig.setInstanceID(instanceConfig.getInstanceId());
        }

        if (instanceConfig.getFunctionId() != null) {
            goInstanceConfig.setFuncID(instanceConfig.getFunctionId());
        }

        if (instanceConfig.getFunctionVersion() != null) {
            goInstanceConfig.setFuncVersion(instanceConfig.getFunctionVersion());
        }

        if (instanceConfig.getFunctionDetails().getAutoAck()) {
            goInstanceConfig.setAutoAck(instanceConfig.getFunctionDetails().getAutoAck());
        }

        if (instanceConfig.getFunctionDetails().getTenant() != null) {
            goInstanceConfig.setTenant(instanceConfig.getFunctionDetails().getTenant());
        }

        if (instanceConfig.getFunctionDetails().getNamespace() != null) {
            goInstanceConfig.setNameSpace(instanceConfig.getFunctionDetails().getNamespace());
        }

        if (instanceConfig.getFunctionDetails().getName() != null) {
            goInstanceConfig.setName(instanceConfig.getFunctionDetails().getName());
        }

        if (instanceConfig.getFunctionDetails().getLogTopic() != null) {
            goInstanceConfig.setLogTopic(instanceConfig.getFunctionDetails().getLogTopic());
        }
        if (instanceConfig.getFunctionDetails().getProcessingGuarantees() != null) {
            goInstanceConfig
                    .setProcessingGuarantees(instanceConfig.getFunctionDetails().getProcessingGuaranteesValue());
        }
        if (instanceConfig.getFunctionDetails().getRuntime() != null) {
            goInstanceConfig.setRuntime(instanceConfig.getFunctionDetails().getRuntimeValue());
        }
        if (instanceConfig.getFunctionDetails().getSecretsMap() != null) {
            goInstanceConfig.setSecretsMap(instanceConfig.getFunctionDetails().getSecretsMap());
        }
        if (instanceConfig.getFunctionDetails().getUserConfig() != null) {
            goInstanceConfig.setUserConfig(instanceConfig.getFunctionDetails().getUserConfig());
        }
        if (instanceConfig.getFunctionDetails().getParallelism() != 0) {
            goInstanceConfig.setParallelism(instanceConfig.getFunctionDetails().getParallelism());
        }

        if (authConfig != null) {
            if (isNotBlank(authConfig.getClientAuthenticationPlugin())
                    && isNotBlank(authConfig.getClientAuthenticationParameters())) {
                goInstanceConfig.setClientAuthenticationPlugin(authConfig.getClientAuthenticationPlugin());
                goInstanceConfig.setClientAuthenticationParameters(authConfig.getClientAuthenticationParameters());
            }
            goInstanceConfig.setTlsAllowInsecureConnection(
                    authConfig.isTlsAllowInsecureConnection());
            goInstanceConfig.setTlsHostnameVerificationEnable(
                    authConfig.isTlsHostnameVerificationEnable());
            if (isNotBlank(authConfig.getTlsTrustCertsFilePath())){
                goInstanceConfig.setTlsTrustCertsFilePath(
                        authConfig.getTlsTrustCertsFilePath());
            }

        }

        if (instanceConfig.getMaxBufferedTuples() != 0) {
            goInstanceConfig.setMaxBufTuples(instanceConfig.getMaxBufferedTuples());
        }

        if (pulsarServiceUrl != null) {
            goInstanceConfig.setPulsarServiceURL(pulsarServiceUrl);
        }
        if (instanceConfig.getFunctionDetails().getSource().getCleanupSubscription()) {
            goInstanceConfig
                    .setCleanupSubscription(instanceConfig.getFunctionDetails().getSource().getCleanupSubscription());
        }
        if (instanceConfig.getFunctionDetails().getSource().getSubscriptionName() != null) {
            goInstanceConfig.setSubscriptionName(instanceConfig.getFunctionDetails().getSource().getSubscriptionName());
        }
        goInstanceConfig.setSubscriptionPosition(
                instanceConfig.getFunctionDetails().getSource().getSubscriptionPosition().getNumber());

        if (instanceConfig.getFunctionDetails().getSource().getInputSpecsMap() != null) {
            Map<String, String> sourceInputSpecs = new HashMap<>();
            for (Map.Entry<String, Function.ConsumerSpec> entry :
                    instanceConfig.getFunctionDetails().getSource().getInputSpecsMap().entrySet()) {
                String topic = entry.getKey();
                Function.ConsumerSpec spec = entry.getValue();
                sourceInputSpecs.put(topic, JsonFormat.printer().omittingInsignificantWhitespace().print(spec));
                goInstanceConfig.setSourceSpecsTopic(topic);
            }
            goInstanceConfig.setSourceInputSpecs(sourceInputSpecs);
        }

        if (instanceConfig.getFunctionDetails().getSource().getTimeoutMs() != 0) {
            goInstanceConfig.setTimeoutMs(instanceConfig.getFunctionDetails().getSource().getTimeoutMs());
        }

        if (instanceConfig.getFunctionDetails().getSink().getTopic() != null) {
            goInstanceConfig.setSinkSpecsTopic(instanceConfig.getFunctionDetails().getSink().getTopic());
        }

        if (instanceConfig.getFunctionDetails().getResources().getCpu() != 0) {
            goInstanceConfig.setCpu(instanceConfig.getFunctionDetails().getResources().getCpu());
        }

        if (instanceConfig.getFunctionDetails().getResources().getRam() != 0) {
            goInstanceConfig.setRam(instanceConfig.getFunctionDetails().getResources().getRam());
        }

        if (instanceConfig.getFunctionDetails().getResources().getDisk() != 0) {
            goInstanceConfig.setDisk(instanceConfig.getFunctionDetails().getResources().getDisk());
        }

        if (instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic() != null) {
            goInstanceConfig
                    .setDeadLetterTopic(instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
        }

        if (instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries() != 0) {
            goInstanceConfig
                    .setMaxMessageRetries(instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
        }

        if (instanceConfig.hasValidMetricsPort()) {
            goInstanceConfig.setMetricsPort(instanceConfig.getMetricsPort());
        }

        goInstanceConfig.setKillAfterIdleMs(0);
        goInstanceConfig.setPort(instanceConfig.getPort());

        // Parse the contents of goInstanceConfig into json form string
        ObjectMapper objectMapper = ObjectMapperFactory.getMapper().getObjectMapper();
        String configContent = objectMapper.writeValueAsString(goInstanceConfig);

        args.add(originalCodeFileName);
        args.add("-instance-conf");
        if (k8sRuntime) {
            args.add("'" + configContent + "'");
        } else {
            args.add(configContent);
        }
        return args;
    }