public void open()

in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sourceimpl/AdbpgDataScanFunction.java [106:136]


    public void open(InputSplit inputSplit) throws IOException {
        sourceInTps = MetricUtils.registerNumRecordsInRate(getRuntimeContext());
        try {
            if (inputSplit != null && parameterValues != null) {
                int pl = parameterValues[inputSplit.getSplitNumber()].length;
                String[] bound = new String[pl];
                for (int i = 0; i < pl; i++) {
                    bound[i] = parameterValues[inputSplit.getSplitNumber()][i].toString();
                }
                queryTemplate =
                        String.format(queryTemplate, bound);
            }

            String query =
                    String.format(
                            "COPY (%s ) TO STDOUT WITH DELIMITER e'%s'",
                            queryTemplate, "\u0002");
            LOG.info(
                    String.format(
                            "Executing '%s' ",
                            query));

            Class.forName(driverClassName).newInstance();
            connection = DriverManager.getConnection(url, username, password);
            in = new PGCopyInputStream((PGConnection) connection, query);
            streamReader = new InputStreamReader(in);
            reader = new BufferedReader(streamReader);
        } catch (Exception se) {
            throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
        }
    }