private void executeSqlWithPrepareStatement()

in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/AdbpgOutputFormat.java [901:954]


    private void executeSqlWithPrepareStatement(
            String sql, List<RowData> valueList, JdbcRowConverter rowDataConverter, boolean del) throws SQLException {
        long start = System.currentTimeMillis();
        int retryTime = 0;
        while (retryTime++ < maxRetryTime) {
            Connection connection = null;
            try {
                connection = dataSource.getConnection();
                PreparedStatement preparedStatement = connection.prepareStatement(sql);
                for (RowData rowData : valueList) {
                    if (existsPrimaryKeys && del) {
                        rowDataConverter.toExternal(getPrimaryKey(rowData), preparedStatement);
                    } else {
                        rowDataConverter.toExternal(rowData, preparedStatement);
                    }
                    preparedStatement.addBatch();
                }
                preparedStatement.executeBatch();
                break;
            } catch (SQLException exception) {
                SQLException throwables = exception;
                if (throwables instanceof BatchUpdateException) {
                    throwables = ((BatchUpdateException) throwables).getNextException();
                }
                LOG.error(String.format("Execute sql error, sql: %s, retryTimes: %d", sql, retryTime), throwables);
                if (retryTime == maxRetryTime) {
                    if ("strict".equalsIgnoreCase(exceptionMode)) {
                        throw throwables;
                    } else {
                        LOG.warn("Ignore exception {} when execute sql {}", throwables, sql);
                        sinkSkipCounter.inc();
                    }
                }
                try {
                    // sleep according to retryTimes
                    Thread.sleep(retryWaitTime);
                } catch (Exception e) {
                    // ignore
                }
            } finally {
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (Exception exception) {
                        LOG.debug("close connection error", exception);
                    }
                }
            }
        }

        long end = System.currentTimeMillis();
        reportMetric(valueList, start, end, (long) valueList.size() * sql.length() * 2);
        LOG.debug("%s operation succeed on %d records ", !del ? "Delete" : "Write", valueList.size());
    }