in pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java [418:585]
public static ExtractedSinkDetails validateAndExtractDetails(SinkConfig sinkConfig,
ValidatableFunctionPackage sinkFunction,
ValidatableFunctionPackage transformFunction,
boolean validateConnectorConfig) {
if (isEmpty(sinkConfig.getTenant())) {
throw new IllegalArgumentException("Sink tenant cannot be null");
}
if (isEmpty(sinkConfig.getNamespace())) {
throw new IllegalArgumentException("Sink namespace cannot be null");
}
if (isEmpty(sinkConfig.getName())) {
throw new IllegalArgumentException("Sink name cannot be null");
}
// make we sure we have one source of input
Collection<String> allInputs = collectAllInputTopics(sinkConfig);
if (allInputs.isEmpty()) {
throw new IllegalArgumentException("Must specify at least one topic of input via topicToSerdeClassName, "
+ "topicsPattern, topicToSchemaType or inputSpecs");
}
for (String topic : allInputs) {
if (!TopicName.isValid(topic)) {
throw new IllegalArgumentException(String.format("Input topic %s is invalid", topic));
}
}
if (!isEmpty(sinkConfig.getLogTopic())) {
if (!TopicName.isValid(sinkConfig.getLogTopic())) {
throw new IllegalArgumentException(
String.format("LogTopic topic %s is invalid", sinkConfig.getLogTopic()));
}
}
if (sinkConfig.getParallelism() != null && sinkConfig.getParallelism() <= 0) {
throw new IllegalArgumentException("Sink parallelism must be a positive number");
}
if (sinkConfig.getResources() != null) {
ResourceConfigUtils.validate(sinkConfig.getResources());
}
if (sinkConfig.getTimeoutMs() != null && sinkConfig.getTimeoutMs() < 0) {
throw new IllegalArgumentException("Sink timeout must be a positive number");
}
String sinkClassName = sinkConfig.getClassName();
// if class name in sink config is not set, this should be a built-in sink
// thus we should try to find it class name in the NAR service definition
if (sinkClassName == null) {
ConnectorDefinition connectorDefinition = sinkFunction.getFunctionMetaData(ConnectorDefinition.class);
if (connectorDefinition == null) {
throw new IllegalArgumentException(
"Sink package doesn't contain the META-INF/services/pulsar-io.yaml file.");
}
sinkClassName = connectorDefinition.getSinkClass();
if (sinkClassName == null) {
throw new IllegalArgumentException("Failed to extract sink class from archive");
}
}
// check if sink implements the correct interfaces
TypeDefinition sinkClass;
try {
sinkClass = sinkFunction.resolveType(sinkClassName);
} catch (TypePool.Resolution.NoSuchTypeException e) {
throw new IllegalArgumentException(
String.format("Sink class %s not found", sinkClassName), e);
}
String functionClassName = sinkConfig.getTransformFunctionClassName();
TypeDefinition typeArg;
ValidatableFunctionPackage inputFunction;
if (transformFunction != null) {
// if function class name in sink config is not set, this should be a built-in function
// thus we should try to find it class name in the NAR service definition
if (functionClassName == null) {
FunctionDefinition functionDefinition =
transformFunction.getFunctionMetaData(FunctionDefinition.class);
if (functionDefinition == null) {
throw new IllegalArgumentException(
"Function package doesn't contain the META-INF/services/pulsar-io.yaml file.");
}
functionClassName = functionDefinition.getFunctionClass();
if (functionClassName == null) {
throw new IllegalArgumentException("Transform function class name must be set");
}
}
TypeDefinition functionClass;
try {
functionClass = transformFunction.resolveType(functionClassName);
} catch (TypePool.Resolution.NoSuchTypeException e) {
throw new IllegalArgumentException(
String.format("Function class %s not found", functionClassName), e);
}
// extract type from transform function class
if (!getRawFunctionTypes(functionClass, false)[1].asErasure().isAssignableTo(Record.class)) {
throw new IllegalArgumentException("Sink transform function output must be of type Record");
}
typeArg = getFunctionTypes(functionClass, false)[0];
inputFunction = transformFunction;
} else {
// extract type from sink class
typeArg = getSinkType(sinkClass);
inputFunction = sinkFunction;
}
if (sinkConfig.getTopicToSerdeClassName() != null) {
for (String serdeClassName : sinkConfig.getTopicToSerdeClassName().values()) {
ValidatorUtils.validateSerde(serdeClassName, typeArg, inputFunction.getTypePool(), true);
}
}
if (sinkConfig.getTopicToSchemaType() != null) {
for (String schemaType : sinkConfig.getTopicToSchemaType().values()) {
ValidatorUtils.validateSchema(schemaType, typeArg, inputFunction.getTypePool(), true);
}
}
// topicsPattern does not need checks
if (sinkConfig.getInputSpecs() != null) {
for (ConsumerConfig consumerSpec : sinkConfig.getInputSpecs().values()) {
// Only one is set
if (!isEmpty(consumerSpec.getSerdeClassName()) && !isEmpty(consumerSpec.getSchemaType())) {
throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
}
if (!isEmpty(consumerSpec.getSerdeClassName())) {
ValidatorUtils.validateSerde(consumerSpec.getSerdeClassName(), typeArg,
inputFunction.getTypePool(), true);
}
if (!isEmpty(consumerSpec.getSchemaType())) {
ValidatorUtils.validateSchema(consumerSpec.getSchemaType(), typeArg,
inputFunction.getTypePool(), true);
}
if (consumerSpec.getCryptoConfig() != null) {
ValidatorUtils.validateCryptoKeyReader(consumerSpec.getCryptoConfig(),
inputFunction.getTypePool(), false);
}
if (consumerSpec.getMessagePayloadProcessorConfig() != null) {
ValidatorUtils.validateMessagePayloadProcessor(consumerSpec.getMessagePayloadProcessorConfig(),
inputFunction.getTypePool());
}
}
}
if (sinkConfig.getRetainKeyOrdering() != null
&& sinkConfig.getRetainKeyOrdering()
&& sinkConfig.getProcessingGuarantees() != null
&& sinkConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
throw new IllegalArgumentException(
"When effectively once processing guarantee is specified, retain Key ordering cannot be set");
}
if (sinkConfig.getRetainKeyOrdering() != null && sinkConfig.getRetainKeyOrdering()
&& sinkConfig.getRetainOrdering() != null && sinkConfig.getRetainOrdering()) {
throw new IllegalArgumentException("Only one of retain ordering or retain key ordering can be set");
}
// validate user defined config if enabled and classloading is enabled
if (validateConnectorConfig) {
if (sinkFunction.isEnableClassloading()) {
validateSinkConfig(sinkConfig, sinkFunction);
} else {
log.warn("Skipping annotation based validation of sink config as classloading is disabled");
}
}
return new ExtractedSinkDetails(sinkClassName, typeArg.asErasure().getTypeName(), functionClassName);
}