in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java [125:166]
public Collection<RowData> lookup(RowData keyRow) {
for (int retry = 0; retry <= maxRetryTimes; retry++) {
try {
statement.clearParameters();
statement = lookupKeyRowConverter.toExternal(keyRow, statement);
try (ResultSet resultSet = statement.executeQuery()) {
ArrayList<RowData> rows = new ArrayList<>();
while (resultSet.next()) {
RowData row = jdbcRowConverter.toInternal(resultSet);
rows.add(row);
}
rows.trimToSize();
return rows;
}
} catch (SQLException e) {
LOG.error(String.format("JDBC executeBatch error, retry times = %d", retry), e);
if (retry >= maxRetryTimes) {
throw new RuntimeException("Execution of JDBC statement failed.", e);
}
try {
if (!connectionProvider.isConnectionValid()) {
statement.close();
connectionProvider.closeConnection();
establishConnectionAndStatement();
}
} catch (SQLException | ClassNotFoundException exception) {
LOG.error(
"JDBC connection is not valid, and reestablish connection failed",
exception);
throw new RuntimeException("Reestablish JDBC connection failed", exception);
}
try {
Thread.sleep(1000L * retry);
} catch (InterruptedException e1) {
throw new RuntimeException(e1);
}
}
}
return Collections.emptyList();
}