in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java [289:345]
private void validateConfigOptions(ReadableConfig config, ClassLoader classLoader) {
String jdbcUrl = config.get(URL);
JdbcFactoryLoader.loadDialect(jdbcUrl, classLoader, config.get(COMPATIBLE_MODE));
checkAllOrNone(config, new ConfigOption[] {USERNAME, PASSWORD});
checkAllOrNone(
config,
new ConfigOption[] {
SCAN_PARTITION_COLUMN,
SCAN_PARTITION_NUM,
SCAN_PARTITION_LOWER_BOUND,
SCAN_PARTITION_UPPER_BOUND
});
if (config.getOptional(SCAN_PARTITION_LOWER_BOUND).isPresent()
&& config.getOptional(SCAN_PARTITION_UPPER_BOUND).isPresent()) {
long lowerBound = config.get(SCAN_PARTITION_LOWER_BOUND);
long upperBound = config.get(SCAN_PARTITION_UPPER_BOUND);
if (lowerBound > upperBound) {
throw new IllegalArgumentException(
String.format(
"'%s'='%s' must not be larger than '%s'='%s'.",
SCAN_PARTITION_LOWER_BOUND.key(),
lowerBound,
SCAN_PARTITION_UPPER_BOUND.key(),
upperBound));
}
}
checkAllOrNone(config, new ConfigOption[] {LOOKUP_CACHE_MAX_ROWS, LOOKUP_CACHE_TTL});
if (config.get(LOOKUP_MAX_RETRIES) < 0) {
throw new IllegalArgumentException(
String.format(
"The value of '%s' option shouldn't be negative, but is %s.",
LOOKUP_MAX_RETRIES.key(), config.get(LOOKUP_MAX_RETRIES)));
}
if (config.get(SINK_MAX_RETRIES) < 0) {
throw new IllegalArgumentException(
String.format(
"The value of '%s' option shouldn't be negative, but is %s.",
SINK_MAX_RETRIES.key(), config.get(SINK_MAX_RETRIES)));
}
if (config.get(MAX_RETRY_TIMEOUT).getSeconds() <= 0) {
throw new IllegalArgumentException(
String.format(
"The value of '%s' option must be in second granularity and shouldn't be smaller than 1 second, but is %s.",
MAX_RETRY_TIMEOUT.key(),
config.get(
ConfigOptions.key(MAX_RETRY_TIMEOUT.key())
.stringType()
.noDefaultValue())));
}
}