in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sourceimpl/AdbpgDataScanFunction.java [106:136]
public void open(InputSplit inputSplit) throws IOException {
sourceInTps = MetricUtils.registerNumRecordsInRate(getRuntimeContext());
try {
if (inputSplit != null && parameterValues != null) {
int pl = parameterValues[inputSplit.getSplitNumber()].length;
String[] bound = new String[pl];
for (int i = 0; i < pl; i++) {
bound[i] = parameterValues[inputSplit.getSplitNumber()][i].toString();
}
queryTemplate =
String.format(queryTemplate, bound);
}
String query =
String.format(
"COPY (%s ) TO STDOUT WITH DELIMITER e'%s'",
queryTemplate, "\u0002");
LOG.info(
String.format(
"Executing '%s' ",
query));
Class.forName(driverClassName).newInstance();
connection = DriverManager.getConnection(url, username, password);
in = new PGCopyInputStream((PGConnection) connection, query);
streamReader = new InputStreamReader(in);
reader = new BufferedReader(streamReader);
} catch (Exception se) {
throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
}
}