in pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java [132:303]
public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
AuthenticationConfig authConfig,
String originalCodeFileName,
String pulsarServiceUrl,
String stateStorageServiceUrl,
String pulsarWebServiceUrl,
boolean k8sRuntime) throws IOException {
final List<String> args = new LinkedList<>();
GoInstanceConfig goInstanceConfig = new GoInstanceConfig();
// pass the raw functino details directly so that we don't need to assemble the `instanceConf.funcDetails`
// manually in Go instance
String functionDetails =
JsonFormat.printer().omittingInsignificantWhitespace().print(instanceConfig.getFunctionDetails());
goInstanceConfig.setFunctionDetails(functionDetails);
if (instanceConfig.getClusterName() != null) {
goInstanceConfig.setClusterName(instanceConfig.getClusterName());
}
if (null != stateStorageServiceUrl) {
goInstanceConfig.setStateStorageServiceUrl(stateStorageServiceUrl);
}
if (instanceConfig.isExposePulsarAdminClientEnabled() && StringUtils.isNotBlank(pulsarWebServiceUrl)) {
goInstanceConfig.setPulsarWebServiceUrl(pulsarWebServiceUrl);
}
if (instanceConfig.getInstanceId() != 0) {
goInstanceConfig.setInstanceID(instanceConfig.getInstanceId());
}
if (instanceConfig.getFunctionId() != null) {
goInstanceConfig.setFuncID(instanceConfig.getFunctionId());
}
if (instanceConfig.getFunctionVersion() != null) {
goInstanceConfig.setFuncVersion(instanceConfig.getFunctionVersion());
}
if (instanceConfig.getFunctionDetails().getAutoAck()) {
goInstanceConfig.setAutoAck(instanceConfig.getFunctionDetails().getAutoAck());
}
if (instanceConfig.getFunctionDetails().getTenant() != null) {
goInstanceConfig.setTenant(instanceConfig.getFunctionDetails().getTenant());
}
if (instanceConfig.getFunctionDetails().getNamespace() != null) {
goInstanceConfig.setNameSpace(instanceConfig.getFunctionDetails().getNamespace());
}
if (instanceConfig.getFunctionDetails().getName() != null) {
goInstanceConfig.setName(instanceConfig.getFunctionDetails().getName());
}
if (instanceConfig.getFunctionDetails().getLogTopic() != null) {
goInstanceConfig.setLogTopic(instanceConfig.getFunctionDetails().getLogTopic());
}
if (instanceConfig.getFunctionDetails().getProcessingGuarantees() != null) {
goInstanceConfig
.setProcessingGuarantees(instanceConfig.getFunctionDetails().getProcessingGuaranteesValue());
}
if (instanceConfig.getFunctionDetails().getRuntime() != null) {
goInstanceConfig.setRuntime(instanceConfig.getFunctionDetails().getRuntimeValue());
}
if (instanceConfig.getFunctionDetails().getSecretsMap() != null) {
goInstanceConfig.setSecretsMap(instanceConfig.getFunctionDetails().getSecretsMap());
}
if (instanceConfig.getFunctionDetails().getUserConfig() != null) {
goInstanceConfig.setUserConfig(instanceConfig.getFunctionDetails().getUserConfig());
}
if (instanceConfig.getFunctionDetails().getParallelism() != 0) {
goInstanceConfig.setParallelism(instanceConfig.getFunctionDetails().getParallelism());
}
if (authConfig != null) {
if (isNotBlank(authConfig.getClientAuthenticationPlugin())
&& isNotBlank(authConfig.getClientAuthenticationParameters())) {
goInstanceConfig.setClientAuthenticationPlugin(authConfig.getClientAuthenticationPlugin());
goInstanceConfig.setClientAuthenticationParameters(authConfig.getClientAuthenticationParameters());
}
goInstanceConfig.setTlsAllowInsecureConnection(
authConfig.isTlsAllowInsecureConnection());
goInstanceConfig.setTlsHostnameVerificationEnable(
authConfig.isTlsHostnameVerificationEnable());
if (isNotBlank(authConfig.getTlsTrustCertsFilePath())){
goInstanceConfig.setTlsTrustCertsFilePath(
authConfig.getTlsTrustCertsFilePath());
}
}
if (instanceConfig.getMaxBufferedTuples() != 0) {
goInstanceConfig.setMaxBufTuples(instanceConfig.getMaxBufferedTuples());
}
if (pulsarServiceUrl != null) {
goInstanceConfig.setPulsarServiceURL(pulsarServiceUrl);
}
if (instanceConfig.getFunctionDetails().getSource().getCleanupSubscription()) {
goInstanceConfig
.setCleanupSubscription(instanceConfig.getFunctionDetails().getSource().getCleanupSubscription());
}
if (instanceConfig.getFunctionDetails().getSource().getSubscriptionName() != null) {
goInstanceConfig.setSubscriptionName(instanceConfig.getFunctionDetails().getSource().getSubscriptionName());
}
goInstanceConfig.setSubscriptionPosition(
instanceConfig.getFunctionDetails().getSource().getSubscriptionPosition().getNumber());
if (instanceConfig.getFunctionDetails().getSource().getInputSpecsMap() != null) {
Map<String, String> sourceInputSpecs = new HashMap<>();
for (Map.Entry<String, Function.ConsumerSpec> entry :
instanceConfig.getFunctionDetails().getSource().getInputSpecsMap().entrySet()) {
String topic = entry.getKey();
Function.ConsumerSpec spec = entry.getValue();
sourceInputSpecs.put(topic, JsonFormat.printer().omittingInsignificantWhitespace().print(spec));
goInstanceConfig.setSourceSpecsTopic(topic);
}
goInstanceConfig.setSourceInputSpecs(sourceInputSpecs);
}
if (instanceConfig.getFunctionDetails().getSource().getTimeoutMs() != 0) {
goInstanceConfig.setTimeoutMs(instanceConfig.getFunctionDetails().getSource().getTimeoutMs());
}
if (instanceConfig.getFunctionDetails().getSink().getTopic() != null) {
goInstanceConfig.setSinkSpecsTopic(instanceConfig.getFunctionDetails().getSink().getTopic());
}
if (instanceConfig.getFunctionDetails().getResources().getCpu() != 0) {
goInstanceConfig.setCpu(instanceConfig.getFunctionDetails().getResources().getCpu());
}
if (instanceConfig.getFunctionDetails().getResources().getRam() != 0) {
goInstanceConfig.setRam(instanceConfig.getFunctionDetails().getResources().getRam());
}
if (instanceConfig.getFunctionDetails().getResources().getDisk() != 0) {
goInstanceConfig.setDisk(instanceConfig.getFunctionDetails().getResources().getDisk());
}
if (instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic() != null) {
goInstanceConfig
.setDeadLetterTopic(instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
}
if (instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries() != 0) {
goInstanceConfig
.setMaxMessageRetries(instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
}
if (instanceConfig.hasValidMetricsPort()) {
goInstanceConfig.setMetricsPort(instanceConfig.getMetricsPort());
}
goInstanceConfig.setKillAfterIdleMs(0);
goInstanceConfig.setPort(instanceConfig.getPort());
// Parse the contents of goInstanceConfig into json form string
ObjectMapper objectMapper = ObjectMapperFactory.getMapper().getObjectMapper();
String configContent = objectMapper.writeValueAsString(goInstanceConfig);
args.add(originalCodeFileName);
args.add("-instance-conf");
if (k8sRuntime) {
args.add("'" + configContent + "'");
} else {
args.add(configContent);
}
return args;
}