in pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java [447:620]
void processArguments() throws Exception {
super.processArguments();
// merge deprecated args with new args
mergeArgs();
if (null != sinkConfigFile) {
this.sinkConfig = CmdUtils.loadConfig(sinkConfigFile, SinkConfig.class);
} else {
this.sinkConfig = new SinkConfig();
}
if (null != tenant) {
sinkConfig.setTenant(tenant);
}
if (null != namespace) {
sinkConfig.setNamespace(namespace);
}
if (null != className) {
sinkConfig.setClassName(className);
}
if (null != name) {
sinkConfig.setName(name);
}
if (null != processingGuarantees) {
sinkConfig.setProcessingGuarantees(processingGuarantees);
}
if (null != cleanupSubscription) {
sinkConfig.setCleanupSubscription(cleanupSubscription);
}
if (retainOrdering != null) {
sinkConfig.setRetainOrdering(retainOrdering);
}
if (retainKeyOrdering != null) {
sinkConfig.setRetainKeyOrdering(retainKeyOrdering);
}
if (null != inputs) {
sinkConfig.setInputs(Arrays.asList(inputs.split(",")));
}
if (null != customSerdeInputString) {
Type type = new TypeToken<Map<String, String>>(){}.getType();
Map<String, String> customSerdeInputMap = new Gson().fromJson(customSerdeInputString, type);
sinkConfig.setTopicToSerdeClassName(customSerdeInputMap);
}
if (null != customSchemaInputString) {
Type type = new TypeToken<Map<String, String>>(){}.getType();
Map<String, String> customSchemaInputMap = new Gson().fromJson(customSchemaInputString, type);
sinkConfig.setTopicToSchemaType(customSchemaInputMap);
}
if (null != inputSpecs) {
Type type = new TypeToken<Map<String, ConsumerConfig>>(){}.getType();
sinkConfig.setInputSpecs(new Gson().fromJson(inputSpecs, type));
}
sinkConfig.setMaxMessageRetries(maxMessageRetries);
if (null != deadLetterTopic) {
sinkConfig.setDeadLetterTopic(deadLetterTopic);
}
if (isNotBlank(subsName)) {
sinkConfig.setSourceSubscriptionName(subsName);
}
if (null != subsPosition) {
sinkConfig.setSourceSubscriptionPosition(subsPosition);
}
if (null != topicsPattern) {
sinkConfig.setTopicsPattern(topicsPattern);
}
if (parallelism != null) {
sinkConfig.setParallelism(parallelism);
}
if (archive != null && (sinkType != null || sinkConfig.getSinkType() != null)) {
throw new ParameterException("Cannot specify both archive and sink-type");
}
if (null != archive) {
sinkConfig.setArchive(archive);
}
if (sinkType != null) {
sinkConfig.setArchive(validateSinkType(sinkType));
} else if (sinkConfig.getSinkType() != null) {
sinkConfig.setArchive(validateSinkType(sinkConfig.getSinkType()));
}
Resources resources = sinkConfig.getResources();
if (cpu != null) {
if (resources == null) {
resources = new Resources();
}
resources.setCpu(cpu);
}
if (ram != null) {
if (resources == null) {
resources = new Resources();
}
resources.setRam(ram);
}
if (disk != null) {
if (resources == null) {
resources = new Resources();
}
resources.setDisk(disk);
}
if (resources != null) {
sinkConfig.setResources(resources);
}
try {
if (null != sinkConfigString) {
sinkConfig.setConfigs(parseConfigs(sinkConfigString));
}
} catch (Exception ex) {
throw new IllegalArgumentException("Cannot parse sink-config", ex);
}
if (autoAck != null) {
sinkConfig.setAutoAck(autoAck);
}
if (timeoutMs != null) {
sinkConfig.setTimeoutMs(timeoutMs);
}
if (negativeAckRedeliveryDelayMs != null && negativeAckRedeliveryDelayMs > 0) {
sinkConfig.setNegativeAckRedeliveryDelayMs(negativeAckRedeliveryDelayMs);
}
if (customRuntimeOptions != null) {
sinkConfig.setCustomRuntimeOptions(customRuntimeOptions);
}
if (secretsString != null) {
Type type = new TypeToken<Map<String, Object>>() {}.getType();
Map<String, Object> secretsMap = new Gson().fromJson(secretsString, type);
if (secretsMap == null) {
secretsMap = Collections.emptyMap();
}
sinkConfig.setSecrets(secretsMap);
}
if (transformFunction != null) {
sinkConfig.setTransformFunction(transformFunction);
}
if (transformFunctionClassName != null) {
sinkConfig.setTransformFunctionClassName(transformFunctionClassName);
}
if (transformFunctionConfig != null) {
sinkConfig.setTransformFunctionConfig(transformFunctionConfig);
}
if (null != logTopic) {
sinkConfig.setLogTopic(logTopic);
}
if (null != runtimeFlags) {
sinkConfig.setRuntimeFlags(runtimeFlags);
}
// check if configs are valid
validateSinkConfigs(sinkConfig);
}