void processArguments()

in pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java [447:620]


        void processArguments() throws Exception {
            super.processArguments();
            // merge deprecated args with new args
            mergeArgs();

            if (null != sinkConfigFile) {
                this.sinkConfig = CmdUtils.loadConfig(sinkConfigFile, SinkConfig.class);
            } else {
                this.sinkConfig = new SinkConfig();
            }

            if (null != tenant) {
                sinkConfig.setTenant(tenant);
            }

            if (null != namespace) {
                sinkConfig.setNamespace(namespace);
            }

            if (null != className) {
                sinkConfig.setClassName(className);
            }

            if (null != name) {
                sinkConfig.setName(name);
            }
            if (null != processingGuarantees) {
                sinkConfig.setProcessingGuarantees(processingGuarantees);
            }

            if (null != cleanupSubscription) {
                sinkConfig.setCleanupSubscription(cleanupSubscription);
            }

            if (retainOrdering != null) {
                sinkConfig.setRetainOrdering(retainOrdering);
            }

            if (retainKeyOrdering != null) {
                sinkConfig.setRetainKeyOrdering(retainKeyOrdering);
            }

            if (null != inputs) {
                sinkConfig.setInputs(Arrays.asList(inputs.split(",")));
            }
            if (null != customSerdeInputString) {
                Type type = new TypeToken<Map<String, String>>(){}.getType();
                Map<String, String> customSerdeInputMap = new Gson().fromJson(customSerdeInputString, type);
                sinkConfig.setTopicToSerdeClassName(customSerdeInputMap);
            }

            if (null != customSchemaInputString) {
                Type type = new TypeToken<Map<String, String>>(){}.getType();
                Map<String, String> customSchemaInputMap = new Gson().fromJson(customSchemaInputString, type);
                sinkConfig.setTopicToSchemaType(customSchemaInputMap);
            }

            if (null != inputSpecs) {
                Type type = new TypeToken<Map<String, ConsumerConfig>>(){}.getType();
                sinkConfig.setInputSpecs(new Gson().fromJson(inputSpecs, type));
            }

            sinkConfig.setMaxMessageRetries(maxMessageRetries);
            if (null != deadLetterTopic) {
                sinkConfig.setDeadLetterTopic(deadLetterTopic);
            }

            if (isNotBlank(subsName)) {
                sinkConfig.setSourceSubscriptionName(subsName);
            }

            if (null != subsPosition) {
                sinkConfig.setSourceSubscriptionPosition(subsPosition);
            }

            if (null != topicsPattern) {
                sinkConfig.setTopicsPattern(topicsPattern);
            }

            if (parallelism != null) {
                sinkConfig.setParallelism(parallelism);
            }

            if (archive != null && (sinkType != null || sinkConfig.getSinkType() != null)) {
                throw new ParameterException("Cannot specify both archive and sink-type");
            }

            if (null != archive) {
                sinkConfig.setArchive(archive);
            }

            if (sinkType != null) {
                sinkConfig.setArchive(validateSinkType(sinkType));
            } else if (sinkConfig.getSinkType() != null) {
                sinkConfig.setArchive(validateSinkType(sinkConfig.getSinkType()));
            }

            Resources resources = sinkConfig.getResources();
            if (cpu != null) {
                if (resources == null) {
                    resources = new Resources();
                }
                resources.setCpu(cpu);
            }

            if (ram != null) {
                if (resources == null) {
                    resources = new Resources();
                }
                resources.setRam(ram);
            }

            if (disk != null) {
                if (resources == null) {
                    resources = new Resources();
                }
                resources.setDisk(disk);
            }
            if (resources != null) {
                sinkConfig.setResources(resources);
            }

            try {
                if (null != sinkConfigString) {
                    sinkConfig.setConfigs(parseConfigs(sinkConfigString));
                }
            } catch (Exception ex) {
                throw new IllegalArgumentException("Cannot parse sink-config", ex);
            }

            if (autoAck != null) {
                sinkConfig.setAutoAck(autoAck);
            }
            if (timeoutMs != null) {
                sinkConfig.setTimeoutMs(timeoutMs);
            }
            if (negativeAckRedeliveryDelayMs != null && negativeAckRedeliveryDelayMs > 0) {
                sinkConfig.setNegativeAckRedeliveryDelayMs(negativeAckRedeliveryDelayMs);
            }

            if (customRuntimeOptions != null) {
                sinkConfig.setCustomRuntimeOptions(customRuntimeOptions);
            }

            if (secretsString != null) {
                Type type = new TypeToken<Map<String, Object>>() {}.getType();
                Map<String, Object> secretsMap = new Gson().fromJson(secretsString, type);
                if (secretsMap == null) {
                    secretsMap = Collections.emptyMap();
                }
                sinkConfig.setSecrets(secretsMap);
            }

            if (transformFunction != null) {
                sinkConfig.setTransformFunction(transformFunction);
            }

            if (transformFunctionClassName != null) {
                sinkConfig.setTransformFunctionClassName(transformFunctionClassName);
            }

            if (transformFunctionConfig != null) {
                sinkConfig.setTransformFunctionConfig(transformFunctionConfig);
            }
            if (null != logTopic) {
                sinkConfig.setLogTopic(logTopic);
            }
            if (null != runtimeFlags) {
                sinkConfig.setRuntimeFlags(runtimeFlags);
            }

            // check if configs are valid
            validateSinkConfigs(sinkConfig);
        }