in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/JdbcDynamicTableFactory.java [142:164]
private InternalJdbcConnectionOptions getJdbcOptions(
ReadableConfig readableConfig, ClassLoader classLoader) {
final String url = readableConfig.get(URL);
final InternalJdbcConnectionOptions.Builder builder =
InternalJdbcConnectionOptions.builder()
.setClassLoader(classLoader)
.setDBUrl(url)
.setTableName(readableConfig.get(TABLE_NAME))
.setDialect(
JdbcFactoryLoader.loadDialect(
url, classLoader, readableConfig.get(COMPATIBLE_MODE)))
.setParallelism(readableConfig.getOptional(SINK_PARALLELISM).orElse(null))
.setConnectionCheckTimeoutSeconds(
(int) readableConfig.get(MAX_RETRY_TIMEOUT).getSeconds());
readableConfig.getOptional(DRIVER).ifPresent(builder::setDriverName);
readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
readableConfig.getOptional(PASSWORD).ifPresent(builder::setPassword);
readableConfig.getOptional(COMPATIBLE_MODE).ifPresent(builder::setCompatibleMode);
getConnectionProperties(readableConfig)
.forEach((key, value) -> builder.setProperty(key.toString(), value.toString()));
return builder.build();
}