public CopyContext init()

in hologres-connector-flink-base/src/main/java/com/alibaba/ververica/connectors/hologres/jdbc/copy/HologresJDBCCopyWriter.java [288:375]


        public CopyContext init(HologresConnectionParam param) {
            Connection conn = null;
            String url = param.getJdbcOptions().getDbUrl();
            // Copy is generally used in scenarios with large parallelism, but load balancing of
            // hologres vip endpoints may not be good. we randomize an offset and distribute
            // connections evenly to each fe
            if (numFrontends > 0) {
                int choseFrontendId = chooseFrontendId();
                LOG.info(
                        "taskNumber {}, number of frontends {}, frontend id offset {}, frontend id chose {}",
                        taskNumber,
                        numFrontends,
                        frontendOffset,
                        choseFrontendId);
                url += ("?options=fe=" + choseFrontendId);
                // for none public cloud, we connect to holo fe with inner ip:port directly
                if (param.isDirectConnect()) {
                    url = JDBCUtils.getJdbcDirectConnectionUrl(param.getJdbcOptions(), url);
                    LOG.info("will connect directly to fe id {} with url {}", choseFrontendId, url);
                }
            }
            try {
                conn =
                        JDBCUtils.createConnection(
                                param.getJdbcOptions(),
                                url, /*sslModeConnection*/
                                true, /*maxRetryCount*/
                                3, /*appName*/
                                "hologres-connector-flink-" + copyMode);
                LOG.info("init conn success to fe " + url);
                pgConn = conn.unwrap(PgConnection.class);
                LOG.info("init unwrap conn success");
                // Set statement_timeout at the session level,avoid being affected by db level
                // configuration. (for example, less than the checkpoint time)
                executeSql(
                        pgConn,
                        String.format(
                                "set statement_timeout = %s", param.getStatementTimeoutSeconds()));
                if (param.isEnableServerlessComputing()) {
                    executeSql(pgConn, "set hg_computing_resource = 'serverless';");
                    executeSql(
                            pgConn,
                            String.format(
                                    "set hg_experimental_serverless_computing_query_priority = '%d';",
                                    param.getServerlessComputingQueryPriority()));
                }
                // 不抛出异常: copy不需要返回影响行数所以默认关闭,但此guc仅部分版本支持,而且设置失败不影响程序运行
                executeSql(
                        pgConn,
                        "set hg_experimental_enable_fixed_dispatcher_affected_rows = off;",
                        false);
                if (param.isEnableReshuffleByHolo()) {
                    String result =
                            getTargetShardList().stream()
                                    .map(String::valueOf)
                                    .collect(Collectors.joining(","));
                    LOG.info(
                            "enable target-shards, numTasks: {}, this taskNumber: {}, target shard list: {}",
                            numTasks,
                            taskNumber,
                            result);
                    executeSql(
                            pgConn,
                            String.format("set hg_experimental_target_shard_list = '%s'", result));
                }
                if (copyMode.equals(CopyMode.BULK_LOAD_ON_CONFLICT)) {
                    executeSql(pgConn, "set hg_experimental_copy_enable_on_conflict = on;", true);
                    executeSql(
                            pgConn,
                            "set hg_experimental_affect_row_multiple_times_keep_last = on;",
                            true);
                }
                manager = new CopyManager(pgConn);
                LOG.info("init new manager success");
            } catch (SQLException e) {
                if (null != conn) {
                    try {
                        conn.close();
                    } catch (SQLException ignored) {

                    }
                }
                pgConn = null;
                manager = null;
                throw new RuntimeException(e);
            }
            return this;
        }