public void open()

in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/AdbpgOutputFormat.java [331:365]


    public void open(int taskNumber, int numTasks) throws IOException {

        dataSource = AdbpgOptions.buildDataSourceFromOptions(config);
        try {
            dataSource.init();
            executeSql("set optimizer to off");
            rawConn = (DruidPooledConnection) connection;
            baseConn = (BaseConnection) (rawConn.getConnection());
            copyManager = new CopyManager(baseConn);
        } catch (SQLException e) {
            LOG.error("Init DataSource Or Get Connection Error!", e);
            throw new IOException("cannot get connection for url: " + url + ", userName: " + userName + ", password: " + password, e);
        }

        executorService = new ScheduledThreadPoolExecutor(1,
                new BasicThreadFactory.Builder().namingPattern("adbpg-flusher-%d").daemon(true).build());
        executorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    if (System.currentTimeMillis() - lastWriteTime >= batchWriteTimeout) {
                        sync();
                    }
                } catch (Exception e) {
                    LOG.error("flush buffer to ADBPG failed", e);
                }
            }
        }, batchWriteTimeout, batchWriteTimeout, TimeUnit.MILLISECONDS);

        outRps = MetricUtils.registerNumRecordsOutRate(getRuntimeContext());
        outBps = MetricUtils.registerNumBytesOutRate(getRuntimeContext(), CONNECTOR_TYPE);
        latencyGauge = MetricUtils.registerCurrentSendTime(getRuntimeContext());
        sinkSkipCounter = MetricUtils.registerNumRecordsOutErrors(getRuntimeContext());
        deleteCounter = MetricUtils.registerSinkDeleteCounter(getRuntimeContext());
    }