in pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java [613:743]
public static SinkConfig validateUpdate(SinkConfig existingConfig, SinkConfig newConfig) {
SinkConfig mergedConfig = clone(existingConfig);
if (!existingConfig.getTenant().equals(newConfig.getTenant())) {
throw new IllegalArgumentException("Tenants differ");
}
if (!existingConfig.getNamespace().equals(newConfig.getNamespace())) {
throw new IllegalArgumentException("Namespaces differ");
}
if (!existingConfig.getName().equals(newConfig.getName())) {
throw new IllegalArgumentException("Sink Names differ");
}
if (!StringUtils.isEmpty(newConfig.getClassName())) {
mergedConfig.setClassName(newConfig.getClassName());
}
if (!StringUtils.isEmpty(newConfig.getSourceSubscriptionName()) && !newConfig.getSourceSubscriptionName()
.equals(existingConfig.getSourceSubscriptionName())) {
throw new IllegalArgumentException("Subscription Name cannot be altered");
}
if (newConfig.getInputSpecs() == null) {
newConfig.setInputSpecs(new HashMap<>());
}
if (mergedConfig.getInputSpecs() == null) {
mergedConfig.setInputSpecs(new HashMap<>());
}
if (!StringUtils.isEmpty(newConfig.getLogTopic())) {
mergedConfig.setLogTopic(newConfig.getLogTopic());
}
if (newConfig.getInputs() != null) {
newConfig.getInputs().forEach((topicName -> {
newConfig.getInputSpecs().putIfAbsent(topicName,
ConsumerConfig.builder().isRegexPattern(false).build());
}));
}
if (newConfig.getTopicsPattern() != null && !newConfig.getTopicsPattern().isEmpty()) {
newConfig.getInputSpecs().put(newConfig.getTopicsPattern(),
ConsumerConfig.builder()
.isRegexPattern(true)
.build());
}
if (newConfig.getTopicToSerdeClassName() != null) {
newConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> {
newConfig.getInputSpecs().put(topicName,
ConsumerConfig.builder()
.serdeClassName(serdeClassName)
.isRegexPattern(false)
.build());
});
}
if (newConfig.getTopicToSchemaType() != null) {
newConfig.getTopicToSchemaType().forEach((topicName, schemaClassname) -> {
newConfig.getInputSpecs().put(topicName,
ConsumerConfig.builder()
.schemaType(schemaClassname)
.isRegexPattern(false)
.build());
});
}
if (!newConfig.getInputSpecs().isEmpty()) {
SinkConfig finalMergedConfig = mergedConfig;
newConfig.getInputSpecs().forEach((topicName, consumerConfig) -> {
if (!existingConfig.getInputSpecs().containsKey(topicName)) {
throw new IllegalArgumentException("Input Topics cannot be altered");
}
if (consumerConfig.isRegexPattern() != existingConfig.getInputSpecs().get(topicName).isRegexPattern()) {
throw new IllegalArgumentException(
"isRegexPattern for input topic " + topicName + " cannot be altered");
}
finalMergedConfig.getInputSpecs().put(topicName, consumerConfig);
});
}
if (newConfig.getProcessingGuarantees() != null && !newConfig.getProcessingGuarantees()
.equals(existingConfig.getProcessingGuarantees())) {
throw new IllegalArgumentException("Processing Guarantees cannot be altered");
}
if (newConfig.getConfigs() != null) {
mergedConfig.setConfigs(newConfig.getConfigs());
}
if (newConfig.getSecrets() != null) {
mergedConfig.setSecrets(newConfig.getSecrets());
}
if (newConfig.getParallelism() != null) {
mergedConfig.setParallelism(newConfig.getParallelism());
}
if (newConfig.getRetainOrdering() != null && !newConfig.getRetainOrdering()
.equals(existingConfig.getRetainOrdering())) {
throw new IllegalArgumentException("Retain Ordering cannot be altered");
}
if (newConfig.getRetainKeyOrdering() != null && !newConfig.getRetainKeyOrdering()
.equals(existingConfig.getRetainKeyOrdering())) {
throw new IllegalArgumentException("Retain Key Ordering cannot be altered");
}
if (newConfig.getAutoAck() != null && !newConfig.getAutoAck().equals(existingConfig.getAutoAck())) {
throw new IllegalArgumentException("AutoAck cannot be altered");
}
if (newConfig.getResources() != null) {
mergedConfig
.setResources(ResourceConfigUtils.merge(existingConfig.getResources(), newConfig.getResources()));
}
if (newConfig.getTimeoutMs() != null) {
mergedConfig.setTimeoutMs(newConfig.getTimeoutMs());
}
if (newConfig.getCleanupSubscription() != null) {
mergedConfig.setCleanupSubscription(newConfig.getCleanupSubscription());
}
if (!StringUtils.isEmpty(newConfig.getArchive())) {
mergedConfig.setArchive(newConfig.getArchive());
}
if (!StringUtils.isEmpty(newConfig.getRuntimeFlags())) {
mergedConfig.setRuntimeFlags(newConfig.getRuntimeFlags());
}
if (!StringUtils.isEmpty(newConfig.getCustomRuntimeOptions())) {
mergedConfig.setCustomRuntimeOptions(newConfig.getCustomRuntimeOptions());
}
if (newConfig.getTransformFunction() != null) {
mergedConfig.setTransformFunction(newConfig.getTransformFunction());
}
if (newConfig.getTransformFunctionClassName() != null) {
mergedConfig.setTransformFunctionClassName(newConfig.getTransformFunctionClassName());
}
if (newConfig.getTransformFunctionConfig() != null) {
mergedConfig.setTransformFunctionConfig(newConfig.getTransformFunctionConfig());
}
if (newConfig.getSourceSubscriptionPosition() != null) {
mergedConfig.setSourceSubscriptionPosition(newConfig.getSourceSubscriptionPosition());
}
return mergedConfig;
}