in src/main/java/org/apache/doris/kafka/connector/utils/ConfigCheckUtils.java [55:219]
public static String validateConfig(Map<String, String> config) {
boolean configIsValid = true; // verify all config
// unique name of this connector instance
String connectorName = config.getOrDefault(DorisSinkConnectorConfig.NAME, "");
if (connectorName.isEmpty() || !isValidDorisApplicationName(connectorName)) {
LOG.error(
"{} is empty or invalid. It should match doris object identifier syntax. Please see "
+ "the documentation.",
DorisSinkConnectorConfig.NAME);
configIsValid = false;
}
String topics = config.getOrDefault(DorisSinkConnectorConfig.TOPICS, "");
String topicsRegex = config.getOrDefault(DorisSinkConnectorConfig.TOPICS_REGEX, "");
if (topics.isEmpty() && topicsRegex.isEmpty()) {
LOG.error(
"{} or {} cannot be empty.",
DorisSinkConnectorConfig.TOPICS,
DorisSinkConnectorConfig.TOPICS_REGEX);
configIsValid = false;
}
if (!topics.isEmpty() && !topicsRegex.isEmpty()) {
LOG.error(
"{} and {} cannot be set at the same time.",
DorisSinkConnectorConfig.TOPICS,
DorisSinkConnectorConfig.TOPICS_REGEX);
configIsValid = false;
}
if (config.containsKey(DorisSinkConnectorConfig.TOPICS_TABLES_MAP)
&& parseTopicToTableMap(config.get(DorisSinkConnectorConfig.TOPICS_TABLES_MAP))
== null) {
LOG.error("{} is empty or invalid.", DorisSinkConnectorConfig.TOPICS_TABLES_MAP);
configIsValid = false;
}
String dorisUrls = config.getOrDefault(DorisSinkConnectorConfig.DORIS_URLS, "");
if (dorisUrls.isEmpty()) {
LOG.error("{} cannot be empty.", DorisSinkConnectorConfig.DORIS_URLS);
configIsValid = false;
}
String queryPort = config.getOrDefault(DorisSinkConnectorConfig.DORIS_QUERY_PORT, "");
if (queryPort.isEmpty()) {
LOG.error("{} cannot be empty.", DorisSinkConnectorConfig.DORIS_QUERY_PORT);
configIsValid = false;
}
String httpPort = config.getOrDefault(DorisSinkConnectorConfig.DORIS_HTTP_PORT, "");
if (httpPort.isEmpty()) {
LOG.error("{} cannot be empty.", DorisSinkConnectorConfig.DORIS_HTTP_PORT);
configIsValid = false;
}
String dorisUser = config.getOrDefault(DorisSinkConnectorConfig.DORIS_USER, "");
if (dorisUser.isEmpty()) {
LOG.error("{} cannot be empty.", DorisSinkConnectorConfig.DORIS_USER);
configIsValid = false;
}
String autoDirect = config.getOrDefault(DorisSinkConnectorConfig.AUTO_REDIRECT, "");
if (!autoDirect.isEmpty()
&& !("true".equalsIgnoreCase(autoDirect) || "false".equalsIgnoreCase(autoDirect))) {
LOG.error("autoDirect non-boolean type, {}", autoDirect);
configIsValid = false;
}
String bufferCountRecords = config.get(DorisSinkConnectorConfig.BUFFER_COUNT_RECORDS);
if (!isNumeric(bufferCountRecords)) {
LOG.error(
"{} cannot be empty or not a number.",
DorisSinkConnectorConfig.BUFFER_COUNT_RECORDS);
configIsValid = false;
}
String bufferSizeBytes = config.get(DorisSinkConnectorConfig.BUFFER_SIZE_BYTES);
if (!isNumeric(bufferSizeBytes)
|| isIllegalRange(
bufferSizeBytes, DorisSinkConnectorConfig.BUFFER_SIZE_BYTES_MIN)) {
LOG.error(
"{} cannot be empty or not a number or less than 1.",
DorisSinkConnectorConfig.BUFFER_SIZE_BYTES);
configIsValid = false;
}
String bufferFlushTime = config.get(DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC);
if (!isNumeric(bufferFlushTime)
|| isIllegalRange(
bufferFlushTime, DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC_MIN)) {
LOG.error(
"{} cannot be empty or not a number or less than 10.",
DorisSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC);
configIsValid = false;
}
String loadModel = config.get(DorisSinkConnectorConfig.LOAD_MODEL);
if (!validateEnumInstances(loadModel, LoadModel.instances())) {
LOG.error(
"The value of {} is an illegal parameter of {}.",
loadModel,
DorisSinkConnectorConfig.LOAD_MODEL);
configIsValid = false;
}
String deliveryGuarantee = config.get(DorisSinkConnectorConfig.DELIVERY_GUARANTEE);
if (!validateEnumInstances(deliveryGuarantee, DeliveryGuarantee.instances())) {
LOG.error(
"The value of {} is an illegal parameter of {}.",
loadModel,
DorisSinkConnectorConfig.DELIVERY_GUARANTEE);
configIsValid = false;
}
String converterMode = config.get(DorisSinkConnectorConfig.CONVERTER_MODE);
if (!validateEnumInstances(converterMode, ConverterMode.instances())) {
LOG.error(
"The value of {} is an illegal parameter of {}.",
loadModel,
DorisSinkConnectorConfig.CONVERTER_MODE);
configIsValid = false;
}
String schemaEvolutionMode = config.get(DorisSinkConnectorConfig.DEBEZIUM_SCHEMA_EVOLUTION);
if (!validateEnumInstances(schemaEvolutionMode, SchemaEvolutionMode.instances())) {
LOG.error(
"The value of {} is an illegal parameter of {}.",
loadModel,
DorisSinkConnectorConfig.DEBEZIUM_SCHEMA_EVOLUTION);
configIsValid = false;
}
String maxRetries = config.get(DorisSinkConnectorConfig.MAX_RETRIES);
if (!isNumeric(maxRetries) || isIllegalRange(maxRetries, 0)) {
LOG.error(
"{} cannot be empty or not a number or less than 0.",
DorisSinkConnectorConfig.MAX_RETRIES);
configIsValid = false;
}
String retryIntervalMs = config.get(DorisSinkConnectorConfig.RETRY_INTERVAL_MS);
if (!isNumeric(retryIntervalMs) || isIllegalRange(retryIntervalMs, 0)) {
LOG.error(
"{} cannot be empty or not a number or less than 0.",
DorisSinkConnectorConfig.RETRY_INTERVAL_MS);
configIsValid = false;
}
String behaviorOnNullValues = config.get(DorisSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES);
if (!validateEnumInstances(behaviorOnNullValues, BehaviorOnNullValues.instances())) {
LOG.error(
"The value of {} is an illegal parameter of {}.",
behaviorOnNullValues,
DorisSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES);
configIsValid = false;
}
if (!configIsValid) {
throw new DorisException(
"input kafka connector configuration is null, missing required values, or wrong input value");
}
return connectorName;
}