in hologres-connector-flink-base/src/main/java/com/alibaba/ververica/connectors/hologres/jdbc/copy/HologresJDBCCopyWriter.java [288:375]
public CopyContext init(HologresConnectionParam param) {
Connection conn = null;
String url = param.getJdbcOptions().getDbUrl();
// Copy is generally used in scenarios with large parallelism, but load balancing of
// hologres vip endpoints may not be good. we randomize an offset and distribute
// connections evenly to each fe
if (numFrontends > 0) {
int choseFrontendId = chooseFrontendId();
LOG.info(
"taskNumber {}, number of frontends {}, frontend id offset {}, frontend id chose {}",
taskNumber,
numFrontends,
frontendOffset,
choseFrontendId);
url += ("?options=fe=" + choseFrontendId);
// for none public cloud, we connect to holo fe with inner ip:port directly
if (param.isDirectConnect()) {
url = JDBCUtils.getJdbcDirectConnectionUrl(param.getJdbcOptions(), url);
LOG.info("will connect directly to fe id {} with url {}", choseFrontendId, url);
}
}
try {
conn =
JDBCUtils.createConnection(
param.getJdbcOptions(),
url, /*sslModeConnection*/
true, /*maxRetryCount*/
3, /*appName*/
"hologres-connector-flink-" + copyMode);
LOG.info("init conn success to fe " + url);
pgConn = conn.unwrap(PgConnection.class);
LOG.info("init unwrap conn success");
// Set statement_timeout at the session level,avoid being affected by db level
// configuration. (for example, less than the checkpoint time)
executeSql(
pgConn,
String.format(
"set statement_timeout = %s", param.getStatementTimeoutSeconds()));
if (param.isEnableServerlessComputing()) {
executeSql(pgConn, "set hg_computing_resource = 'serverless';");
executeSql(
pgConn,
String.format(
"set hg_experimental_serverless_computing_query_priority = '%d';",
param.getServerlessComputingQueryPriority()));
}
// 不抛出异常: copy不需要返回影响行数所以默认关闭,但此guc仅部分版本支持,而且设置失败不影响程序运行
executeSql(
pgConn,
"set hg_experimental_enable_fixed_dispatcher_affected_rows = off;",
false);
if (param.isEnableReshuffleByHolo()) {
String result =
getTargetShardList().stream()
.map(String::valueOf)
.collect(Collectors.joining(","));
LOG.info(
"enable target-shards, numTasks: {}, this taskNumber: {}, target shard list: {}",
numTasks,
taskNumber,
result);
executeSql(
pgConn,
String.format("set hg_experimental_target_shard_list = '%s'", result));
}
if (copyMode.equals(CopyMode.BULK_LOAD_ON_CONFLICT)) {
executeSql(pgConn, "set hg_experimental_copy_enable_on_conflict = on;", true);
executeSql(
pgConn,
"set hg_experimental_affect_row_multiple_times_keep_last = on;",
true);
}
manager = new CopyManager(pgConn);
LOG.info("init new manager success");
} catch (SQLException e) {
if (null != conn) {
try {
conn.close();
} catch (SQLException ignored) {
}
}
pgConn = null;
manager = null;
throw new RuntimeException(e);
}
return this;
}