in pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java [973:1116]
public static FunctionConfig validateUpdate(FunctionConfig existingConfig, FunctionConfig newConfig) {
FunctionConfig mergedConfig = existingConfig.toBuilder().build();
if (!existingConfig.getTenant().equals(newConfig.getTenant())) {
throw new IllegalArgumentException("Tenants differ");
}
if (!existingConfig.getNamespace().equals(newConfig.getNamespace())) {
throw new IllegalArgumentException("Namespaces differ");
}
if (!existingConfig.getName().equals(newConfig.getName())) {
throw new IllegalArgumentException("Function Names differ");
}
if (!StringUtils.isEmpty(newConfig.getClassName())) {
mergedConfig.setClassName(newConfig.getClassName());
}
if (!StringUtils.isEmpty(newConfig.getJar())) {
mergedConfig.setJar(newConfig.getJar());
}
if (newConfig.getInputSpecs() == null) {
newConfig.setInputSpecs(new HashMap<>());
}
if (mergedConfig.getInputSpecs() == null) {
mergedConfig.setInputSpecs(new HashMap<>());
}
if (newConfig.getInputs() != null) {
newConfig.getInputs().forEach((topicName -> {
newConfig.getInputSpecs().put(topicName,
ConsumerConfig.builder().isRegexPattern(false).build());
}));
}
if (newConfig.getTopicsPattern() != null && !newConfig.getTopicsPattern().isEmpty()) {
newConfig.getInputSpecs().put(newConfig.getTopicsPattern(),
ConsumerConfig.builder()
.isRegexPattern(true)
.build());
}
if (newConfig.getCustomSerdeInputs() != null) {
newConfig.getCustomSerdeInputs().forEach((topicName, serdeClassName) -> {
newConfig.getInputSpecs().put(topicName,
ConsumerConfig.builder()
.serdeClassName(serdeClassName)
.isRegexPattern(false)
.build());
});
}
if (newConfig.getCustomSchemaInputs() != null) {
newConfig.getCustomSchemaInputs().forEach((topicName, schemaClassname) -> {
newConfig.getInputSpecs().put(topicName,
ConsumerConfig.builder()
.schemaType(schemaClassname)
.isRegexPattern(false)
.build());
});
}
if (!newConfig.getInputSpecs().isEmpty()) {
newConfig.getInputSpecs().forEach((topicName, consumerConfig) -> {
if (!existingConfig.getInputSpecs().containsKey(topicName)) {
throw new IllegalArgumentException("Input Topics cannot be altered");
}
if (consumerConfig.isRegexPattern() != existingConfig.getInputSpecs().get(topicName).isRegexPattern()) {
throw new IllegalArgumentException(
"isRegexPattern for input topic " + topicName + " cannot be altered");
}
mergedConfig.getInputSpecs().put(topicName, consumerConfig);
});
}
if (!StringUtils.isEmpty(newConfig.getOutputSerdeClassName()) && !newConfig.getOutputSerdeClassName()
.equals(existingConfig.getOutputSerdeClassName())) {
throw new IllegalArgumentException("Output Serde mismatch");
}
if (!StringUtils.isEmpty(newConfig.getOutputSchemaType()) && !newConfig.getOutputSchemaType()
.equals(existingConfig.getOutputSchemaType())) {
throw new IllegalArgumentException("Output Schema mismatch");
}
if (!StringUtils.isEmpty(newConfig.getLogTopic())) {
mergedConfig.setLogTopic(newConfig.getLogTopic());
}
if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees()
.equals(existingConfig.getProcessingGuarantees())) {
throw new IllegalArgumentException("Processing Guarantees cannot be altered");
}
if (newConfig.getRetainOrdering() != null && !newConfig.getRetainOrdering()
.equals(existingConfig.getRetainOrdering())) {
throw new IllegalArgumentException("Retain Ordering cannot be altered");
}
if (newConfig.getRetainKeyOrdering() != null && !newConfig.getRetainKeyOrdering()
.equals(existingConfig.getRetainKeyOrdering())) {
throw new IllegalArgumentException("Retain Key Ordering cannot be altered");
}
if (!StringUtils.isEmpty(newConfig.getOutput())) {
mergedConfig.setOutput(newConfig.getOutput());
}
if (newConfig.getUserConfig() != null) {
mergedConfig.setUserConfig(newConfig.getUserConfig());
}
if (newConfig.getSecrets() != null) {
mergedConfig.setSecrets(newConfig.getSecrets());
}
if (newConfig.getRuntime() != null && !newConfig.getRuntime().equals(existingConfig.getRuntime())) {
throw new IllegalArgumentException("Runtime cannot be altered");
}
if (newConfig.getAutoAck() != null && !newConfig.getAutoAck().equals(existingConfig.getAutoAck())) {
throw new IllegalArgumentException("AutoAck cannot be altered");
}
if (newConfig.getMaxMessageRetries() != null) {
mergedConfig.setMaxMessageRetries(newConfig.getMaxMessageRetries());
}
if (!StringUtils.isEmpty(newConfig.getDeadLetterTopic())) {
mergedConfig.setDeadLetterTopic(newConfig.getDeadLetterTopic());
}
if (!StringUtils.isEmpty(newConfig.getSubName()) && !newConfig.getSubName()
.equals(existingConfig.getSubName())) {
throw new IllegalArgumentException("Subscription Name cannot be altered");
}
if (newConfig.getParallelism() != null) {
mergedConfig.setParallelism(newConfig.getParallelism());
}
if (newConfig.getResources() != null) {
mergedConfig
.setResources(ResourceConfigUtils.merge(existingConfig.getResources(), newConfig.getResources()));
}
if (newConfig.getWindowConfig() != null) {
mergedConfig.setWindowConfig(newConfig.getWindowConfig());
}
if (newConfig.getTimeoutMs() != null) {
mergedConfig.setTimeoutMs(newConfig.getTimeoutMs());
}
if (newConfig.getCleanupSubscription() != null) {
mergedConfig.setCleanupSubscription(newConfig.getCleanupSubscription());
}
if (!StringUtils.isEmpty(newConfig.getRuntimeFlags())) {
mergedConfig.setRuntimeFlags(newConfig.getRuntimeFlags());
}
if (!StringUtils.isEmpty(newConfig.getCustomRuntimeOptions())) {
mergedConfig.setCustomRuntimeOptions(newConfig.getCustomRuntimeOptions());
}
if (newConfig.getProducerConfig() != null) {
mergedConfig.setProducerConfig(newConfig.getProducerConfig());
}
return mergedConfig;
}