in pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java [305:533]
public static List<String> getCmd(InstanceConfig instanceConfig,
String instanceFile,
String extraDependenciesDir, /* extra dependencies for running instances */
String logDirectory,
String originalCodeFileName,
String originalTransformFunctionFileName,
String pulsarServiceUrl,
String stateStorageServiceUrl,
AuthenticationConfig authConfig,
String shardId,
Integer grpcPort,
Long expectedHealthCheckInterval,
String logConfigFile,
String secretsProviderClassName,
String secretsProviderConfig,
Boolean installUserCodeDependencies,
String pythonDependencyRepository,
String pythonExtraDependencyRepository,
String narExtractionDirectory,
String functionInstanceClassPath,
boolean k8sRuntime,
String pulsarWebServiceUrl) throws Exception {
final List<String> args = new LinkedList<>();
if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.GO) {
return getGoInstanceCmd(instanceConfig, authConfig, originalCodeFileName,
pulsarServiceUrl, stateStorageServiceUrl, pulsarWebServiceUrl,
k8sRuntime);
}
if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
args.add("java");
args.add("-cp");
String classpath = instanceFile;
if (StringUtils.isNotEmpty(extraDependenciesDir)) {
classpath = classpath + ":" + extraDependenciesDir + "/*";
}
args.add(classpath);
if (StringUtils.isNotEmpty(extraDependenciesDir)) {
args.add(String.format("-D%s=%s", FUNCTIONS_EXTRA_DEPS_PROPERTY, extraDependenciesDir));
}
if (StringUtils.isNotEmpty(functionInstanceClassPath)) {
args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, functionInstanceClassPath));
} else {
// add complete classpath for broker/worker so that the function instance can load
// the functions instance dependencies separately from user code dependencies
String systemFunctionInstanceClasspath = System.getProperty(FUNCTIONS_INSTANCE_CLASSPATH);
if (systemFunctionInstanceClasspath == null) {
log.warn("Property {} is not set. Falling back to using classpath of current JVM",
FUNCTIONS_INSTANCE_CLASSPATH);
systemFunctionInstanceClasspath = System.getProperty("java.class.path");
}
args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, systemFunctionInstanceClasspath));
}
args.add("-Dlog4j.configurationFile=" + logConfigFile);
args.add("-Dpulsar.function.log.dir=" + genFunctionLogFolder(logDirectory, instanceConfig));
args.add("-Dpulsar.function.log.file=" + String.format(
"%s-%s",
instanceConfig.getFunctionDetails().getName(),
shardId));
// Needed for optimized Netty direct byte buffer support
args.add("-Dio.netty.tryReflectionSetAccessible=true");
// Handle possible shaded Netty versions
args.add("-Dorg.apache.pulsar.shade.io.netty.tryReflectionSetAccessible=true");
args.add("-Dio.grpc.netty.shaded.io.netty.tryReflectionSetAccessible=true");
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_11)) {
// Needed for optimized Netty direct byte buffer support
args.add("--add-opens");
args.add("java.base/java.nio=ALL-UNNAMED");
args.add("--add-opens");
args.add("java.base/jdk.internal.misc=ALL-UNNAMED");
}
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
// Needed for optimized checksum calculation when com.scurrilous.circe.checksum.Java9IntHash
// is used. That gets used when the native library libcirce-checksum is not available or cannot
// be loaded.
args.add("--add-opens");
args.add("java.base/java.util.zip=ALL-UNNAMED");
}
if (instanceConfig.getAdditionalJavaRuntimeArguments() != null) {
args.addAll(instanceConfig.getAdditionalJavaRuntimeArguments());
}
if (!isEmpty(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
Collections.addAll(args, splitRuntimeArgs(instanceConfig.getFunctionDetails().getRuntimeFlags()));
}
if (instanceConfig.getFunctionDetails().getResources() != null) {
Function.Resources resources = instanceConfig.getFunctionDetails().getResources();
if (resources.getRam() != 0) {
args.add("-Xmx" + String.valueOf(resources.getRam()));
}
}
args.add("org.apache.pulsar.functions.instance.JavaInstanceMain");
args.add("--jar");
args.add(originalCodeFileName);
if (isNotEmpty(originalTransformFunctionFileName)) {
args.add("--transform_function_jar");
args.add(originalTransformFunctionFileName);
args.add("--transform_function_id");
args.add(instanceConfig.getTransformFunctionId());
}
} else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON) {
args.add("python3");
if (!isEmpty(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
Collections.addAll(args, splitRuntimeArgs(instanceConfig.getFunctionDetails().getRuntimeFlags()));
}
args.add(instanceFile);
args.add("--py");
args.add(originalCodeFileName);
args.add("--logging_directory");
args.add(logDirectory);
args.add("--logging_file");
args.add(instanceConfig.getFunctionDetails().getName());
// set logging config file
args.add("--logging_config_file");
args.add(logConfigFile);
// `installUserCodeDependencies` is only valid for python runtime
if (installUserCodeDependencies != null && installUserCodeDependencies) {
args.add("--install_usercode_dependencies");
args.add("True");
}
if (!isEmpty(pythonDependencyRepository)) {
args.add("--dependency_repository");
args.add(pythonDependencyRepository);
}
if (!isEmpty(pythonExtraDependencyRepository)) {
args.add("--extra_dependency_repository");
args.add(pythonExtraDependencyRepository);
}
// TODO:- Find a platform independent way of controlling memory for a python application
}
args.add("--instance_id");
args.add(shardId);
args.add("--function_id");
args.add(instanceConfig.getFunctionId());
args.add("--function_version");
args.add(instanceConfig.getFunctionVersion());
args.add("--function_details");
args.add("'" + JsonFormat.printer().omittingInsignificantWhitespace()
.print(instanceConfig.getFunctionDetails()) + "'");
args.add("--pulsar_serviceurl");
args.add(pulsarServiceUrl);
if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
// TODO: for now only Java function context exposed pulsar admin, so python/go no need to pass this argument
// until pulsar admin client enabled in python/go function context.
// For backward compatibility, pass `--web_serviceurl` parameter only if
// exposed pulsar admin client enabled.
if (instanceConfig.isExposePulsarAdminClientEnabled() && StringUtils.isNotBlank(pulsarWebServiceUrl)) {
args.add("--web_serviceurl");
args.add(pulsarWebServiceUrl);
args.add("--expose_pulsaradmin");
}
}
if (authConfig != null) {
if (isNotBlank(authConfig.getClientAuthenticationPlugin())
&& isNotBlank(authConfig.getClientAuthenticationParameters())) {
args.add("--client_auth_plugin");
args.add(authConfig.getClientAuthenticationPlugin());
args.add("--client_auth_params");
args.add(authConfig.getClientAuthenticationParameters());
}
args.add("--use_tls");
args.add(Boolean.toString(authConfig.isUseTls()));
args.add("--tls_allow_insecure");
args.add(Boolean.toString(authConfig.isTlsAllowInsecureConnection()));
args.add("--hostname_verification_enabled");
args.add(Boolean.toString(authConfig.isTlsHostnameVerificationEnable()));
if (isNotBlank(authConfig.getTlsTrustCertsFilePath())) {
args.add("--tls_trust_cert_path");
args.add(authConfig.getTlsTrustCertsFilePath());
}
}
args.add("--max_buffered_tuples");
args.add(String.valueOf(instanceConfig.getMaxBufferedTuples()));
args.add("--port");
args.add(String.valueOf(grpcPort));
args.add("--metrics_port");
args.add(String.valueOf(instanceConfig.getMetricsPort()));
// params supported only by the Java instance runtime.
if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
args.add("--pending_async_requests");
args.add(String.valueOf(instanceConfig.getMaxPendingAsyncRequests()));
if (instanceConfig.isIgnoreUnknownConfigFields()) {
args.add("--ignore_unknown_config_fields");
}
}
// state storage configs
if (null != stateStorageServiceUrl) {
args.add("--state_storage_serviceurl");
args.add(stateStorageServiceUrl);
}
args.add("--expected_healthcheck_interval");
args.add(String.valueOf(expectedHealthCheckInterval));
if (!StringUtils.isEmpty(secretsProviderClassName)) {
args.add("--secrets_provider");
args.add(secretsProviderClassName);
if (!StringUtils.isEmpty(secretsProviderConfig)) {
args.add("--secrets_provider_config");
args.add("'" + secretsProviderConfig + "'");
}
}
args.add("--cluster_name");
args.add(instanceConfig.getClusterName());
if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
if (!StringUtils.isEmpty(narExtractionDirectory)) {
args.add("--nar_extraction_directory");
args.add(narExtractionDirectory);
}
}
return args;
}