private void retryExecuteQuery()

in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sourceimpl/AdbpgRowDataLookupFunction.java [221:285]


    private void retryExecuteQuery(RowData keyRow, ArrayList<RowData> rows) throws Exception {
        int attemptNum = 0;
        Exception lastError = null;
        PreparedStatement statement = null;
        Connection connection = null;
        long startTime = System.currentTimeMillis();
        String connfrom = "";
        if (1 == this.usePool) {
            connection = (Connection) this.dataSource.getConnection();
            connfrom = "from Pool";
        } else {
            Class.forName(driverClassName).newInstance();
            connection = DriverManager.getConnection(url, username, password);
            connfrom = "from Driver";
        }
        if (1 == this.verbose) {
            long endTime = System.currentTimeMillis();
            LOG.info("getConnection " + connfrom + " cost:" + (endTime - startTime) + " ms。");
        }
        try {
            statement = connection.prepareStatement(queryTemplate);
            statement.clearParameters();
            statement = lookupKeyRowConverter.toExternal(keyRow, statement);
            ResultSet resultSet = statement.executeQuery();
            int cnt = 0;
            while (cnt < joinMaxRows) {
                if (resultSet.isClosed()) {
                    rows.clear();
                    throw new RuntimeException("result closed before collect.");
                }
                if (!resultSet.next()) {
                    break;
                }
                rows.add(jdbcRowConverter.toInternal(resultSet, this.exceptionMode));
                cnt++;
            }
            if (1 == this.verbose) {
                long endTime = System.currentTimeMillis();
                LOG.info("ADBPG rt time is: " + (endTime - startTime) + " ms");
            }
            return;
        } catch (Exception e) {
            LOG.warn("Error happens when query ADBPG, try for the {} time.", attemptNum, e);
            lastError = e;
        } finally {
            if (statement != null && !statement.isClosed()) {
                statement.close();
            }
            if (connection != null && !connection.isClosed()) {
                connection.close();
            }
        }
        // sleep if query execute failed.
        try {
            if (attemptNum < maxRetryTime) {
                Thread.sleep(retryWaitTime);
            }
        } catch (Exception e) {
        }
        assert lastError != null;
        LOG.info("error orrcured where execute " + queryTemplate, lastError);
        if ("strict".equalsIgnoreCase(exceptionMode)) {
            throw lastError;
        }
    }