in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sourceimpl/AdbpgRowDataLookupFunction.java [221:285]
private void retryExecuteQuery(RowData keyRow, ArrayList<RowData> rows) throws Exception {
int attemptNum = 0;
Exception lastError = null;
PreparedStatement statement = null;
Connection connection = null;
long startTime = System.currentTimeMillis();
String connfrom = "";
if (1 == this.usePool) {
connection = (Connection) this.dataSource.getConnection();
connfrom = "from Pool";
} else {
Class.forName(driverClassName).newInstance();
connection = DriverManager.getConnection(url, username, password);
connfrom = "from Driver";
}
if (1 == this.verbose) {
long endTime = System.currentTimeMillis();
LOG.info("getConnection " + connfrom + " cost:" + (endTime - startTime) + " ms。");
}
try {
statement = connection.prepareStatement(queryTemplate);
statement.clearParameters();
statement = lookupKeyRowConverter.toExternal(keyRow, statement);
ResultSet resultSet = statement.executeQuery();
int cnt = 0;
while (cnt < joinMaxRows) {
if (resultSet.isClosed()) {
rows.clear();
throw new RuntimeException("result closed before collect.");
}
if (!resultSet.next()) {
break;
}
rows.add(jdbcRowConverter.toInternal(resultSet, this.exceptionMode));
cnt++;
}
if (1 == this.verbose) {
long endTime = System.currentTimeMillis();
LOG.info("ADBPG rt time is: " + (endTime - startTime) + " ms");
}
return;
} catch (Exception e) {
LOG.warn("Error happens when query ADBPG, try for the {} time.", attemptNum, e);
lastError = e;
} finally {
if (statement != null && !statement.isClosed()) {
statement.close();
}
if (connection != null && !connection.isClosed()) {
connection.close();
}
}
// sleep if query execute failed.
try {
if (attemptNum < maxRetryTime) {
Thread.sleep(retryWaitTime);
}
} catch (Exception e) {
}
assert lastError != null;
LOG.info("error orrcured where execute " + queryTemplate, lastError);
if ("strict".equalsIgnoreCase(exceptionMode)) {
throw lastError;
}
}