in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/utils/AdbpgOptions.java [383:431]
public static DruidDataSource buildDataSourceFromOptions(ReadableConfig config) {
String url = config.get(AdbpgOptions.URL);
String userName = config.get(AdbpgOptions.USERNAME);
String password = config.get(AdbpgOptions.PASSWORD);
if (!url.startsWith("jdbc:postgresql://")) {
throw new IllegalArgumentException(
String.format(
"url of %s must starts with jdbc:postgresql:// format, but actual url is %s",
CONNECTOR_TYPE, url));
}
int connectionMaxActive = config.get(CONNECTION_MAX_ACTIVE);
long connectionMaxWait = config.get(CONNECTION_MAX_WAIT);
DruidDataSource dataSource = new DruidDataSource();
dataSource.setUrl(url);
dataSource.setUsername(userName);
dataSource.setPassword(password);
dataSource.setDriverClassName(AdbpgOptions.DRIVER_CLASS);
dataSource.setMaxActive(connectionMaxActive);
dataSource.setMaxWait(connectionMaxWait);
dataSource.setInitialSize(1);
dataSource.setPoolPreparedStatements(false);
dataSource.setTestWhileIdle(true);
dataSource.setValidationQuery("select 'adbpg_flink_connector'");
dataSource.setTimeBetweenEvictionRunsMillis(-1);
dataSource.setMinEvictableIdleTimeMillis(Long.MAX_VALUE - 1);
dataSource.setMaxEvictableIdleTimeMillis(Long.MAX_VALUE);
LOG.info("connector " + CONNECTOR_TYPE + " created using url=" + url + ", "
+ "tableName=" + config.get(TABLE_NAME) + ", "
+ "userName=" + userName + ", "
+ "password=" + password + ", "
+ "maxRetries=" + config.get(MAX_RETRY_TIMES) + ", "
+ "retryWaitTime=" + config.get(RETRY_WAIT_TIME) + ", "
+ "batchSize=" + config.get(BATCH_SIZE) + ", "
+ "connectionMaxActive=" + connectionMaxActive + ", "
+ "batchWriteTimeoutMs=" + config.get(BATCH_WRITE_TIMEOUT_MS) + ", "
+ "conflictMode=" + config.get(CONFLICT_MODE) + ", "
+ "timeZone=" + "Asia/Shanghai" + ", "
+ "useCopy=" + config.get(USE_COPY) + ", "
+ "targetSchema=" + config.get(TARGET_SCHEMA) + ", "
+ "exceptionMode=" + config.get(EXCEPTION_MODE) + ", "
+ "reserveMs=" + config.get(RESERVEMS) + ", "
+ "caseSensitive=" + config.get(CASE_SENSITIVE) + ", "
+ "writeMode=" + config.get(WRITE_MODE) + ", "
+ "verbose=" + config.get(VERBOSE));
return dataSource;
}