in pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java [440:692]
void processArguments() throws Exception {
// merge deprecated args with new args
mergeArgs();
// Initialize config builder either from a supplied YAML config file or from scratch
if (null != fnConfigFile) {
functionConfig = CmdUtils.loadConfig(fnConfigFile, FunctionConfig.class);
} else {
functionConfig = new FunctionConfig();
}
if (null != fqfn) {
String[] args = fqfn.split("/");
if (args.length != 3) {
throw new ParameterException("Fully qualified function names (FQFNs) must "
+ "be of the form tenant/namespace/name");
} else {
functionConfig.setTenant(args[0]);
functionConfig.setNamespace(args[1]);
functionConfig.setName(args[2]);
}
} else {
if (null != tenant) {
functionConfig.setTenant(tenant);
}
if (null != namespace) {
functionConfig.setNamespace(namespace);
}
if (null != functionName) {
functionConfig.setName(functionName);
}
}
if (null != cleanupSubscription) {
functionConfig.setCleanupSubscription(cleanupSubscription);
}
if (null != inputs) {
List<String> inputTopics = Arrays.asList(inputs.split(","));
functionConfig.setInputs(inputTopics);
}
if (null != customSerdeInputString) {
Type type = new TypeToken<Map<String, String>>() {}.getType();
Map<String, String> customSerdeInputMap = new Gson().fromJson(customSerdeInputString, type);
functionConfig.setCustomSerdeInputs(customSerdeInputMap);
}
if (null != customSchemaInputString) {
Type type = new TypeToken<Map<String, String>>() {}.getType();
Map<String, String> customschemaInputMap = new Gson().fromJson(customSchemaInputString, type);
functionConfig.setCustomSchemaInputs(customschemaInputMap);
}
if (null != customSchemaOutputString) {
Type type = new TypeToken<Map<String, String>>() {}.getType();
Map<String, String> customSchemaOutputMap = new Gson().fromJson(customSchemaOutputString, type);
functionConfig.setCustomSchemaOutputs(customSchemaOutputMap);
}
if (null != inputSpecs) {
Type type = new TypeToken<Map<String, ConsumerConfig>>() {}.getType();
functionConfig.setInputSpecs(new Gson().fromJson(inputSpecs, type));
}
if (null != inputTypeClassName) {
functionConfig.setInputTypeClassName(inputTypeClassName);
}
if (null != topicsPattern) {
functionConfig.setTopicsPattern(topicsPattern);
}
if (null != output) {
functionConfig.setOutput(output);
}
if (null != outputTypeClassName) {
functionConfig.setOutputTypeClassName(outputTypeClassName);
}
if (null != producerConfig) {
Type type = new TypeToken<ProducerConfig>() {}.getType();
functionConfig.setProducerConfig(new Gson().fromJson(producerConfig, type));
}
if (null != logTopic) {
functionConfig.setLogTopic(logTopic);
}
if (null != className) {
functionConfig.setClassName(className);
}
if (null != outputSerdeClassName) {
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
}
if (null != schemaType) {
functionConfig.setOutputSchemaType(schemaType);
}
if (null != processingGuarantees) {
functionConfig.setProcessingGuarantees(processingGuarantees);
}
if (null != retainOrdering) {
functionConfig.setRetainOrdering(retainOrdering);
}
if (null != retainKeyOrdering) {
functionConfig.setRetainKeyOrdering(retainKeyOrdering);
}
if (isNotBlank(batchBuilder)) {
functionConfig.setBatchBuilder(batchBuilder);
}
if (null != forwardSourceMessageProperty) {
functionConfig.setForwardSourceMessageProperty(forwardSourceMessageProperty);
}
if (isNotBlank(subsName)) {
functionConfig.setSubName(subsName);
}
if (null != subsPosition) {
functionConfig.setSubscriptionPosition(subsPosition);
}
if (null != skipToLatest) {
functionConfig.setSkipToLatest(skipToLatest);
}
if (null != userConfigString) {
Type type = new TypeToken<Map<String, Object>>() {}.getType();
Map<String, Object> userConfigMap = new Gson().fromJson(userConfigString, type);
if (userConfigMap == null) {
userConfigMap = new HashMap<>();
}
functionConfig.setUserConfig(userConfigMap);
}
if (parallelism != null) {
functionConfig.setParallelism(parallelism);
}
Resources resources = functionConfig.getResources();
if (cpu != null) {
if (resources == null) {
resources = new Resources();
}
resources.setCpu(cpu);
}
if (ram != null) {
if (resources == null) {
resources = new Resources();
}
resources.setRam(ram);
}
if (disk != null) {
if (resources == null) {
resources = new Resources();
}
resources.setDisk(disk);
}
if (resources != null) {
functionConfig.setResources(resources);
}
if (timeoutMs != null) {
functionConfig.setTimeoutMs(timeoutMs);
}
if (customRuntimeOptions != null) {
functionConfig.setCustomRuntimeOptions(customRuntimeOptions);
}
if (secretsString != null) {
Type type = new TypeToken<Map<String, Object>>() {}.getType();
Map<String, Object> secretsMap = new Gson().fromJson(secretsString, type);
if (secretsMap == null) {
secretsMap = Collections.emptyMap();
}
functionConfig.setSecrets(secretsMap);
}
// window configs
WindowConfig windowConfig = functionConfig.getWindowConfig();
if (null != windowLengthCount) {
if (windowConfig == null) {
windowConfig = new WindowConfig();
}
windowConfig.setWindowLengthCount(windowLengthCount);
}
if (null != windowLengthDurationMs) {
if (windowConfig == null) {
windowConfig = new WindowConfig();
}
windowConfig.setWindowLengthDurationMs(windowLengthDurationMs);
}
if (null != slidingIntervalCount) {
if (windowConfig == null) {
windowConfig = new WindowConfig();
}
windowConfig.setSlidingIntervalCount(slidingIntervalCount);
}
if (null != slidingIntervalDurationMs) {
if (windowConfig == null) {
windowConfig = new WindowConfig();
}
windowConfig.setSlidingIntervalDurationMs(slidingIntervalDurationMs);
}
functionConfig.setWindowConfig(windowConfig);
if (autoAck != null) {
functionConfig.setAutoAck(autoAck);
}
if (null != maxMessageRetries) {
functionConfig.setMaxMessageRetries(maxMessageRetries);
}
if (null != deadLetterTopic) {
functionConfig.setDeadLetterTopic(deadLetterTopic);
}
if (jarFile != null && functionType != null) {
throw new ParameterException("Cannot specify both jar and function-type");
}
if (null != jarFile) {
functionConfig.setJar(jarFile);
}
if (functionType != null) {
functionConfig.setJar("builtin://" + functionType);
} else if (functionConfig.getFunctionType() != null) {
functionConfig.setJar("builtin://" + functionConfig.getFunctionType());
}
if (null != pyFile) {
functionConfig.setPy(pyFile);
}
if (null != goFile) {
functionConfig.setGo(goFile);
}
if (functionConfig.getJar() != null) {
userCodeFile = functionConfig.getJar();
} else if (functionConfig.getPy() != null) {
userCodeFile = functionConfig.getPy();
} else if (functionConfig.getGo() != null) {
userCodeFile = functionConfig.getGo();
}
if (null != runtimeFlags) {
functionConfig.setRuntimeFlags(runtimeFlags);
}
// check if configs are valid
validateFunctionConfigs(functionConfig);
}